协程(coroutine)应用实例:计时器过期事件响应

2024-06-16 22:08

本文主要是介绍协程(coroutine)应用实例:计时器过期事件响应,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

协程应用实例:计时器过期事件响应

    • 1. 计时调度中心
    • 2.基于协程的事件处理

  早期我曾把弄过War3 的WE编辑器,算是我编程的启蒙教育了。其事件响应系统在我心中一直印象深刻,特别是每个事件都可以用等待函数延迟执行,昨天我看到了协程,心血来潮便写了个简陋的计时器响应机制。

1. 计时调度中心

  计时调度中心采用linux时间轮式设计,网上资源很全面,不做过多叙述。
  详见: 基于Linux内核的时间轮算法设计实现【附代码】(https://cloud.tencent.com/developer/article/1553274)

2.基于协程的事件处理

  通常,在这种同时处理多个事件的情况下,一般是用 多线程? 执行事件响应函数(回调函数)的。
  考察实际应用方面,更多的是简短的事件响应,这样一来大部分的时间开销都用在创建/销毁线程对象上面了。
  当然为了解决这个问题,一种线程池技术被提出来。但仍然存在线程超过一定阈值时,大量时间被浪费在线程切换上所带来的问题。
  基于以上几点考虑,我决定用协程执行事件响应处理,同时也方面了等待函数的实现(不需要阻塞线程,仅仅只需要在一段时间后再度调度)。
  当然如果一个线程同时处理成千上万的协程压力过大,这时候平分到几个线程区执行就可以了。

协程返回值类型 promise_type,用于和协程交互

class co_call
{
public:class promise_type{public:using value_type = size_t;public:promise_type &get_return_object(){return *this;}auto initial_suspend(){return std::experimental::suspend_always{};}auto final_suspend(){// suspend it to save the coroutine handle return std::experimental::suspend_always{};}void yield_value(value_type _Value){_CurrentValue = _Value;}auto return_value(value_type _Value){_CurrentValue = _Value;return std::experimental::suspend_always{}; // dont suspend it}value_type operator *(void) const noexcept{return _CurrentValue;}public:value_type _CurrentValue;};using value_type = promise_type::value_type;
public:explicit co_call(promise_type &_Prom): _Coro(::std::experimental::coroutine_handle<promise_type>::from_promise(_Prom)),_Value(*_Prom){}co_call() = default;co_call(co_call const &) = delete;co_call(co_call &&_Right) : _Coro(_Right._Coro), _Value(_Right._Value){_Right._Coro = nullptr;_Right._Value = 0;}~co_call(){if (_Coro) {_Coro.destroy();}}
public:_NODISCARD value_type resume(){if (_Coro) {_Coro.resume();_Value = *_Coro.promise();if (_Coro.done() || (_Value == 0)){_Coro.destroy();_Coro = 0;_Value = 0;return _Value;}}return _Value;}public:co_call &operator=(co_call const &) = delete;co_call &operator=(co_call &&_Right){if (this != _STD addressof(_Right)) {_Coro = _Right._Coro;_Right._Coro = nullptr;_Value = _Right._Value;_Right._Value = 0;}return *this;}operator bool(void){return (_Coro != 0);}
private:::std::experimental::coroutine_handle<promise_type> _Coro = nullptr;value_type _Value = 0;
};

计时器对象,保存有协程句柄,如果事件存在等待函数则先行挂起返回一个等待时间,在一段时候后再次被调度,否则直接返回0

class _XTimer
{
public:public:_XTimer():task(0),id(0),handle(){}_XTimer(int c,int i):task(c),id(i),handle(){}~_XTimer(){}public:co_call on_event(size_t now){std::cout << "timer ID: " << id << ", BEGIN AT: " << now << ", WAIT: " << task << std::endl;// 挂起一段时间,让出时间片,执行其他COROUTINE_COT_WAIT(task);std::cout << "timer ID: " << id << ", TASK DONE! (AT:" << now << ") " <<  std::endl;_COT_NORET();}public:int task;int expires;int id;co_call handle;
};

下面是全部代码


#include <iostream>
#include <list>
#include <cassert>
#include <algorithm>
#include <mutex>
#include <experimental/coroutine>
#include <experimental/resumable>
#include <experimental/generator>
#include <Windows.h>
#include <thread>
#include <future>using namespace std;#define _COT_WAIT(x) (co_yield (x))
#define _COT_NORET() co_return (0)class co_call
{
public:class promise_type{public:using value_type = size_t;public:promise_type &get_return_object(){return *this;}auto initial_suspend(){return std::experimental::suspend_always{};}auto final_suspend(){// suspend it to save the coroutine handle return std::experimental::suspend_always{};}void yield_value(value_type _Value){_CurrentValue = _Value;}auto return_value(value_type _Value){_CurrentValue = _Value;return std::experimental::suspend_always{}; // dont suspend it}value_type operator *(void) const noexcept{return _CurrentValue;}public:value_type _CurrentValue;};using value_type = promise_type::value_type;
public:explicit co_call(promise_type &_Prom): _Coro(::std::experimental::coroutine_handle<promise_type>::from_promise(_Prom)),_Value(*_Prom){}co_call() = default;co_call(co_call const &) = delete;co_call(co_call &&_Right) : _Coro(_Right._Coro), _Value(_Right._Value){_Right._Coro = nullptr;_Right._Value = 0;}~co_call(){if (_Coro) {_Coro.destroy();}}
public:_NODISCARD value_type resume(){if (_Coro) {_Coro.resume();_Value = *_Coro.promise();if (_Coro.done() || (_Value == 0)){_Coro.destroy();_Coro = 0;_Value = 0;return _Value;}}return _Value;}public:co_call &operator=(co_call const &) = delete;co_call &operator=(co_call &&_Right){if (this != _STD addressof(_Right)) {_Coro = _Right._Coro;_Right._Coro = nullptr;_Value = _Right._Value;_Right._Value = 0;}return *this;}operator bool(void){return (_Coro != 0);}
private:::std::experimental::coroutine_handle<promise_type> _Coro = nullptr;value_type _Value = 0;
};class _XTimer
{
public:public:_XTimer():task(0),id(0),handle(){}_XTimer(int c,int i):task(c),id(i),handle(){}~_XTimer(){}public:co_call on_event(size_t now){std::cout << "timer ID: " << id << ", BEGIN AT: " << now << ", WAIT: " << task << std::endl;// 挂起一段时间,让出时间片,执行其他COROUTINE_COT_WAIT(task);std::cout << "timer ID: " << id << ", TASK DONE! (AT:" << now << ") " <<  std::endl;_COT_NORET();}public:int task;int expires;int id;co_call handle;
};class _XTimeDisp
{
public:using _Cont_ty = ::std::list<class _XTimer*>;static constexpr int _TVN_BITS = 4;static constexpr int _TVR_BITS = 6;static constexpr int _TVN_SIZE = 1 << _TVN_BITS;static constexpr int _TVR_SIZE = 1 << _TVR_BITS;static constexpr int _TVN_MASK = _TVN_SIZE - 1;static constexpr int _TVR_MASK = _TVR_SIZE - 1;public:_XTimeDisp():_tq_mutex(),tvec(),_sign_mutex(),_run_thread(),_sign(),_thread_state(0){}~_XTimeDisp(){}
public:static constexpr int _INDEX(int expires, int n){return ((expires >> (_TVR_BITS + n * _TVN_BITS)) & _TVN_MASK);}static constexpr int _OFFSET(int n){return (_TVR_SIZE + n*_TVN_SIZE);}
public:void add(_XTimer* timer){unsigned long long expires = timer->expires;unsigned long long index = expires - _Check_time;unsigned int _VecIndex = 0;if (index < _TVR_SIZE)		// tvec_1{tvec_1[expires & _TVR_MASK].push_back(timer);}else if (index < (1 << (_TVR_BITS + 1 * _TVN_BITS)))	// tvec_2{tvec_2[_INDEX(expires, 0)].push_back(timer);}else if (index <= 0)	// 异常处理,视为即将到期的计时器{tvec_1[_Check_time & _TVR_MASK].push_back(timer);}else {if (index > 0xFFFFFFFFUL){index = 0xFFFFFFFFUL;expires = index + _Check_time;}tvec_1[_INDEX(expires, 1)].push_back(timer);}}int cascade(int offset, int index){::std::unique_lock<::std::recursive_mutex>  _lock(_tq_mutex);_Cont_ty& list = tvec[offset + index];_Cont_ty empty;::std::swap(empty, list);for (auto it = empty.begin(); it != empty.end(); ++it){this->add(*it);}return index;}void tick(size_t _Now){::std::unique_lock<::std::recursive_mutex> _lock(_tq_mutex);while (_Check_time <= _Now){int index = _Check_time & _TVR_MASK;if (!index &&		// tv1!cascade(_OFFSET(0), _INDEX(_Check_time, 0)))	// tv2{cascade(_OFFSET(1), _INDEX(_Check_time, 1));	// tv3}++_Check_time;_Cont_ty& list = tvec[index];_Cont_ty empty;::std::swap(empty, list);for (auto it = empty.begin(); it != empty.end(); ++it){ // 如果有句柄说明处于挂起状态,继续执行if ((*it)->handle){auto ret = (*it)->handle.resume();if (ret != 0){(*it)->expires += ret;add(*it);}}else {// 视为第一次执行auto res = (*it)->on_event(_Now);auto ret = res.resume();if (ret != 0){(*it)->expires += ret;(*it)->handle = ::std::move(res);add(*it);}}}}}void run(){while (_thread_state == 1){tick(_Check_time);::std::unique_lock<::std::mutex> _nofity(_sign_mutex);_sign.wait(_nofity);}}void nofity(size_t now){_Check_time = now;_sign.notify_one();}void start(size_t time){if (_thread_state){return;}_Check_time = time;_thread_state = 1;_run_thread = ::std::thread(&_XTimeDisp::run, this);}void stop(){_thread_state = 0;_sign.notify_one();if (_run_thread.joinable()) {_run_thread.join();}}
public:size_t _Check_time;union{class{public:_Cont_ty tvec_1[_TVR_SIZE];_Cont_ty tvec_2[_TVN_SIZE];_Cont_ty tvec_3[_TVN_SIZE];};_Cont_ty tvec[_TVR_SIZE + 2 * _TVN_SIZE];};::std::recursive_mutex _tq_mutex;::std::mutex _sign_mutex;::std::condition_variable _sign;::std::thread _run_thread;int _thread_state;
};using timer = _XTimer;
using tdc = _XTimeDisp;void main()
{timer t1, t2,t3,t4;t1.id = 1;t1.expires = 0;t1.task = 300;t2.id = 2;t2.task = 100;t2.expires = 3;t3.id = 3;t3.task = 50;t3.expires = 3;t4.id = 4;t4.task = 30;t4.expires = 88;tdc tm;tm._Check_time = 0;tm.add(&t1);tm.add(&t2);tm.add(&t3);tm.add(&t4);tm.start(0);int tbegin = 0;while (1){::std::this_thread::sleep_for(::std::chrono::milliseconds(10));tm.nofity(tbegin);++tbegin;}system("pause");
}#undef _COT_WAIT
#undef _COT_NORET

运行截图
在这里插入图片描述
由于没有采取多线程Sleep阻塞,CPU利用自然是比较高的

这篇关于协程(coroutine)应用实例:计时器过期事件响应的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1067678

相关文章

nginx -t、nginx -s stop 和 nginx -s reload 命令的详细解析(结合应用场景)

《nginx-t、nginx-sstop和nginx-sreload命令的详细解析(结合应用场景)》本文解析Nginx的-t、-sstop、-sreload命令,分别用于配置语法检... 以下是关于 nginx -t、nginx -s stop 和 nginx -s reload 命令的详细解析,结合实际应

MySQL中的LENGTH()函数用法详解与实例分析

《MySQL中的LENGTH()函数用法详解与实例分析》MySQLLENGTH()函数用于计算字符串的字节长度,区别于CHAR_LENGTH()的字符长度,适用于多字节字符集(如UTF-8)的数据验证... 目录1. LENGTH()函数的基本语法2. LENGTH()函数的返回值2.1 示例1:计算字符串

PostgreSQL的扩展dict_int应用案例解析

《PostgreSQL的扩展dict_int应用案例解析》dict_int扩展为PostgreSQL提供了专业的整数文本处理能力,特别适合需要精确处理数字内容的搜索场景,本文给大家介绍PostgreS... 目录PostgreSQL的扩展dict_int一、扩展概述二、核心功能三、安装与启用四、字典配置方法

Python中re模块结合正则表达式的实际应用案例

《Python中re模块结合正则表达式的实际应用案例》Python中的re模块是用于处理正则表达式的强大工具,正则表达式是一种用来匹配字符串的模式,它可以在文本中搜索和匹配特定的字符串模式,这篇文章主... 目录前言re模块常用函数一、查看文本中是否包含 A 或 B 字符串二、替换多个关键词为统一格式三、提

Java MQTT实战应用

《JavaMQTT实战应用》本文详解MQTT协议,涵盖其发布/订阅机制、低功耗高效特性、三种服务质量等级(QoS0/1/2),以及客户端、代理、主题的核心概念,最后提供Linux部署教程、Sprin... 目录一、MQTT协议二、MQTT优点三、三种服务质量等级四、客户端、代理、主题1. 客户端(Clien

java向微信服务号发送消息的完整步骤实例

《java向微信服务号发送消息的完整步骤实例》:本文主要介绍java向微信服务号发送消息的相关资料,包括申请测试号获取appID/appsecret、关注公众号获取openID、配置消息模板及代码... 目录步骤1. 申请测试系统2. 公众号账号信息3. 关注测试号二维码4. 消息模板接口5. Java测试

MySQL数据库的内嵌函数和联合查询实例代码

《MySQL数据库的内嵌函数和联合查询实例代码》联合查询是一种将多个查询结果组合在一起的方法,通常使用UNION、UNIONALL、INTERSECT和EXCEPT关键字,下面:本文主要介绍MyS... 目录一.数据库的内嵌函数1.1聚合函数COUNT([DISTINCT] expr)SUM([DISTIN

CSS中的Static、Relative、Absolute、Fixed、Sticky的应用与详细对比

《CSS中的Static、Relative、Absolute、Fixed、Sticky的应用与详细对比》CSS中的position属性用于控制元素的定位方式,不同的定位方式会影响元素在页面中的布... css 中的 position 属性用于控制元素的定位方式,不同的定位方式会影响元素在页面中的布局和层叠关

SpringBoot3应用中集成和使用Spring Retry的实践记录

《SpringBoot3应用中集成和使用SpringRetry的实践记录》SpringRetry为SpringBoot3提供重试机制,支持注解和编程式两种方式,可配置重试策略与监听器,适用于临时性故... 目录1. 简介2. 环境准备3. 使用方式3.1 注解方式 基础使用自定义重试策略失败恢复机制注意事项

Python实例题之pygame开发打飞机游戏实例代码

《Python实例题之pygame开发打飞机游戏实例代码》对于python的学习者,能够写出一个飞机大战的程序代码,是不是感觉到非常的开心,:本文主要介绍Python实例题之pygame开发打飞机... 目录题目pygame-aircraft-game使用 Pygame 开发的打飞机游戏脚本代码解释初始化部