Disruptor系列3:Disruptor样例实战

2024-04-29 09:18
文章标签 实战 系列 样例 disruptor

本文主要是介绍Disruptor系列3:Disruptor样例实战,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

章节回顾:
- Disruptor系列1:初识Disruptor
- Disruptor系列2:Disruptor原理剖析

本章节是Disruptor样例实战,依据Disruptor的工作流依次执行的特性,实现各种样例。如果想了解Disruptor是什么,可以查看章节 Disruptor系列1:初识Disruptor ,如果想深层次了解Disruptor,可以查看章节 Disruptor系列2:Disruptor原理剖析。通过本章节,希望让大家对如何使用Disruptor有个初步认识,看看它能够解决哪些情况。

具体而言,它可以解决如下方面:
- 并行计算实现;
- 串行依次执行;
- 菱形方式执行;
- 链式并行计算。
并且基于以上情况,每种类型的消费者都可以池化,默认初始化多个同一类型的消费者实例,并行处理,提高系统吞吐量。

本样例是一个生产者生产一个Long类型的数值,消费者对该数值进行处理的操作。本样例对以上各种情况的实现只是disruptor注册消费者的方式不同,因此,我们先把事件类、事件工厂类、消费者类、事件转换类和主函数贴出来。

事件类

public class LongEvent {private Long number;public Long getNumber() {return number;}public void setNumber(Long number) {this.number = number;}
}

事件工厂类

public class LongEventFactory implements EventFactory<LongEvent> {@Overridepublic LongEvent newInstance() {return new LongEvent();}
}

事件转换类

public class LongEventTranslator implements EventTranslatorOneArg<LongEvent, Long> {@Overridepublic void translateTo(LongEvent event, long sequence, Long arg0) {event.setNumber(arg0);}
}

C1-1消费者类

该消费者执行将数值+10的操作。可以看到该消费者同时实现了EventHandlerWorkHandler两个接口。如果不需要池化,只需要实现EventHandler类即可。如果需要池化,只需要实现WorkHandler类即可。本例为了能够同时讲解池化和非池化的实现,因此同时实现了两个类,当然,也没啥问题。

public class C11EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {@Overridepublic void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {long number = event.getNumber();number += 10;System.out.println(System.currentTimeMillis()+": c1-1 consumer finished.number=" + number);}@Overridepublic void onEvent(LongEvent event) throws Exception {long number = event.getNumber();number += 10;System.out.println(System.currentTimeMillis()+": c1-1 consumer finished.number=" + number);}
}

C1-2消费者类

该消费者类执行将数值乘以10的操作。

public class C12EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {@Overridepublic void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {long number = event.getNumber();number *= 10;System.out.println(System.currentTimeMillis()+": c1-2 consumer finished.number=" + number);}@Overridepublic void onEvent(LongEvent event) throws Exception {long number = event.getNumber();number *= 10;System.out.println(System.currentTimeMillis()+": c1-2 consumer finished.number=" + number);}
}

c2-1消费者类

该消费者类负责将数值+20.

public class C21EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {@Overridepublic void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {long number = event.getNumber();number += 20;System.out.println(System.currentTimeMillis()+": c2-1 consumer finished.number=" + number);}@Overridepublic void onEvent(LongEvent event) throws Exception {long number = event.getNumber();number += 20;System.out.println(System.currentTimeMillis()+": c2-1 consumer finished.number=" + number);}
}

C2-2消费者类

该消费者类负责将数值*20

public class C22EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {@Overridepublic void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {long number = event.getNumber();number *= 20;System.out.println(System.currentTimeMillis()+": c2-2 consumer finished.number=" + number);}@Overridepublic void onEvent(LongEvent event) throws Exception {long number = event.getNumber();number *= 20;System.out.println(System.currentTimeMillis()+": c2-2 consumer finished.number=" + number);}
}

主函数

public class Main {public static void main(String[] args) {int bufferSize = 1024*1024;//环形队列长度,必须是2的N次方EventFactory<LongEvent> eventFactory = new LongEventFactory();/*** 定义Disruptor,基于单生产者,阻塞策略*/Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory,bufferSize, Executors.defaultThreadFactory(),ProducerType.SINGLE,new BlockingWaitStrategy());/////////////////////////////////////////////////////////////////////XXX(disruptor);//这里是调用各种不同方法的地方./////////////////////////////////////////////////////////////////////RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();/*** 输入10*/ringBuffer.publishEvent(new LongEventTranslator(),10L);ringBuffer.publishEvent(new LongEventTranslator(),100L);}
}

并行计算实现

这里写图片描述
并行计算就是消费者之间互相不依赖,并行执行,执行开始时间是一样的。

/*** 并行计算实现,c1,c2互相不依赖* <br/>* p --> c11*   --> c21*/public static void parallel(Disruptor<LongEvent> disruptor){disruptor.handleEventsWith(new C11EventHandler(),new C21EventHandler());disruptor.start();}

串行计算,依次执行

这里写图片描述

/*** 串行依次执行* <br/>* p --> c11 --> c21* @param disruptor*/public static void serial(Disruptor<LongEvent> disruptor){disruptor.handleEventsWith(new C11EventHandler()).then(new C21EventHandler());disruptor.start();}

菱形方式执行

这里写图片描述

/*** 菱形方式执行* <br/>*   --> c11* p          --> c21*   --> c12* @param disruptor*/public static void diamond(Disruptor<LongEvent> disruptor){disruptor.handleEventsWith(new C11EventHandler(),new C12EventHandler()).then(new C21EventHandler());disruptor.start();}

链式并行计算

这里写图片描述

/*** 链式并行计算* <br/>*   --> c11 --> c12* p*   --> c21 --> c22* @param disruptor*/public static void chain(Disruptor<LongEvent> disruptor){disruptor.handleEventsWith(new C11EventHandler()).then(new C12EventHandler());disruptor.handleEventsWith(new C21EventHandler()).then(new C22EventHandler());disruptor.start();}

上面的实例,每一种消费者都只有一个实例,如果想多个实例形成一个线程池并发处理多个任务怎么办?如果使用disruptor.handleEventWith(new C11EventHandler(),new C11EventHandler(),...)这种,会造成重复消费同一个数据,不是我们想要的。我们想要的是同一个类的实例消费不同的数据,怎么办?
- 首先,消费者类需要实现WorkHandler接口,而不是EventHandler接口。为了方便,我们同时实现了这两个接口。
- 其次,disruptor调用handleEventsWithWorkerPool方法,而不是handleEventsWith方法
- 最后,实例化多个事件消费类。

并行计算实现,c1,c2互相不依赖,同时C1,C2分别有2个实例

这里写图片描述

/*** 并行计算实现,c1,c2互相不依赖,同时C1,C2分别有2个实例* <br/>* p --> c11*   --> c21*/public static void parallelWithPool(Disruptor<LongEvent> disruptor){disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler());disruptor.handleEventsWithWorkerPool(new C21EventHandler(),new C21EventHandler());disruptor.start();}

串行依次执行,同时C11,C21分别有2个实例

这里写图片描述

/*** 串行依次执行,同时C11,C21分别有2个实例* <br/>* p --> c11 --> c21* @param disruptor*/public static void serialWithPool(Disruptor<LongEvent> disruptor){disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler()).then(new C21EventHandler(),new C21EventHandler());disruptor.start();}

这篇关于Disruptor系列3:Disruptor样例实战的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/945690

相关文章

Golang使用minio替代文件系统的实战教程

《Golang使用minio替代文件系统的实战教程》本文讨论项目开发中直接文件系统的限制或不足,接着介绍Minio对象存储的优势,同时给出Golang的实际示例代码,包括初始化客户端、读取minio对... 目录文件系统 vs Minio文件系统不足:对象存储:miniogolang连接Minio配置Min

Node.js 中 http 模块的深度剖析与实战应用小结

《Node.js中http模块的深度剖析与实战应用小结》本文详细介绍了Node.js中的http模块,从创建HTTP服务器、处理请求与响应,到获取请求参数,每个环节都通过代码示例进行解析,旨在帮... 目录Node.js 中 http 模块的深度剖析与实战应用一、引言二、创建 HTTP 服务器:基石搭建(一

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount

科研绘图系列:R语言扩展物种堆积图(Extended Stacked Barplot)

介绍 R语言的扩展物种堆积图是一种数据可视化工具,它不仅展示了物种的堆积结果,还整合了不同样本分组之间的差异性分析结果。这种图形表示方法能够直观地比较不同物种在各个分组中的显著性差异,为研究者提供了一种有效的数据解读方式。 加载R包 knitr::opts_chunk$set(warning = F, message = F)library(tidyverse)library(phyl

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言

滚雪球学Java(87):Java事务处理:JDBC的ACID属性与实战技巧!真有两下子!

咦咦咦,各位小可爱,我是你们的好伙伴——bug菌,今天又来给大家普及Java SE啦,别躲起来啊,听我讲干货还不快点赞,赞多了我就有动力讲得更嗨啦!所以呀,养成先点赞后阅读的好习惯,别被干货淹没了哦~ 🏆本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,助你一臂之力,带你早日登顶🚀,欢迎大家关注&&收藏!持续更新中,up!up!up!! 环境说明:Windows 10

flume系列之:查看flume系统日志、查看统计flume日志类型、查看flume日志

遍历指定目录下多个文件查找指定内容 服务器系统日志会记录flume相关日志 cat /var/log/messages |grep -i oom 查找系统日志中关于flume的指定日志 import osdef search_string_in_files(directory, search_string):count = 0