本文主要是介绍协程(coroutine)应用实例:计时器过期事件响应,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
协程应用实例:计时器过期事件响应
- 序
- 1. 计时调度中心
- 2.基于协程的事件处理
序
早期我曾把弄过War3 的WE编辑器,算是我编程的启蒙教育了。其事件响应系统在我心中一直印象深刻,特别是每个事件都可以用等待函数延迟执行,昨天我看到了协程,心血来潮便写了个简陋的计时器响应机制。
1. 计时调度中心
计时调度中心采用linux时间轮式设计,网上资源很全面,不做过多叙述。
详见: 基于Linux内核的时间轮算法设计实现【附代码】(https://cloud.tencent.com/developer/article/1553274)
2.基于协程的事件处理
通常,在这种同时处理多个事件的情况下,一般是用 多线程? 执行事件响应函数(回调函数)的。
考察实际应用方面,更多的是简短的事件响应,这样一来大部分的时间开销都用在创建/销毁线程对象上面了。
当然为了解决这个问题,一种线程池技术被提出来。但仍然存在线程超过一定阈值时,大量时间被浪费在线程切换上所带来的问题。
基于以上几点考虑,我决定用协程执行事件响应处理,同时也方面了等待函数的实现(不需要阻塞线程,仅仅只需要在一段时间后再度调度)。
当然如果一个线程同时处理成千上万的协程压力过大,这时候平分到几个线程区执行就可以了。
协程返回值类型 promise_type,用于和协程交互
class co_call
{
public:class promise_type{public:using value_type = size_t;public:promise_type &get_return_object(){return *this;}auto initial_suspend(){return std::experimental::suspend_always{};}auto final_suspend(){// suspend it to save the coroutine handle return std::experimental::suspend_always{};}void yield_value(value_type _Value){_CurrentValue = _Value;}auto return_value(value_type _Value){_CurrentValue = _Value;return std::experimental::suspend_always{}; // dont suspend it}value_type operator *(void) const noexcept{return _CurrentValue;}public:value_type _CurrentValue;};using value_type = promise_type::value_type;
public:explicit co_call(promise_type &_Prom): _Coro(::std::experimental::coroutine_handle<promise_type>::from_promise(_Prom)),_Value(*_Prom){}co_call() = default;co_call(co_call const &) = delete;co_call(co_call &&_Right) : _Coro(_Right._Coro), _Value(_Right._Value){_Right._Coro = nullptr;_Right._Value = 0;}~co_call(){if (_Coro) {_Coro.destroy();}}
public:_NODISCARD value_type resume(){if (_Coro) {_Coro.resume();_Value = *_Coro.promise();if (_Coro.done() || (_Value == 0)){_Coro.destroy();_Coro = 0;_Value = 0;return _Value;}}return _Value;}public:co_call &operator=(co_call const &) = delete;co_call &operator=(co_call &&_Right){if (this != _STD addressof(_Right)) {_Coro = _Right._Coro;_Right._Coro = nullptr;_Value = _Right._Value;_Right._Value = 0;}return *this;}operator bool(void){return (_Coro != 0);}
private:::std::experimental::coroutine_handle<promise_type> _Coro = nullptr;value_type _Value = 0;
};
计时器对象,保存有协程句柄,如果事件存在等待函数则先行挂起返回一个等待时间,在一段时候后再次被调度,否则直接返回0
class _XTimer
{
public:public:_XTimer():task(0),id(0),handle(){}_XTimer(int c,int i):task(c),id(i),handle(){}~_XTimer(){}public:co_call on_event(size_t now){std::cout << "timer ID: " << id << ", BEGIN AT: " << now << ", WAIT: " << task << std::endl;// 挂起一段时间,让出时间片,执行其他COROUTINE_COT_WAIT(task);std::cout << "timer ID: " << id << ", TASK DONE! (AT:" << now << ") " << std::endl;_COT_NORET();}public:int task;int expires;int id;co_call handle;
};
下面是全部代码
#include <iostream>
#include <list>
#include <cassert>
#include <algorithm>
#include <mutex>
#include <experimental/coroutine>
#include <experimental/resumable>
#include <experimental/generator>
#include <Windows.h>
#include <thread>
#include <future>using namespace std;#define _COT_WAIT(x) (co_yield (x))
#define _COT_NORET() co_return (0)class co_call
{
public:class promise_type{public:using value_type = size_t;public:promise_type &get_return_object(){return *this;}auto initial_suspend(){return std::experimental::suspend_always{};}auto final_suspend(){// suspend it to save the coroutine handle return std::experimental::suspend_always{};}void yield_value(value_type _Value){_CurrentValue = _Value;}auto return_value(value_type _Value){_CurrentValue = _Value;return std::experimental::suspend_always{}; // dont suspend it}value_type operator *(void) const noexcept{return _CurrentValue;}public:value_type _CurrentValue;};using value_type = promise_type::value_type;
public:explicit co_call(promise_type &_Prom): _Coro(::std::experimental::coroutine_handle<promise_type>::from_promise(_Prom)),_Value(*_Prom){}co_call() = default;co_call(co_call const &) = delete;co_call(co_call &&_Right) : _Coro(_Right._Coro), _Value(_Right._Value){_Right._Coro = nullptr;_Right._Value = 0;}~co_call(){if (_Coro) {_Coro.destroy();}}
public:_NODISCARD value_type resume(){if (_Coro) {_Coro.resume();_Value = *_Coro.promise();if (_Coro.done() || (_Value == 0)){_Coro.destroy();_Coro = 0;_Value = 0;return _Value;}}return _Value;}public:co_call &operator=(co_call const &) = delete;co_call &operator=(co_call &&_Right){if (this != _STD addressof(_Right)) {_Coro = _Right._Coro;_Right._Coro = nullptr;_Value = _Right._Value;_Right._Value = 0;}return *this;}operator bool(void){return (_Coro != 0);}
private:::std::experimental::coroutine_handle<promise_type> _Coro = nullptr;value_type _Value = 0;
};class _XTimer
{
public:public:_XTimer():task(0),id(0),handle(){}_XTimer(int c,int i):task(c),id(i),handle(){}~_XTimer(){}public:co_call on_event(size_t now){std::cout << "timer ID: " << id << ", BEGIN AT: " << now << ", WAIT: " << task << std::endl;// 挂起一段时间,让出时间片,执行其他COROUTINE_COT_WAIT(task);std::cout << "timer ID: " << id << ", TASK DONE! (AT:" << now << ") " << std::endl;_COT_NORET();}public:int task;int expires;int id;co_call handle;
};class _XTimeDisp
{
public:using _Cont_ty = ::std::list<class _XTimer*>;static constexpr int _TVN_BITS = 4;static constexpr int _TVR_BITS = 6;static constexpr int _TVN_SIZE = 1 << _TVN_BITS;static constexpr int _TVR_SIZE = 1 << _TVR_BITS;static constexpr int _TVN_MASK = _TVN_SIZE - 1;static constexpr int _TVR_MASK = _TVR_SIZE - 1;public:_XTimeDisp():_tq_mutex(),tvec(),_sign_mutex(),_run_thread(),_sign(),_thread_state(0){}~_XTimeDisp(){}
public:static constexpr int _INDEX(int expires, int n){return ((expires >> (_TVR_BITS + n * _TVN_BITS)) & _TVN_MASK);}static constexpr int _OFFSET(int n){return (_TVR_SIZE + n*_TVN_SIZE);}
public:void add(_XTimer* timer){unsigned long long expires = timer->expires;unsigned long long index = expires - _Check_time;unsigned int _VecIndex = 0;if (index < _TVR_SIZE) // tvec_1{tvec_1[expires & _TVR_MASK].push_back(timer);}else if (index < (1 << (_TVR_BITS + 1 * _TVN_BITS))) // tvec_2{tvec_2[_INDEX(expires, 0)].push_back(timer);}else if (index <= 0) // 异常处理,视为即将到期的计时器{tvec_1[_Check_time & _TVR_MASK].push_back(timer);}else {if (index > 0xFFFFFFFFUL){index = 0xFFFFFFFFUL;expires = index + _Check_time;}tvec_1[_INDEX(expires, 1)].push_back(timer);}}int cascade(int offset, int index){::std::unique_lock<::std::recursive_mutex> _lock(_tq_mutex);_Cont_ty& list = tvec[offset + index];_Cont_ty empty;::std::swap(empty, list);for (auto it = empty.begin(); it != empty.end(); ++it){this->add(*it);}return index;}void tick(size_t _Now){::std::unique_lock<::std::recursive_mutex> _lock(_tq_mutex);while (_Check_time <= _Now){int index = _Check_time & _TVR_MASK;if (!index && // tv1!cascade(_OFFSET(0), _INDEX(_Check_time, 0))) // tv2{cascade(_OFFSET(1), _INDEX(_Check_time, 1)); // tv3}++_Check_time;_Cont_ty& list = tvec[index];_Cont_ty empty;::std::swap(empty, list);for (auto it = empty.begin(); it != empty.end(); ++it){ // 如果有句柄说明处于挂起状态,继续执行if ((*it)->handle){auto ret = (*it)->handle.resume();if (ret != 0){(*it)->expires += ret;add(*it);}}else {// 视为第一次执行auto res = (*it)->on_event(_Now);auto ret = res.resume();if (ret != 0){(*it)->expires += ret;(*it)->handle = ::std::move(res);add(*it);}}}}}void run(){while (_thread_state == 1){tick(_Check_time);::std::unique_lock<::std::mutex> _nofity(_sign_mutex);_sign.wait(_nofity);}}void nofity(size_t now){_Check_time = now;_sign.notify_one();}void start(size_t time){if (_thread_state){return;}_Check_time = time;_thread_state = 1;_run_thread = ::std::thread(&_XTimeDisp::run, this);}void stop(){_thread_state = 0;_sign.notify_one();if (_run_thread.joinable()) {_run_thread.join();}}
public:size_t _Check_time;union{class{public:_Cont_ty tvec_1[_TVR_SIZE];_Cont_ty tvec_2[_TVN_SIZE];_Cont_ty tvec_3[_TVN_SIZE];};_Cont_ty tvec[_TVR_SIZE + 2 * _TVN_SIZE];};::std::recursive_mutex _tq_mutex;::std::mutex _sign_mutex;::std::condition_variable _sign;::std::thread _run_thread;int _thread_state;
};using timer = _XTimer;
using tdc = _XTimeDisp;void main()
{timer t1, t2,t3,t4;t1.id = 1;t1.expires = 0;t1.task = 300;t2.id = 2;t2.task = 100;t2.expires = 3;t3.id = 3;t3.task = 50;t3.expires = 3;t4.id = 4;t4.task = 30;t4.expires = 88;tdc tm;tm._Check_time = 0;tm.add(&t1);tm.add(&t2);tm.add(&t3);tm.add(&t4);tm.start(0);int tbegin = 0;while (1){::std::this_thread::sleep_for(::std::chrono::milliseconds(10));tm.nofity(tbegin);++tbegin;}system("pause");
}#undef _COT_WAIT
#undef _COT_NORET
运行截图
由于没有采取多线程Sleep阻塞,CPU利用自然是比较高的
这篇关于协程(coroutine)应用实例:计时器过期事件响应的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!