brpc:WorkStealingQueue

2024-04-12 03:36
文章标签 brpc workstealingqueue

本文主要是介绍brpc:WorkStealingQueue,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

代码

#ifndef BTHREAD_WORK_STEALING_QUEUE_H
#define BTHREAD_WORK_STEALING_QUEUE_H#include "butil/macros.h"
#include "butil/atomicops.h"
#include "butil/logging.h"namespace bthread {template <typename T>
class WorkStealingQueue {
public:WorkStealingQueue(): _bottom(1), _capacity(0), _buffer(NULL), _top(1) {}~WorkStealingQueue() {delete [] _buffer;_buffer = NULL;}int init(size_t capacity) {if (_capacity != 0) {LOG(ERROR) << "Already initialized";return -1;}if (capacity == 0) {LOG(ERROR) << "Invalid capacity=" << capacity;return -1;}if (capacity & (capacity - 1)) {LOG(ERROR) << "Invalid capacity=" << capacity<< " which must be power of 2";return -1;}_buffer = new(std::nothrow) T[capacity];if (NULL == _buffer) {return -1;}_capacity = capacity;return 0;}// Push an item into the queue.// Returns true on pushed.// May run in parallel with steal().// Never run in parallel with pop() or another push().bool push(const T& x) {const size_t b = _bottom.load(butil::memory_order_relaxed);const size_t t = _top.load(butil::memory_order_acquire);if (b >= t + _capacity) { // Full queue.return false;}_buffer[b & (_capacity - 1)] = x;_bottom.store(b + 1, butil::memory_order_release);return true;}// Pop an item from the queue.// Returns true on popped and the item is written to `val'.// May run in parallel with steal().// Never run in parallel with push() or another pop().bool pop(T* val) {const size_t b = _bottom.load(butil::memory_order_relaxed);size_t t = _top.load(butil::memory_order_relaxed);if (t >= b) {// fast check since we call pop() in each sched.// Stale _top which is smaller should not enter this branch.return false;}const size_t newb = b - 1;_bottom.store(newb, butil::memory_order_relaxed);butil::atomic_thread_fence(butil::memory_order_seq_cst);t = _top.load(butil::memory_order_relaxed);if (t > newb) {_bottom.store(b, butil::memory_order_relaxed);return false;}*val = _buffer[newb & (_capacity - 1)];if (t != newb) {return true;}// Single last element, compete with steal()const bool popped = _top.compare_exchange_strong(t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed);_bottom.store(b, butil::memory_order_relaxed);return popped;}// Steal one item from the queue.// Returns true on stolen.// May run in parallel with push() pop() or another steal().bool steal(T* val) {size_t t = _top.load(butil::memory_order_acquire);size_t b = _bottom.load(butil::memory_order_acquire);if (t >= b) {// Permit false negative for performance considerations.return false;}do {butil::atomic_thread_fence(butil::memory_order_seq_cst);b = _bottom.load(butil::memory_order_acquire);if (t >= b) {return false;}*val = _buffer[t & (_capacity - 1)];} while (!_top.compare_exchange_strong(t, t + 1,butil::memory_order_seq_cst,butil::memory_order_relaxed));return true;}size_t volatile_size() const {const size_t b = _bottom.load(butil::memory_order_relaxed);const size_t t = _top.load(butil::memory_order_relaxed);return (b <= t ? 0 : (b - t));}size_t capacity() const { return _capacity; }private:// Copying a concurrent structure makes no sense.DISALLOW_COPY_AND_ASSIGN(WorkStealingQueue); // 禁用拷贝构造与拷贝赋值butil::atomic<size_t> _bottom;size_t _capacity;T* _buffer;BAIDU_CACHELINE_ALIGNMENT butil::atomic<size_t> _top;
};}  // namespace bthread#endif  // BTHREAD_WORK_STEALING_QUEUE_H

taskflow同款的WorkStealingQueue(但是brpc提供的版本不支持动态扩容),push和pop仅可以在本线程内使用,外部线程通过steal来偷取任务。

因为该队列通过实现了一个循环队列,capacity一定要是2
的次方。

设capacity为C,假如循环队列下标i从0开始,不断增加
C在之前的定义中,必须满足其容量大小为2的倍数,而设置M = C-1
例如:当C = 8时(二进制位1000),M = 7(二进制位0111)
那么,当i = 0时,i & M = 0,此时,i = 0,o会被插入到S[0]
循环往复,直到i = C-1,即i=7,此时,i & M = M,此时,i = 7,o会被插入到S[7]
然后下一次插入,i = 8,i & M = 0,此时,i = 0,o会被插入到S[0],即形成了环形缓冲区
这个计算的目的在于简化对数组下标的处理,确保索引始终在有效范围内循环。

所以,b和t只会增加,不会减小,因为有了上面的逻辑,保证了增加的循环队列逻辑的正确性。

init

int init(size_t capacity) {if (_capacity != 0) {LOG(ERROR) << "Already initialized";return -1;}if (capacity == 0) {LOG(ERROR) << "Invalid capacity=" << capacity;return -1;}if (capacity & (capacity - 1)) {LOG(ERROR) << "Invalid capacity=" << capacity<< " which must be power of 2";return -1;}_buffer = new(std::nothrow) T[capacity];if (NULL == _buffer) {return -1;}_capacity = capacity;return 0;}

init函数非常简单,分配一个容量大小为sizeof(T) * capacity 的空间,并赋值给_buffer,伴随着检查一系列的capacity异常情况。

push

bool push(const T& x) {const size_t b = _bottom.load(butil::memory_order_relaxed);const size_t t = _top.load(butil::memory_order_acquire);if (b >= t + _capacity) { // Full queue.return false;}_buffer[b & (_capacity - 1)] = x;_bottom.store(b + 1, butil::memory_order_release);return true;}

push的逻辑也比较简单,每push一个元素,队列向b+1的方向增长。

pop和pop只会在本线程调用,所以不会有竞争。

因为steal可能会修改_top的值,所以需要使用memory_order_acquire内存序。

pop

// Pop an item from the queue.// Returns true on popped and the item is written to `val'.// May run in parallel with steal().// Never run in parallel with push() or another pop().bool pop(T* val) {const size_t b = _bottom.load(butil::memory_order_relaxed);size_t t = _top.load(butil::memory_order_relaxed);if (t >= b) {// fast check since we call pop() in each sched.// Stale _top which is smaller should not enter this branch.return false;}const size_t newb = b - 1;_bottom.store(newb, butil::memory_order_relaxed);butil::atomic_thread_fence(butil::memory_order_seq_cst);t = _top.load(butil::memory_order_relaxed);if (t > newb) {_bottom.store(b, butil::memory_order_relaxed);return false;}*val = _buffer[newb & (_capacity - 1)];if (t != newb) {return true;}// Single last element, compete with steal()const bool popped = _top.compare_exchange_strong(t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed);_bottom.store(b, butil::memory_order_relaxed);return popped;}

pop是线程内调用,且每次bthread的调度都会涉及到pop,所以作者设计了一个快速检查,先快速判断当前队列是否为空(不一定准确)

const size_t b = _bottom.load(butil::memory_order_relaxed);size_t t = _top.load(butil::memory_order_relaxed);if (t >= b) {// fast check since we call pop() in each sched.// Stale _top which is smaller should not enter this branch.return false;}

pop的逻辑是b回退,并判断队列是否为空,如果这次为空,则b恢复原状,并返回false;否则,给val赋值,此时,如果val指向的不是队列中唯一的元素(只有队列元素个数为1时才会和steal出现竞态),则返回true,表示pop成功。

const size_t newb = b - 1;_bottom.store(newb, butil::memory_order_relaxed);butil::atomic_thread_fence(butil::memory_order_seq_cst);t = _top.load(butil::memory_order_relaxed);if (t > newb) {_bottom.store(b, butil::memory_order_relaxed);return false;}*val = _buffer[newb & (_capacity - 1)];if (t != newb) {return true;}

当队列中只有一个元素的时候,就会和steal出现竞争关系,此时需要判断到底是谁抢到了该元素:

// Single last element, compete with steal()const bool popped = _top.compare_exchange_strong(t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed);_bottom.store(b, butil::memory_order_relaxed);

如果返回true,说明是pop提前执行,t此时也加了1,返回true,表示val的值可用; 如果是steal先pop之前更新了_top, 此时compare_exchange_strong返回false,最终返回false;但_top的值其实是等于 t + 1的(steal在pop之前更新了_top), 所以不管是谁抢到了,_bottom都会更新到newb+1的状态,相当于重设状态,_bottom和_top又再一次相等。

steal

最后讲一下steal, steal会和pop、push产生竞争,也会和其他steal产生竞争,它从队列的另外一个出口steal对象,即通过t++的方式获取对象,这种设计在很大程度上减少了pop和steal之间的竞争关系。

// Steal one item from the queue.// Returns true on stolen.// May run in parallel with push() pop() or another steal().bool steal(T* val) {size_t t = _top.load(butil::memory_order_acquire);size_t b = _bottom.load(butil::memory_order_acquire);if (t >= b) {// Permit false negative for performance considerations.return false;}do {butil::atomic_thread_fence(butil::memory_order_seq_cst);b = _bottom.load(butil::memory_order_acquire);if (t >= b) {return false;}*val = _buffer[t & (_capacity - 1)];} while (!_top.compare_exchange_strong(t, t + 1,butil::memory_order_seq_cst,butil::memory_order_relaxed));return true;}

首先,还是有一个处于性能考量,而进行提前判断的一个逻辑,不保证时效性(但是内存序用的挺高的呀。。memory_order_acquire):

		size_t t = _top.load(butil::memory_order_acquire);size_t b = _bottom.load(butil::memory_order_acquire);if (t >= b) {// Permit false negative for performance considerations.return false;}

接下来steal函数会不断从队列中steal对象,直到队列为空,或者steal到对象:

        do {butil::atomic_thread_fence(butil::memory_order_seq_cst);b = _bottom.load(butil::memory_order_acquire);if (t >= b) {return false;}*val = _buffer[t & (_capacity - 1)];} while (!_top.compare_exchange_strong(t, t + 1,butil::memory_order_seq_cst,butil::memory_order_relaxed));return true;

这里while (!_top.compare_exchange_strong(t, t + 1,butil::memory_order_seq_cst,butil::memory_order_relaxed)); 包含了对t的更新,所以循环中仅需要更新b(细节请查看compare_exchange_strong的用法)。

简单使用

#include <atomic>
#include <iostream>
#include <thread>
#include "bthread/work_stealing_queue.h"
#include "bthread/bthread.h"
#include "butil/fast_rand.h"std::atomic<bool> flag{false};
bthread::WorkStealingQueue<int> queue;
void stealer() {int val;while (true) {if (queue.steal(&val)) {LOG(INFO) << "Stolen value: " << val;}bthread_usleep(1000);if(flag.load()) {break;}}
}
int main() {constexpr int N = 16;// 创建一个容量为N的工作窃取队列if (queue.init(N) != 0) {LOG(ERROR) << "Failed to initialize the work stealing queue.";return 1;}std::thread(stealer).detach();// 随机push 或者 pop 对象int cnt = 0, cur = 0;while (cnt++ < N) {if ( cnt < N && butil::fast_rand() % 2 == 0) {queue.push(++cur);LOG(INFO) << "Push " << cur;} else {int val = 0;if(queue.pop(&val)) {LOG(INFO) << "pop value: " << val;}}bthread_usleep(100);}while(queue.volatile_size() != 0) {int val = 0;if(queue.pop(&val)) {LOG(INFO) << "pop value: " << val;}bthread_usleep(100);}flag.store(true);return 0;
}

这篇关于brpc:WorkStealingQueue的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/895990

相关文章

brpc profiler

cpu profiler cpu profiler | bRPC MacOS的额外配置 在MacOS下,gperftools中的perl pprof脚本无法将函数地址转变成函数名,解决办法是: 安装standalone pprof,并把下载的pprof二进制文件路径写入环境变量GOOGLE_PPROF_BINARY_PATH中安装llvm-symbolizer(将函数符号转化为函数

学习brpc:echo服务

Echo同步客户端 server 端 #include <gflags/gflags.h>#include <json2pb/pb_to_json.h>#include <brpc/server.h>#include "butil/endpoint.h"#include "echo.pb.h"// flags,用于配置serverDEFINE_bool(echo_attachment

brpc之ResourcePool

简介 ResourcePool用于管理资源,负责资源的分配以及回收 结构 BlockGroup:资源池中包含多个BlockGroup,最多65536个 Block:一个BlockGroup中包含多个Block,最多(1<<16)个;1个Block中包含BLOCK_NITEM个类型为T的资源,BLOCK_NITEM由类模板ResourcePoolBlockItemNum中的静态成员value

brpc负载均衡load balance和服务发现name servicing

1.SharedLoadBalancer(load_balancer.h):包含LoadBalancer指针_lb,AddServersInBatch 2.LoadBalancerWithNaming:继承SharedLoadBalancer和NamingServiceWatcher 2.1Init函数:SharedLoadBalancer::Init,new一个load balance对象

brpc之InputMessenger

简介 InputMessenger类是客户端处理socket中响应的处理类 类结构 #mermaid-svg-pmitHXejHDdzZmky {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-pmitHXejHDdzZmky .error-icon{fi

brpc: a little source code

之前在https://www.yuque.com/treblez/qksu6c/nqe8ip59cwegl6rk?singleDoc# 《olap/clickhouse-编译器优化与向量化》中我谈过brpc的汇编控制bthread。本文就来看一下brpc作为一个高性能的rpc实现,除了自定义线程栈之外,代码还有什么优秀之处。 因为时间原因,本文不做深入分析,只是解读下几个有意思的模块。 用户态f

【brpc学习实践十三】基于brpc的redis client的实现

brpc支持了redis协议,提供了相关redis访问接口,充分利用了bthread,可以坐到比hiredis更高效。 brpc redis与hiredis的对比 相比使用hiredis(官方client)的优势有: 线程安全。用户不需要为每个线程建立独立的client。支持同步、异步、批量同步、批量异步等访问方式,能使用ParallelChannel等组合访问方式。支持多种连接方式。支持超

【brpc学习实践十三】基于brpc的redis client的实现

brpc支持了redis协议,提供了相关redis访问接口,充分利用了bthread,可以坐到比hiredis更高效。 brpc redis与hiredis的对比 相比使用hiredis(官方client)的优势有: 线程安全。用户不需要为每个线程建立独立的client。支持同步、异步、批量同步、批量异步等访问方式,能使用ParallelChannel等组合访问方式。支持多种连接方式。支持超

【brpc学习实践】ParallelChannel的使用与并行请求

概览 ParallelChannel (有时被称为“pchan”)同时访问其包含的sub channel,并合并它们的结果。用户可通过CallMapper修改请求,通过ResponseMerger合并结果。ParallelChannel看起来就像是一个Channel: 支持同步和异步访问。 发起异步操作后可以立刻删除。 可以取消。 支持超时。 任何brpc::ChannelBas

【brpc学习实践七】dummy server、DynamicPartitionChannel

dummy server 如果你的程序只使用了baidu-rpc的client或根本没有使用baidu-rpc,但你也想使用baidu-rpc的内置服务,只要在程序中启动一个空的server就行了,这种server我们称为dummy server。 Dummy server 可以用于原型设计和开发目的,作为简单的 http 服务器使用,多数场景用不上。 brpc怎么开启dummy server