用Disruptor框架实现生产者-消费者模式

2023-12-26 01:28

本文主要是介绍用Disruptor框架实现生产者-消费者模式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ConcurrentLinkedQueue队列的秘诀就在于大量使用了无锁CAS操作。
现成的Disruptor框架实现CAS进行编程。
无锁的缓存框架:Disruptor
它使用无锁的方式实现了一个环形队列,非常适合实现生产者-消费者模式,
比如事件和消息的发布。如果队列是环形的,则只需要对外提供一个当前位置cursor,
利用这个指针即可用进入队操作,也可用进行出队操作。
由于环形队列的缘故,队列的总大小必须事先指定,不能动态扩展。
为了能够快速从一个序列对于到数组的实际位置,每次有元素入队,序列就加1.
Disruptor框架要求我们必须将数组的大小设置为2的整数次方。这样通过sequence&(queueSize-1)
就能立即定位到实际的元素位置index,这比取余%操作快得多。
如果搭建不理解上面的sequence&(queueSize-1),那么我在这里再简单说明一下。
如果queueSize是2的整数次幂。则这个数字的二进制表示必然是10、100、1000、10000等形式。

果一个数字是2的整数次幂,其二进制表示的确是形如10、100、1000、10000等的形式。
这是因为2的整数次幂在二进制中只有一个比特位为1,其余都为0。例如:
2^1 = 2,二进制表示为10
2^2 = 4,二进制表示为100
2^3 = 8,二进制表示为1000
2^4 = 16,二进制表示为10000
这种规律一直持续下去。这样的性质在计算机科学和计算机工程中经常会被利用,
特别是在处理队列(Queue)等数据结构的大小时。
Disruptor框架


生产者需要一个 RingBuffer的引用,也就是环形缓冲区。
它有一个重要的方法pushData()将产生的数据推入缓冲区。
方法pushData()接收一个ByteBuffer对象。在ByteBuffer对象中
可用用来包装任何数据类型。这里用来存储long整数,
pushData()方法的功能就是将传入的ByteBuffer对象中的数据提取出来,
并转载到环形缓冲区中。
只有发布后的数据才会真正被消费者看见。

public class Consumer implements WorkHandler<PCData> {@Overridepublic void onEvent(PCData event) throws Exception{System.out.println(Thread.currentThread().getId()+":Event: --"+ event.get() * event.get() +"--");}}
public class PCData {private long value;public void set(long value){this.value = value;}public long get(){return value;}}
public class PCDataFactory implements EventFactory<PCData> {public PCData newInstance(){return new PCData();}
}
public class Producer {private final RingBuffer<PCData> ringBuffer;public Producer(RingBuffer<PCData> ringBuffer){this.ringBuffer = ringBuffer;}public void pushData(ByteBuffer bb){long sequence = ringBuffer.next();try{PCData event = ringBuffer.get(sequence);event.set(bb.getLong(0));}finally {ringBuffer.publish(sequence);}}}
public static void main(String[] args) {Executor executors =  Executors.newCachedThreadPool();PCDataFactory factory = new PCDataFactory();int bufferSize = 1024;Disruptor<PCData> disruptor = new Disruptor<PCData>((EventFactory<PCData>) factory,bufferSize,(Executor) executors,ProducerType.MULTI,new BlockingWaitStrategy());disruptor.handleEventsWithWorkerPool(new Consumer(),new Consumer(),new Consumer(),new Consumer());disruptor.start();RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();Producer producer = new Producer(ringBuffer);ByteBuffer bb = ByteBuffer.allocate(8);for(long l = 0;true;l++){bb.putLong(0,l);producer.pushData(bb);try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("add data "+ l);}}

main 函数是一个无限循环,因为它包含了一个 for 循环,其中的条件是 true。因此,这个 main 函数会一直执行下去,直到程序被手动中断或出现异常导致程序终止。

在循环中,你不断地向 RingBuffer 中发布数据,由 Producer 类的 pushData 方法完成。同时,RingBuffer 中的数据会被多个消费者(Consumer 类)并发地处理。

你在循环中使用了 Thread.sleep(100),这会导致每次循环执行后线程暂停 100 毫秒。因此,每次数据被发布后,你会看到 "add data" 的输出,并且在消费者的 onEvent 方法中会输出相应的信息。

由于这是一个生产者-消费者模型,生产者生产数据,消费者处理数据,程序在生产和消费之间持续运行。

如果你希望在某个条件下结束程序,你需要在循环中添加相应的退出条件,并在满足条件时使用 break 语句跳出循环。否则,这个程序将一直执行下去。

这篇关于用Disruptor框架实现生产者-消费者模式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/537655

相关文章

C++使用栈实现括号匹配的代码详解

《C++使用栈实现括号匹配的代码详解》在编程中,括号匹配是一个常见问题,尤其是在处理数学表达式、编译器解析等任务时,栈是一种非常适合处理此类问题的数据结构,能够精确地管理括号的匹配问题,本文将通过C+... 目录引言问题描述代码讲解代码解析栈的状态表示测试总结引言在编程中,括号匹配是一个常见问题,尤其是在

Java实现检查多个时间段是否有重合

《Java实现检查多个时间段是否有重合》这篇文章主要为大家详细介绍了如何使用Java实现检查多个时间段是否有重合,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录流程概述步骤详解China编程步骤1:定义时间段类步骤2:添加时间段步骤3:检查时间段是否有重合步骤4:输出结果示例代码结语作

使用C++实现链表元素的反转

《使用C++实现链表元素的反转》反转链表是链表操作中一个经典的问题,也是面试中常见的考题,本文将从思路到实现一步步地讲解如何实现链表的反转,帮助初学者理解这一操作,我们将使用C++代码演示具体实现,同... 目录问题定义思路分析代码实现带头节点的链表代码讲解其他实现方式时间和空间复杂度分析总结问题定义给定

Java覆盖第三方jar包中的某一个类的实现方法

《Java覆盖第三方jar包中的某一个类的实现方法》在我们日常的开发中,经常需要使用第三方的jar包,有时候我们会发现第三方的jar包中的某一个类有问题,或者我们需要定制化修改其中的逻辑,那么应该如何... 目录一、需求描述二、示例描述三、操作步骤四、验证结果五、实现原理一、需求描述需求描述如下:需要在

如何使用Java实现请求deepseek

《如何使用Java实现请求deepseek》这篇文章主要为大家详细介绍了如何使用Java实现请求deepseek功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1.deepseek的api创建2.Java实现请求deepseek2.1 pom文件2.2 json转化文件2.2

python使用fastapi实现多语言国际化的操作指南

《python使用fastapi实现多语言国际化的操作指南》本文介绍了使用Python和FastAPI实现多语言国际化的操作指南,包括多语言架构技术栈、翻译管理、前端本地化、语言切换机制以及常见陷阱和... 目录多语言国际化实现指南项目多语言架构技术栈目录结构翻译工作流1. 翻译数据存储2. 翻译生成脚本

如何通过Python实现一个消息队列

《如何通过Python实现一个消息队列》这篇文章主要为大家详细介绍了如何通过Python实现一个简单的消息队列,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录如何通过 python 实现消息队列如何把 http 请求放在队列中执行1. 使用 queue.Queue 和 reque

Python如何实现PDF隐私信息检测

《Python如何实现PDF隐私信息检测》随着越来越多的个人信息以电子形式存储和传输,确保这些信息的安全至关重要,本文将介绍如何使用Python检测PDF文件中的隐私信息,需要的可以参考下... 目录项目背景技术栈代码解析功能说明运行结php果在当今,数据隐私保护变得尤为重要。随着越来越多的个人信息以电子形

使用 sql-research-assistant进行 SQL 数据库研究的实战指南(代码实现演示)

《使用sql-research-assistant进行SQL数据库研究的实战指南(代码实现演示)》本文介绍了sql-research-assistant工具,该工具基于LangChain框架,集... 目录技术背景介绍核心原理解析代码实现演示安装和配置项目集成LangSmith 配置(可选)启动服务应用场景

使用Python快速实现链接转word文档

《使用Python快速实现链接转word文档》这篇文章主要为大家详细介绍了如何使用Python快速实现链接转word文档功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 演示代码展示from newspaper import Articlefrom docx import