无锁队列Disruptor使用笔记

2023-10-20 12:28

本文主要是介绍无锁队列Disruptor使用笔记,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 一. 背景

        Disruptor是由英国外汇公司LMAX 于2010年公开的一款用于线程间数据交互的高并发组件,其原型是一个有界的环形队列,通过巧妙的设计使得该队列在多线程环境下无需加锁就能保证消息的安全可靠,为软件系统带来指数级性能提升,可以参考博文 https://blog.csdn.net/21aspnet/article/details/89709221 了解关于disruptor的更多内容。由于Disruptor原版是java开发的,而本人是从事C/C++开发工作的,期望找一个C/C++版本的实现,用到自己的工作中。为了避免令人头疼的第三方依赖问题,优先考虑使用现代C++特性,即C++ 11以上版本的实现(不想引入Boost库<_>)。Disruptor4cpp(GitHub - alexleemanfui/disruptor4cpp: C++ port of LMAX disruptor)正好满足上述需求,虽然该版本只实现到Disruptor 3.3.2版本,并且好像没有继续维护的迹象,但是3.3.2 版本的Disruptor对于大部分系统来说已经够用了,尤其是它对Disruptor一些关键特性的实现,能够很好地帮助我们理解Disruptor的工作原理。所以,最近对Disruptor4cpp展开了一些研究,下面对学习和使用过程中的一些问题做了记录。

二.  生产与消费模式

Disruptor不对生产者和消费者的数量做限制,可以是单生产者->单消费者,单生产者->多消费者,多生产者->单消费者,以及多生产者->多消费者(这里的每个生产者和消费者都指的是一个线程,而不是某个抽象业务的类实例)。

实际上,Disruptor支持两种消费模式:多播模式 和 竞争模式。多播模式下,每个消费者都会处理所有的消息,即一个消息会被多个消费者重复消费;竞争模式下,多个消费者竞争同一批消息,即一个消息仅被消费一次,分别对应了java版本实现中的BatchEventProcessor和WorkProcessor。

在Disruptor4cpp的实现中,只提供了BatchEventProcessor消费模式,给出的示例程序也是多播消费模式,如果想要使用竞争消费模式,则需要自己实现WorkProcessor。以下给出了WorkProcessor模式的实现,是对Disruptor4cpp的一个补充,包括两个文件: work_handler.h和race_work_processor.h。

// file: work_handler.h/*
Copyright (c) 2015, Alex Man-fui Lee
All rights reserved.Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:* Redistributions of source code must retain the above copyright notice, thislist of conditions and the following disclaimer.* Redistributions in binary form must reproduce the above copyright notice,this list of conditions and the following disclaimer in the documentationand/or other materials provided with the distribution.* Neither the name of disruptor4cpp nor the names of itscontributors may be used to endorse or promote products derived fromthis software without specific prior written permission.THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/#ifndef DISRUPTOR4CPP_WORK_HANDLER_H_
#define DISRUPTOR4CPP_WORK_HANDLER_H_#include <cstdint>
#include <exception>namespace disruptor4cpp
{template<class TEvent>class work_handler{public:virtual ~work_handler(){};virtual void on_start() = 0;virtual void on_shutdown() = 0;virtual void on_event(TEvent& event, int64_t sequence) = 0;virtual void on_timeout(int64_t sequence) = 0;virtual void on_event_exception(const std::exception& ex, int64_t sequence, TEvent* event) = 0;virtual void on_start_exception(const std::exception& ex) = 0;virtual void on_shutdown_exception(const std::exception& ex) = 0;};
}#endif
//file: race_work_processor.h
/*
Copyright (c) 2015, Alex Man-fui Lee
All rights reserved.Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:* Redistributions of source code must retain the above copyright notice, thislist of conditions and the following disclaimer.* Redistributions in binary form must reproduce the above copyright notice,this list of conditions and the following disclaimer in the documentationand/or other materials provided with the distribution.* Neither the name of disruptor4cpp nor the names of itscontributors may be used to endorse or promote products derived fromthis software without specific prior written permission.THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/#ifndef DISRUPTOR4CPP_RACE_WORK_PROCESSOR_H_
#define DISRUPTOR4CPP_RACE_WORK_PROCESSOR_H_#include <atomic>
#include <cstdint>
#include <memory>
#include <stdexcept>#include "work_handler.h"
#include "exceptions/alert_exception.h"
#include "exceptions/timeout_exception.h"
#include "sequence.h"namespace disruptor4cpp
{template <typename TRingBuffer>class race_work_processor{public:race_work_processor(TRingBuffer& ring_buffer,typename TRingBuffer::sequence_barrier_type& sequence_barrier,work_handler<typename TRingBuffer::event_type>& wkr_handler,typename TRingBuffer::sequence_type& worksequence): sequence_(),ring_buffer_(ring_buffer),sequence_barrier_(sequence_barrier),work_handler_(wkr_handler),work_sequence_(worksequence),running_(false){}race_work_processor(TRingBuffer& ring_buffer,std::unique_ptr<typename TRingBuffer::sequence_barrier_type> sequence_barrier_ptr,work_handler<typename TRingBuffer::event_type>& wkr_handler,typename TRingBuffer::sequence_type& worksequence): sequence_(),ring_buffer_(ring_buffer),sequence_barrier_(*sequence_barrier_ptr),work_handler_(wkr_handler),sequence_barrier_ptr_(std::move(sequence_barrier_ptr)),work_sequence_(worksequence),running_(false){}typename TRingBuffer::sequence_type& get_sequence(){return sequence_;}void halt(){running_.store(false, std::memory_order_release);sequence_barrier_.alert();}bool is_running() const{return running_.load(std::memory_order_acquire);}void run(){bool expected_running_state = false;if (!running_.compare_exchange_strong(expected_running_state, true))throw std::runtime_error("Thread is already running");sequence_barrier_.clear_alert();notify_start();typename TRingBuffer::event_type* event=nullptr;int64_t next_sequence;int64_t cached_available_sequence = LLONG_MIN;bool processedSequence = true;try{while (true){try{if (processedSequence){processedSequence = false;do{ next_sequence = work_sequence_.get() + 1;sequence_.set(next_sequence-1);} while (!work_sequence_.compare_and_set(next_sequence - 1, next_sequence));}if (cached_available_sequence >= next_sequence){event = &ring_buffer_[next_sequence];work_handler_.on_event(*event, next_sequence);processedSequence = true;}else{cached_available_sequence = sequence_barrier_.wait_for(next_sequence);}}catch (timeout_exception& timeout_ex){notify_timeout(sequence_.get());}catch (alert_exception& alert_ex){if (!running_.load(std::memory_order_acquire))break;}catch (std::exception& ex){work_handler_.on_event_exception(ex, next_sequence, event);processedSequence = true;}}}catch (...){notify_shutdown();running_.store(false, std::memory_order_release);throw;}notify_shutdown();running_.store(false, std::memory_order_release);}private:void notify_timeout(int64_t available_sequence){try{work_handler_.on_timeout(available_sequence);}catch (std::exception& ex){work_handler_.on_event_exception(ex, available_sequence, nullptr);}}void notify_start(){try{work_handler_.on_start();}catch (std::exception& ex){work_handler_.on_start_exception(ex);}}void notify_shutdown(){try{work_handler_.on_shutdown();}catch (std::exception& ex){work_handler_.on_shutdown_exception(ex);}}typename TRingBuffer::sequence_type sequence_;TRingBuffer& ring_buffer_;typename TRingBuffer::sequence_barrier_type& sequence_barrier_;//引用到ring_buffer的序列屏障,用于控制消费者不要消费没有写成功的数据work_handler<typename TRingBuffer::event_type>& work_handler_;typename TRingBuffer::sequence_type& work_sequence_;//引用到一个由多个消费者共同维护的消费序列,避免重复消费std::unique_ptr<typename TRingBuffer::sequence_barrier_type> sequence_barrier_ptr_;std::atomic<bool> running_;};
}#endif

 三. 测试

在Discruptor提供的示例代码的基础上,增加竞争消费示例,代码如下。在竞争消费模式下,每条消息的真正被消费的时机是不确定的,有可能先入队的消息后被消费,因此可以看到打印顺序是乱的,这也是这种消费模式的重要特性。

//main.cpp#include <cstdint>
#include <exception>
#include <iostream>
#include <thread>
//将work_handler.h和race_work_processor.h放到disruptor4cpp.h同级目录,并在disruptor4cpp.h中添加 #include <race_work_processor.h>#include <disruptor4cpp/disruptor4cpp.h>class int_event_handler : public disruptor4cpp::event_handler<int>
{
public:int_event_handler() = default;virtual ~int_event_handler() = default;virtual void on_start() { }virtual void on_shutdown() { }virtual void on_event(int& event, int64_t sequence, bool end_of_batch){std::cout << "Received integer: " << event << std::endl;}virtual void on_timeout(int64_t sequence) { }virtual void on_event_exception(const std::exception& ex, int64_t sequence, int* event) { }virtual void on_start_exception(const std::exception& ex) { }virtual void on_shutdown_exception(const std::exception& ex) { }
};class int_work_handler :public disruptor4cpp::work_handler<int>
{
public:int_work_handler() = default;virtual ~int_work_handler() = default;void SetWorkerId(int id){ worker_id = id; }virtual void on_start() { }virtual void on_shutdown() { }virtual void on_event(int& event, int64_t sequence){std::cout << worker_id << " Received integer: " << event << std::endl;}virtual void on_timeout(int64_t sequence) { }virtual void on_event_exception(const std::exception& ex, int64_t sequence, int* event) { }virtual void on_start_exception(const std::exception& ex) { }virtual void on_shutdown_exception(const std::exception& ex) { }
private:int worker_id = -1;
};void TestEventProcessor()
{using namespace disruptor4cpp;// 创建一个支持多生产者写入的环形队列ring_bufferring_buffer<int, 1024, busy_spin_wait_strategy, producer_type::multi> ring_buffer;// 创建用于控制写入ring_buffer的序列屏障,因为该队列是最上游的一个队列,没有依赖其他的序列屏障,所以无需传参auto barrier = ring_buffer.new_barrier();int_event_handler handler;//基于队列的写屏障,创建一个消费者batch_event_processor<decltype(ring_buffer)> processor(ring_buffer, *barrier.get(), handler);//将消费者的读屏障加入到监听序列,以便控制生产者写入时,避免覆盖未消费的数据ring_buffer.add_gating_sequences({ &processor.get_sequence(),});//启动消费者线程,开始消费std::thread processor_thread([&processor] { processor.run(); });// 循环写入1000条数据(也可以在多个线程中向ring_buffer中写数据)for (int i = 0; i < 1000; i++){int64_t seq = ring_buffer.next();ring_buffer[seq] = i;ring_buffer.publish(seq);}// Stop the consumer.std::this_thread::sleep_for(std::chrono::seconds(1));processor.halt();processor_thread.join();return;
}void TestWorkProcessor()
{using namespace disruptor4cpp;// 创建一个支持多生产者写入的环形队列ring_bufferring_buffer<int, 1024, busy_spin_wait_strategy, producer_type::multi> ring_buffer;// 创建用于控制写入ring_buffer的序列屏障,因为该队列是最上游的一个队列,没有依赖其他的序列屏障,所以无需传参auto barrier = ring_buffer.new_barrier();sequence worksequence;int_work_handler wkr1;wkr1.SetWorkerId(1);race_work_processor<decltype(ring_buffer)> work1(ring_buffer, *barrier.get(), wkr1, worksequence);int_work_handler wkr2;wkr2.SetWorkerId(2);race_work_processor<decltype(ring_buffer)> work2(ring_buffer, *barrier.get(), wkr2, worksequence);ring_buffer.add_gating_sequences({ &work1.get_sequence(), &work2.get_sequence(), &worksequence });std::thread wkr1_thread([&work1] { work1.run(); });std::thread wkr2_thread([&work2] { work2.run(); });// 循环写入1000条数据(也可以在多个线程中,向ring_buffer中写数据)for (int i = 0; i < 1000; i++){int64_t seq = ring_buffer.next();ring_buffer[seq] = i;ring_buffer.publish(seq);}// Stop the consumer.std::this_thread::sleep_for(std::chrono::seconds(1));work1.halt();work2.halt();wkr1_thread.join();wkr2_thread.join();return;
}int main(int argc, char* argv[])
{//TestWorkProcessor();TestEventProcessor();return 0;
}

 

这篇关于无锁队列Disruptor使用笔记的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/247079

相关文章

Conda与Python venv虚拟环境的区别与使用方法详解

《Conda与Pythonvenv虚拟环境的区别与使用方法详解》随着Python社区的成长,虚拟环境的概念和技术也在不断发展,:本文主要介绍Conda与Pythonvenv虚拟环境的区别与使用... 目录前言一、Conda 与 python venv 的核心区别1. Conda 的特点2. Python v

Spring Boot中WebSocket常用使用方法详解

《SpringBoot中WebSocket常用使用方法详解》本文从WebSocket的基础概念出发,详细介绍了SpringBoot集成WebSocket的步骤,并重点讲解了常用的使用方法,包括简单消... 目录一、WebSocket基础概念1.1 什么是WebSocket1.2 WebSocket与HTTP

C#中Guid类使用小结

《C#中Guid类使用小结》本文主要介绍了C#中Guid类用于生成和操作128位的唯一标识符,用于数据库主键及分布式系统,支持通过NewGuid、Parse等方法生成,感兴趣的可以了解一下... 目录前言一、什么是 Guid二、生成 Guid1. 使用 Guid.NewGuid() 方法2. 从字符串创建

Python使用python-can实现合并BLF文件

《Python使用python-can实现合并BLF文件》python-can库是Python生态中专注于CAN总线通信与数据处理的强大工具,本文将使用python-can为BLF文件合并提供高效灵活... 目录一、python-can 库:CAN 数据处理的利器二、BLF 文件合并核心代码解析1. 基础合

Python使用OpenCV实现获取视频时长的小工具

《Python使用OpenCV实现获取视频时长的小工具》在处理视频数据时,获取视频的时长是一项常见且基础的需求,本文将详细介绍如何使用Python和OpenCV获取视频时长,并对每一行代码进行深入解析... 目录一、代码实现二、代码解析1. 导入 OpenCV 库2. 定义获取视频时长的函数3. 打开视频文

Spring IoC 容器的使用详解(最新整理)

《SpringIoC容器的使用详解(最新整理)》文章介绍了Spring框架中的应用分层思想与IoC容器原理,通过分层解耦业务逻辑、数据访问等模块,IoC容器利用@Component注解管理Bean... 目录1. 应用分层2. IoC 的介绍3. IoC 容器的使用3.1. bean 的存储3.2. 方法注

Python内置函数之classmethod函数使用详解

《Python内置函数之classmethod函数使用详解》:本文主要介绍Python内置函数之classmethod函数使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 类方法定义与基本语法2. 类方法 vs 实例方法 vs 静态方法3. 核心特性与用法(1编程客

Linux中压缩、网络传输与系统监控工具的使用完整指南

《Linux中压缩、网络传输与系统监控工具的使用完整指南》在Linux系统管理中,压缩与传输工具是数据备份和远程协作的桥梁,而系统监控工具则是保障服务器稳定运行的眼睛,下面小编就来和大家详细介绍一下它... 目录引言一、压缩与解压:数据存储与传输的优化核心1. zip/unzip:通用压缩格式的便捷操作2.

使用Python实现可恢复式多线程下载器

《使用Python实现可恢复式多线程下载器》在数字时代,大文件下载已成为日常操作,本文将手把手教你用Python打造专业级下载器,实现断点续传,多线程加速,速度限制等功能,感兴趣的小伙伴可以了解下... 目录一、智能续传:从崩溃边缘抢救进度二、多线程加速:榨干网络带宽三、速度控制:做网络的好邻居四、终端交互

Python中注释使用方法举例详解

《Python中注释使用方法举例详解》在Python编程语言中注释是必不可少的一部分,它有助于提高代码的可读性和维护性,:本文主要介绍Python中注释使用方法的相关资料,需要的朋友可以参考下... 目录一、前言二、什么是注释?示例:三、单行注释语法:以 China编程# 开头,后面的内容为注释内容示例:示例:四