java9反应式编程 SubmissionPublisher实现发布订阅

2023-10-28 14:20

本文主要是介绍java9反应式编程 SubmissionPublisher实现发布订阅,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一 准备工作

JAVA9 中新增了Ract Stream API 支持反应编程,今天就来自己玩一下JAVA的原生反应API。
首先需要下载jdk13 , 编译版本需要是12,因为spring5 不支持12以上的版本编译。如图所示,另外IEDA中modules 的编译也要改成12,还有setting中的也要同步改成12 。

 IEDA的编译配置

二 反应模型简介 && 代码示例

任务订阅的过程
 订阅的过程
1- new 一个SubmissionPublisher
2- 新建订阅者 Subscriberss ,编写订阅者代码
3- 提交任务

Subscription 的作用
只有 发布者 和 订阅者 是不行的,就好比订阅报纸,是谁把报纸给你送到家的呢,这里就是Subscription
,他发起请求,请求从发布者那里获取几条消息。几个item。如果发布者发布了10条,但是请求只要3条,最终也只会消费3条。 demo代码如下

  public static void main(String[] args) throws InterruptedException {// 错误处理器,可以不写BiConsumer<Subscriberss, Exception> handler = (a,b) -> {System.out.println(Throwables.getStackTraceAsString(b));a.onError(b);};//线程池也不用,默认自带FJ 的线程池ExecutorService executorService = Executors.newFixedThreadPool(20);//可以直接new 不用构造方法。2 是buffer数组的容量大小,最大32,此处抛出一个问题,如果32个item都占满了,会怎么样?SubmissionPublisher<String> sp  = new SubmissionPublisher(executorService,2,handler);//添加一个订阅者(消费者),这个方法里会初始化BufferedSubscription 也是实际上消费者消费的类sp.subscribe(new Subscriberss());//发送4笔数据sp.submit("1");sp.submit("2");sp.submit("3");sp.submit("4");//关闭stream sp.close();//此处代码无关while (true){Thread.sleep(100000);}}//实现一个消费者static class Subscriberss<String> implements Flow.Subscriber{@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("订阅消息");//拉取10个元素(消息)subscription.request(10);}@Overridepublic void onNext(Object item) {System.out.println( item.toString());}@Overridepublic void onError(Throwable throwable) {System.out.println("error");}@Overridepublic void onComplete() {System.out.println("牛逼");}}

执行结果如下
结果

代码跟踪

onSubscribe方法的描述 ,翻译成人话就是 onSubscribe在订阅者执行任何方法之前执行。所以首先打印
“订阅消息” 。

  Method invoked prior to invoking any other Subscriber methods for the given Subscription. If this method throws  an exception, resulting behavior is not guaranteed, but may  cause the Subscription not to be established or to be cancelled.

由于是异步的,不能跟着main线程,需要跟着订阅者的异步线程,BufferedSubscription默认使用的是FJ线程池,当我们提交完成后submit 方法结束后,任务就给到了 FJ线程池 或者 自己的线程池 里执行。
FORKJOIN 线程池
从BufferedSubscription 的 consume方法我们可以看到依次从 subscribeOnOpen(onSubscribe), takeItems (onNext),closeOnComplete(onComplete) 逐步调用。在等待执行的数组里,看到了我们的消息, array就是存放item的地方。

消息
注:这里不是demo里的 1,2,3,4, 这只之前的代码的跟踪

重点看一下:takeItems方法

//s 是订阅者,d是需求的数量,h是 头数组的下标final int takeItems(Subscriber<? super T> s, long d, int h) {Object[] a;int k = 0, cap;if ((a = array) != null && (cap = a.length) > 0) {//  这里提交任务的数量/8 +1 不清楚int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8)  // 如果需求数量 <  这里提交任务的数量/8 +1  就用需求数量否则就用 /8的结果int n = (d < (long)b) ? (int)d : b;//从计算的下标开始逐个执行onNext 方法 for (; k < n; ++h, ++k) { Object x = QA.getAndSet(a, h & m, null);if (waiting != 0)signalWaiter();  //线程uppark if (x == null)break;else if (!consumeNext(s, x)) // 执行onNext方法break;}}return k;}

如果request请求了3次,但是submit了4次 ,最终会消费几次? demand 也就是我们的需求次数,takeitem里是取小的那个数就是3次,为什么 除以8 就不知道 了。。

如何实现 “反应”

如果是数组把item都存储下来逐个调用,请求次数就不能写死,所以参考HTTP里的写法改造无论发多少次都可以消费完。

			static class Subscriberss<String> implements Flow.Subscriber{//保存订阅者的引用private Flow.Subscription ss;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("订阅消息");ss=subscription;subscription.request(1);}@Overridepublic void onNext(Object item) {System.out.println( item.toString());//消费完成后再次请求ss.request(1);}}

接下来我们来研究java9 是如何用反应式API 改造HTTP模块的

这篇关于java9反应式编程 SubmissionPublisher实现发布订阅的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/293802

相关文章

详解Java如何向http/https接口发出请求

《详解Java如何向http/https接口发出请求》这篇文章主要为大家详细介绍了Java如何实现向http/https接口发出请求,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 用Java发送web请求所用到的包都在java.net下,在具体使用时可以用如下代码,你可以把它封装成一

使用Python实现在Word中添加或删除超链接

《使用Python实现在Word中添加或删除超链接》在Word文档中,超链接是一种将文本或图像连接到其他文档、网页或同一文档中不同部分的功能,本文将为大家介绍一下Python如何实现在Word中添加或... 在Word文档中,超链接是一种将文本或图像连接到其他文档、网页或同一文档中不同部分的功能。通过添加超

windos server2022里的DFS配置的实现

《windosserver2022里的DFS配置的实现》DFS是WindowsServer操作系统提供的一种功能,用于在多台服务器上集中管理共享文件夹和文件的分布式存储解决方案,本文就来介绍一下wi... 目录什么是DFS?优势:应用场景:DFS配置步骤什么是DFS?DFS指的是分布式文件系统(Distr

NFS实现多服务器文件的共享的方法步骤

《NFS实现多服务器文件的共享的方法步骤》NFS允许网络中的计算机之间共享资源,客户端可以透明地读写远端NFS服务器上的文件,本文就来介绍一下NFS实现多服务器文件的共享的方法步骤,感兴趣的可以了解一... 目录一、简介二、部署1、准备1、服务端和客户端:安装nfs-utils2、服务端:创建共享目录3、服

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J

JAVA系统中Spring Boot应用程序的配置文件application.yml使用详解

《JAVA系统中SpringBoot应用程序的配置文件application.yml使用详解》:本文主要介绍JAVA系统中SpringBoot应用程序的配置文件application.yml的... 目录文件路径文件内容解释1. Server 配置2. Spring 配置3. Logging 配置4. Ma

Java 字符数组转字符串的常用方法

《Java字符数组转字符串的常用方法》文章总结了在Java中将字符数组转换为字符串的几种常用方法,包括使用String构造函数、String.valueOf()方法、StringBuilder以及A... 目录1. 使用String构造函数1.1 基本转换方法1.2 注意事项2. 使用String.valu

C#使用yield关键字实现提升迭代性能与效率

《C#使用yield关键字实现提升迭代性能与效率》yield关键字在C#中简化了数据迭代的方式,实现了按需生成数据,自动维护迭代状态,本文主要来聊聊如何使用yield关键字实现提升迭代性能与效率,感兴... 目录前言传统迭代和yield迭代方式对比yield延迟加载按需获取数据yield break显式示迭

Python实现高效地读写大型文件

《Python实现高效地读写大型文件》Python如何读写的是大型文件,有没有什么方法来提高效率呢,这篇文章就来和大家聊聊如何在Python中高效地读写大型文件,需要的可以了解下... 目录一、逐行读取大型文件二、分块读取大型文件三、使用 mmap 模块进行内存映射文件操作(适用于大文件)四、使用 pand