继续来,同我一起撸Kotlin Channel 深水区

2023-10-30 11:40

本文主要是介绍继续来,同我一起撸Kotlin Channel 深水区,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

协程系列文章:

  • 一个小故事讲明白进程、线程、Kotlin 协程到底啥关系?
  • 少年,你可知 Kotlin 协程最初的样子?
  • 讲真,Kotlin 协程的挂起/恢复没那么神秘(故事篇)
  • 讲真,Kotlin 协程的挂起/恢复没那么神秘(原理篇)
  • Kotlin 协程调度切换线程是时候解开真相了
  • Kotlin 协程之线程池探索之旅(与Java线程池PK)
  • Kotlin 协程之取消与异常处理探索之旅(上)
  • Kotlin 协程之取消与异常处理探索之旅(下)
  • 来,跟我一起撸Kotlin runBlocking/launch/join/async/delay 原理&使用
  • 继续来,同我一起撸Kotlin Channel 深水区
  • Kotlin 协程 Select:看我如何多路复用
  • Kotlin Sequence 是时候派上用场了
  • Kotlin Flow 背压和线程切换竟然如此相似
  • Kotlin Flow啊,你将流向何方?
  • Kotlin SharedFlow&StateFlow 热流到底有多热?

之前文章都是分析单个协程的原理、特性以及使用,本篇文章将着重分析协程间的通信方式。
通过本篇文章,你将了解到:

  1. Channel的引入及简单使用
  2. Channel的原理
  3. Channel四种类型深入解析
  4. produce/actor的使用与原理

1. Channel的引入及简单使用

初级版协程间通信

先来看一个简单的通信Demo:

    fun testChannel() {//协程1var deferred= GlobalScope.async {//假装在加工数据Thread.sleep(2000)"Hello fishforest"}//协程2GlobalScope.launch {var result = deferred.await()println("get result from coroutine1: $result")}}

如上,协程2拿到了协程1的值,这就是一次简单的协程间通信过程。
现在需求变了,协程1一直在生产数据,协程2也需要不断地从中取数据,此时靠async/await 配合无能为力了。当然,我们很容易想到的方案是:

共享一个变量,这个变量可以是个队列。

于是Demo改造如下:

    fun testChannel2() {//阻塞队列var queue = ArrayBlockingQueue<String>(5)//协程1GlobalScope.launch {var count = 0while (true) {//假装在加工数据Thread.sleep(1000)queue.put("fish ${count++}")}}//协程2GlobalScope.launch {while (true) {Thread.sleep(1000)println("get result from coroutine1:${queue.take()}")}}}

通过阻塞队列,当协程2取数据时,如果队列是空,那么等待协程1往队列里放数据;当协程1放数据时,如果队列满了,那么等待协程2从队列里取出数据。如此,就是简单的协程通信。
看似美好,实际上此处有个很大的漏洞:

队列满/队列空 时,此时等待动作阻塞的是线程,而我们知道协程的挂起并不阻塞线程,因此此种方式并没有利用到协程的优势。

我们期望协程发现队列满/空时将自己挂起等待,此时就引入了Channel。

升级版协程间通信-Channel

同样的需求,我们用Channel 实现:

    fun testChannel3() {//定义Channelvar channel = Channel<String>()//协程1GlobalScope.launch {var count = 0while (true) {//假装在加工数据Thread.sleep(1000)var sendStr = "fish ${count++}"println("send $sendStr")channel.send("$sendStr")}}//协程2GlobalScope.launch {while (true) {Thread.sleep(1000)println("receive:${channel.receive()}")}}}

与之前的实现方案相比,仅仅只是将队列换成了Channel,可以看出,Channel 和队列比较类似,而Channel的send/recevie 函数并没有阻塞线程,仅仅只是挂起了协程。
查看打印结果:
image.png
你可能发现了端倪:发送者和接收者是成对出现的,难道Channel的内部实现不是队列?
要想解开这个谜题,最好的方法是从源码入手深究其原理。

2. Channel的原理

Channel的构造

先从Channel 构造开始:

#Channel.kt
public fun <E> Channel(//Channel 容量/叫做Channel类型更合理一些capacity: Int = Channel.RENDEZVOUS,//缓冲区满后,发送端的处理方式onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,//信息没有传递出去时的回调onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =when (capacity) {//默认是约会模式Channel.RENDEZVOUS -> {//默认挂起if (onBufferOverflow == BufferOverflow.SUSPEND)RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channelelseArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel}//...}

此处的Channel() 并不是构造函数,而是顶层函数,Kotlin里有很多伪装为构造函数的顶层函数。该顶层函数默认构造并返回RendezvousChannel类型的Channel。
RendezvousChannel 类本身很简单,就重写了一些属性,它继承自AbstractChannel。
image.png

重点在AbstractChannel/AbstractSendChannel及其子类里。

Channel的队列结构

AbstractSendChannel 里有个很重要的成员变量:

    protected val queue = LockFreeLinkedListHead()

LockFreeLinkedListHead 继承自LockFreeLinkedListNode,而这个Node 我们在分析Kotlin 协程之取消与异常处理探索之旅(上) 有提及过,此处再拎出来说说。
先看其定义:

#LockFreeLinkedList.kt
public actual open class LockFreeLinkedListNode {//后驱指针private val _next = atomic<Any>(this) // Node | Removed | OpDescriptor//前驱指针private val _prev = atomic(this) //...
}

很典型的一个链表结构,并且是无锁链表,意思是它的插入/删除是无需上锁的,核心是使用了CAS。
回到Channel里的成员变量queue,初始的链表结构如下:
image.png
可以看出,当前节点的_next、_prev分别指向自己。

当往链表里面添加Node时,形成如下结构:
image.png

链表头为固定节点,通过它构造了双向循环链表。
AbstractSendChannel 里的queue 就是个链表头,通过它我们可以快速找到链表里的第1个节点(_next 指向的节点),也可以快速找到链表的最后一个节点(_prev指向的节点)。
于是形成了一个队列结构,每次往队列里放入数据,就放到链表的尾部,每次从队列里取数据,就从链表头后的第一个节点取。

Channel的send/receive

send 分析

#AbstractChannel.ktpublic final override suspend fun send(element: E) {//快速判断是否可以放入queue 队列//若能成功,则直接返回if (offerInternal(element) === kotlinx.coroutines.channels.OFFER_SUCCESS) return//不能退出,则挂起协程return sendSuspend(element)}protected open fun offerInternal(element: E): Any {while (true) {//先找到队列第一个Node节点,如果存在并且是Receive 类型,说明有接收者在等待val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED//给接收者协程赋值val token = receive.tryResumeReceive(element, null)if (token != null) {kotlinx.coroutines.assert { token === RESUME_TOKEN }//重新恢复接收者协程receive.completeResumeReceive(element)//返回插入的结果return receive.offerResult}}}private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable sc@ { cont ->//suspendCancellableCoroutineReusable 里有挂起协程的逻辑loop@ while (true) {if (isFullImpl) {//构造SendElement,它是Node类型 val send = if (onUndeliveredElement == null)//SendElement 有两个成员变量:1是具体的值,2是当前协程的封装体contSendElement(element, cont) elseSendElementWithUndeliveredHandler(element, cont, onUndeliveredElement)//将Element 加入到队列尾部val enqueueResult = enqueueSend(send)//插入成功,则返回}}}

用图表示以上流程:
image.png

接收者协程被恢复后,重新调度执行协程,而传入的值即为send发送的值,最终recevie返回的即是send过来的值。
对协程的挂起有疑惑请移步:讲真,Kotlin 协程的挂起没那么神秘(原理篇)

receive 分析
与send流程类似,就不贴代码了,仅用图表示:
image.png

可以看出,send/receive 通过判断queue的状态来决定是否挂起当前协程,而queue里的Node 又分为三种类型:
image.png

综合以上得出:

在RENDEZVOUS类型(默认类型)下,发送者协程需要等待接收者就位了(到队列里等待)才会继续往下走。同样的,接收者协程需要等待发送者就位了(到队列里等待)才会继续往下走。因此,形成的现象是发送者/接收者成对出现。

成对出现的场景,我们称RENDEZVOUS 为约会类型。

3. Channel四种类型深入解析

CONFLATED 类型

前面的分析是基于约会类型,实际上Channel还有其它类型,通过其构造过程可看出:

#AbstractChannel.kt
public fun <E> Channel(capacity: Int = Channel.RENDEZVOUS,onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =when (capacity) {//约会类型Channel.RENDEZVOUS -> {if (onBufferOverflow == BufferOverflow.SUSPEND)RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channelelse//转为缓冲类型ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel}//混合类型Channel.CONFLATED -> {//此种类型下必须是挂起模式require(onBufferOverflow == BufferOverflow.SUSPEND) {"CONFLATED capacity cannot be used with non-default onBufferOverflow"}ConflatedChannel(onUndeliveredElement)}//无限制类型Channel.UNLIMITED -> LinkedListChannel(onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows//缓冲类型Channel.BUFFERED -> ArrayChannel( // uses default capacity with SUSPENDif (onBufferOverflow == BufferOverflow.SUSPEND) Channel.CHANNEL_DEFAULT_CAPACITY else 1,onBufferOverflow, onUndeliveredElement)//没有指定具体类型是以上4种内的组合}

先看CONFLATED(混合)类型。
ConflatedChannel 继承自AbstractChannel,有个成员变量:value。
重点来看其重写的函数:offerInternal与pollInternal,分别对应send与receive的逻辑。

send 分析

#ConflatedChannel.kt
protected override fun offerInternal(element: E): Any {var receive: ReceiveOrClosed<E>? = null//先上锁lock.withLock {//如果value 为空,也就是之前没有发送过,说明可能有接收者在等待。if (value === kotlinx.coroutines.channels.EMPTY) {loop@ while(true) {//尝试取出接收者receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queuedif (receive is Closed) {//是关闭Node,直接返回return receive!!}//赋值给接收者协程val token = receive!!.tryResumeReceive(element, null)if (token != null) {//跳出锁return@withLock}}}//更新发送值到value里updateValueLocked(element)?.let { throw it }//成功插入return OFFER_SUCCESS}//如果找到接收者,则恢复接收者协程receive!!.completeResumeReceive(element)return receive!!.offerResult
}

receive 分析

#ConflatedChannel.kt
protected override fun pollInternal(): Any? {var result: Any? = null//上锁lock.withLock {//如果value 为空,说明没数据,取数据失败if (value === kotlinx.coroutines.channels.EMPTY) return closedForSend ?: POLL_FAILED//从value 里取数据result = value//恢复到无数据状态value = EMPTY}return result
}

image.png
由此可见:

在 CONFLATED类型下,发送者无需等待接收者就位,它可以一直更新数据。

BUFFERED 类型

此为缓冲类型,与其它类型最大的不同之处在于它内部有数据缓冲区。
ArrayChannel 继承自AbstractChannel,其成员变量:

//数据缓冲区private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 8)).apply { fill(EMPTY) }

重点函数还是offerInternal与pollInternal。
send 分析

#ArrayChannel.kt
protected override fun offerInternal(element: E): Any {var receive: ReceiveOrClosed<E>? = nulllock.withLock {//size 为buffer 当前的实际存储数据的个数val size = this.size.value//更新size,此处根据发送策略,有可能会直接退出updateBufferSize(size)?.let { return it }if (size == 0) {//当前缓冲区没有数据loop@ while (true) {//查看是否有接收者等待receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued//给接收者协程赋值val token = receive!!.tryResumeReceive(element, null)if (token != null) {//缓冲区数量不变this.size.value = sizereturn@withLock}}}//加入到缓冲队列enqueueElement(size, element)//插入成功return OFFER_SUCCESS}//恢复接收者协程receive!!.completeResumeReceive(element)return receive!!.offerResult
}private fun updateBufferSize(currentSize: Int): Symbol? {if (currentSize < capacity) {//还可以继续存放数据size.value = currentSize + 1 // tentatively put it into the bufferreturn null // proceed}//缓冲区满return when (onBufferOverflow) {//协程需要挂起BufferOverflow.SUSPEND -> OFFER_FAILED//舍弃最新数据,相当于发送永远是成功的BufferOverflow.DROP_LATEST -> OFFER_SUCCESS//舍弃旧的数据,发送继续走下面的流程BufferOverflow.DROP_OLDEST -> null // proceed, will drop oldest in enqueueElement}
}

receive 分析
receive 过程与send类似,就不贴源码了,直接上图对比:
image.png
可以看出,对于发送者来说:

先将数据放入数据缓冲队列,当缓冲区满后才会考虑是否需要挂起发送者协程。同样的,对于接收者来说,先从缓冲队列取数据,当缓冲区没数据时才会挂起自身。

UNLIMITED 类型

此类型为无限制类型,网上一些文章将此与BUFFERED类型类比,并归为“无限缓冲类型”,该说法是否正确,接下来一步步印证。
同样的,LinkedListChannel继承自AbstractChannel。
重点函数还是offerInternal与pollInternal。
send 分析

#LinkedListChannel.kt
protected override fun offerInternal(element: E): Any {while (true) {//快速查找是否有接收者等待val result = super.offerInternal(element)when {//找到接收者,插入算是成功result === OFFER_SUCCESS -> return OFFER_SUCCESS//没找到result === OFFER_FAILED -> {//加入到协程缓冲队列when (val sendResult = sendBuffered(element)) {null -> return OFFER_SUCCESSis Closed<*> -> return sendResult}}}}
}protected fun sendBuffered(element: E): ReceiveOrClosed<*>? {//将SendBuffered 加入到queue里(队尾)queue.addLastIfPrev(AbstractSendChannel.SendBuffered(element)) { prev ->if (prev is ReceiveOrClosed<*>) return@sendBuffered prevtrue}//添加成功return null
}

receive 分析
receive 过程完全依靠父类AbstractChannel完成,此处就不再赘述,用图表示:
B
可以看出:

此类型下,发送者不会挂起,会一直往队列里存放数据,理论上是可以无限制存放的。与BUFFERED类型不同的是,UNLIMITED 缓冲数据使用的是queue,它是链表。而BUFFERED 缓冲数据使用的是数组。

Channel 四种类型比对

image.png

对于接收者来说,只有一种逻辑:

有数据则消费数据,没数据则挂起等待。

4. produce/actor的使用与原理

使用

通过上面的分析,我们知道接收者有可能会阻塞,怎样才能让接收者知道数据已经发送完毕了呢?
答案是:Channel.close()。
当调用该函数时,会往queue里加入Closed节点,当send/receive 取出该节点时就知道Channel关闭了。
你说,能不能不手动调用该函数呢?刚好,produce可以解决该问题:

    fun testProduce() {//返回接收者var receiveChannel = GlobalScope.produce<String> {//for (x in 1..5) {var sendStr = "fish $x"println("send $sendStr")send("$sendStr")}}//接收数据GlobalScope.launch {while (true) {println("job2 receive:${receiveChannel.receive()}")}println("job2 end")}GlobalScope.launch {while (true) {println("job3 receive:${receiveChannel.receive()}")}println("job3 end")}}

produce 函数返回Channel,在produce的协程体里可以发送数据,而通过返回的Channel,其它协程可以接收数据。当produce协程执行完毕后,将会主动调用close关闭Channel,其它Receive的Channel就会有感知,从而退出挂起状态。
这是一个典型的单生产者–多消费者的模型。
反之单消费者–多生产者的模型如下:

    fun testActor() {//返回发送者var sendChannel = GlobalScope.actor<String> {//for (x in 1..5) {println("job1 receive:${receive()}")}println("actor end")}//发送者GlobalScope.launch {sendChannel.send("send from job2")}GlobalScope.launch {sendChannel.send("send from job3")}}

原理

produce和actor内部创建了RENDEZVOUS 类型的Channel,它们返回的Channel以及协程体里的Channel都是委托这个内部的Channel来完成功能的,并且Channel绑定了协程的生命周期,当协程取消时将会取消Channel。(由于篇幅原因,就不展开细说了,有兴趣可以自行阅读源码或是留言)。

下篇将着重分析Flow以及与LiveData的PK。

本文基于Kotlin 1.5.3,文中完整Demo请点击

您若喜欢,请点赞、关注、收藏,您的鼓励是我前进的动力

持续更新中,和我一起步步为营系统、深入学习Android/Kotlin

这篇关于继续来,同我一起撸Kotlin Channel 深水区的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/307681

相关文章

Go Channel的实现

channel作为goroutine间通信和同步的重要途径,是Go runtime层实现CSP并发模型重要的成员。在不理解底层实现时,经常在使用中对channe相关语法的表现感到疑惑,尤其是select case的行为。因此在了解channel的应用前先看一眼channel的实现。 Channel内存布局 channel是go的内置类型,它可以被存储到变量中,可以作为函数的参数或返回值,它在r

跟我一起玩《linux内核设计的艺术》第1章(四)——from setup.s to head.s,这回一定让main滚出来!(已解封)

看到书上1.3的大标题,以为马上就要见着main了,其实啊,还早着呢,光看setup.s和head.s的代码量就知道,跟bootsect.s没有可比性,真多……这确实需要包括我在内的大家多一些耐心,相信见着main后,大家的信心和干劲会上一个台阶,加油! 既然上篇已经玩转gdb,接下来的讲解肯定是边调试边分析书上的内容,纯理论讲解其实我并不在行。 setup.s: 目标:争取把setup.

Kotlin高阶函数与Lambda表达式及内联函数的介绍

目录 1、高阶函数1.1、什么是高阶函数?1.1.1、不带返回值的高阶函数1.1.2、带参数且带返回值的高阶函数1.1.3、与一般的函数进行比较 1.2、如何使用?1.3、高阶函数有什么作用? 2、Lambda表达式2.1、什么是Lambda表达式?2.1.1、无参数的写法2.1.2、有参数的写法2.1.3、有参数且有返回值的写法 2.2、如何使用?2.3、Lambda表达式有什么作用? 3

【go 通道】go语言通道channel

通过使用通道,在多个goroutine发送和接受共享的数据,达到数据同步的目的。 通道,他有点像在两个routine之间架设的管道,一个goroutine可以往这个管道里塞数据,另外一个可以从这个管道里取数据,有点类似于我们说的队列。 声明一个通道很简单,我们使用chan关键字即可,除此之外,还要指定通道中发送和接收数据的类型,这样我们才能知道,要发送什么类型的数据给通道,也知道从这个通道里可

android kotlin复习 Anonymous function 匿名函数

1、还是先上个图,新建kt: 2、代码: package com.jstonesoft.myapplication.testfun main(){val count = "helloworld".count()println(count);println("------------------------")var count2 = "helloworld".count(){it ==

社交平台找旅游搭子一起旅行靠谱吗?答案是不要太爽!

哈喽小伙伴们,今天要跟大家分享一个超级棒的小程序——咕哇找搭子!作为一个热爱自由行的人,最头疼的就是找不到志同道合的小伙伴。但自从用了这个咕哇小程序后,一切都变得简单又充满乐趣啦!🎉 上个月,我计划去云南旅行,就试着在咕哇上发布了我的行程信息。没想到很快就收到了几位朋友的回应,其中一位叫小莲的朋友特别投缘。我们不仅目的地一样,就连兴趣爱好都出奇地相似,于是我们就决定一起出发啦!👭

android开发---Kotlin语言基础语法

目录 数据打印 变量 函数 程序逻辑控制   if  when 循环 数据打印 IDE采用的androidStudio 可自行官网下载 https://developer.android.google.cn/studio/archive?hl=zh-cn 新建项目 添加一个main方法,main()函数的左边出现了一个运行标志的小箭头。现在我们只要点击一下这个

python打包exe如何把浏览器和geckodriver一起打包进去

一、目录结构:main.py同级目录下有一个浏览器包 二、调用浏览器的py修改:根据开发环境和打包环境选择浏览器和webdriver的路径 if getattr(sys, 'frozen', False):# 如果是打包的应用程序application_path = sys._MEIPASSelse:# 如果是开发环境application_path = os.path.dirna

猛兽财经:股价暴跌37.6%后,超微电脑股票还能继续投资吗?

来源:猛兽财经 作者:猛兽财经 S&P Global Market Intelligence的数据显示,超微电脑(SMCI)的股价在8月份遭受了两次重创(下跌了37.6%),目前的股价已经较3月份的高点低了64%。   超微电脑股价下跌的原因 第一个原因是,8月6日,超微电脑公布的2024财年第四季度业绩。由于超微电脑的销售成本增长高于收入增长,其收益也远低于华尔街的普遍预期和管理层的

eclipse中设置中文字体变大,注释字体变大,不跟代码字体一起变大

windows-preferences-general-appearance-colours and fonts 在basic里面找到最后TEXT FONT,点edit,在右下角脚本里面将西欧语言改成中欧语言 解决