探究C++20协程(6)——实现协程之间消息传递

2024-04-25 18:44

本文主要是介绍探究C++20协程(6)——实现协程之间消息传递,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

之前主要关注的是协程与外部调用者的交互,这次也关注一下对等的协程之间的通信。

实现目标

在C++中实现协程的Channel相对复杂,因为C++标准库中并没有内置的协程间通信机制。C++20引入了协程支持,但主要提供了底层的协程操作,如协程的启动和暂停(通过co_await, co_return, 和 co_yield),并未直接提供Channel或其他高级并发原语。因此,实现一个C++的协程Channel需要依赖C++20的协程功能,结合额外的同步机制,如条件变量、互斥锁和原子操作等。

最终的 Channel 的用例如下:

Task<void, LooperExecutor> Producer(Channel<int> &channel) {int i = 0;while (i < 10) {// 写入时调用 write 函数co_await channel.write(i++);// 或者使用 << 运算符co_await (channel << i++);}// 支持关闭channel.close();
}Task<void, LooperExecutor> Consumer(Channel<int> &channel) {while (channel.is_active()) {try {// 读取时使用 read 函数,表达式的值就是读取的值auto received = co_await channel.read();int received;// 或者使用 >> 运算符将读取的值写入变量当中co_await (channel >> received);} catch (std::exception &e) {// 捕获 Channel 关闭时抛出的异常}}
}

co_await 表达式的支持

想要支持 co_await 表达式,只需要为 Channel 读写函数返回的 Awaiter 类型添加相应的 await_transform 函数。假定Channel的read 和 write 两个函数的返回值类型 ReaderAwaiter 和 WriterAwaiter,接下来就添加一个非常简单的 await_transform 的支持:

template<typename ResultType, typename Executor>
struct TaskPromise {template<typename _ValueType>auto await_transform(ReaderAwaiter<_ValueType> reader_awaiter) {reader_awaiter.executor = &executor;return reader_awaiter;}template<typename _ValueType>auto await_transform(WriterAwaiter<_ValueType> writer_awaiter) {writer_awaiter.executor = &executor;return writer_awaiter;}
}

由于 Channel 的 buffer 和对 Channel 的读写本身会决定协程是否挂起或恢复,因此这些逻辑都将在 Channel 当中给出,TaskPromise 能做的就是把调度器传过去,当协程恢复时使用。

Awaiter 的实现

Awaiter 负责在挂起时将自己存入 Channel,并且在需要时恢复协程。因此除了前面看到需要在恢复执行协程时的调度器之外,Awaiter 还需要持有 Channel、需要读写的值。

WriterAwaiter

template<typename ValueType>
struct WriterAwaiter {Channel<ValueType> *channel;// 调度器不是必须的,如果没有,则直接在当前线程执行(等价于 NoopExecutor)AbstractExecutor *executor = nullptr;// 写入 Channel 的值ValueType _value;std::coroutine_handle<> handle;WriterAwaiter(Channel<ValueType> *channel, ValueType value): channel(channel), _value(value) {}bool await_ready() {return false;}auto await_suspend(std::coroutine_handle<> coroutine_handle) {// 记录协程 handle,恢复时用this->handle = coroutine_handle;// 将自身传给 Channel,Channel 内部会根据自身状态处理是否立即恢复或者挂起channel->try_push_writer(this);}void await_resume() {// Channel 关闭时也会将挂起的读写协程恢复// 要检查是否是关闭引起的恢复,如果是,check_closed 会抛出 Channel 关闭异常channel->check_closed();}// Channel 当中恢复该协程时调用 resume 函数void resume() {// 我们将调度器调度的逻辑封装在这里if (executor) {executor->execute([this]() { handle.resume(); });} else {handle.resume();}}
};

ReaderAwaiter

template<typename ValueType>
struct ReaderAwaiter {Channel<ValueType> *channel;AbstractExecutor *executor = nullptr;ValueType _value;// 用于 channel >> received; 这种情况// 需要将变量的地址传入,协程恢复时写入变量内存ValueType* p_value = nullptr;std::coroutine_handle<> handle;explicit ReaderAwaiter(Channel<ValueType> *channel) : channel(channel) {}bool await_ready() { return false; }auto await_suspend(std::coroutine_handle<> coroutine_handle) {this->handle = coroutine_handle;// 将自身传给 Channel,Channel 内部会根据自身状态处理是否立即恢复或者挂起channel->try_push_reader(this);}int await_resume() {// Channel 关闭时也会将挂起的读写协程恢复// 要检查是否是关闭引起的恢复,如果是,check_closed 会抛出 Channel 关闭异常channel->check_closed();return _value;}// Channel 当中正常恢复读协程时调用 resume 函数void resume(ValueType value) {this->_value = value;if (p_value) {*p_value = value;}resume();}// Channel 关闭时调用 resume() 函数来恢复该协程// 在 await_resume 当中,如果 Channel 关闭,会抛出 Channel 关闭异常void resume() {if (executor) {executor->execute([this]() { handle.resume(); });} else {handle.resume();}}
};

Awaiter 的功能就是:负责用协程的调度器在需要时恢复协程,处理读写的值的传递(通过Channel)。

Channel 的实现

接下来给出 Channel 当中根据 buffer 的情况来处理读写两端的挂起和恢复的逻辑。

基本结构

template<typename ValueType>
struct Channel {... struct ChannelClosedException : std::exception {const char *what() const noexcept override {return "Channel is closed.";}};void check_closed() {// 如果已经关闭,则抛出异常if (!_is_active.load(std::memory_order_relaxed)) {throw ChannelClosedException();}}explicit Channel(int capacity = 0) : buffer_capacity(capacity) {_is_active.store(true, std::memory_order_relaxed);}// true 表示 Channel 尚未关闭bool is_active() {return _is_active.load(std::memory_order_relaxed);}// 关闭 Channelvoid close() {bool expect = true;// 判断如果已经关闭,则不再重复操作// 比较 _is_active 为 true 时才会完成设置操作,并且返回 trueif(_is_active.compare_exchange_strong(expect, false, std::memory_order_relaxed)) {// 清理资源clean_up();}}// 不希望 Channel 被移动或者复制Channel(Channel &&channel) = delete;Channel(Channel &) = delete;Channel &operator=(Channel &) = delete;// 销毁时关闭~Channel() {close();}private:// buffer 的容量int buffer_capacity;std::queue<ValueType> buffer;// buffer 已满时,新来的写入者需要挂起保存在这里等待恢复std::list<WriterAwaiter<ValueType> *> writer_list;// buffer 为空时,新来的读取者需要挂起保存在这里等待恢复std::list<ReaderAwaiter<ValueType> *> reader_list;// Channel 的状态标识std::atomic<bool> _is_active;std::mutex channel_lock;std::condition_variable channel_condition;void clean_up() {std::lock_guard lock(channel_lock);// 需要对已经挂起等待的协程予以恢复执行for (auto writer : writer_list) {writer->resume();}writer_list.clear();for (auto reader : reader_list) {reader->resume();}reader_list.clear();// 清空 bufferdecltype(buffer) empty_buffer;std::swap(buffer, empty_buffer);}
};

初始化和运行时:

  • 通道在创建时是开放的,可以进行数据的读写操作。
  • 当数据写入满足或读取可进行时,可能有等待的读写者被恢复执行。

关闭和清理:通道的关闭操作会触发资源的清理,包括清空缓冲区和恢复所有挂起的操作,确保没有线程或协程因通道关闭而无限期等待。

read 和 write

template<typename ValueType>
struct Channel {auto write(ValueType value) {check_closed();return WriterAwaiter<ValueType>(this, value);}auto operator<<(ValueType value) {return write(value);}auto read() {check_closed();return ReaderAwaiter<ValueType>(this);}auto operator>>(ValueType &value_ref) {auto awaiter =  read();// 保存待赋值的变量的地址,方便后续写入awaiter.p_value = &value_ref;return awaiter;}
}

write 方法:

  • 这个方法首先调用 check_closed() 检查通道是否已关闭。如果通道关闭,则会抛出 ChannelClosedException。
  • 若通道未关闭,方法将创建一个 WriterAwaiter 对象,这个对象负责管理写操作的挂起和恢复。WriterAwaiter 构造时接收通道自身的指针和要写入的值。

read 方法:

  • 类似于 write,read 方法首先检查通道是否已关闭,如果关闭,则抛出异常。
  • 如果通道开启,则创建并返回一个 ReaderAwaiter 对象,这个对象负责管理读操作的挂起和恢复。

这些对象会在协程尝试进行不可能立即完成的操作(如写入一个满的缓冲区或从空的缓冲区读取)时挂起协程。当操作变得可行时(如缓冲区有空间可写或有数据可读),相关的 Awaiter 会恢复协程的执行。

try_push_writer 和 try_push_reader

try_push_writer 调用时,意味着有一个新的写入者挂起准备写入值到 Channel 当中,这时候有以下几种情况:

  • Channel 当中有挂起的读取者,写入者直接将要写入的值传给读取者,恢复读取者,恢复写入者。
  • Channel 的 buffer 没满,写入者把值写入 buffer,然后立即恢复执行。
  • Channel 的 buffer 已满,则写入者被存入挂起列表(writer_list)等待新的读取者读取时再恢复。
void try_push_writer(WriterAwaiter<ValueType> *writer_awaiter) {std::unique_lock lock(channel_lock);check_closed();// 检查有没有挂起的读取者,对应情况 1if (!reader_list.empty()) {auto reader = reader_list.front();reader_list.pop_front();lock.unlock();reader->resume(writer_awaiter->_value);writer_awaiter->resume();return;}// buffer 未满,对应情况 2if (buffer.size() < buffer_capacity) {buffer.push(writer_awaiter->_value);lock.unlock();writer_awaiter->resume();return;}// buffer 已满,对应情况 3writer_list.push_back(writer_awaiter);
}

相对应的,try_push_reader 调用时,意味着有一个新的读取者挂起准备从 Channel 当中读取值,这时候有以下几种情况:

  • Channel 的 buffer 非空,读取者从 buffer 当中读取值,如果此时有挂起的写入者,需要去队头的写入者将值写入 buffer,然后立即恢复该写入者和当次的读取者。
  • Channel 当中有挂起的写入者,写入者直接将要写入的值传给读取者,恢复读取者,恢复写入者
  • Channel 的 buffer 为空,则读取者被存入挂起列表(reader_list)等待新的写入者写入时再恢复。
void try_push_reader(ReaderAwaiter<ValueType> *reader_awaiter) {std::unique_lock lock(channel_lock);check_closed();// buffer 非空,对应情况 1if (!buffer.empty()) {auto value = buffer.front();buffer.pop();if (!writer_list.empty()) {// 有挂起的写入者要及时将其写入 buffer 并恢复执行auto writer = writer_list.front();writer_list.pop_front();buffer.push(writer->_value);lock.unlock();writer->resume();} else {lock.unlock();}reader_awaiter->resume(value);return;}// 有写入者挂起,对应情况 2if (!writer_list.empty()) {auto writer = writer_list.front();writer_list.pop_front();lock.unlock();reader_awaiter->resume(writer->_value);writer->resume();return;}// buffer 为空,对应情况 3reader_list.push_back(reader_awaiter);
}

监听协程的提前销毁

观察上述代码,Channel 对象必须在持有 Channel 实例的协程退出之前关闭。在 Channel 当中持有了已经挂起的读写协程的 Awaiter 的指针,一旦协程销毁,这些 Awaiter 也会被销毁,Channel 在关闭时试图恢复这些读写协程时就会出现程序崩溃(访问了野指针)。

为了解决这个问题,需要在 Awaiter 销毁时主动将自己的指针从 Channel 当中移除。

template<typename ValueType>
struct ReaderAwaiter {ReaderAwaiter(ReaderAwaiter&& other) noexcept: channel(std::exchange(other.channel, nullptr)),executor(std::exchange(other.executor, nullptr)),_value(other._value),p_value(std::exchange(other.p_value, nullptr)),handle(other.handle) {}int await_resume() {auto channel = this->channel;this->channel = nullptr;channel->check_closed();return _value;}~ReaderAwaiter() {if (channel) channel->remove_reader(this);}
}

实现了移动构造函数,ReaderAwaiter在被移动后会将原对象的channel指针置为nullptr。原来的Awaiter对象不再与任何Channel关联,从而防止在原Awaiter对象被销毁时误操作已移走的资源。

协程恢复时将自身持有的channel指针置空。这是因为当协程由于await表达式被挂起后恢复执行时,await_resume()被调用以继续执行协程。将channel设置为nullptr之后,如果在后续的执行中再次错误地或意外地引用了channel,这将直接导致访问空指针错误而非进行无效或危险的操作。

在ReaderAwaiter的析构函数中,如果其channel成员变量仍然非空,表明该Awaiter可能在协程尚未恢复执行前被销毁(例如协程的异常退出或提前结束)。在这种情况下,Awaiter负责通知Channel从其等待列表中移除自己,确保Channel不会在未来尝试访问已经销毁的Awaiter。

对应的,Channel 当中也需要增加 remove_reader 函数:

template<typename ValueType>
struct Channel {void remove_reader(ReaderAwaiter<ValueType> *reader_awaiter) {// 并发环境,修改 reader_list 的操作都需要加锁std::lock_guard lock(channel_lock);reader_list.remove(reader_awaiter);}
}

WriterAwaiter 的修改类似,之后即使把正在等待读写 Channel 的协程提前结束销毁,也不会影响 Channel 的继续使用以及后续的正常关闭了。

结果展示

测试代码如下所示

Task<void, LooperExecutor> Producer(Channel<int>& channel) {int i = 0;while (i < 10) {debug("send: ", i);co_await(channel << i++);co_await 50ms;}co_await 5s;channel.close();debug("close channel, exit.");
}Task<void, LooperExecutor> Consumer(Channel<int>& channel) {while (channel.is_active()) {try {int received;co_await(channel >> received);debug("receive: ", received);co_await 500ms;}catch (std::exception& e) {//}}debug("exit.");
}Task<void, LooperExecutor> Consumer2(Channel<int>& channel) {while (channel.is_active()) {try {auto received = co_await channel.read();debug("receive2: ", received);co_await 300ms;}catch (std::exception& e) {//}}debug("exit.");
}
// co_wait 时间也会有run_loop exit.
void test_channel() {debug("test_channel()");auto channel = Channel<int>(5);auto producer = Producer(channel);auto consumer = Consumer(channel);auto consumer2 = Consumer2(channel);std::this_thread::sleep_for(10s);
}int main() {test_channel();return 0;
}

完整代码见个人github的Coroutines项目。

Current time: 19:47.300 18784 send:  0
Current time: 19:47.300 26408 receive2:  0
Current time: 19:47.356 18784 send:  1
Current time: 19:47.357 38656 receive:  1
Current time: 19:47.419 18784 send:  2
Current time: 19:47.482 18784 send:  3
Current time: 19:47.545 18784 send:  4
Current time: 19:47.607 18784 send:  5
Current time: 19:47.607 26408 receive2:  2
Current time: 19:47.669 18784 send:  6
Current time: 19:47.731 18784 send:  7
Current time: 19:47.791 18784 send:  8
Current time: 19:47.869 38656 receive:  3
Current time: 19:47.915 26408 receive2:  4
Current time: 19:47.931 18784 send:  9
Current time: 19:48.224 26408 receive2:  5
Current time: 19:48.379 38656 receive:  6
Current time: 19:48.532 26408 receive2:  7
Current time: 19:48.839 26408 receive2:  8
Current time: 19:48.886 38656 receive:  9

基本符合其中的等待时间和处理逻辑。

这篇关于探究C++20协程(6)——实现协程之间消息传递的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/935485

相关文章

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

【C++ Primer Plus习题】13.4

大家好,这里是国中之林! ❥前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。有兴趣的可以点点进去看看← 问题: 解答: main.cpp #include <iostream>#include "port.h"int main() {Port p1;Port p2("Abc", "Bcc", 30);std::cout <<

C++包装器

包装器 在 C++ 中,“包装器”通常指的是一种设计模式或编程技巧,用于封装其他代码或对象,使其更易于使用、管理或扩展。包装器的概念在编程中非常普遍,可以用于函数、类、库等多个方面。下面是几个常见的 “包装器” 类型: 1. 函数包装器 函数包装器用于封装一个或多个函数,使其接口更统一或更便于调用。例如,std::function 是一个通用的函数包装器,它可以存储任意可调用对象(函数、函数

C++11第三弹:lambda表达式 | 新的类功能 | 模板的可变参数

🌈个人主页: 南桥几晴秋 🌈C++专栏: 南桥谈C++ 🌈C语言专栏: C语言学习系列 🌈Linux学习专栏: 南桥谈Linux 🌈数据结构学习专栏: 数据结构杂谈 🌈数据库学习专栏: 南桥谈MySQL 🌈Qt学习专栏: 南桥谈Qt 🌈菜鸡代码练习: 练习随想记录 🌈git学习: 南桥谈Git 🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈�

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

让树莓派智能语音助手实现定时提醒功能

最初的时候是想直接在rasa 的chatbot上实现,因为rasa本身是带有remindschedule模块的。不过经过一番折腾后,忽然发现,chatbot上实现的定时,语音助手不一定会有响应。因为,我目前语音助手的代码设置了长时间无应答会结束对话,这样一来,chatbot定时提醒的触发就不会被语音助手获悉。那怎么让语音助手也具有定时提醒功能呢? 我最后选择的方法是用threading.Time

Android实现任意版本设置默认的锁屏壁纸和桌面壁纸(两张壁纸可不一致)

客户有些需求需要设置默认壁纸和锁屏壁纸  在默认情况下 这两个壁纸是相同的  如果需要默认的锁屏壁纸和桌面壁纸不一样 需要额外修改 Android13实现 替换默认桌面壁纸: 将图片文件替换frameworks/base/core/res/res/drawable-nodpi/default_wallpaper.*  (注意不能是bmp格式) 替换默认锁屏壁纸: 将图片资源放入vendo

06 C++Lambda表达式

lambda表达式的定义 没有显式模版形参的lambda表达式 [捕获] 前属性 (形参列表) 说明符 异常 后属性 尾随类型 约束 {函数体} 有显式模版形参的lambda表达式 [捕获] <模版形参> 模版约束 前属性 (形参列表) 说明符 异常 后属性 尾随类型 约束 {函数体} 含义 捕获:包含零个或者多个捕获符的逗号分隔列表 模板形参:用于泛型lambda提供个模板形参的名

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount