深潜Kotlin协程(十六):Channel

2023-11-28 17:59

本文主要是介绍深潜Kotlin协程(十六):Channel,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

系列电子书:传送门


Channel API 用于在协程间的原语通信。许多人把 channel 想象成 pipe(管道)。但我更喜欢一个不同的比喻,你熟悉用于交换书籍的公共书柜吗?一个人会在里面放上另一个人所需要找的书,这与 kotlinx.coroutines 的 Channel 非常相似。

在这里插入图片描述

Channel 支持任意数量的发送方和接收方。并且发送到 Channel 的每个值只会被一个协程接收(一次)。

···图片··

···图片··

Channel 是一个接口,它实现了另外两个接口:

  • SendChannel : 用于发送(添加元素)和关闭管道
  • ReceiveChannel : 用于接收元素
interface SendChannel<in E> {suspend fun send(element: E)fun close(): Boolean//...
}interface ReceiveChannel<out E> {suspend fun receive(): Efun cancel(cause: CancellationException? = null)// ...
}interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

由于这种区别,我们可以只暴露 ReceiveChannelSendChannel 来限制 channel 的入口/出口点。

你可能注意到了,sendreceive 都是挂起函数,这是一个基本特性:

  • 当我们尝试 receive 而 channel 中没有元素时,协程将被挂起,直到该元素可用。就像我们的“书柜”一样,当有人去书架上找一本书,而书架是空的时候,这个人就需挂起,直到有人在那里放了一个他要的书
  • 另一方面,当 channel 达到了容量阈值时,send 将会被挂起。我们很快就会看到,大多数 channel 的容量都是有限的。就像我们的“书柜”一样,当有人想把一本书放在书架上,而书架已经放满了书时,这个人就得挂起,直到有人拿走一本书,从而腾出空间

如果需要从非挂起函数中发送或接收信息,可以使用 trySendtryReceive。这两个操作都是即时的,并返回 ChannelResult,包含了有关操作成功或失败的结果信息。我们只能对容量有限的 channel 使用 trySendtryReceive,因为它们不适用于交会的 channel。

一个 channel 可以有任意数量的发送方和接收方,然而, channel 最常见的情况是两端只有一个协程。

··· 图片 ···

···图片··

想要看看 channel 的简单示例,我们需要在单独的协程中有一个生产者(发送方)和一个消费者(接收方)。生产者将发送元素,消费者将接收它们,以下是它们的实现方式:

suspend fun main(): Unit = coroutineScope {val channel = Channel<Int>()launch {repeat(5) { index ->delay(1000)println("Producing next one")channel.send(index * 2)}}launch {repeat(5) {val received = channel.receive()println(received)}}
}
// (1 sec)
// Producing next one
// 0
// (1 sec)
// Producing next one
// 2
// (1 sec)
// Producing next one
// 4
// (1 sec)
// Producing next one
// 6
// (1 sec
// Producing next one
// 8

这样的实现并不好。接收方需要知道发送方发送了多少个元素,所以上述的情况很少会发生,我们宁愿一直监听,直到发送者发送。要接收 channel 上的元素,可以使用 for循环 或 consumeEach 函数,它会一直监听发送直到 channel 关闭。

suspend fun main(): Unit = coroutineScope {val channel = Channel<Int>()launch {repeat(5) { index ->println("Producing next one")delay(1000)channel.send(index * 2)}channel.close()}launch {for (element in channel) {println(element)}// 或者// channel.consumeEach { element ->//    println(element)// }}
}

使用这种方式发送元素的常见问题是:很容易忘记关闭 channel,特别是在异常情况下。如果一个协程因为异常而停止生产,另一个协程将会永远的等待元素。使用 produce 函数要方便的多,它是一个返回 ReceiveChannel 的协程构建器。

// 这个函数将会创建一个 channel,并且一直在
// 上面生产正整数,直至输入的最大值
fun CoroutineScope.produceNumbers(max: Int
): ReceiveChannel<Int> = produce {var x = 0while (x < max) send(x++)
}

当协程以任何方式结束(完成、停止、取消)时, produce 函数都会关闭通道。多亏了这一点,我们永远不会忘记调用 closeproduce 构建器是一个非常受欢迎创建 channel 的方式,理由很充分:它提供了很多安全保障,并且方便。

suspend fun main(): Unit = coroutineScope {val channel = produce {repeat(5) { index ->println("Producing next one")delay(1000)send(index * 2)}}for (element in channel) {println(element)}
}

Channel 的类型

根据 channel 设置的容量大小,我们区分了四种类型的 channel:

  • 无限制 —— 容量设置为 Channel.UNLIMITED 的 channel,拥有一个无限的缓冲区,并且 send 永远不会挂起
  • 缓冲 —— 有一个具体数量的缓冲区,或者设置为 Channel.BUFFERED (默认为64,可以通过在 JVM 中设置 kotlinx.coruoutines.channels.defaultBuffer 系统属性来重写)的 channel
  • 交会(默认) —— 容量为0,或者设置为 Channel.RENDEZVOUS(等于0) 的 channel。这意味着只有在发送者和接收者相遇时,才会发生数据通信。(所以它像一个图书交换点,而不是一个书柜)
  • 合并 —— 容量为1,或者设置为 Channel.CONFLATED 的缓冲区,每个新元素会替换前一个元素

现在让我们来看看这些能力的实际应用。我们可以在 Channel 上设置它们,也可以在调用 produce 时设置。

我们将生产者快起来,接收者慢起来。在无限容量的情况下,channel 应该可以容纳所有的元素,然后让它们一个个的被接收处理。

suspend fun main(): Unit = coroutineScope {val channel = produce(capacity = Channel.UNLIMITED) {repeat(5) { index ->send(index * 2)delay(100)println("发送")}}delay(1000)for (element in channel) {println(element)delay(1000)}
}
// 发送
// (0.1 sec)
// 发送
// (0.1 sec)
// 发送
// (0.1 sec)
// 发送
// (0.1 sec)
// 发送
// (1 - 4 * 0.1 = 0.6 sec)
// 0
// (1 sec)
// 2
// (1 sec)
// 4
// (1 sec)
// 6
// (1 sec)
// 8
// (1 sec)

如果 channel 有具体的容量,我们首先生产到缓冲区满了,之后生产者将开始等待接收。

suspend fun main(): Unit = coroutineScope {val channel = produce(capacity = 3) {repeat(5) { index ->send(index * 2)delay(100)println("发送")}}delay(1000)for (element in channel) {println(element)delay(1000)}
}
// 发送
// (0.1 sec)
// 发送
// (0.1 sec)
// 发送
// (1 - 2 * 0.1 = 0.8 sec)
// 0
// 发送
// (1 sec)
// 2
// 发送
// (1 sec)
// 4
// (1 sec)
// 6
// (1 sec)
// 8
// (1 sec)

对于默认容量(Channel.RENDEZVOUS)的 channel,生产者将始终等待接收者。

suspend fun main(): Unit = coroutineScope {val channel = produce {// 或者 produce(capacity = Channel.RENDEZVOUS) {repeat(5) { index ->send(index * 2)delay(100)println("发送")}}delay(1000)for (element in channel) {println(element)delay(1000)}
}// 发送
// (1 sec)
// 2
// 发送
// (1 sec)
// 4
// 发送
// (1 sec)
// 6
// 发送
// (1 sec)
// 8
// 发送
// (1 sec)

最后,在使用 Channel.CONFLATED 时,我们不会存储过去的元素。新元素将会替换之前的元素,因此我们将只能接收最后一个元素,丢失之前发送的元素。

suspend fun main(): Unit = coroutineScope {val channel = produce(capacity = Channel.CONFLATED) {repeat(5) { index ->send(index * 2)delay(100)println("发送")}}delay(1000)for (element in channel) {println(element)delay(1000)}
}
// 发送
// (0.1 sec)
// 发送
// (0.1 sec)
// 发送
// (0.1 sec)
// 发送
// (0.1 sec)
// 发送
// (1 - 4 * 0.1 = 0.6 sec)
// 8

onBufferOverflow

为了进一步定制 channel,我们可以控制缓冲区堆满时发生的情况(onBufferOverflow 参数),有以下选项:

  • SUSPEND(默认) —— 当缓冲区塞满时,send 函数挂起
  • DROP_OLDEST —— 当缓冲区塞满时,删除最老的元素
  • DROP_LATEST —— 当缓冲区塞满时,删除最新的元素

正如你所想那样,channel 设置容量为 Channel.CONFLATED 的效果就等于设置容量数为1并且 onBufferOverFlowDROP_OLDEST。目前, produce 函数不允许我们自定义 onBufferOverflow,因此要设置它,我们需要使用 Channel 函数来定义一个 channel。

suspend fun main(): Unit = coroutineScope {val channel = Channel<Int>(capacity = 2,onBufferOverflow = BufferOverflow.DROP_OLDEST)launch {repeat(5) { index ->channel.send(index * 2)delay(100)println("发送")}channel.close()}delay(1000)for (element in channel) {println(element)delay(1000)}
}
// 发送
// (0.1 sec)  
// 发送
// (0.1 sec)
// 发送
// (0.1 sec)
// 发送
// (0.1 sec)
// 发送
// (1 - 4 * 0.1 = 0.6 sec)
// 6
// (1 sec)
// 8

onUndeleliveredElement

我们应该知道的另一个 Channel 函数的参数是 onUndeleliveredElement。当某个元素由于某些原因无法处理时会调用它。大多数情况下,它意味着 channel 被关闭或者取消后,有可能在 sendreceivereceiveOrNullhasNext 抛出错误时发生。我们通常使用它来关闭由该通道发送的资源。

val channel = Channel<Resource>(capacity) { resource ->resource.close()
}
// 或者
// val channel = Channel<Resource>(
//     capacity,
//     onUndeliveredElement = { resource ->
//         resource.close()
//     }
// )// 生产者代码
val resourceToSend = openResource()
channel.send(resourceToSend)// 消费者代码
val resourceReceived = channel.receive()
try {// 接收工作
} finally {resourceReceived.close()
}

Fan-out

多个协程可以从单个 channel 接收元素,然而,为了正确地接收它们,我们应该使用 for 循环(多个协程使用 consumeEach 是不安全的)。

···图片··

fun CoroutineScope.produceNumbers() = produce {repeat(10) {delay(100)send(it)}
}fun CoroutineScope.launchProcessor(id: Int,channel: ReceiveChannel<Int>
) = launch {for (msg in channel) {println("#$id received $msg")}
}suspend fun main(): Unit = coroutineScope {val channel = produceNumbers()repeat(3) { id ->delay(10)launchProcessor(id, channel)}
}
// #0 received 0
// #1 received 1
// #2 received 2
// #0 received 3
// #1 received 4
// #2 received 5
// #0 received 6
// ...

元素均匀分布。 channel 有一个 FIFO(先进先出)的协程队列等待一个元素。这就是为什么在上面的例子中,可以看到每个元素被下一个协程接收(0,1,2,0,1,2…)。

为了更好地理解为什么,想象一下幼儿园的孩子们在排队买糖果,一旦他们得到一些,他们就会立即吃掉它们,然后走到队列的最后一个位置。这样的分配是公平的(假设糖果的数量是孩子数量的倍数,并且假设他们的父母对孩子吃糖果没有什么意见)。

Fan-in

多个协程可以发送到同一个 channel。在下面的例子中,你可以看到两个协程将元素发送到同一个 channel。
···图片··

suspend fun sendString(channel: SendChannel<String>,text: String,time: Long
) {while (true) {delay(time)channel.send(text)}
}fun main() = runBlocking {val channel = Channel<String>()launch { sendString(channel, "foo", 200L) }launch { sendString(channel, "BAR!", 500L) }repeat(50) {println(channel.receive())}coroutineContext.cancelChildren()
}

有时候,我们需要将多个渠道合并为一个渠道,为此,你可能会发现下面的函数很有用,因为它使用 produce 合并多个 channel。

fun <T> CoroutineScope.fanIn(channels: List<ReceiveChannel<T>>
): ReceiveChannel<T> = produce {for (channel in channels) {launch {for (elem in channel) {send(elem)}}}
}

Pipelines

有时我们设置两个 channel,其中一个产生的元素是基于从另一个接收到元素。在这种情况下,我们称之为管道。

// 一个 Channel 发送从 1 到 3
fun CoroutineScope.numbers(): ReceiveChannel<Int> =produce {repeat(3) { num ->send(num + 1)}}fun CoroutineScope.square(numbers: ReceiveChannel<Int>) =produce {for (num in numbers) {send(num * num)}}suspend fun main() = coroutineScope {val numbers = numbers()val squared = square(numbers)for (num in squared) {println(num)}
}
// 1
// 4
// 9

Channel 是原语通信的

当不同的协程需要互相通信时, Channel 很有用,它们保证没有冲突(例如,共享状态没有问题)并且公平。

想象一下不同的咖啡师正在冲咖啡。每个咖啡师都应该是独立工作的协程。不同的咖啡类型需要不同准备时间,但我们希望按照顺序处理订单。解决这个问题最简单的方案是在 Channel 中同时发送订单和生成的咖啡结果,可以使用 produce 生成器定义咖啡师:

suspend fun CoroutineScope.serveOrders(orders: ReceiveChannel<Order>,baristaName: String
): ReceiveChannel<CoffeeResult> = produce {for (order in orders) {val coffee = prepareCoffee(order.type)send(CoffeeResult(coffee = coffee,customer = order.customer,baristaName = baristaName))}
}

当我们设置一个管道时,我们可以使用前面定义的 fanIn 函数将不同的咖啡师产生的结果合并为一个:

val coffeeResults = fanIn(serveOrders(ordersChannel, "Alex"),serveOrders(ordersChannel, "Bob"),serveOrders(ordersChannel, "Celine"),
)

在下一章中,你将会看到更多实际的例子。

Practical usage

实际使用情况

我们使用 channel 的一个典型情况是:一端产生值,另一端处理。这些例子包括响应用户点击、来自服务器的新通知或随着时间推移更新搜索结果(一个很好的例子是 SkyScanner,它通过查询多个航空公司网站来搜索最便宜的航班)。然而,在大多数情况下,最好使用 channelFlowcallbackFlow,它们都是 ChannelFlow 的混合体(我们将在构建 Flow 的章节中介绍它们)。

···图片···

纯粹形式而言,我发现 channel 在一些更复杂的情况下很有用。例如,假设我们正在维护一个线上商店,比如亚马逊。让我们假设你的服务器收到了大量卖家的更改产品信息的提交。对于每个更改,我们首先需要找到最新的报价列表,然后逐个更新它们。

···图片···

传统方法并不是最佳的,一个卖家甚至可能有成千上万的售价更改。在一个漫长的过程中完成这一切并不是一个好的主意。

首先,内部异常或服务器重启可能会让我们不知道停在哪里。其次,一个大卖家可能会阻塞服务器很长一段时间,从而让小卖家等待它们的更改被应用。此外,我们不应该同时发送太多的网络请求,以避免需要处理这些请求的服务(以及我们的网络接口)过载。

这个问题的解决方法可能是建立一个管道。第一个通道包含要处理的卖家,而第二个通道包含要更新的报价。这些通道会有一个缓冲区。当已经有太多的提交在等待时,第二个提交的缓冲区可以防止我们的服务有得更多的提交。因此,我们的服务器将能够平衡我们在同一时间更新的报价数量。

我们还可以很容易地添加一些中间步骤,例如删除重复项。通过定义在每个通道上监听的协程的数量,我们可以决定服务发送多少并发请求。控制这些参数给了我们很大的自由。还可以很容易地添加许多改进,如持久性(用于服务器重启的情况)或元素唯一性(用于卖家在前一个更改被处理之前进行另一个更改的情况)。

···图片···

// 一个简单的实现
suspend fun handleOfferUpdates() = coroutineScope {val sellerChannel = listenOnSellerChanges()val offerToUpdateChannel = produce(capacity = UNLIMITED) {repeat(NUMBER_OF_CONCURRENT_OFFER_SERVICE_REQUESTS) {launch {for (seller in sellerChannel) {val offers = offerService.requestOffers(seller.id)offers.forEach { send(it) }}}}}repeat(NUMBER_OF_CONCURRENT_UPDATE_SENDERS) {launch {for (offer in offerToUpdateChannel) {sendOfferUpdate(offer)}}}
}

总结

Channel 是一个强大的协程间原语通信的工具。它支持任意数量的发送方和接收方,并且发送到通道的每个值只能被接收一次。我们通常使用 produce 构建器来创建 channel,在 channel 中可以控制处理某些任务的协程数量。入金,我们最常用的是与 Flow 相关的 channel,这将在本书的后面介绍。

这篇关于深潜Kotlin协程(十六):Channel的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/430059

相关文章

使用协程实现高并发的I/O处理

文章目录 1. 协程简介1.1 什么是协程?1.2 协程的特点1.3 Python 中的协程 2. 协程的基本概念2.1 事件循环2.2 协程函数2.3 Future 对象 3. 使用协程实现高并发的 I/O 处理3.1 网络请求3.2 文件读写 4. 实际应用场景4.1 网络爬虫4.2 文件处理 5. 性能分析5.1 上下文切换开销5.2 I/O 等待时间 6. 最佳实践6.1 使用 as

Go Channel的实现

channel作为goroutine间通信和同步的重要途径,是Go runtime层实现CSP并发模型重要的成员。在不理解底层实现时,经常在使用中对channe相关语法的表现感到疑惑,尤其是select case的行为。因此在了解channel的应用前先看一眼channel的实现。 Channel内存布局 channel是go的内置类型,它可以被存储到变量中,可以作为函数的参数或返回值,它在r

Unity协程搭配队列开发Tips弹窗模块

概述 在Unity游戏开发过程中,提示系统是提升用户体验的重要组成部分。一个设计良好的提示窗口不仅能及时传达信息给玩家,还应当做到不干扰游戏流程。本文将探讨如何使用Unity的协程(Coroutine)配合队列(Queue)数据结构来构建一个高效且可扩展的Tips弹窗模块。 技术模块介绍 1. Unity协程(Coroutines) 协程是Unity中的一种特殊函数类型,允许异步操作的实现

Kotlin高阶函数与Lambda表达式及内联函数的介绍

目录 1、高阶函数1.1、什么是高阶函数?1.1.1、不带返回值的高阶函数1.1.2、带参数且带返回值的高阶函数1.1.3、与一般的函数进行比较 1.2、如何使用?1.3、高阶函数有什么作用? 2、Lambda表达式2.1、什么是Lambda表达式?2.1.1、无参数的写法2.1.2、有参数的写法2.1.3、有参数且有返回值的写法 2.2、如何使用?2.3、Lambda表达式有什么作用? 3

Unity实现自己的协程系统

概述:自定义Unity协程调度器(不依赖Mono)           实现了一个协程调度器,允许在程序中以非阻塞的方式调度协程。协程可以在满足特定条件后暂停和恢复,如等待特定的帧数、时间、或等待其他协程执行完毕。它的设计思想与Unity的协程机制类似,但它不依赖Unity的YieldInstruction,因此适用于非Unity环境。 协程可以在以下情况下暂停: "yield null ;

【go 通道】go语言通道channel

通过使用通道,在多个goroutine发送和接受共享的数据,达到数据同步的目的。 通道,他有点像在两个routine之间架设的管道,一个goroutine可以往这个管道里塞数据,另外一个可以从这个管道里取数据,有点类似于我们说的队列。 声明一个通道很简单,我们使用chan关键字即可,除此之外,还要指定通道中发送和接收数据的类型,这样我们才能知道,要发送什么类型的数据给通道,也知道从这个通道里可

android kotlin复习 Anonymous function 匿名函数

1、还是先上个图,新建kt: 2、代码: package com.jstonesoft.myapplication.testfun main(){val count = "helloworld".count()println(count);println("------------------------")var count2 = "helloworld".count(){it ==

android开发---Kotlin语言基础语法

目录 数据打印 变量 函数 程序逻辑控制   if  when 循环 数据打印 IDE采用的androidStudio 可自行官网下载 https://developer.android.google.cn/studio/archive?hl=zh-cn 新建项目 添加一个main方法,main()函数的左边出现了一个运行标志的小箭头。现在我们只要点击一下这个

Flink实战案例(十六):Flink 异步IO 简介

1 Aysnc I/O是啥? Async I/O 是阿里巴巴贡献给社区的一个呼声非常高的特性,于1.2版本引入。  主要目的:是为了解决与外部系统交互时网络延迟成为了系统瓶颈的问题。  场景: 流计算系统中经常需要与外部系统进行交互,比如需要查询外部数据库以关联上用户的额外信息。通常,我们的实现方式是向数据库发送用户a的查询请求(例如在MapFunction中),然后等待结果返回,在这

【硬刚ES】ES基础(十六)基于词项和基于全文的搜索

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的ES部分补充。 DELETE productsPUT products{"settings": {"number_of_shards": 1}}POST /products/_bulk{ "index": { "_id": 1 }}{ "productID" : "XHDK-