无锁队列Disruptor使用笔记

2023-10-20 12:28

本文主要是介绍无锁队列Disruptor使用笔记,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 一. 背景

        Disruptor是由英国外汇公司LMAX 于2010年公开的一款用于线程间数据交互的高并发组件,其原型是一个有界的环形队列,通过巧妙的设计使得该队列在多线程环境下无需加锁就能保证消息的安全可靠,为软件系统带来指数级性能提升,可以参考博文 https://blog.csdn.net/21aspnet/article/details/89709221 了解关于disruptor的更多内容。由于Disruptor原版是java开发的,而本人是从事C/C++开发工作的,期望找一个C/C++版本的实现,用到自己的工作中。为了避免令人头疼的第三方依赖问题,优先考虑使用现代C++特性,即C++ 11以上版本的实现(不想引入Boost库<_>)。Disruptor4cpp(GitHub - alexleemanfui/disruptor4cpp: C++ port of LMAX disruptor)正好满足上述需求,虽然该版本只实现到Disruptor 3.3.2版本,并且好像没有继续维护的迹象,但是3.3.2 版本的Disruptor对于大部分系统来说已经够用了,尤其是它对Disruptor一些关键特性的实现,能够很好地帮助我们理解Disruptor的工作原理。所以,最近对Disruptor4cpp展开了一些研究,下面对学习和使用过程中的一些问题做了记录。

二.  生产与消费模式

Disruptor不对生产者和消费者的数量做限制,可以是单生产者->单消费者,单生产者->多消费者,多生产者->单消费者,以及多生产者->多消费者(这里的每个生产者和消费者都指的是一个线程,而不是某个抽象业务的类实例)。

实际上,Disruptor支持两种消费模式:多播模式 和 竞争模式。多播模式下,每个消费者都会处理所有的消息,即一个消息会被多个消费者重复消费;竞争模式下,多个消费者竞争同一批消息,即一个消息仅被消费一次,分别对应了java版本实现中的BatchEventProcessor和WorkProcessor。

在Disruptor4cpp的实现中,只提供了BatchEventProcessor消费模式,给出的示例程序也是多播消费模式,如果想要使用竞争消费模式,则需要自己实现WorkProcessor。以下给出了WorkProcessor模式的实现,是对Disruptor4cpp的一个补充,包括两个文件: work_handler.h和race_work_processor.h。

// file: work_handler.h/*
Copyright (c) 2015, Alex Man-fui Lee
All rights reserved.Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:* Redistributions of source code must retain the above copyright notice, thislist of conditions and the following disclaimer.* Redistributions in binary form must reproduce the above copyright notice,this list of conditions and the following disclaimer in the documentationand/or other materials provided with the distribution.* Neither the name of disruptor4cpp nor the names of itscontributors may be used to endorse or promote products derived fromthis software without specific prior written permission.THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/#ifndef DISRUPTOR4CPP_WORK_HANDLER_H_
#define DISRUPTOR4CPP_WORK_HANDLER_H_#include <cstdint>
#include <exception>namespace disruptor4cpp
{template<class TEvent>class work_handler{public:virtual ~work_handler(){};virtual void on_start() = 0;virtual void on_shutdown() = 0;virtual void on_event(TEvent& event, int64_t sequence) = 0;virtual void on_timeout(int64_t sequence) = 0;virtual void on_event_exception(const std::exception& ex, int64_t sequence, TEvent* event) = 0;virtual void on_start_exception(const std::exception& ex) = 0;virtual void on_shutdown_exception(const std::exception& ex) = 0;};
}#endif
//file: race_work_processor.h
/*
Copyright (c) 2015, Alex Man-fui Lee
All rights reserved.Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:* Redistributions of source code must retain the above copyright notice, thislist of conditions and the following disclaimer.* Redistributions in binary form must reproduce the above copyright notice,this list of conditions and the following disclaimer in the documentationand/or other materials provided with the distribution.* Neither the name of disruptor4cpp nor the names of itscontributors may be used to endorse or promote products derived fromthis software without specific prior written permission.THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/#ifndef DISRUPTOR4CPP_RACE_WORK_PROCESSOR_H_
#define DISRUPTOR4CPP_RACE_WORK_PROCESSOR_H_#include <atomic>
#include <cstdint>
#include <memory>
#include <stdexcept>#include "work_handler.h"
#include "exceptions/alert_exception.h"
#include "exceptions/timeout_exception.h"
#include "sequence.h"namespace disruptor4cpp
{template <typename TRingBuffer>class race_work_processor{public:race_work_processor(TRingBuffer& ring_buffer,typename TRingBuffer::sequence_barrier_type& sequence_barrier,work_handler<typename TRingBuffer::event_type>& wkr_handler,typename TRingBuffer::sequence_type& worksequence): sequence_(),ring_buffer_(ring_buffer),sequence_barrier_(sequence_barrier),work_handler_(wkr_handler),work_sequence_(worksequence),running_(false){}race_work_processor(TRingBuffer& ring_buffer,std::unique_ptr<typename TRingBuffer::sequence_barrier_type> sequence_barrier_ptr,work_handler<typename TRingBuffer::event_type>& wkr_handler,typename TRingBuffer::sequence_type& worksequence): sequence_(),ring_buffer_(ring_buffer),sequence_barrier_(*sequence_barrier_ptr),work_handler_(wkr_handler),sequence_barrier_ptr_(std::move(sequence_barrier_ptr)),work_sequence_(worksequence),running_(false){}typename TRingBuffer::sequence_type& get_sequence(){return sequence_;}void halt(){running_.store(false, std::memory_order_release);sequence_barrier_.alert();}bool is_running() const{return running_.load(std::memory_order_acquire);}void run(){bool expected_running_state = false;if (!running_.compare_exchange_strong(expected_running_state, true))throw std::runtime_error("Thread is already running");sequence_barrier_.clear_alert();notify_start();typename TRingBuffer::event_type* event=nullptr;int64_t next_sequence;int64_t cached_available_sequence = LLONG_MIN;bool processedSequence = true;try{while (true){try{if (processedSequence){processedSequence = false;do{ next_sequence = work_sequence_.get() + 1;sequence_.set(next_sequence-1);} while (!work_sequence_.compare_and_set(next_sequence - 1, next_sequence));}if (cached_available_sequence >= next_sequence){event = &ring_buffer_[next_sequence];work_handler_.on_event(*event, next_sequence);processedSequence = true;}else{cached_available_sequence = sequence_barrier_.wait_for(next_sequence);}}catch (timeout_exception& timeout_ex){notify_timeout(sequence_.get());}catch (alert_exception& alert_ex){if (!running_.load(std::memory_order_acquire))break;}catch (std::exception& ex){work_handler_.on_event_exception(ex, next_sequence, event);processedSequence = true;}}}catch (...){notify_shutdown();running_.store(false, std::memory_order_release);throw;}notify_shutdown();running_.store(false, std::memory_order_release);}private:void notify_timeout(int64_t available_sequence){try{work_handler_.on_timeout(available_sequence);}catch (std::exception& ex){work_handler_.on_event_exception(ex, available_sequence, nullptr);}}void notify_start(){try{work_handler_.on_start();}catch (std::exception& ex){work_handler_.on_start_exception(ex);}}void notify_shutdown(){try{work_handler_.on_shutdown();}catch (std::exception& ex){work_handler_.on_shutdown_exception(ex);}}typename TRingBuffer::sequence_type sequence_;TRingBuffer& ring_buffer_;typename TRingBuffer::sequence_barrier_type& sequence_barrier_;//引用到ring_buffer的序列屏障,用于控制消费者不要消费没有写成功的数据work_handler<typename TRingBuffer::event_type>& work_handler_;typename TRingBuffer::sequence_type& work_sequence_;//引用到一个由多个消费者共同维护的消费序列,避免重复消费std::unique_ptr<typename TRingBuffer::sequence_barrier_type> sequence_barrier_ptr_;std::atomic<bool> running_;};
}#endif

 三. 测试

在Discruptor提供的示例代码的基础上,增加竞争消费示例,代码如下。在竞争消费模式下,每条消息的真正被消费的时机是不确定的,有可能先入队的消息后被消费,因此可以看到打印顺序是乱的,这也是这种消费模式的重要特性。

//main.cpp#include <cstdint>
#include <exception>
#include <iostream>
#include <thread>
//将work_handler.h和race_work_processor.h放到disruptor4cpp.h同级目录,并在disruptor4cpp.h中添加 #include <race_work_processor.h>#include <disruptor4cpp/disruptor4cpp.h>class int_event_handler : public disruptor4cpp::event_handler<int>
{
public:int_event_handler() = default;virtual ~int_event_handler() = default;virtual void on_start() { }virtual void on_shutdown() { }virtual void on_event(int& event, int64_t sequence, bool end_of_batch){std::cout << "Received integer: " << event << std::endl;}virtual void on_timeout(int64_t sequence) { }virtual void on_event_exception(const std::exception& ex, int64_t sequence, int* event) { }virtual void on_start_exception(const std::exception& ex) { }virtual void on_shutdown_exception(const std::exception& ex) { }
};class int_work_handler :public disruptor4cpp::work_handler<int>
{
public:int_work_handler() = default;virtual ~int_work_handler() = default;void SetWorkerId(int id){ worker_id = id; }virtual void on_start() { }virtual void on_shutdown() { }virtual void on_event(int& event, int64_t sequence){std::cout << worker_id << " Received integer: " << event << std::endl;}virtual void on_timeout(int64_t sequence) { }virtual void on_event_exception(const std::exception& ex, int64_t sequence, int* event) { }virtual void on_start_exception(const std::exception& ex) { }virtual void on_shutdown_exception(const std::exception& ex) { }
private:int worker_id = -1;
};void TestEventProcessor()
{using namespace disruptor4cpp;// 创建一个支持多生产者写入的环形队列ring_bufferring_buffer<int, 1024, busy_spin_wait_strategy, producer_type::multi> ring_buffer;// 创建用于控制写入ring_buffer的序列屏障,因为该队列是最上游的一个队列,没有依赖其他的序列屏障,所以无需传参auto barrier = ring_buffer.new_barrier();int_event_handler handler;//基于队列的写屏障,创建一个消费者batch_event_processor<decltype(ring_buffer)> processor(ring_buffer, *barrier.get(), handler);//将消费者的读屏障加入到监听序列,以便控制生产者写入时,避免覆盖未消费的数据ring_buffer.add_gating_sequences({ &processor.get_sequence(),});//启动消费者线程,开始消费std::thread processor_thread([&processor] { processor.run(); });// 循环写入1000条数据(也可以在多个线程中向ring_buffer中写数据)for (int i = 0; i < 1000; i++){int64_t seq = ring_buffer.next();ring_buffer[seq] = i;ring_buffer.publish(seq);}// Stop the consumer.std::this_thread::sleep_for(std::chrono::seconds(1));processor.halt();processor_thread.join();return;
}void TestWorkProcessor()
{using namespace disruptor4cpp;// 创建一个支持多生产者写入的环形队列ring_bufferring_buffer<int, 1024, busy_spin_wait_strategy, producer_type::multi> ring_buffer;// 创建用于控制写入ring_buffer的序列屏障,因为该队列是最上游的一个队列,没有依赖其他的序列屏障,所以无需传参auto barrier = ring_buffer.new_barrier();sequence worksequence;int_work_handler wkr1;wkr1.SetWorkerId(1);race_work_processor<decltype(ring_buffer)> work1(ring_buffer, *barrier.get(), wkr1, worksequence);int_work_handler wkr2;wkr2.SetWorkerId(2);race_work_processor<decltype(ring_buffer)> work2(ring_buffer, *barrier.get(), wkr2, worksequence);ring_buffer.add_gating_sequences({ &work1.get_sequence(), &work2.get_sequence(), &worksequence });std::thread wkr1_thread([&work1] { work1.run(); });std::thread wkr2_thread([&work2] { work2.run(); });// 循环写入1000条数据(也可以在多个线程中,向ring_buffer中写数据)for (int i = 0; i < 1000; i++){int64_t seq = ring_buffer.next();ring_buffer[seq] = i;ring_buffer.publish(seq);}// Stop the consumer.std::this_thread::sleep_for(std::chrono::seconds(1));work1.halt();work2.halt();wkr1_thread.join();wkr2_thread.join();return;
}int main(int argc, char* argv[])
{//TestWorkProcessor();TestEventProcessor();return 0;
}

 

这篇关于无锁队列Disruptor使用笔记的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/247079

相关文章

在Android中使用WebView在线查看PDF文件的方法示例

《在Android中使用WebView在线查看PDF文件的方法示例》在Android应用开发中,有时我们需要在客户端展示PDF文件,以便用户可以阅读或交互,:本文主要介绍在Android中使用We... 目录简介:1. WebView组件介绍2. 在androidManifest.XML中添加Interne

Java Stream流与使用操作指南

《JavaStream流与使用操作指南》Stream不是数据结构,而是一种高级的数据处理工具,允许你以声明式的方式处理数据集合,类似于SQL语句操作数据库,本文给大家介绍JavaStream流与使用... 目录一、什么是stream流二、创建stream流1.单列集合创建stream流2.双列集合创建str

SpringBoot集成redisson实现延时队列教程

《SpringBoot集成redisson实现延时队列教程》文章介绍了使用Redisson实现延迟队列的完整步骤,包括依赖导入、Redis配置、工具类封装、业务枚举定义、执行器实现、Bean创建、消费... 目录1、先给项目导入Redisson依赖2、配置redis3、创建 RedissonConfig 配

Python使用FastAPI实现大文件分片上传与断点续传功能

《Python使用FastAPI实现大文件分片上传与断点续传功能》大文件直传常遇到超时、网络抖动失败、失败后只能重传的问题,分片上传+断点续传可以把大文件拆成若干小块逐个上传,并在中断后从已完成分片继... 目录一、接口设计二、服务端实现(FastAPI)2.1 运行环境2.2 目录结构建议2.3 serv

Spring Security简介、使用与最佳实践

《SpringSecurity简介、使用与最佳实践》SpringSecurity是一个能够为基于Spring的企业应用系统提供声明式的安全访问控制解决方案的安全框架,本文给大家介绍SpringSec... 目录一、如何理解 Spring Security?—— 核心思想二、如何在 Java 项目中使用?——

springboot中使用okhttp3的小结

《springboot中使用okhttp3的小结》OkHttp3是一个JavaHTTP客户端,可以处理各种请求类型,比如GET、POST、PUT等,并且支持高效的HTTP连接池、请求和响应缓存、以及异... 在 Spring Boot 项目中使用 OkHttp3 进行 HTTP 请求是一个高效且流行的方式。

Java使用Javassist动态生成HelloWorld类

《Java使用Javassist动态生成HelloWorld类》Javassist是一个非常强大的字节码操作和定义库,它允许开发者在运行时创建新的类或者修改现有的类,本文将简单介绍如何使用Javass... 目录1. Javassist简介2. 环境准备3. 动态生成HelloWorld类3.1 创建CtC

使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解

《使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解》本文详细介绍了如何使用Python通过ncmdump工具批量将.ncm音频转换为.mp3的步骤,包括安装、配置ffmpeg环... 目录1. 前言2. 安装 ncmdump3. 实现 .ncm 转 .mp34. 执行过程5. 执行结

Java使用jar命令配置服务器端口的完整指南

《Java使用jar命令配置服务器端口的完整指南》本文将详细介绍如何使用java-jar命令启动应用,并重点讲解如何配置服务器端口,同时提供一个实用的Web工具来简化这一过程,希望对大家有所帮助... 目录1. Java Jar文件简介1.1 什么是Jar文件1.2 创建可执行Jar文件2. 使用java

C#使用Spire.Doc for .NET实现HTML转Word的高效方案

《C#使用Spire.Docfor.NET实现HTML转Word的高效方案》在Web开发中,HTML内容的生成与处理是高频需求,然而,当用户需要将HTML页面或动态生成的HTML字符串转换为Wor... 目录引言一、html转Word的典型场景与挑战二、用 Spire.Doc 实现 HTML 转 Word1