协程(coroutine)应用实例:计时器过期事件响应

2024-06-16 22:08

本文主要是介绍协程(coroutine)应用实例:计时器过期事件响应,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

协程应用实例:计时器过期事件响应

    • 1. 计时调度中心
    • 2.基于协程的事件处理

  早期我曾把弄过War3 的WE编辑器,算是我编程的启蒙教育了。其事件响应系统在我心中一直印象深刻,特别是每个事件都可以用等待函数延迟执行,昨天我看到了协程,心血来潮便写了个简陋的计时器响应机制。

1. 计时调度中心

  计时调度中心采用linux时间轮式设计,网上资源很全面,不做过多叙述。
  详见: 基于Linux内核的时间轮算法设计实现【附代码】(https://cloud.tencent.com/developer/article/1553274)

2.基于协程的事件处理

  通常,在这种同时处理多个事件的情况下,一般是用 多线程? 执行事件响应函数(回调函数)的。
  考察实际应用方面,更多的是简短的事件响应,这样一来大部分的时间开销都用在创建/销毁线程对象上面了。
  当然为了解决这个问题,一种线程池技术被提出来。但仍然存在线程超过一定阈值时,大量时间被浪费在线程切换上所带来的问题。
  基于以上几点考虑,我决定用协程执行事件响应处理,同时也方面了等待函数的实现(不需要阻塞线程,仅仅只需要在一段时间后再度调度)。
  当然如果一个线程同时处理成千上万的协程压力过大,这时候平分到几个线程区执行就可以了。

协程返回值类型 promise_type,用于和协程交互

class co_call
{
public:class promise_type{public:using value_type = size_t;public:promise_type &get_return_object(){return *this;}auto initial_suspend(){return std::experimental::suspend_always{};}auto final_suspend(){// suspend it to save the coroutine handle return std::experimental::suspend_always{};}void yield_value(value_type _Value){_CurrentValue = _Value;}auto return_value(value_type _Value){_CurrentValue = _Value;return std::experimental::suspend_always{}; // dont suspend it}value_type operator *(void) const noexcept{return _CurrentValue;}public:value_type _CurrentValue;};using value_type = promise_type::value_type;
public:explicit co_call(promise_type &_Prom): _Coro(::std::experimental::coroutine_handle<promise_type>::from_promise(_Prom)),_Value(*_Prom){}co_call() = default;co_call(co_call const &) = delete;co_call(co_call &&_Right) : _Coro(_Right._Coro), _Value(_Right._Value){_Right._Coro = nullptr;_Right._Value = 0;}~co_call(){if (_Coro) {_Coro.destroy();}}
public:_NODISCARD value_type resume(){if (_Coro) {_Coro.resume();_Value = *_Coro.promise();if (_Coro.done() || (_Value == 0)){_Coro.destroy();_Coro = 0;_Value = 0;return _Value;}}return _Value;}public:co_call &operator=(co_call const &) = delete;co_call &operator=(co_call &&_Right){if (this != _STD addressof(_Right)) {_Coro = _Right._Coro;_Right._Coro = nullptr;_Value = _Right._Value;_Right._Value = 0;}return *this;}operator bool(void){return (_Coro != 0);}
private:::std::experimental::coroutine_handle<promise_type> _Coro = nullptr;value_type _Value = 0;
};

计时器对象,保存有协程句柄,如果事件存在等待函数则先行挂起返回一个等待时间,在一段时候后再次被调度,否则直接返回0

class _XTimer
{
public:public:_XTimer():task(0),id(0),handle(){}_XTimer(int c,int i):task(c),id(i),handle(){}~_XTimer(){}public:co_call on_event(size_t now){std::cout << "timer ID: " << id << ", BEGIN AT: " << now << ", WAIT: " << task << std::endl;// 挂起一段时间,让出时间片,执行其他COROUTINE_COT_WAIT(task);std::cout << "timer ID: " << id << ", TASK DONE! (AT:" << now << ") " <<  std::endl;_COT_NORET();}public:int task;int expires;int id;co_call handle;
};

下面是全部代码


#include <iostream>
#include <list>
#include <cassert>
#include <algorithm>
#include <mutex>
#include <experimental/coroutine>
#include <experimental/resumable>
#include <experimental/generator>
#include <Windows.h>
#include <thread>
#include <future>using namespace std;#define _COT_WAIT(x) (co_yield (x))
#define _COT_NORET() co_return (0)class co_call
{
public:class promise_type{public:using value_type = size_t;public:promise_type &get_return_object(){return *this;}auto initial_suspend(){return std::experimental::suspend_always{};}auto final_suspend(){// suspend it to save the coroutine handle return std::experimental::suspend_always{};}void yield_value(value_type _Value){_CurrentValue = _Value;}auto return_value(value_type _Value){_CurrentValue = _Value;return std::experimental::suspend_always{}; // dont suspend it}value_type operator *(void) const noexcept{return _CurrentValue;}public:value_type _CurrentValue;};using value_type = promise_type::value_type;
public:explicit co_call(promise_type &_Prom): _Coro(::std::experimental::coroutine_handle<promise_type>::from_promise(_Prom)),_Value(*_Prom){}co_call() = default;co_call(co_call const &) = delete;co_call(co_call &&_Right) : _Coro(_Right._Coro), _Value(_Right._Value){_Right._Coro = nullptr;_Right._Value = 0;}~co_call(){if (_Coro) {_Coro.destroy();}}
public:_NODISCARD value_type resume(){if (_Coro) {_Coro.resume();_Value = *_Coro.promise();if (_Coro.done() || (_Value == 0)){_Coro.destroy();_Coro = 0;_Value = 0;return _Value;}}return _Value;}public:co_call &operator=(co_call const &) = delete;co_call &operator=(co_call &&_Right){if (this != _STD addressof(_Right)) {_Coro = _Right._Coro;_Right._Coro = nullptr;_Value = _Right._Value;_Right._Value = 0;}return *this;}operator bool(void){return (_Coro != 0);}
private:::std::experimental::coroutine_handle<promise_type> _Coro = nullptr;value_type _Value = 0;
};class _XTimer
{
public:public:_XTimer():task(0),id(0),handle(){}_XTimer(int c,int i):task(c),id(i),handle(){}~_XTimer(){}public:co_call on_event(size_t now){std::cout << "timer ID: " << id << ", BEGIN AT: " << now << ", WAIT: " << task << std::endl;// 挂起一段时间,让出时间片,执行其他COROUTINE_COT_WAIT(task);std::cout << "timer ID: " << id << ", TASK DONE! (AT:" << now << ") " <<  std::endl;_COT_NORET();}public:int task;int expires;int id;co_call handle;
};class _XTimeDisp
{
public:using _Cont_ty = ::std::list<class _XTimer*>;static constexpr int _TVN_BITS = 4;static constexpr int _TVR_BITS = 6;static constexpr int _TVN_SIZE = 1 << _TVN_BITS;static constexpr int _TVR_SIZE = 1 << _TVR_BITS;static constexpr int _TVN_MASK = _TVN_SIZE - 1;static constexpr int _TVR_MASK = _TVR_SIZE - 1;public:_XTimeDisp():_tq_mutex(),tvec(),_sign_mutex(),_run_thread(),_sign(),_thread_state(0){}~_XTimeDisp(){}
public:static constexpr int _INDEX(int expires, int n){return ((expires >> (_TVR_BITS + n * _TVN_BITS)) & _TVN_MASK);}static constexpr int _OFFSET(int n){return (_TVR_SIZE + n*_TVN_SIZE);}
public:void add(_XTimer* timer){unsigned long long expires = timer->expires;unsigned long long index = expires - _Check_time;unsigned int _VecIndex = 0;if (index < _TVR_SIZE)		// tvec_1{tvec_1[expires & _TVR_MASK].push_back(timer);}else if (index < (1 << (_TVR_BITS + 1 * _TVN_BITS)))	// tvec_2{tvec_2[_INDEX(expires, 0)].push_back(timer);}else if (index <= 0)	// 异常处理,视为即将到期的计时器{tvec_1[_Check_time & _TVR_MASK].push_back(timer);}else {if (index > 0xFFFFFFFFUL){index = 0xFFFFFFFFUL;expires = index + _Check_time;}tvec_1[_INDEX(expires, 1)].push_back(timer);}}int cascade(int offset, int index){::std::unique_lock<::std::recursive_mutex>  _lock(_tq_mutex);_Cont_ty& list = tvec[offset + index];_Cont_ty empty;::std::swap(empty, list);for (auto it = empty.begin(); it != empty.end(); ++it){this->add(*it);}return index;}void tick(size_t _Now){::std::unique_lock<::std::recursive_mutex> _lock(_tq_mutex);while (_Check_time <= _Now){int index = _Check_time & _TVR_MASK;if (!index &&		// tv1!cascade(_OFFSET(0), _INDEX(_Check_time, 0)))	// tv2{cascade(_OFFSET(1), _INDEX(_Check_time, 1));	// tv3}++_Check_time;_Cont_ty& list = tvec[index];_Cont_ty empty;::std::swap(empty, list);for (auto it = empty.begin(); it != empty.end(); ++it){ // 如果有句柄说明处于挂起状态,继续执行if ((*it)->handle){auto ret = (*it)->handle.resume();if (ret != 0){(*it)->expires += ret;add(*it);}}else {// 视为第一次执行auto res = (*it)->on_event(_Now);auto ret = res.resume();if (ret != 0){(*it)->expires += ret;(*it)->handle = ::std::move(res);add(*it);}}}}}void run(){while (_thread_state == 1){tick(_Check_time);::std::unique_lock<::std::mutex> _nofity(_sign_mutex);_sign.wait(_nofity);}}void nofity(size_t now){_Check_time = now;_sign.notify_one();}void start(size_t time){if (_thread_state){return;}_Check_time = time;_thread_state = 1;_run_thread = ::std::thread(&_XTimeDisp::run, this);}void stop(){_thread_state = 0;_sign.notify_one();if (_run_thread.joinable()) {_run_thread.join();}}
public:size_t _Check_time;union{class{public:_Cont_ty tvec_1[_TVR_SIZE];_Cont_ty tvec_2[_TVN_SIZE];_Cont_ty tvec_3[_TVN_SIZE];};_Cont_ty tvec[_TVR_SIZE + 2 * _TVN_SIZE];};::std::recursive_mutex _tq_mutex;::std::mutex _sign_mutex;::std::condition_variable _sign;::std::thread _run_thread;int _thread_state;
};using timer = _XTimer;
using tdc = _XTimeDisp;void main()
{timer t1, t2,t3,t4;t1.id = 1;t1.expires = 0;t1.task = 300;t2.id = 2;t2.task = 100;t2.expires = 3;t3.id = 3;t3.task = 50;t3.expires = 3;t4.id = 4;t4.task = 30;t4.expires = 88;tdc tm;tm._Check_time = 0;tm.add(&t1);tm.add(&t2);tm.add(&t3);tm.add(&t4);tm.start(0);int tbegin = 0;while (1){::std::this_thread::sleep_for(::std::chrono::milliseconds(10));tm.nofity(tbegin);++tbegin;}system("pause");
}#undef _COT_WAIT
#undef _COT_NORET

运行截图
在这里插入图片描述
由于没有采取多线程Sleep阻塞,CPU利用自然是比较高的

这篇关于协程(coroutine)应用实例:计时器过期事件响应的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1067678

相关文章

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

禁止平板,iPad长按弹出默认菜单事件

通过监控按下抬起时间差来禁止弹出事件,把以下代码写在要禁止的页面的页面加载事件里面即可     var date;document.addEventListener('touchstart', event => {date = new Date().getTime();});document.addEventListener('touchend', event => {if (new

csu 1446 Problem J Modified LCS (扩展欧几里得算法的简单应用)

这是一道扩展欧几里得算法的简单应用题,这题是在湖南多校训练赛中队友ac的一道题,在比赛之后请教了队友,然后自己把它a掉 这也是自己独自做扩展欧几里得算法的题目 题意:把题意转变下就变成了:求d1*x - d2*y = f2 - f1的解,很明显用exgcd来解 下面介绍一下exgcd的一些知识点:求ax + by = c的解 一、首先求ax + by = gcd(a,b)的解 这个

hdu1394(线段树点更新的应用)

题意:求一个序列经过一定的操作得到的序列的最小逆序数 这题会用到逆序数的一个性质,在0到n-1这些数字组成的乱序排列,将第一个数字A移到最后一位,得到的逆序数为res-a+(n-a-1) 知道上面的知识点后,可以用暴力来解 代码如下: #include<iostream>#include<algorithm>#include<cstring>#include<stack>#in

zoj3820(树的直径的应用)

题意:在一颗树上找两个点,使得所有点到选择与其更近的一个点的距离的最大值最小。 思路:如果是选择一个点的话,那么点就是直径的中点。现在考虑两个点的情况,先求树的直径,再把直径最中间的边去掉,再求剩下的两个子树中直径的中点。 代码如下: #include <stdio.h>#include <string.h>#include <algorithm>#include <map>#

【区块链 + 人才服务】可信教育区块链治理系统 | FISCO BCOS应用案例

伴随着区块链技术的不断完善,其在教育信息化中的应用也在持续发展。利用区块链数据共识、不可篡改的特性, 将与教育相关的数据要素在区块链上进行存证确权,在确保数据可信的前提下,促进教育的公平、透明、开放,为教育教学质量提升赋能,实现教育数据的安全共享、高等教育体系的智慧治理。 可信教育区块链治理系统的顶层治理架构由教育部、高校、企业、学生等多方角色共同参与建设、维护,支撑教育资源共享、教学质量评估、

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

AI行业应用(不定期更新)

ChatPDF 可以让你上传一个 PDF 文件,然后针对这个 PDF 进行小结和提问。你可以把各种各样你要研究的分析报告交给它,快速获取到想要知道的信息。https://www.chatpdf.com/

【区块链 + 人才服务】区块链集成开发平台 | FISCO BCOS应用案例

随着区块链技术的快速发展,越来越多的企业开始将其应用于实际业务中。然而,区块链技术的专业性使得其集成开发成为一项挑战。针对此,广东中创智慧科技有限公司基于国产开源联盟链 FISCO BCOS 推出了区块链集成开发平台。该平台基于区块链技术,提供一套全面的区块链开发工具和开发环境,支持开发者快速开发和部署区块链应用。此外,该平台还可以提供一套全面的区块链开发教程和文档,帮助开发者快速上手区块链开发。