rust tokio select!宏详解

2023-11-27 08:52
文章标签 rust 详解 select tokio

本文主要是介绍rust tokio select!宏详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

rust tokio select!宏详解

简介

本文介绍Tokioselect!的用法,重点是使用过程中可能遇到的问题,比如阻塞问题、优先级问题、cancel safe问题。在Tokio 中,select! 是一个宏,用于同时等待多个异步任务,并在其中任意一个任务完成时执行相应的逻辑。

基本用法

如下代码演示了如何使用 Tokio 库实现一个异步的消息传递系统,其中包括三个无限通道和一个关闭通道。程序使用了 select! 宏来等待通道和关闭通道的事件,并在事件发生时执行相应的操作。

程序的主要步骤如下:

  1. 创建三个无限通道和一个用于传递关闭信号的通道。
  2. 向三个通道中发送一些数据。
  3. 开启一个异步任务并在两秒后发送关闭信号。
  4. 在主循环中使用 select! 宏等待通道和关闭通道的事件。
  5. 当一个通道接收到数据时,打印出数据。
  6. 当关闭通道接收到信号时,退出循环。

程序中的 select! 宏使用了类似于 match 的语法,但是它可以同时等待多个异步事件。当其中一个事件发生时,宏将执行相应的代码块,并跳出循环。在本例中,当一个通道接收到数据时,打印出数据;当关闭通道接收到信号时,退出循环。
select!经常与loop搭配使用,循环地从多个通道中接收事件并处理。

use std::time::Duration;use tokio::select;#[tokio::main]
async fn main() {let (sender1, mut receiver1) = tokio::sync::mpsc::unbounded_channel::<String>();let (sender2, mut receiver2) = tokio::sync::mpsc::unbounded_channel::<String>();let (sender3, mut receiver3) = tokio::sync::mpsc::unbounded_channel::<String>();let (shutdown_sender, mut shutdown_receiver) = tokio::sync::watch::channel(());for i in 0..3 {sender1.send(i.to_string()).unwrap();sender2.send(i.to_string()).unwrap();sender3.send(i.to_string()).unwrap();}tokio::spawn(async move {tokio::time::sleep(Duration::from_secs(2)).await;shutdown_sender.send(()).unwrap(); //两秒后关闭});loop {select! {ret = receiver1.recv() => {println!("channel 1 received: {:?}", ret);},ret = receiver2.recv() => {println!("channel 2 received: {:?}", ret);},ret = receiver3.recv() => {println!("channel 3 received: {:?}", ret);},_ = shutdown_receiver.changed() => {println!("shutdown received");break;}};}
}

可能遇到的坑

阻塞

select中的各个分支是并行执行的,这里的并行是指分支中的各个future在并行执行。不过一旦某个分支的future完成并进入了分支代码块,如果在分支代码中有一些阻塞的操作,则其他分支是没有机会执行的。
比如下面代码,在receiver1.recv()完成时,sleep了10s,sleep期间其他的分支是不会执行的。即使在2s后发送了shutdown信号,select!因为无法及时处理此信号,实际上循环也无法退出。

 loop {select! {ret = receiver1.recv() => {println!("channel 1 received: {:?}", ret);tokio::time::sleep(Duration::from_secs(10)).await;//这里等待期间,其他的分支是无法被执行的},ret = receiver2.recv() => {println!("channel 2 received: {:?}", ret);},ret = receiver3.recv() => {println!("channel 3 received: {:?}", ret);},_ = shutdown_receiver.changed() => {println!("shutdown received");break;}};}

这个坑在网络编程中比较容易踩到,比如select这里是从channel中取出上层应用传来的数据,并将其写入到socket中,而写socket的操作是有可能阻塞的,阻塞期间其他的分支是无法执行的。

顺序

1、默认情况下select中的各个分支执行顺序是随机的,比如上面例子中三个channel都有消息的情况下,具体去执行哪个分支是随机的。执行结果如下:
在这里插入图片描述
2、如果想要区分优先级,可以加标志biased,这样每次select将会按照从上到下的顺序去poll每个future,也就是说优先级顺序是从上往下的。比如某些场景下需要按优先级处理各个channel中的数据时这个特性就很有用。代码如下:

    loop {select! {biased;//按顺序优先执行ret = receiver1.recv() => {println!("channel 1 received: {:?}", ret);},ret = receiver2.recv() => {println!("channel 2 received: {:?}", ret);},ret = receiver3.recv() => {println!("channel 3 received: {:?}", ret);},_ = shutdown_receiver.changed() => {println!("shutdown received");break;}};}

运行结果如下:
在这里插入图片描述
3、顺序执行时注意饿死问题
添加了biased标志后,顺序靠前的future总是先被执行,在上述例子中,极端情况下如果靠前的channel总是有数据,那后面的channel就没有机会被执行。比如例子中如果前三个channel中一直有数据,那shutdown_receiver就无法收到shutdown信号,导致程序功能不符合预期。
解决这个问题很简单,就是把更关键的控制性的future放在最前方。

关于cancel safe

select!中如果某个分支future completed了,会将其他分支的future cancel掉,这个cancel操作要格外小心,因为如果future不是cancel safe的可能会丢数据。tokio的官方文档中给出了常见的cancel safe和不safefuture
那么如何判断自己实现的future是否是cancel safe的呢? 很简单、只需要思考如果future中的代码执行到.await时被cancel了,是否是安全的。我们来看下cancel unsafe的代码长啥样:

pub async fn read_and_write(mut message_recevier: UnboundedReceiver<Bytes>, mut file: File) {let message = message_recevier.recv().await.unwrap();file.write(&message).await.unwrap();
}

该方法从一个channel中读取消息,并将此消息写入到文件中,这个future就明显不是cancel safe的。为啥呢?试想一下,此futurechannel中读到消息之后,在写文件时被cancel掉了,那message岂不是就丢了。
实际项目中一定要格外小心这个cancel safe问题,很容易造成丢数据或者数据重复等不良反应,而且一旦出现了还很难复现、不太容易想到是这里的问题。网络编程中尤其要注意tokio::io::AsyncWriteExt::write_all不是cancel safe的,因为它内部可能是多次调用write操作才将所有缓冲区写入。

数量

1、首先select!中的分支仅支持显式地用代码书写,无法动态增减。就是说在写代码时select中的futures数量就固定了,程序运行过程中无法动态删减。
2、目前最多支持64个分支。

这篇关于rust tokio select!宏详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/427393

相关文章

Spring IoC 容器的使用详解(最新整理)

《SpringIoC容器的使用详解(最新整理)》文章介绍了Spring框架中的应用分层思想与IoC容器原理,通过分层解耦业务逻辑、数据访问等模块,IoC容器利用@Component注解管理Bean... 目录1. 应用分层2. IoC 的介绍3. IoC 容器的使用3.1. bean 的存储3.2. 方法注

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

Python内置函数之classmethod函数使用详解

《Python内置函数之classmethod函数使用详解》:本文主要介绍Python内置函数之classmethod函数使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 类方法定义与基本语法2. 类方法 vs 实例方法 vs 静态方法3. 核心特性与用法(1编程客

Python函数作用域示例详解

《Python函数作用域示例详解》本文介绍了Python中的LEGB作用域规则,详细解析了变量查找的四个层级,通过具体代码示例,展示了各层级的变量访问规则和特性,对python函数作用域相关知识感兴趣... 目录一、LEGB 规则二、作用域实例2.1 局部作用域(Local)2.2 闭包作用域(Enclos

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

Java内存分配与JVM参数详解(推荐)

《Java内存分配与JVM参数详解(推荐)》本文详解JVM内存结构与参数调整,涵盖堆分代、元空间、GC选择及优化策略,帮助开发者提升性能、避免内存泄漏,本文给大家介绍Java内存分配与JVM参数详解,... 目录引言JVM内存结构JVM参数概述堆内存分配年轻代与老年代调整堆内存大小调整年轻代与老年代比例元空

Python中注释使用方法举例详解

《Python中注释使用方法举例详解》在Python编程语言中注释是必不可少的一部分,它有助于提高代码的可读性和维护性,:本文主要介绍Python中注释使用方法的相关资料,需要的朋友可以参考下... 目录一、前言二、什么是注释?示例:三、单行注释语法:以 China编程# 开头,后面的内容为注释内容示例:示例:四

mysql表操作与查询功能详解

《mysql表操作与查询功能详解》本文系统讲解MySQL表操作与查询,涵盖创建、修改、复制表语法,基本查询结构及WHERE、GROUPBY等子句,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随... 目录01.表的操作1.1表操作概览1.2创建表1.3修改表1.4复制表02.基本查询操作2.1 SE

MySQL中的锁机制详解之全局锁,表级锁,行级锁

《MySQL中的锁机制详解之全局锁,表级锁,行级锁》MySQL锁机制通过全局、表级、行级锁控制并发,保障数据一致性与隔离性,全局锁适用于全库备份,表级锁适合读多写少场景,行级锁(InnoDB)实现高并... 目录一、锁机制基础:从并发问题到锁分类1.1 并发访问的三大问题1.2 锁的核心作用1.3 锁粒度分

MySQL数据库中ENUM的用法是什么详解

《MySQL数据库中ENUM的用法是什么详解》ENUM是一个字符串对象,用于指定一组预定义的值,并可在创建表时使用,下面:本文主要介绍MySQL数据库中ENUM的用法是什么的相关资料,文中通过代码... 目录mysql 中 ENUM 的用法一、ENUM 的定义与语法二、ENUM 的特点三、ENUM 的用法1