java的反应式流

2023-11-08 16:30
文章标签 java 反应式

本文主要是介绍java的反应式流,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Java的反应式流是一种新的编程模型,它在异步和事件驱动的环境下工作。反应式流的目的是为了解决传统的单线程或者多线程编程模型在高并发和大流量情况下的性能瓶颈。

反应式流的核心是Observable和Observer,Observable表示一个数据流,而Observer则表示这个数据流的消费者。Observable在数据流上产生事件,而Observer则对这些事件进行响应。反应式流的数据流是一种推式的流,Observable发布事件时不需要等待Observer接收,Observable会把事件推送给Observer,而不是Observer去轮询Observable。

Java的反应式流通常基于Reactor或RxJava等库,这些库提供了丰富的函数式编程API和运算符,可以非常方便地处理异步事件。这些库都提供了类似于Observable和Observer的抽象概念,可以用来描述和处理异步数据流。同时还提供了常用的运算符,包括map、filter、reduce等,这些运算符可以方便地对数据流进行变换和过滤。

反应式流还有一个重要的概念是背压(backpressure),它是指在高并发和大流量情况下,消费者无法处理生产者产生的数据流,导致数据积压的情况。为了解决这个问题,反应式流引入了背压机制,生产者会在发送数据前先询问消费者的处理能力,如果消费者没有处理能力,生产者会等待一段时间或者缓存数据,等待消费者处理完数据后再继续发送。

反应式流已经被广泛应用于大规模的互联网应用中,包括机器学习、数据分析、网络爬虫等领域。它的优点在于处理高并发和大流量的数据流时,能够更加高效地利用系统资源,提高系统的性能和可扩展性。

总之,反应式流是Java编程中的一个重要概念,它可以帮助我们更好地处理异步和事件驱动的数据流,提高系统的性能和可扩展性。

不涉及任何库,就单纯用java的反应式流,完成发布订阅者模式:

package com.example.jdk9.react;import java.util.concurrent.Flow.*;public class PublisherSubscriberDemo {public static void main(String[] args) {SimplePublisher<String> publisher = new SimplePublisher<>();SimpleSubscriber<String> subscriber1 = new SimpleSubscriber<>();SimpleSubscriber<String> subscriber2 = new SimpleSubscriber<>();publisher.subscribe(subscriber1);publisher.subscribe(subscriber2);publisher.submit("hello");publisher.submit("world");publisher.close();}
}class SimplePublisher<T> implements Publisher<T> {private Subscription subscription;@Overridepublic void subscribe(Subscriber<? super T> subscriber) {subscriber.onSubscribe(new Subscription() {@Overridepublic void request(long n) {}@Overridepublic void cancel() {// nothing to do}});this.subscription = new Subscription() {private boolean cancelled = false;@Overridepublic void request(long n) {// nothing to do}@Overridepublic void cancel() {this.cancelled = true;}public boolean isCancelled() {return this.cancelled;}};subscriber.onSubscribe(this.subscription);}public void submit(T item) {subscriptionLimitedQueue.offer(item);subscription.request(1);}public void close() {while (!subscriptionLimitedQueue.isEmpty()) {subscriptionLimitedQueue.poll();}subscription.cancel();}private SubscriptionLimitedQueue<T> subscriptionLimitedQueue = new SubscriptionLimitedQueue<>(2);static class SubscriptionLimitedQueue<T> {private final int limit;private int size = 0;private Node<T> head;private Node<T> tail;public SubscriptionLimitedQueue(int limit) {this.limit = limit;}private static class Node<T> {final T item;Node<T> next;Node(T item, Node<T> next) {this.item = item;this.next = next;}}public void offer(T item) {Node<T> node = new Node<>(item, null);if (head == null) {head = node;tail = head;} else {tail.next = node;tail = tail.next;}size++;if (size > limit) {Node<T> newHead = head.next;head.next = null;head = newHead;size--;}}public boolean isEmpty() {return size == 0;}public T poll() {if (isEmpty()) {return null;}T item = head.item;Node<T> newHead = head.next;head.next = null;head = newHead;size--;return item;}}
}class SimpleSubscriber<T> implements Subscriber<T> {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {this.subscription = subscription;System.out.println("订阅成功");subscription.request(1);}@Overridepublic void onNext(T item) {System.out.println("Received item: " + item);subscription.request(1);}@Overridepublic void onError(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void onComplete() {System.out.println("Done");}
}

这段代码演示了使用Flow API来发布和订阅消息的过程,它包含以下类和接口:

  1. Publisher<T>:发布者接口,表示能够发布指定类型的消息给订阅者。
  2. Subscriber<T>:订阅者接口,表示能够接收指定类型的消息。
  3. Subscription:订阅接口,表示订阅关系,能够请求一定数量的消息和取消订阅。
  4. SubmissionPublisher<T>:继承自Publisher<T>接口,实现了异步发布消息的能力。
  5. Flow API:一组用于处理数据流和异步操作的接口和类。

具体解释:

  1. SimplePublisher类是一个实现了Publisher接口的简单发布者类,它能够发布指定类型的消息给订阅者。它内部维护了一个SubscriptionLimitedQueue类的对象,用于限制消息队列的长度。
  2. SubscriptionLimitedQueue类是一个维护队列长度的类,用于实现限制消息队列长度的功能。
  3. SimpleSubscriber类是一个实现了Subscriber接口的简单订阅者类,它能够接收指定类型的消息,并将其输出到控制台中。
  4. main方法创建了一个SimplePublisher类的实例和一个SimpleSubscriber类的实例,然后将它们关联起来,最后向SimplePublisher类的实例中发布了两个消息,随后关闭了发布者。

运行结果:

例子:

第一步,引入依赖:

<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.5.11</version></dependency>

第二步,编写代码:

package com.example.jdk9.react;import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;public class ReactiveStreamExample {public static void main(String[] args) {Flux<Integer> stream = Flux.range(1, 10);stream.map(i -> i * 2).filter(i -> i % 3 == 0).flatMap(i -> Mono.just(i).zipWith(Mono.just(i * 3))).subscribe(System.out::println);}
}

上面的代码首先创建了一个从1到10的数字列表,然后通过map操作符将每个元素乘以2,再使用filter操作符过滤掉不能被3整除的元素。接下来,使用flatMap操作符来创建一个新的流,该流将原始元素和该元素乘以3的结果合并在一起。最后,使用subscribe方法来订阅这个流并打印出每个元素的值。

这个例子展示了Reactor库中的一些常见操作符,包括mapfilterflatMap。通过这些操作符的链式调用,我们可以轻松地对数据流进行复杂的操作。在实际的应用中,我们可以根据具体的需求选择不同的操作符来实现所需的数据处理逻辑。

使用Reactor 库实现发布订阅者模式:

package com.example.jdk9.react;import reactor.core.publisher.Flux;public class PublisherSubscriberExample {public static void main(String[] args) {// 创建发布者Flux<Integer> publisher = Flux.just(1, 2, 3, 4, 5);// 订阅者1:打印每个元素publisher.subscribe(System.out::println);// 订阅者2:计算元素的总和并打印publisher.reduce(0, Integer::sum).subscribe(total -> System.out.println("Sum = " + total));}
}

这篇关于java的反应式流的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/371081

相关文章

Spring Boot循环依赖原理、解决方案与最佳实践(全解析)

《SpringBoot循环依赖原理、解决方案与最佳实践(全解析)》循环依赖指两个或多个Bean相互直接或间接引用,形成闭环依赖关系,:本文主要介绍SpringBoot循环依赖原理、解决方案与最... 目录一、循环依赖的本质与危害1.1 什么是循环依赖?1.2 核心危害二、Spring的三级缓存机制2.1 三

在Spring Boot中浅尝内存泄漏的实战记录

《在SpringBoot中浅尝内存泄漏的实战记录》本文给大家分享在SpringBoot中浅尝内存泄漏的实战记录,结合实例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧... 目录使用静态集合持有对象引用,阻止GC回收关键点:可执行代码:验证:1,运行程序(启动时添加JVM参数限制堆大小):2,访问 htt

SpringBoot集成Milvus实现数据增删改查功能

《SpringBoot集成Milvus实现数据增删改查功能》milvus支持的语言比较多,支持python,Java,Go,node等开发语言,本文主要介绍如何使用Java语言,采用springboo... 目录1、Milvus基本概念2、添加maven依赖3、配置yml文件4、创建MilvusClient

浅析Java中如何优雅地处理null值

《浅析Java中如何优雅地处理null值》这篇文章主要为大家详细介绍了如何结合Lambda表达式和Optional,让Java更优雅地处理null值,感兴趣的小伙伴可以跟随小编一起学习一下... 目录场景 1:不为 null 则执行场景 2:不为 null 则返回,为 null 则返回特定值或抛出异常场景

SpringMVC获取请求参数的方法

《SpringMVC获取请求参数的方法》:本文主要介绍SpringMVC获取请求参数的方法,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下... 目录1、通过ServletAPI获取2、通过控制器方法的形参获取请求参数3、@RequestParam4、@

SpringBoot应用中出现的Full GC问题的场景与解决

《SpringBoot应用中出现的FullGC问题的场景与解决》这篇文章主要为大家详细介绍了SpringBoot应用中出现的FullGC问题的场景与解决方法,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录Full GC的原理与触发条件原理触发条件对Spring Boot应用的影响示例代码优化建议结论F

springboot项目中常用的工具类和api详解

《springboot项目中常用的工具类和api详解》在SpringBoot项目中,开发者通常会依赖一些工具类和API来简化开发、提高效率,以下是一些常用的工具类及其典型应用场景,涵盖Spring原生... 目录1. Spring Framework 自带工具类(1) StringUtils(2) Coll

SpringBoot条件注解核心作用与使用场景详解

《SpringBoot条件注解核心作用与使用场景详解》SpringBoot的条件注解为开发者提供了强大的动态配置能力,理解其原理和适用场景是构建灵活、可扩展应用的关键,本文将系统梳理所有常用的条件注... 目录引言一、条件注解的核心机制二、SpringBoot内置条件注解详解1、@ConditionalOn

通过Spring层面进行事务回滚的实现

《通过Spring层面进行事务回滚的实现》本文主要介绍了通过Spring层面进行事务回滚的实现,包括声明式事务和编程式事务,具有一定的参考价值,感兴趣的可以了解一下... 目录声明式事务回滚:1. 基础注解配置2. 指定回滚异常类型3. ​不回滚特殊场景编程式事务回滚:1. ​使用 TransactionT

Spring LDAP目录服务的使用示例

《SpringLDAP目录服务的使用示例》本文主要介绍了SpringLDAP目录服务的使用示例... 目录引言一、Spring LDAP基础二、LdapTemplate详解三、LDAP对象映射四、基本LDAP操作4.1 查询操作4.2 添加操作4.3 修改操作4.4 删除操作五、认证与授权六、高级特性与最佳