无锁队列Disruptor使用笔记

2023-10-20 12:28

本文主要是介绍无锁队列Disruptor使用笔记,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 一. 背景

        Disruptor是由英国外汇公司LMAX 于2010年公开的一款用于线程间数据交互的高并发组件,其原型是一个有界的环形队列,通过巧妙的设计使得该队列在多线程环境下无需加锁就能保证消息的安全可靠,为软件系统带来指数级性能提升,可以参考博文 https://blog.csdn.net/21aspnet/article/details/89709221 了解关于disruptor的更多内容。由于Disruptor原版是java开发的,而本人是从事C/C++开发工作的,期望找一个C/C++版本的实现,用到自己的工作中。为了避免令人头疼的第三方依赖问题,优先考虑使用现代C++特性,即C++ 11以上版本的实现(不想引入Boost库<_>)。Disruptor4cpp(GitHub - alexleemanfui/disruptor4cpp: C++ port of LMAX disruptor)正好满足上述需求,虽然该版本只实现到Disruptor 3.3.2版本,并且好像没有继续维护的迹象,但是3.3.2 版本的Disruptor对于大部分系统来说已经够用了,尤其是它对Disruptor一些关键特性的实现,能够很好地帮助我们理解Disruptor的工作原理。所以,最近对Disruptor4cpp展开了一些研究,下面对学习和使用过程中的一些问题做了记录。

二.  生产与消费模式

Disruptor不对生产者和消费者的数量做限制,可以是单生产者->单消费者,单生产者->多消费者,多生产者->单消费者,以及多生产者->多消费者(这里的每个生产者和消费者都指的是一个线程,而不是某个抽象业务的类实例)。

实际上,Disruptor支持两种消费模式:多播模式 和 竞争模式。多播模式下,每个消费者都会处理所有的消息,即一个消息会被多个消费者重复消费;竞争模式下,多个消费者竞争同一批消息,即一个消息仅被消费一次,分别对应了java版本实现中的BatchEventProcessor和WorkProcessor。

在Disruptor4cpp的实现中,只提供了BatchEventProcessor消费模式,给出的示例程序也是多播消费模式,如果想要使用竞争消费模式,则需要自己实现WorkProcessor。以下给出了WorkProcessor模式的实现,是对Disruptor4cpp的一个补充,包括两个文件: work_handler.h和race_work_processor.h。

// file: work_handler.h/*
Copyright (c) 2015, Alex Man-fui Lee
All rights reserved.Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:* Redistributions of source code must retain the above copyright notice, thislist of conditions and the following disclaimer.* Redistributions in binary form must reproduce the above copyright notice,this list of conditions and the following disclaimer in the documentationand/or other materials provided with the distribution.* Neither the name of disruptor4cpp nor the names of itscontributors may be used to endorse or promote products derived fromthis software without specific prior written permission.THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/#ifndef DISRUPTOR4CPP_WORK_HANDLER_H_
#define DISRUPTOR4CPP_WORK_HANDLER_H_#include <cstdint>
#include <exception>namespace disruptor4cpp
{template<class TEvent>class work_handler{public:virtual ~work_handler(){};virtual void on_start() = 0;virtual void on_shutdown() = 0;virtual void on_event(TEvent& event, int64_t sequence) = 0;virtual void on_timeout(int64_t sequence) = 0;virtual void on_event_exception(const std::exception& ex, int64_t sequence, TEvent* event) = 0;virtual void on_start_exception(const std::exception& ex) = 0;virtual void on_shutdown_exception(const std::exception& ex) = 0;};
}#endif
//file: race_work_processor.h
/*
Copyright (c) 2015, Alex Man-fui Lee
All rights reserved.Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:* Redistributions of source code must retain the above copyright notice, thislist of conditions and the following disclaimer.* Redistributions in binary form must reproduce the above copyright notice,this list of conditions and the following disclaimer in the documentationand/or other materials provided with the distribution.* Neither the name of disruptor4cpp nor the names of itscontributors may be used to endorse or promote products derived fromthis software without specific prior written permission.THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/#ifndef DISRUPTOR4CPP_RACE_WORK_PROCESSOR_H_
#define DISRUPTOR4CPP_RACE_WORK_PROCESSOR_H_#include <atomic>
#include <cstdint>
#include <memory>
#include <stdexcept>#include "work_handler.h"
#include "exceptions/alert_exception.h"
#include "exceptions/timeout_exception.h"
#include "sequence.h"namespace disruptor4cpp
{template <typename TRingBuffer>class race_work_processor{public:race_work_processor(TRingBuffer& ring_buffer,typename TRingBuffer::sequence_barrier_type& sequence_barrier,work_handler<typename TRingBuffer::event_type>& wkr_handler,typename TRingBuffer::sequence_type& worksequence): sequence_(),ring_buffer_(ring_buffer),sequence_barrier_(sequence_barrier),work_handler_(wkr_handler),work_sequence_(worksequence),running_(false){}race_work_processor(TRingBuffer& ring_buffer,std::unique_ptr<typename TRingBuffer::sequence_barrier_type> sequence_barrier_ptr,work_handler<typename TRingBuffer::event_type>& wkr_handler,typename TRingBuffer::sequence_type& worksequence): sequence_(),ring_buffer_(ring_buffer),sequence_barrier_(*sequence_barrier_ptr),work_handler_(wkr_handler),sequence_barrier_ptr_(std::move(sequence_barrier_ptr)),work_sequence_(worksequence),running_(false){}typename TRingBuffer::sequence_type& get_sequence(){return sequence_;}void halt(){running_.store(false, std::memory_order_release);sequence_barrier_.alert();}bool is_running() const{return running_.load(std::memory_order_acquire);}void run(){bool expected_running_state = false;if (!running_.compare_exchange_strong(expected_running_state, true))throw std::runtime_error("Thread is already running");sequence_barrier_.clear_alert();notify_start();typename TRingBuffer::event_type* event=nullptr;int64_t next_sequence;int64_t cached_available_sequence = LLONG_MIN;bool processedSequence = true;try{while (true){try{if (processedSequence){processedSequence = false;do{ next_sequence = work_sequence_.get() + 1;sequence_.set(next_sequence-1);} while (!work_sequence_.compare_and_set(next_sequence - 1, next_sequence));}if (cached_available_sequence >= next_sequence){event = &ring_buffer_[next_sequence];work_handler_.on_event(*event, next_sequence);processedSequence = true;}else{cached_available_sequence = sequence_barrier_.wait_for(next_sequence);}}catch (timeout_exception& timeout_ex){notify_timeout(sequence_.get());}catch (alert_exception& alert_ex){if (!running_.load(std::memory_order_acquire))break;}catch (std::exception& ex){work_handler_.on_event_exception(ex, next_sequence, event);processedSequence = true;}}}catch (...){notify_shutdown();running_.store(false, std::memory_order_release);throw;}notify_shutdown();running_.store(false, std::memory_order_release);}private:void notify_timeout(int64_t available_sequence){try{work_handler_.on_timeout(available_sequence);}catch (std::exception& ex){work_handler_.on_event_exception(ex, available_sequence, nullptr);}}void notify_start(){try{work_handler_.on_start();}catch (std::exception& ex){work_handler_.on_start_exception(ex);}}void notify_shutdown(){try{work_handler_.on_shutdown();}catch (std::exception& ex){work_handler_.on_shutdown_exception(ex);}}typename TRingBuffer::sequence_type sequence_;TRingBuffer& ring_buffer_;typename TRingBuffer::sequence_barrier_type& sequence_barrier_;//引用到ring_buffer的序列屏障,用于控制消费者不要消费没有写成功的数据work_handler<typename TRingBuffer::event_type>& work_handler_;typename TRingBuffer::sequence_type& work_sequence_;//引用到一个由多个消费者共同维护的消费序列,避免重复消费std::unique_ptr<typename TRingBuffer::sequence_barrier_type> sequence_barrier_ptr_;std::atomic<bool> running_;};
}#endif

 三. 测试

在Discruptor提供的示例代码的基础上,增加竞争消费示例,代码如下。在竞争消费模式下,每条消息的真正被消费的时机是不确定的,有可能先入队的消息后被消费,因此可以看到打印顺序是乱的,这也是这种消费模式的重要特性。

//main.cpp#include <cstdint>
#include <exception>
#include <iostream>
#include <thread>
//将work_handler.h和race_work_processor.h放到disruptor4cpp.h同级目录,并在disruptor4cpp.h中添加 #include <race_work_processor.h>#include <disruptor4cpp/disruptor4cpp.h>class int_event_handler : public disruptor4cpp::event_handler<int>
{
public:int_event_handler() = default;virtual ~int_event_handler() = default;virtual void on_start() { }virtual void on_shutdown() { }virtual void on_event(int& event, int64_t sequence, bool end_of_batch){std::cout << "Received integer: " << event << std::endl;}virtual void on_timeout(int64_t sequence) { }virtual void on_event_exception(const std::exception& ex, int64_t sequence, int* event) { }virtual void on_start_exception(const std::exception& ex) { }virtual void on_shutdown_exception(const std::exception& ex) { }
};class int_work_handler :public disruptor4cpp::work_handler<int>
{
public:int_work_handler() = default;virtual ~int_work_handler() = default;void SetWorkerId(int id){ worker_id = id; }virtual void on_start() { }virtual void on_shutdown() { }virtual void on_event(int& event, int64_t sequence){std::cout << worker_id << " Received integer: " << event << std::endl;}virtual void on_timeout(int64_t sequence) { }virtual void on_event_exception(const std::exception& ex, int64_t sequence, int* event) { }virtual void on_start_exception(const std::exception& ex) { }virtual void on_shutdown_exception(const std::exception& ex) { }
private:int worker_id = -1;
};void TestEventProcessor()
{using namespace disruptor4cpp;// 创建一个支持多生产者写入的环形队列ring_bufferring_buffer<int, 1024, busy_spin_wait_strategy, producer_type::multi> ring_buffer;// 创建用于控制写入ring_buffer的序列屏障,因为该队列是最上游的一个队列,没有依赖其他的序列屏障,所以无需传参auto barrier = ring_buffer.new_barrier();int_event_handler handler;//基于队列的写屏障,创建一个消费者batch_event_processor<decltype(ring_buffer)> processor(ring_buffer, *barrier.get(), handler);//将消费者的读屏障加入到监听序列,以便控制生产者写入时,避免覆盖未消费的数据ring_buffer.add_gating_sequences({ &processor.get_sequence(),});//启动消费者线程,开始消费std::thread processor_thread([&processor] { processor.run(); });// 循环写入1000条数据(也可以在多个线程中向ring_buffer中写数据)for (int i = 0; i < 1000; i++){int64_t seq = ring_buffer.next();ring_buffer[seq] = i;ring_buffer.publish(seq);}// Stop the consumer.std::this_thread::sleep_for(std::chrono::seconds(1));processor.halt();processor_thread.join();return;
}void TestWorkProcessor()
{using namespace disruptor4cpp;// 创建一个支持多生产者写入的环形队列ring_bufferring_buffer<int, 1024, busy_spin_wait_strategy, producer_type::multi> ring_buffer;// 创建用于控制写入ring_buffer的序列屏障,因为该队列是最上游的一个队列,没有依赖其他的序列屏障,所以无需传参auto barrier = ring_buffer.new_barrier();sequence worksequence;int_work_handler wkr1;wkr1.SetWorkerId(1);race_work_processor<decltype(ring_buffer)> work1(ring_buffer, *barrier.get(), wkr1, worksequence);int_work_handler wkr2;wkr2.SetWorkerId(2);race_work_processor<decltype(ring_buffer)> work2(ring_buffer, *barrier.get(), wkr2, worksequence);ring_buffer.add_gating_sequences({ &work1.get_sequence(), &work2.get_sequence(), &worksequence });std::thread wkr1_thread([&work1] { work1.run(); });std::thread wkr2_thread([&work2] { work2.run(); });// 循环写入1000条数据(也可以在多个线程中,向ring_buffer中写数据)for (int i = 0; i < 1000; i++){int64_t seq = ring_buffer.next();ring_buffer[seq] = i;ring_buffer.publish(seq);}// Stop the consumer.std::this_thread::sleep_for(std::chrono::seconds(1));work1.halt();work2.halt();wkr1_thread.join();wkr2_thread.join();return;
}int main(int argc, char* argv[])
{//TestWorkProcessor();TestEventProcessor();return 0;
}

 

这篇关于无锁队列Disruptor使用笔记的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/247079

相关文章

C语言中联合体union的使用

本文编辑整理自: http://bbs.chinaunix.net/forum.php?mod=viewthread&tid=179471 一、前言 “联合体”(union)与“结构体”(struct)有一些相似之处。但两者有本质上的不同。在结构体中,各成员有各自的内存空间, 一个结构变量的总长度是各成员长度之和。而在“联合”中,各成员共享一段内存空间, 一个联合变量

Tolua使用笔记(上)

目录   1.准备工作 2.运行例子 01.HelloWorld:在C#中,创建和销毁Lua虚拟机 和 简单调用。 02.ScriptsFromFile:在C#中,对一个lua文件的执行调用 03.CallLuaFunction:在C#中,对lua函数的操作 04.AccessingLuaVariables:在C#中,对lua变量的操作 05.LuaCoroutine:在Lua中,

AssetBundle学习笔记

AssetBundle是unity自定义的资源格式,通过调用引擎的资源打包接口对资源进行打包成.assetbundle格式的资源包。本文介绍了AssetBundle的生成,使用,加载,卸载以及Unity资源更新的一个基本步骤。 目录 1.定义: 2.AssetBundle的生成: 1)设置AssetBundle包的属性——通过编辑器界面 补充:分组策略 2)调用引擎接口API

Vim使用基础篇

本文内容大部分来自 vimtutor,自带的教程的总结。在终端输入vimtutor 即可进入教程。 先总结一下,然后再分别介绍正常模式,插入模式,和可视模式三种模式下的命令。 目录 看完以后的汇总 1.正常模式(Normal模式) 1.移动光标 2.删除 3.【:】输入符 4.撤销 5.替换 6.重复命令【. ; ,】 7.复制粘贴 8.缩进 2.插入模式 INSERT

Lipowerline5.0 雷达电力应用软件下载使用

1.配网数据处理分析 针对配网线路点云数据,优化了分类算法,支持杆塔、导线、交跨线、建筑物、地面点和其他线路的自动分类;一键生成危险点报告和交跨报告;还能生成点云数据采集航线和自主巡检航线。 获取软件安装包联系邮箱:2895356150@qq.com,资源源于网络,本介绍用于学习使用,如有侵权请您联系删除! 2.新增快速版,简洁易上手 支持快速版和专业版切换使用,快速版界面简洁,保留主

如何免费的去使用connectedpapers?

免费使用connectedpapers 1. 打开谷歌浏览器2. 按住ctrl+shift+N,进入无痕模式3. 不需要登录(也就是访客模式)4. 两次用完,关闭无痕模式(继续重复步骤 2 - 4) 1. 打开谷歌浏览器 2. 按住ctrl+shift+N,进入无痕模式 输入网址:https://www.connectedpapers.com/ 3. 不需要登录(也就是

《offer来了》第二章学习笔记

1.集合 Java四种集合:List、Queue、Set和Map 1.1.List:可重复 有序的Collection ArrayList: 基于数组实现,增删慢,查询快,线程不安全 Vector: 基于数组实现,增删慢,查询快,线程安全 LinkedList: 基于双向链实现,增删快,查询慢,线程不安全 1.2.Queue:队列 ArrayBlockingQueue:

Toolbar+DrawerLayout使用详情结合网络各大神

最近也想搞下toolbar+drawerlayout的使用。结合网络上各大神的杰作,我把大部分的内容效果都完成了遍。现在记录下各个功能效果的实现以及一些细节注意点。 这图弹出两个菜单内容都是仿QQ界面的选项。左边一个是drawerlayout的弹窗。右边是toolbar的popup弹窗。 开始实现步骤详情: 1.创建toolbar布局跟drawerlayout布局 <?xml vers

操作系统实训复习笔记(1)

目录 Linux vi/vim编辑器(简单) (1)vi/vim基本用法。 (2)vi/vim基础操作。 进程基础操作(简单) (1)fork()函数。 写文件系统函数(中等) ​编辑 (1)C语言读取文件。 (2)C语言写入文件。 1、write()函数。  读文件系统函数(简单) (1)read()函数。 作者本人的操作系统实训复习笔记 Linux

C#中,decimal类型使用

在Microsoft SQL Server中numeric类型,在C#中使用的时候,需要用decimal类型与其对应,不能使用int等类型。 SQL:numeric C#:decimal