Disruptor系列3:Disruptor样例实战

2024-04-29 09:18
文章标签 实战 系列 样例 disruptor

本文主要是介绍Disruptor系列3:Disruptor样例实战,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

章节回顾:
- Disruptor系列1:初识Disruptor
- Disruptor系列2:Disruptor原理剖析

本章节是Disruptor样例实战,依据Disruptor的工作流依次执行的特性,实现各种样例。如果想了解Disruptor是什么,可以查看章节 Disruptor系列1:初识Disruptor ,如果想深层次了解Disruptor,可以查看章节 Disruptor系列2:Disruptor原理剖析。通过本章节,希望让大家对如何使用Disruptor有个初步认识,看看它能够解决哪些情况。

具体而言,它可以解决如下方面:
- 并行计算实现;
- 串行依次执行;
- 菱形方式执行;
- 链式并行计算。
并且基于以上情况,每种类型的消费者都可以池化,默认初始化多个同一类型的消费者实例,并行处理,提高系统吞吐量。

本样例是一个生产者生产一个Long类型的数值,消费者对该数值进行处理的操作。本样例对以上各种情况的实现只是disruptor注册消费者的方式不同,因此,我们先把事件类、事件工厂类、消费者类、事件转换类和主函数贴出来。

事件类

public class LongEvent {private Long number;public Long getNumber() {return number;}public void setNumber(Long number) {this.number = number;}
}

事件工厂类

public class LongEventFactory implements EventFactory<LongEvent> {@Overridepublic LongEvent newInstance() {return new LongEvent();}
}

事件转换类

public class LongEventTranslator implements EventTranslatorOneArg<LongEvent, Long> {@Overridepublic void translateTo(LongEvent event, long sequence, Long arg0) {event.setNumber(arg0);}
}

C1-1消费者类

该消费者执行将数值+10的操作。可以看到该消费者同时实现了EventHandlerWorkHandler两个接口。如果不需要池化,只需要实现EventHandler类即可。如果需要池化,只需要实现WorkHandler类即可。本例为了能够同时讲解池化和非池化的实现,因此同时实现了两个类,当然,也没啥问题。

public class C11EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {@Overridepublic void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {long number = event.getNumber();number += 10;System.out.println(System.currentTimeMillis()+": c1-1 consumer finished.number=" + number);}@Overridepublic void onEvent(LongEvent event) throws Exception {long number = event.getNumber();number += 10;System.out.println(System.currentTimeMillis()+": c1-1 consumer finished.number=" + number);}
}

C1-2消费者类

该消费者类执行将数值乘以10的操作。

public class C12EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {@Overridepublic void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {long number = event.getNumber();number *= 10;System.out.println(System.currentTimeMillis()+": c1-2 consumer finished.number=" + number);}@Overridepublic void onEvent(LongEvent event) throws Exception {long number = event.getNumber();number *= 10;System.out.println(System.currentTimeMillis()+": c1-2 consumer finished.number=" + number);}
}

c2-1消费者类

该消费者类负责将数值+20.

public class C21EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {@Overridepublic void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {long number = event.getNumber();number += 20;System.out.println(System.currentTimeMillis()+": c2-1 consumer finished.number=" + number);}@Overridepublic void onEvent(LongEvent event) throws Exception {long number = event.getNumber();number += 20;System.out.println(System.currentTimeMillis()+": c2-1 consumer finished.number=" + number);}
}

C2-2消费者类

该消费者类负责将数值*20

public class C22EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {@Overridepublic void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {long number = event.getNumber();number *= 20;System.out.println(System.currentTimeMillis()+": c2-2 consumer finished.number=" + number);}@Overridepublic void onEvent(LongEvent event) throws Exception {long number = event.getNumber();number *= 20;System.out.println(System.currentTimeMillis()+": c2-2 consumer finished.number=" + number);}
}

主函数

public class Main {public static void main(String[] args) {int bufferSize = 1024*1024;//环形队列长度,必须是2的N次方EventFactory<LongEvent> eventFactory = new LongEventFactory();/*** 定义Disruptor,基于单生产者,阻塞策略*/Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory,bufferSize, Executors.defaultThreadFactory(),ProducerType.SINGLE,new BlockingWaitStrategy());/////////////////////////////////////////////////////////////////////XXX(disruptor);//这里是调用各种不同方法的地方./////////////////////////////////////////////////////////////////////RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();/*** 输入10*/ringBuffer.publishEvent(new LongEventTranslator(),10L);ringBuffer.publishEvent(new LongEventTranslator(),100L);}
}

并行计算实现

这里写图片描述
并行计算就是消费者之间互相不依赖,并行执行,执行开始时间是一样的。

/*** 并行计算实现,c1,c2互相不依赖* <br/>* p --> c11*   --> c21*/public static void parallel(Disruptor<LongEvent> disruptor){disruptor.handleEventsWith(new C11EventHandler(),new C21EventHandler());disruptor.start();}

串行计算,依次执行

这里写图片描述

/*** 串行依次执行* <br/>* p --> c11 --> c21* @param disruptor*/public static void serial(Disruptor<LongEvent> disruptor){disruptor.handleEventsWith(new C11EventHandler()).then(new C21EventHandler());disruptor.start();}

菱形方式执行

这里写图片描述

/*** 菱形方式执行* <br/>*   --> c11* p          --> c21*   --> c12* @param disruptor*/public static void diamond(Disruptor<LongEvent> disruptor){disruptor.handleEventsWith(new C11EventHandler(),new C12EventHandler()).then(new C21EventHandler());disruptor.start();}

链式并行计算

这里写图片描述

/*** 链式并行计算* <br/>*   --> c11 --> c12* p*   --> c21 --> c22* @param disruptor*/public static void chain(Disruptor<LongEvent> disruptor){disruptor.handleEventsWith(new C11EventHandler()).then(new C12EventHandler());disruptor.handleEventsWith(new C21EventHandler()).then(new C22EventHandler());disruptor.start();}

上面的实例,每一种消费者都只有一个实例,如果想多个实例形成一个线程池并发处理多个任务怎么办?如果使用disruptor.handleEventWith(new C11EventHandler(),new C11EventHandler(),...)这种,会造成重复消费同一个数据,不是我们想要的。我们想要的是同一个类的实例消费不同的数据,怎么办?
- 首先,消费者类需要实现WorkHandler接口,而不是EventHandler接口。为了方便,我们同时实现了这两个接口。
- 其次,disruptor调用handleEventsWithWorkerPool方法,而不是handleEventsWith方法
- 最后,实例化多个事件消费类。

并行计算实现,c1,c2互相不依赖,同时C1,C2分别有2个实例

这里写图片描述

/*** 并行计算实现,c1,c2互相不依赖,同时C1,C2分别有2个实例* <br/>* p --> c11*   --> c21*/public static void parallelWithPool(Disruptor<LongEvent> disruptor){disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler());disruptor.handleEventsWithWorkerPool(new C21EventHandler(),new C21EventHandler());disruptor.start();}

串行依次执行,同时C11,C21分别有2个实例

这里写图片描述

/*** 串行依次执行,同时C11,C21分别有2个实例* <br/>* p --> c11 --> c21* @param disruptor*/public static void serialWithPool(Disruptor<LongEvent> disruptor){disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler()).then(new C21EventHandler(),new C21EventHandler());disruptor.start();}

这篇关于Disruptor系列3:Disruptor样例实战的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/945690

相关文章

MySQL 多列 IN 查询之语法、性能与实战技巧(最新整理)

《MySQL多列IN查询之语法、性能与实战技巧(最新整理)》本文详解MySQL多列IN查询,对比传统OR写法,强调其简洁高效,适合批量匹配复合键,通过联合索引、分批次优化提升性能,兼容多种数据库... 目录一、基础语法:多列 IN 的两种写法1. 直接值列表2. 子查询二、对比传统 OR 的写法三、性能分析

Python办公自动化实战之打造智能邮件发送工具

《Python办公自动化实战之打造智能邮件发送工具》在数字化办公场景中,邮件自动化是提升工作效率的关键技能,本文将演示如何使用Python的smtplib和email库构建一个支持图文混排,多附件,多... 目录前言一、基础配置:搭建邮件发送框架1.1 邮箱服务准备1.2 核心库导入1.3 基础发送函数二、

PowerShell中15个提升运维效率关键命令实战指南

《PowerShell中15个提升运维效率关键命令实战指南》作为网络安全专业人员的必备技能,PowerShell在系统管理、日志分析、威胁检测和自动化响应方面展现出强大能力,下面我们就来看看15个提升... 目录一、PowerShell在网络安全中的战略价值二、网络安全关键场景命令实战1. 系统安全基线核查

从原理到实战深入理解Java 断言assert

《从原理到实战深入理解Java断言assert》本文深入解析Java断言机制,涵盖语法、工作原理、启用方式及与异常的区别,推荐用于开发阶段的条件检查与状态验证,并强调生产环境应使用参数验证工具类替代... 目录深入理解 Java 断言(assert):从原理到实战引言:为什么需要断言?一、断言基础1.1 语

Java MQTT实战应用

《JavaMQTT实战应用》本文详解MQTT协议,涵盖其发布/订阅机制、低功耗高效特性、三种服务质量等级(QoS0/1/2),以及客户端、代理、主题的核心概念,最后提供Linux部署教程、Sprin... 目录一、MQTT协议二、MQTT优点三、三种服务质量等级四、客户端、代理、主题1. 客户端(Clien

在Spring Boot中集成RabbitMQ的实战记录

《在SpringBoot中集成RabbitMQ的实战记录》本文介绍SpringBoot集成RabbitMQ的步骤,涵盖配置连接、消息发送与接收,并对比两种定义Exchange与队列的方式:手动声明(... 目录前言准备工作1. 安装 RabbitMQ2. 消息发送者(Producer)配置1. 创建 Spr

深度解析Spring Boot拦截器Interceptor与过滤器Filter的区别与实战指南

《深度解析SpringBoot拦截器Interceptor与过滤器Filter的区别与实战指南》本文深度解析SpringBoot中拦截器与过滤器的区别,涵盖执行顺序、依赖关系、异常处理等核心差异,并... 目录Spring Boot拦截器(Interceptor)与过滤器(Filter)深度解析:区别、实现

深度解析Spring AOP @Aspect 原理、实战与最佳实践教程

《深度解析SpringAOP@Aspect原理、实战与最佳实践教程》文章系统讲解了SpringAOP核心概念、实现方式及原理,涵盖横切关注点分离、代理机制(JDK/CGLIB)、切入点类型、性能... 目录1. @ASPect 核心概念1.1 AOP 编程范式1.2 @Aspect 关键特性2. 完整代码实

MySQL中的索引结构和分类实战案例详解

《MySQL中的索引结构和分类实战案例详解》本文详解MySQL索引结构与分类,涵盖B树、B+树、哈希及全文索引,分析其原理与优劣势,并结合实战案例探讨创建、管理及优化技巧,助力提升查询性能,感兴趣的朋... 目录一、索引概述1.1 索引的定义与作用1.2 索引的基本原理二、索引结构详解2.1 B树索引2.2

从入门到精通MySQL 数据库索引(实战案例)

《从入门到精通MySQL数据库索引(实战案例)》索引是数据库的目录,提升查询速度,主要类型包括BTree、Hash、全文、空间索引,需根据场景选择,建议用于高频查询、关联字段、排序等,避免重复率高或... 目录一、索引是什么?能干嘛?核心作用:二、索引的 4 种主要类型(附通俗例子)1. BTree 索引(