java9反应式编程 SubmissionPublisher实现发布订阅

2023-10-28 14:20

本文主要是介绍java9反应式编程 SubmissionPublisher实现发布订阅,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一 准备工作

JAVA9 中新增了Ract Stream API 支持反应编程,今天就来自己玩一下JAVA的原生反应API。
首先需要下载jdk13 , 编译版本需要是12,因为spring5 不支持12以上的版本编译。如图所示,另外IEDA中modules 的编译也要改成12,还有setting中的也要同步改成12 。

 IEDA的编译配置

二 反应模型简介 && 代码示例

任务订阅的过程
 订阅的过程
1- new 一个SubmissionPublisher
2- 新建订阅者 Subscriberss ,编写订阅者代码
3- 提交任务

Subscription 的作用
只有 发布者 和 订阅者 是不行的,就好比订阅报纸,是谁把报纸给你送到家的呢,这里就是Subscription
,他发起请求,请求从发布者那里获取几条消息。几个item。如果发布者发布了10条,但是请求只要3条,最终也只会消费3条。 demo代码如下

  public static void main(String[] args) throws InterruptedException {// 错误处理器,可以不写BiConsumer<Subscriberss, Exception> handler = (a,b) -> {System.out.println(Throwables.getStackTraceAsString(b));a.onError(b);};//线程池也不用,默认自带FJ 的线程池ExecutorService executorService = Executors.newFixedThreadPool(20);//可以直接new 不用构造方法。2 是buffer数组的容量大小,最大32,此处抛出一个问题,如果32个item都占满了,会怎么样?SubmissionPublisher<String> sp  = new SubmissionPublisher(executorService,2,handler);//添加一个订阅者(消费者),这个方法里会初始化BufferedSubscription 也是实际上消费者消费的类sp.subscribe(new Subscriberss());//发送4笔数据sp.submit("1");sp.submit("2");sp.submit("3");sp.submit("4");//关闭stream sp.close();//此处代码无关while (true){Thread.sleep(100000);}}//实现一个消费者static class Subscriberss<String> implements Flow.Subscriber{@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("订阅消息");//拉取10个元素(消息)subscription.request(10);}@Overridepublic void onNext(Object item) {System.out.println( item.toString());}@Overridepublic void onError(Throwable throwable) {System.out.println("error");}@Overridepublic void onComplete() {System.out.println("牛逼");}}

执行结果如下
结果

代码跟踪

onSubscribe方法的描述 ,翻译成人话就是 onSubscribe在订阅者执行任何方法之前执行。所以首先打印
“订阅消息” 。

  Method invoked prior to invoking any other Subscriber methods for the given Subscription. If this method throws  an exception, resulting behavior is not guaranteed, but may  cause the Subscription not to be established or to be cancelled.

由于是异步的,不能跟着main线程,需要跟着订阅者的异步线程,BufferedSubscription默认使用的是FJ线程池,当我们提交完成后submit 方法结束后,任务就给到了 FJ线程池 或者 自己的线程池 里执行。
FORKJOIN 线程池
从BufferedSubscription 的 consume方法我们可以看到依次从 subscribeOnOpen(onSubscribe), takeItems (onNext),closeOnComplete(onComplete) 逐步调用。在等待执行的数组里,看到了我们的消息, array就是存放item的地方。

消息
注:这里不是demo里的 1,2,3,4, 这只之前的代码的跟踪

重点看一下:takeItems方法

//s 是订阅者,d是需求的数量,h是 头数组的下标final int takeItems(Subscriber<? super T> s, long d, int h) {Object[] a;int k = 0, cap;if ((a = array) != null && (cap = a.length) > 0) {//  这里提交任务的数量/8 +1 不清楚int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8)  // 如果需求数量 <  这里提交任务的数量/8 +1  就用需求数量否则就用 /8的结果int n = (d < (long)b) ? (int)d : b;//从计算的下标开始逐个执行onNext 方法 for (; k < n; ++h, ++k) { Object x = QA.getAndSet(a, h & m, null);if (waiting != 0)signalWaiter();  //线程uppark if (x == null)break;else if (!consumeNext(s, x)) // 执行onNext方法break;}}return k;}

如果request请求了3次,但是submit了4次 ,最终会消费几次? demand 也就是我们的需求次数,takeitem里是取小的那个数就是3次,为什么 除以8 就不知道 了。。

如何实现 “反应”

如果是数组把item都存储下来逐个调用,请求次数就不能写死,所以参考HTTP里的写法改造无论发多少次都可以消费完。

			static class Subscriberss<String> implements Flow.Subscriber{//保存订阅者的引用private Flow.Subscription ss;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("订阅消息");ss=subscription;subscription.request(1);}@Overridepublic void onNext(Object item) {System.out.println( item.toString());//消费完成后再次请求ss.request(1);}}

接下来我们来研究java9 是如何用反应式API 改造HTTP模块的

这篇关于java9反应式编程 SubmissionPublisher实现发布订阅的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/293802

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

高效+灵活,万博智云全球发布AWS无代理跨云容灾方案!

摘要 近日,万博智云推出了基于AWS的无代理跨云容灾解决方案,并与拉丁美洲,中东,亚洲的合作伙伴面向全球开展了联合发布。这一方案以AWS应用环境为基础,将HyperBDR平台的高效、灵活和成本效益优势与无代理功能相结合,为全球企业带来实现了更便捷、经济的数据保护。 一、全球联合发布 9月2日,万博智云CEO Michael Wong在线上平台发布AWS无代理跨云容灾解决方案的阐述视频,介绍了