本文主要是介绍无锁队列Disruptor使用笔记,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一. 背景
Disruptor是由英国外汇公司LMAX 于2010年公开的一款用于线程间数据交互的高并发组件,其原型是一个有界的环形队列,通过巧妙的设计使得该队列在多线程环境下无需加锁就能保证消息的安全可靠,为软件系统带来指数级性能提升,可以参考博文 https://blog.csdn.net/21aspnet/article/details/89709221 了解关于disruptor的更多内容。由于Disruptor原版是java开发的,而本人是从事C/C++开发工作的,期望找一个C/C++版本的实现,用到自己的工作中。为了避免令人头疼的第三方依赖问题,优先考虑使用现代C++特性,即C++ 11以上版本的实现(不想引入Boost库<_>)。Disruptor4cpp(GitHub - alexleemanfui/disruptor4cpp: C++ port of LMAX disruptor)正好满足上述需求,虽然该版本只实现到Disruptor 3.3.2版本,并且好像没有继续维护的迹象,但是3.3.2 版本的Disruptor对于大部分系统来说已经够用了,尤其是它对Disruptor一些关键特性的实现,能够很好地帮助我们理解Disruptor的工作原理。所以,最近对Disruptor4cpp展开了一些研究,下面对学习和使用过程中的一些问题做了记录。
二. 生产与消费模式
Disruptor不对生产者和消费者的数量做限制,可以是单生产者->单消费者,单生产者->多消费者,多生产者->单消费者,以及多生产者->多消费者(这里的每个生产者和消费者都指的是一个线程,而不是某个抽象业务的类实例)。
实际上,Disruptor支持两种消费模式:多播模式 和 竞争模式。多播模式下,每个消费者都会处理所有的消息,即一个消息会被多个消费者重复消费;竞争模式下,多个消费者竞争同一批消息,即一个消息仅被消费一次,分别对应了java版本实现中的BatchEventProcessor和WorkProcessor。
在Disruptor4cpp的实现中,只提供了BatchEventProcessor消费模式,给出的示例程序也是多播消费模式,如果想要使用竞争消费模式,则需要自己实现WorkProcessor。以下给出了WorkProcessor模式的实现,是对Disruptor4cpp的一个补充,包括两个文件: work_handler.h和race_work_processor.h。
// file: work_handler.h/*
Copyright (c) 2015, Alex Man-fui Lee
All rights reserved.Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:* Redistributions of source code must retain the above copyright notice, thislist of conditions and the following disclaimer.* Redistributions in binary form must reproduce the above copyright notice,this list of conditions and the following disclaimer in the documentationand/or other materials provided with the distribution.* Neither the name of disruptor4cpp nor the names of itscontributors may be used to endorse or promote products derived fromthis software without specific prior written permission.THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/#ifndef DISRUPTOR4CPP_WORK_HANDLER_H_
#define DISRUPTOR4CPP_WORK_HANDLER_H_#include <cstdint>
#include <exception>namespace disruptor4cpp
{template<class TEvent>class work_handler{public:virtual ~work_handler(){};virtual void on_start() = 0;virtual void on_shutdown() = 0;virtual void on_event(TEvent& event, int64_t sequence) = 0;virtual void on_timeout(int64_t sequence) = 0;virtual void on_event_exception(const std::exception& ex, int64_t sequence, TEvent* event) = 0;virtual void on_start_exception(const std::exception& ex) = 0;virtual void on_shutdown_exception(const std::exception& ex) = 0;};
}#endif
//file: race_work_processor.h
/*
Copyright (c) 2015, Alex Man-fui Lee
All rights reserved.Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:* Redistributions of source code must retain the above copyright notice, thislist of conditions and the following disclaimer.* Redistributions in binary form must reproduce the above copyright notice,this list of conditions and the following disclaimer in the documentationand/or other materials provided with the distribution.* Neither the name of disruptor4cpp nor the names of itscontributors may be used to endorse or promote products derived fromthis software without specific prior written permission.THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/#ifndef DISRUPTOR4CPP_RACE_WORK_PROCESSOR_H_
#define DISRUPTOR4CPP_RACE_WORK_PROCESSOR_H_#include <atomic>
#include <cstdint>
#include <memory>
#include <stdexcept>#include "work_handler.h"
#include "exceptions/alert_exception.h"
#include "exceptions/timeout_exception.h"
#include "sequence.h"namespace disruptor4cpp
{template <typename TRingBuffer>class race_work_processor{public:race_work_processor(TRingBuffer& ring_buffer,typename TRingBuffer::sequence_barrier_type& sequence_barrier,work_handler<typename TRingBuffer::event_type>& wkr_handler,typename TRingBuffer::sequence_type& worksequence): sequence_(),ring_buffer_(ring_buffer),sequence_barrier_(sequence_barrier),work_handler_(wkr_handler),work_sequence_(worksequence),running_(false){}race_work_processor(TRingBuffer& ring_buffer,std::unique_ptr<typename TRingBuffer::sequence_barrier_type> sequence_barrier_ptr,work_handler<typename TRingBuffer::event_type>& wkr_handler,typename TRingBuffer::sequence_type& worksequence): sequence_(),ring_buffer_(ring_buffer),sequence_barrier_(*sequence_barrier_ptr),work_handler_(wkr_handler),sequence_barrier_ptr_(std::move(sequence_barrier_ptr)),work_sequence_(worksequence),running_(false){}typename TRingBuffer::sequence_type& get_sequence(){return sequence_;}void halt(){running_.store(false, std::memory_order_release);sequence_barrier_.alert();}bool is_running() const{return running_.load(std::memory_order_acquire);}void run(){bool expected_running_state = false;if (!running_.compare_exchange_strong(expected_running_state, true))throw std::runtime_error("Thread is already running");sequence_barrier_.clear_alert();notify_start();typename TRingBuffer::event_type* event=nullptr;int64_t next_sequence;int64_t cached_available_sequence = LLONG_MIN;bool processedSequence = true;try{while (true){try{if (processedSequence){processedSequence = false;do{ next_sequence = work_sequence_.get() + 1;sequence_.set(next_sequence-1);} while (!work_sequence_.compare_and_set(next_sequence - 1, next_sequence));}if (cached_available_sequence >= next_sequence){event = &ring_buffer_[next_sequence];work_handler_.on_event(*event, next_sequence);processedSequence = true;}else{cached_available_sequence = sequence_barrier_.wait_for(next_sequence);}}catch (timeout_exception& timeout_ex){notify_timeout(sequence_.get());}catch (alert_exception& alert_ex){if (!running_.load(std::memory_order_acquire))break;}catch (std::exception& ex){work_handler_.on_event_exception(ex, next_sequence, event);processedSequence = true;}}}catch (...){notify_shutdown();running_.store(false, std::memory_order_release);throw;}notify_shutdown();running_.store(false, std::memory_order_release);}private:void notify_timeout(int64_t available_sequence){try{work_handler_.on_timeout(available_sequence);}catch (std::exception& ex){work_handler_.on_event_exception(ex, available_sequence, nullptr);}}void notify_start(){try{work_handler_.on_start();}catch (std::exception& ex){work_handler_.on_start_exception(ex);}}void notify_shutdown(){try{work_handler_.on_shutdown();}catch (std::exception& ex){work_handler_.on_shutdown_exception(ex);}}typename TRingBuffer::sequence_type sequence_;TRingBuffer& ring_buffer_;typename TRingBuffer::sequence_barrier_type& sequence_barrier_;//引用到ring_buffer的序列屏障,用于控制消费者不要消费没有写成功的数据work_handler<typename TRingBuffer::event_type>& work_handler_;typename TRingBuffer::sequence_type& work_sequence_;//引用到一个由多个消费者共同维护的消费序列,避免重复消费std::unique_ptr<typename TRingBuffer::sequence_barrier_type> sequence_barrier_ptr_;std::atomic<bool> running_;};
}#endif
三. 测试
在Discruptor提供的示例代码的基础上,增加竞争消费示例,代码如下。在竞争消费模式下,每条消息的真正被消费的时机是不确定的,有可能先入队的消息后被消费,因此可以看到打印顺序是乱的,这也是这种消费模式的重要特性。
//main.cpp#include <cstdint>
#include <exception>
#include <iostream>
#include <thread>
//将work_handler.h和race_work_processor.h放到disruptor4cpp.h同级目录,并在disruptor4cpp.h中添加 #include <race_work_processor.h>#include <disruptor4cpp/disruptor4cpp.h>class int_event_handler : public disruptor4cpp::event_handler<int>
{
public:int_event_handler() = default;virtual ~int_event_handler() = default;virtual void on_start() { }virtual void on_shutdown() { }virtual void on_event(int& event, int64_t sequence, bool end_of_batch){std::cout << "Received integer: " << event << std::endl;}virtual void on_timeout(int64_t sequence) { }virtual void on_event_exception(const std::exception& ex, int64_t sequence, int* event) { }virtual void on_start_exception(const std::exception& ex) { }virtual void on_shutdown_exception(const std::exception& ex) { }
};class int_work_handler :public disruptor4cpp::work_handler<int>
{
public:int_work_handler() = default;virtual ~int_work_handler() = default;void SetWorkerId(int id){ worker_id = id; }virtual void on_start() { }virtual void on_shutdown() { }virtual void on_event(int& event, int64_t sequence){std::cout << worker_id << " Received integer: " << event << std::endl;}virtual void on_timeout(int64_t sequence) { }virtual void on_event_exception(const std::exception& ex, int64_t sequence, int* event) { }virtual void on_start_exception(const std::exception& ex) { }virtual void on_shutdown_exception(const std::exception& ex) { }
private:int worker_id = -1;
};void TestEventProcessor()
{using namespace disruptor4cpp;// 创建一个支持多生产者写入的环形队列ring_bufferring_buffer<int, 1024, busy_spin_wait_strategy, producer_type::multi> ring_buffer;// 创建用于控制写入ring_buffer的序列屏障,因为该队列是最上游的一个队列,没有依赖其他的序列屏障,所以无需传参auto barrier = ring_buffer.new_barrier();int_event_handler handler;//基于队列的写屏障,创建一个消费者batch_event_processor<decltype(ring_buffer)> processor(ring_buffer, *barrier.get(), handler);//将消费者的读屏障加入到监听序列,以便控制生产者写入时,避免覆盖未消费的数据ring_buffer.add_gating_sequences({ &processor.get_sequence(),});//启动消费者线程,开始消费std::thread processor_thread([&processor] { processor.run(); });// 循环写入1000条数据(也可以在多个线程中向ring_buffer中写数据)for (int i = 0; i < 1000; i++){int64_t seq = ring_buffer.next();ring_buffer[seq] = i;ring_buffer.publish(seq);}// Stop the consumer.std::this_thread::sleep_for(std::chrono::seconds(1));processor.halt();processor_thread.join();return;
}void TestWorkProcessor()
{using namespace disruptor4cpp;// 创建一个支持多生产者写入的环形队列ring_bufferring_buffer<int, 1024, busy_spin_wait_strategy, producer_type::multi> ring_buffer;// 创建用于控制写入ring_buffer的序列屏障,因为该队列是最上游的一个队列,没有依赖其他的序列屏障,所以无需传参auto barrier = ring_buffer.new_barrier();sequence worksequence;int_work_handler wkr1;wkr1.SetWorkerId(1);race_work_processor<decltype(ring_buffer)> work1(ring_buffer, *barrier.get(), wkr1, worksequence);int_work_handler wkr2;wkr2.SetWorkerId(2);race_work_processor<decltype(ring_buffer)> work2(ring_buffer, *barrier.get(), wkr2, worksequence);ring_buffer.add_gating_sequences({ &work1.get_sequence(), &work2.get_sequence(), &worksequence });std::thread wkr1_thread([&work1] { work1.run(); });std::thread wkr2_thread([&work2] { work2.run(); });// 循环写入1000条数据(也可以在多个线程中,向ring_buffer中写数据)for (int i = 0; i < 1000; i++){int64_t seq = ring_buffer.next();ring_buffer[seq] = i;ring_buffer.publish(seq);}// Stop the consumer.std::this_thread::sleep_for(std::chrono::seconds(1));work1.halt();work2.halt();wkr1_thread.join();wkr2_thread.join();return;
}int main(int argc, char* argv[])
{//TestWorkProcessor();TestEventProcessor();return 0;
}
这篇关于无锁队列Disruptor使用笔记的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!