本文主要是介绍brpc:WorkStealingQueue,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
代码
#ifndef BTHREAD_WORK_STEALING_QUEUE_H
#define BTHREAD_WORK_STEALING_QUEUE_H#include "butil/macros.h"
#include "butil/atomicops.h"
#include "butil/logging.h"namespace bthread {template <typename T>
class WorkStealingQueue {
public:WorkStealingQueue(): _bottom(1), _capacity(0), _buffer(NULL), _top(1) {}~WorkStealingQueue() {delete [] _buffer;_buffer = NULL;}int init(size_t capacity) {if (_capacity != 0) {LOG(ERROR) << "Already initialized";return -1;}if (capacity == 0) {LOG(ERROR) << "Invalid capacity=" << capacity;return -1;}if (capacity & (capacity - 1)) {LOG(ERROR) << "Invalid capacity=" << capacity<< " which must be power of 2";return -1;}_buffer = new(std::nothrow) T[capacity];if (NULL == _buffer) {return -1;}_capacity = capacity;return 0;}// Push an item into the queue.// Returns true on pushed.// May run in parallel with steal().// Never run in parallel with pop() or another push().bool push(const T& x) {const size_t b = _bottom.load(butil::memory_order_relaxed);const size_t t = _top.load(butil::memory_order_acquire);if (b >= t + _capacity) { // Full queue.return false;}_buffer[b & (_capacity - 1)] = x;_bottom.store(b + 1, butil::memory_order_release);return true;}// Pop an item from the queue.// Returns true on popped and the item is written to `val'.// May run in parallel with steal().// Never run in parallel with push() or another pop().bool pop(T* val) {const size_t b = _bottom.load(butil::memory_order_relaxed);size_t t = _top.load(butil::memory_order_relaxed);if (t >= b) {// fast check since we call pop() in each sched.// Stale _top which is smaller should not enter this branch.return false;}const size_t newb = b - 1;_bottom.store(newb, butil::memory_order_relaxed);butil::atomic_thread_fence(butil::memory_order_seq_cst);t = _top.load(butil::memory_order_relaxed);if (t > newb) {_bottom.store(b, butil::memory_order_relaxed);return false;}*val = _buffer[newb & (_capacity - 1)];if (t != newb) {return true;}// Single last element, compete with steal()const bool popped = _top.compare_exchange_strong(t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed);_bottom.store(b, butil::memory_order_relaxed);return popped;}// Steal one item from the queue.// Returns true on stolen.// May run in parallel with push() pop() or another steal().bool steal(T* val) {size_t t = _top.load(butil::memory_order_acquire);size_t b = _bottom.load(butil::memory_order_acquire);if (t >= b) {// Permit false negative for performance considerations.return false;}do {butil::atomic_thread_fence(butil::memory_order_seq_cst);b = _bottom.load(butil::memory_order_acquire);if (t >= b) {return false;}*val = _buffer[t & (_capacity - 1)];} while (!_top.compare_exchange_strong(t, t + 1,butil::memory_order_seq_cst,butil::memory_order_relaxed));return true;}size_t volatile_size() const {const size_t b = _bottom.load(butil::memory_order_relaxed);const size_t t = _top.load(butil::memory_order_relaxed);return (b <= t ? 0 : (b - t));}size_t capacity() const { return _capacity; }private:// Copying a concurrent structure makes no sense.DISALLOW_COPY_AND_ASSIGN(WorkStealingQueue); // 禁用拷贝构造与拷贝赋值butil::atomic<size_t> _bottom;size_t _capacity;T* _buffer;BAIDU_CACHELINE_ALIGNMENT butil::atomic<size_t> _top;
};} // namespace bthread#endif // BTHREAD_WORK_STEALING_QUEUE_H
taskflow同款的WorkStealingQueue(但是brpc提供的版本不支持动态扩容),push和pop仅可以在本线程内使用,外部线程通过steal来偷取任务。
因为该队列通过实现了一个循环队列,capacity一定要是2
的次方。
设capacity为C,假如循环队列下标i从0开始,不断增加
C在之前的定义中,必须满足其容量大小为2的倍数,而设置M = C-1
例如:当C = 8时(二进制位1000),M = 7(二进制位0111)
那么,当i = 0时,i & M = 0,此时,i = 0,o会被插入到S[0]
循环往复,直到i = C-1,即i=7,此时,i & M = M,此时,i = 7,o会被插入到S[7]
然后下一次插入,i = 8,i & M = 0,此时,i = 0,o会被插入到S[0],即形成了环形缓冲区
这个计算的目的在于简化对数组下标的处理,确保索引始终在有效范围内循环。
所以,b和t只会增加,不会减小,因为有了上面的逻辑,保证了增加的循环队列逻辑的正确性。
init
int init(size_t capacity) {if (_capacity != 0) {LOG(ERROR) << "Already initialized";return -1;}if (capacity == 0) {LOG(ERROR) << "Invalid capacity=" << capacity;return -1;}if (capacity & (capacity - 1)) {LOG(ERROR) << "Invalid capacity=" << capacity<< " which must be power of 2";return -1;}_buffer = new(std::nothrow) T[capacity];if (NULL == _buffer) {return -1;}_capacity = capacity;return 0;}
init函数非常简单,分配一个容量大小为sizeof(T) * capacity 的空间,并赋值给_buffer,伴随着检查一系列的capacity异常情况。
push
bool push(const T& x) {const size_t b = _bottom.load(butil::memory_order_relaxed);const size_t t = _top.load(butil::memory_order_acquire);if (b >= t + _capacity) { // Full queue.return false;}_buffer[b & (_capacity - 1)] = x;_bottom.store(b + 1, butil::memory_order_release);return true;}
push的逻辑也比较简单,每push一个元素,队列向b+1的方向增长。
pop和pop只会在本线程调用,所以不会有竞争。
因为steal可能会修改_top的值,所以需要使用memory_order_acquire内存序。
pop
// Pop an item from the queue.// Returns true on popped and the item is written to `val'.// May run in parallel with steal().// Never run in parallel with push() or another pop().bool pop(T* val) {const size_t b = _bottom.load(butil::memory_order_relaxed);size_t t = _top.load(butil::memory_order_relaxed);if (t >= b) {// fast check since we call pop() in each sched.// Stale _top which is smaller should not enter this branch.return false;}const size_t newb = b - 1;_bottom.store(newb, butil::memory_order_relaxed);butil::atomic_thread_fence(butil::memory_order_seq_cst);t = _top.load(butil::memory_order_relaxed);if (t > newb) {_bottom.store(b, butil::memory_order_relaxed);return false;}*val = _buffer[newb & (_capacity - 1)];if (t != newb) {return true;}// Single last element, compete with steal()const bool popped = _top.compare_exchange_strong(t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed);_bottom.store(b, butil::memory_order_relaxed);return popped;}
pop是线程内调用,且每次bthread的调度都会涉及到pop,所以作者设计了一个快速检查,先快速判断当前队列是否为空(不一定准确)
const size_t b = _bottom.load(butil::memory_order_relaxed);size_t t = _top.load(butil::memory_order_relaxed);if (t >= b) {// fast check since we call pop() in each sched.// Stale _top which is smaller should not enter this branch.return false;}
pop的逻辑是b回退,并判断队列是否为空,如果这次为空,则b恢复原状,并返回false;否则,给val赋值,此时,如果val指向的不是队列中唯一的元素(只有队列元素个数为1时才会和steal出现竞态),则返回true,表示pop成功。
const size_t newb = b - 1;_bottom.store(newb, butil::memory_order_relaxed);butil::atomic_thread_fence(butil::memory_order_seq_cst);t = _top.load(butil::memory_order_relaxed);if (t > newb) {_bottom.store(b, butil::memory_order_relaxed);return false;}*val = _buffer[newb & (_capacity - 1)];if (t != newb) {return true;}
当队列中只有一个元素的时候,就会和steal出现竞争关系,此时需要判断到底是谁抢到了该元素:
// Single last element, compete with steal()const bool popped = _top.compare_exchange_strong(t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed);_bottom.store(b, butil::memory_order_relaxed);
如果返回true,说明是pop提前执行,t此时也加了1,返回true,表示val的值可用; 如果是steal先pop之前更新了_top, 此时compare_exchange_strong返回false,最终返回false;但_top的值其实是等于 t + 1的(steal在pop之前更新了_top), 所以不管是谁抢到了,_bottom都会更新到newb+1的状态,相当于重设状态,_bottom和_top又再一次相等。
steal
最后讲一下steal, steal会和pop、push产生竞争,也会和其他steal产生竞争,它从队列的另外一个出口steal对象,即通过t++的方式获取对象,这种设计在很大程度上减少了pop和steal之间的竞争关系。
// Steal one item from the queue.// Returns true on stolen.// May run in parallel with push() pop() or another steal().bool steal(T* val) {size_t t = _top.load(butil::memory_order_acquire);size_t b = _bottom.load(butil::memory_order_acquire);if (t >= b) {// Permit false negative for performance considerations.return false;}do {butil::atomic_thread_fence(butil::memory_order_seq_cst);b = _bottom.load(butil::memory_order_acquire);if (t >= b) {return false;}*val = _buffer[t & (_capacity - 1)];} while (!_top.compare_exchange_strong(t, t + 1,butil::memory_order_seq_cst,butil::memory_order_relaxed));return true;}
首先,还是有一个处于性能考量,而进行提前判断的一个逻辑,不保证时效性(但是内存序用的挺高的呀。。memory_order_acquire):
size_t t = _top.load(butil::memory_order_acquire);size_t b = _bottom.load(butil::memory_order_acquire);if (t >= b) {// Permit false negative for performance considerations.return false;}
接下来steal函数会不断从队列中steal对象,直到队列为空,或者steal到对象:
do {butil::atomic_thread_fence(butil::memory_order_seq_cst);b = _bottom.load(butil::memory_order_acquire);if (t >= b) {return false;}*val = _buffer[t & (_capacity - 1)];} while (!_top.compare_exchange_strong(t, t + 1,butil::memory_order_seq_cst,butil::memory_order_relaxed));return true;
这里while (!_top.compare_exchange_strong(t, t + 1,butil::memory_order_seq_cst,butil::memory_order_relaxed));
包含了对t的更新,所以循环中仅需要更新b(细节请查看compare_exchange_strong的用法)。
简单使用
#include <atomic>
#include <iostream>
#include <thread>
#include "bthread/work_stealing_queue.h"
#include "bthread/bthread.h"
#include "butil/fast_rand.h"std::atomic<bool> flag{false};
bthread::WorkStealingQueue<int> queue;
void stealer() {int val;while (true) {if (queue.steal(&val)) {LOG(INFO) << "Stolen value: " << val;}bthread_usleep(1000);if(flag.load()) {break;}}
}
int main() {constexpr int N = 16;// 创建一个容量为N的工作窃取队列if (queue.init(N) != 0) {LOG(ERROR) << "Failed to initialize the work stealing queue.";return 1;}std::thread(stealer).detach();// 随机push 或者 pop 对象int cnt = 0, cur = 0;while (cnt++ < N) {if ( cnt < N && butil::fast_rand() % 2 == 0) {queue.push(++cur);LOG(INFO) << "Push " << cur;} else {int val = 0;if(queue.pop(&val)) {LOG(INFO) << "pop value: " << val;}}bthread_usleep(100);}while(queue.volatile_size() != 0) {int val = 0;if(queue.pop(&val)) {LOG(INFO) << "pop value: " << val;}bthread_usleep(100);}flag.store(true);return 0;
}
这篇关于brpc:WorkStealingQueue的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!