Spring响应式编程之Reactor操作符

2024-06-23 23:52

本文主要是介绍Spring响应式编程之Reactor操作符,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

操作符

  • 操作符Processo<T,R>
    • (1)创建操作符
    • (2)转换操作符
    • (3)组合操作符
    • (4)条件操作符
    • (5)错误处理操作符

操作符Processo<T,R>

操作符并不是响应式流规范的一部分,但为了改进响应式代码的可读性并降低开发成本,Reactor 库中的 API 提供了一组丰富的操作符,这些操作符为响应式流规范提供了最大的附加值。下面介绍一些常用的操作符。

(1)创建操作符

  • just:创建一个包含单个元素的Mono或多个元素的Flux;
  • empty:创建一个空的Flux或Mono;
  • defer:在订阅时动态创建一个新的Flux或Mono;
  • fromArray:从数组创建Flux;
  • fromIterable:从Iterable对象创建Flux;
  • range:创建一个从start到end的整数序列Flux;
  • interval:创建一个按时间间隔发布数据的Flux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;public class CreationExample {public static void main(String[] args) {// 示例 1: 使用 Mono 创建操作符Mono<String> monoJust = Mono.just("Hello, Mono");Mono<String> monoEmpty = Mono.empty();Mono<String> monoDefer = Mono.defer(() -> Mono.just("Deferred Mono"));// 订阅 Mono 并打印结果monoJust.subscribe(System.out::println);monoEmpty.subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Completed"));monoDefer.subscribe(System.out::println);// 示例 2: 使用 Flux 创建操作符Flux<String> fluxJust = Flux.just("A", "B", "C");Flux<String> fluxFromArray = Flux.fromArray(new String[]{"A", "B", "C"});List<String> list = Arrays.asList("A", "B", "C");Flux<String> fluxFromIterable = Flux.fromIterable(list);Flux<String> fluxFromStream = Flux.fromStream(Stream.of("A", "B", "C"));Flux<Integer> fluxRange = Flux.range(1, 5);Flux<Long> fluxInterval = Flux.interval(Duration.ofSeconds(1));Flux<String> fluxDefer = Flux.defer(() -> Flux.just("Deferred Flux"));// 订阅 Flux 并打印结果fluxJust.subscribe(System.out::println);fluxFromArray.subscribe(System.out::println);fluxFromIterable.subscribe(System.out::println);fluxFromStream.subscribe(System.out::println);fluxRange.subscribe(System.out::println);fluxInterval.take(5).subscribe(System.out::println);fluxDefer.subscribe(System.out::println);}
}

(2)转换操作符

  • map:将Mono中的值或Flux中的每个元素转换为另一种类型;

  • flatmap:将Mono中的值或Flux中的每个元素转换为另一个Mono或另一个Publisher,并展平结果;

  • flatMapSequential:类似于flatMap,但保持顺序并并行处理;

  • flatMapMany:将Mono中的值转换为Flux;

  • collectList: 将Flux中的所有元素收集到一个List中,返回Mono<List<T>>;

  • collectMap:将Flux中的元素收集到一个Map中,返回Mono<Map<K,V>>;

  • reduce:聚合Flux中的元素,返回Mono;

  • buffer:将Flux中的元素收集到List中,按指定大小进行分组;

  • window:将Flux中的元素分组到Flux中,每组包含指定数量的元素;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.util.List;public class ConversionExample {public static void main(String[] args) {// 示例 1: 使用 Mono 转换操作符Mono<Integer> mono = Mono.just("123").map(Integer::parseInt).flatMap(i -> Mono.just(i * 2)).doOnNext(System.out::println);mono.subscribe();// 示例 2: 使用 Flux 转换操作符Flux<Integer> flux = Flux.just("1", "2", "3", "4", "5").map(Integer::parseInt).filter(i -> i % 2 == 0).flatMap(i -> Flux.just(i * 2)).concatMap(i -> Flux.just(i + 1)).buffer(2).doOnNext(System.out::println);flux.subscribe();}
}

(3)组合操作符

  • zipWith:将两个Mono的值组合成一个新的Mono;
  • zip:将多个Flux的元素组合成一个Flux;
  • then:在当前Mono或Flux完成后执行另一个Mono或Flux;
  • thenReturn:在当前Mono或Flux完成后返回一个指定的值;
  • thenMany:在当前Mono完成后返回一个Flux;
  • when:等待多个Publisher完成
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;public class CombinationExample {public static void main(String[] args) {// 示例 1: 使用 Mono 组合操作符Mono<String> mono1 = Mono.just("Hello");Mono<String> mono2 = Mono.just("World");Mono<String> combined = mono1.zipWith(mono2, (a, b) -> a + " " + b);combined.subscribe(System.out::println); // 输出: Hello WorldMono<Void> when = Mono.when(mono1, mono2);when.subscribe(null, Throwable::printStackTrace, () -> System.out.println("Completed")); // 输出: Completed// 示例 2: 使用 Flux 组合操作符Flux<String> flux1 = Flux.just("A", "B", "C");Flux<String> flux2 = Flux.just("1", "2", "3");Flux<String> merged = Flux.merge(flux1, flux2);merged.subscribe(System.out::println); // 输出: A 1 B 2 C 3Flux<String> concatenated = Flux.concat(flux1, flux2);concatenated.subscribe(System.out::println); // 输出: A B C 1 2 3Flux<String> zipped = Flux.zip(flux1, flux2, (a, b) -> a + b);zipped.subscribe(System.out::println); // 输出: A1 B2 C3Flux<String> combinedLatest = Flux.combineLatest(flux1, flux2, (a, b) -> a + b);combinedLatest.subscribe(System.out::println); // 输出: C3Flux<String> started = flux1.startWith("Start");started.subscribe(System.out::println); // 输出: Start A B C}
}

(4)条件操作符

  • hasElement:判断Mono是否包含元素;
  • hasElements:判断Flux是否包含元素;
  • hasElementWith:判断Mono是否包含与给定Predicate匹配的元素;
  • all:判断Flux中的所有元素是否都满足给定的条件;
  • any:判断Flux中是否有任意一个元素满足给定的条件;
  • isEmpty:判断Flux是否为空;
  • switchIfEmpty:如果Mono或Flux为空,则切换到另一个Mono或Flux;
  • defaultIfEmpty:如果Mono或Flux为空,则返回默认值;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;public class ConditionalExample {public static void main(String[] args) {// 示例 1: 使用 Mono 条件操作符Mono<String> mono = Mono.just("Hello");Mono<Boolean> hasElement = mono.hasElement();hasElement.subscribe(System.out::println); // 输出: trueMono<String> emptyMono = Mono.<String>empty();Mono<String> switchIfEmptyMono = emptyMono.switchIfEmpty(Mono.just("Default"));switchIfEmptyMono.subscribe(System.out::println); // 输出: DefaultMono<String> defaultIfEmptyMono = emptyMono.defaultIfEmpty("Default");defaultIfEmptyMono.subscribe(System.out::println); // 输出: Default// 示例 2: 使用 Flux 条件操作符Flux<Integer> flux = Flux.just(1, 2, 3, 4);Mono<Boolean> allMatch = flux.all(i -> i > 0);allMatch.subscribe(System.out::println); // 输出: trueMono<Boolean> anyMatch = flux.any(i -> i > 3);anyMatch.subscribe(System.out::println); // 输出: trueMono<Boolean> hasElements = flux.hasElements();hasElements.subscribe(System.out::println); // 输出: trueMono<Boolean> isEmpty = flux.isEmpty();isEmpty.subscribe(System.out::println); // 输出: falseFlux<Integer> emptyFlux = Flux.<Integer>empty();Flux<Integer> switchIfEmptyFlux = emptyFlux.switchIfEmpty(Flux.just(10, 20, 30));switchIfEmptyFlux.subscribe(System.out::println); // 输出: 10 20 30Flux<Integer> defaultIfEmptyFlux = emptyFlux.defaultIfEmpty(999);defaultIfEmptyFlux.subscribe(System.out::println); // 输出: 999}
}

(5)错误处理操作符

  • onErrorResume:当发生错误时,切换到另一个数据流;
  • onErrorReturn:当发生错误时,返回一个默认值;
  • onErrorMap:将错误映射为另一个错误;
  • retry重试操作一定次数;
  • retryWhen:当错误发生时,根据提供的Publisher逻辑重试;
  • doOnError:当发生错误时执行一些额外的逻辑;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;public class ErrorHandlingExample {public static void main(String[] args) {// 示例 1: 使用 Mono 错误处理操作符Mono<String> mono1 = Mono.error(new RuntimeException("Error")).onErrorResume(e -> Mono.just("Recovered"));mono1.subscribe(System.out::println); // 输出: RecoveredMono<String> mono2 = Mono.error(new RuntimeException("Error")).onErrorReturn("Default");mono2.subscribe(System.out::println); // 输出: DefaultMono<String> mono3 = Mono.error(new RuntimeException("Error")).onErrorMap(e -> new IllegalArgumentException("Mapped Error", e));mono3.subscribe(System.out::println, Throwable::printStackTrace); // 输出: Mapped ErrorMono<String> mono4 = Mono.error(new RuntimeException("Error")).retry(3);mono4.subscribe(System.out::println, Throwable::printStackTrace);Mono<String> mono5 = Mono.error(new RuntimeException("Error")).retryWhen(companion -> companion.take(3));mono5.subscribe(System.out::println, Throwable::printStackTrace);Mono<String> mono6 = Mono.error(new RuntimeException("Error")).doOnError(e -> System.out.println("Error occurred: " + e.getMessage()));mono6.subscribe(System.out::println, Throwable::printStackTrace);// 示例 2: 使用 Flux 错误处理操作符Flux<String> flux1 = Flux.just("A", "B").concatWith(Mono.error(new RuntimeException("Error"))).onErrorResume(e -> Flux.just("Recovered"));flux1.subscribe(System.out::println); // 输出: A B RecoveredFlux<String> flux2 = Flux.just("A", "B").concatWith(Mono.error(new RuntimeException("Error"))).onErrorReturn("Default");flux2.subscribe(System.out::println); // 输出: A B DefaultFlux<String> flux3 = Flux.just("A", "B").concatWith(Mono.error(new RuntimeException("Error"))).onErrorMap(e -> new IllegalArgumentException("Mapped Error", e));flux3.subscribe(System.out::println, Throwable::printStackTrace); // 输出: Mapped ErrorFlux<String> flux4 = Flux.just("A", "B").concatWith(Mono.error(new RuntimeException("Error"))).retry(3);flux4.subscribe(System.out::println, Throwable::printStackTrace);Flux<String> flux5 = Flux.just("A", "B").concatWith(Mono.error(new RuntimeException("Error"))).retryWhen(companion -> companion.take(3));flux5.subscribe(System.out::println, Throwable::printStackTrace);Flux<String> flux6 = Flux.just("A", "B").concatWith(Mono.error(new RuntimeException("Error"))).doOnError(e -> System.out.println("Error occurred: " + e.getMessage()));flux6.subscribe(System.out::println, Throwable::printStackTrace);}
}

这篇关于Spring响应式编程之Reactor操作符的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1088636

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Linux 网络编程 --- 应用层

一、自定义协议和序列化反序列化 代码: 序列化反序列化实现网络版本计算器 二、HTTP协议 1、谈两个简单的预备知识 https://www.baidu.com/ --- 域名 --- 域名解析 --- IP地址 http的端口号为80端口,https的端口号为443 url为统一资源定位符。CSDNhttps://mp.csdn.net/mp_blog/creation/editor