本文主要是介绍继续来,同我一起撸Kotlin Channel 深水区,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
前言
协程系列文章:
- 一个小故事讲明白进程、线程、Kotlin 协程到底啥关系?
- 少年,你可知 Kotlin 协程最初的样子?
- 讲真,Kotlin 协程的挂起/恢复没那么神秘(故事篇)
- 讲真,Kotlin 协程的挂起/恢复没那么神秘(原理篇)
- Kotlin 协程调度切换线程是时候解开真相了
- Kotlin 协程之线程池探索之旅(与Java线程池PK)
- Kotlin 协程之取消与异常处理探索之旅(上)
- Kotlin 协程之取消与异常处理探索之旅(下)
- 来,跟我一起撸Kotlin runBlocking/launch/join/async/delay 原理&使用
- 继续来,同我一起撸Kotlin Channel 深水区
- Kotlin 协程 Select:看我如何多路复用
- Kotlin Sequence 是时候派上用场了
- Kotlin Flow 背压和线程切换竟然如此相似
- Kotlin Flow啊,你将流向何方?
- Kotlin SharedFlow&StateFlow 热流到底有多热?
之前文章都是分析单个协程的原理、特性以及使用,本篇文章将着重分析协程间的通信方式。
通过本篇文章,你将了解到:
- Channel的引入及简单使用
- Channel的原理
- Channel四种类型深入解析
- produce/actor的使用与原理
1. Channel的引入及简单使用
初级版协程间通信
先来看一个简单的通信Demo:
fun testChannel() {//协程1var deferred= GlobalScope.async {//假装在加工数据Thread.sleep(2000)"Hello fishforest"}//协程2GlobalScope.launch {var result = deferred.await()println("get result from coroutine1: $result")}}
如上,协程2拿到了协程1的值,这就是一次简单的协程间通信过程。
现在需求变了,协程1一直在生产数据,协程2也需要不断地从中取数据,此时靠async/await 配合无能为力了。当然,我们很容易想到的方案是:
共享一个变量,这个变量可以是个队列。
于是Demo改造如下:
fun testChannel2() {//阻塞队列var queue = ArrayBlockingQueue<String>(5)//协程1GlobalScope.launch {var count = 0while (true) {//假装在加工数据Thread.sleep(1000)queue.put("fish ${count++}")}}//协程2GlobalScope.launch {while (true) {Thread.sleep(1000)println("get result from coroutine1:${queue.take()}")}}}
通过阻塞队列,当协程2取数据时,如果队列是空,那么等待协程1往队列里放数据;当协程1放数据时,如果队列满了,那么等待协程2从队列里取出数据。如此,就是简单的协程通信。
看似美好,实际上此处有个很大的漏洞:
队列满/队列空 时,此时等待动作阻塞的是线程,而我们知道协程的挂起并不阻塞线程,因此此种方式并没有利用到协程的优势。
我们期望协程发现队列满/空时将自己挂起等待,此时就引入了Channel。
升级版协程间通信-Channel
同样的需求,我们用Channel 实现:
fun testChannel3() {//定义Channelvar channel = Channel<String>()//协程1GlobalScope.launch {var count = 0while (true) {//假装在加工数据Thread.sleep(1000)var sendStr = "fish ${count++}"println("send $sendStr")channel.send("$sendStr")}}//协程2GlobalScope.launch {while (true) {Thread.sleep(1000)println("receive:${channel.receive()}")}}}
与之前的实现方案相比,仅仅只是将队列换成了Channel,可以看出,Channel 和队列比较类似,而Channel的send/recevie 函数并没有阻塞线程,仅仅只是挂起了协程。
查看打印结果:
你可能发现了端倪:发送者和接收者是成对出现的,难道Channel的内部实现不是队列?
要想解开这个谜题,最好的方法是从源码入手深究其原理。
2. Channel的原理
Channel的构造
先从Channel 构造开始:
#Channel.kt
public fun <E> Channel(//Channel 容量/叫做Channel类型更合理一些capacity: Int = Channel.RENDEZVOUS,//缓冲区满后,发送端的处理方式onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,//信息没有传递出去时的回调onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =when (capacity) {//默认是约会模式Channel.RENDEZVOUS -> {//默认挂起if (onBufferOverflow == BufferOverflow.SUSPEND)RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channelelseArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel}//...}
此处的Channel() 并不是构造函数,而是顶层函数,Kotlin里有很多伪装为构造函数的顶层函数。该顶层函数默认构造并返回RendezvousChannel类型的Channel。
RendezvousChannel 类本身很简单,就重写了一些属性,它继承自AbstractChannel。
重点在AbstractChannel/AbstractSendChannel及其子类里。
Channel的队列结构
AbstractSendChannel 里有个很重要的成员变量:
protected val queue = LockFreeLinkedListHead()
LockFreeLinkedListHead 继承自LockFreeLinkedListNode,而这个Node 我们在分析Kotlin 协程之取消与异常处理探索之旅(上) 有提及过,此处再拎出来说说。
先看其定义:
#LockFreeLinkedList.kt
public actual open class LockFreeLinkedListNode {//后驱指针private val _next = atomic<Any>(this) // Node | Removed | OpDescriptor//前驱指针private val _prev = atomic(this) //...
}
很典型的一个链表结构,并且是无锁链表,意思是它的插入/删除是无需上锁的,核心是使用了CAS。
回到Channel里的成员变量queue,初始的链表结构如下:
可以看出,当前节点的_next、_prev分别指向自己。
当往链表里面添加Node时,形成如下结构:
链表头为固定节点,通过它构造了双向循环链表。
AbstractSendChannel 里的queue 就是个链表头,通过它我们可以快速找到链表里的第1个节点(_next 指向的节点),也可以快速找到链表的最后一个节点(_prev指向的节点)。
于是形成了一个队列结构,每次往队列里放入数据,就放到链表的尾部,每次从队列里取数据,就从链表头后的第一个节点取。
Channel的send/receive
send 分析
#AbstractChannel.ktpublic final override suspend fun send(element: E) {//快速判断是否可以放入queue 队列//若能成功,则直接返回if (offerInternal(element) === kotlinx.coroutines.channels.OFFER_SUCCESS) return//不能退出,则挂起协程return sendSuspend(element)}protected open fun offerInternal(element: E): Any {while (true) {//先找到队列第一个Node节点,如果存在并且是Receive 类型,说明有接收者在等待val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED//给接收者协程赋值val token = receive.tryResumeReceive(element, null)if (token != null) {kotlinx.coroutines.assert { token === RESUME_TOKEN }//重新恢复接收者协程receive.completeResumeReceive(element)//返回插入的结果return receive.offerResult}}}private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable sc@ { cont ->//suspendCancellableCoroutineReusable 里有挂起协程的逻辑loop@ while (true) {if (isFullImpl) {//构造SendElement,它是Node类型 val send = if (onUndeliveredElement == null)//SendElement 有两个成员变量:1是具体的值,2是当前协程的封装体contSendElement(element, cont) elseSendElementWithUndeliveredHandler(element, cont, onUndeliveredElement)//将Element 加入到队列尾部val enqueueResult = enqueueSend(send)//插入成功,则返回}}}
用图表示以上流程:
接收者协程被恢复后,重新调度执行协程,而传入的值即为send发送的值,最终recevie返回的即是send过来的值。
对协程的挂起有疑惑请移步:讲真,Kotlin 协程的挂起没那么神秘(原理篇)
receive 分析
与send流程类似,就不贴代码了,仅用图表示:
可以看出,send/receive 通过判断queue的状态来决定是否挂起当前协程,而queue里的Node 又分为三种类型:
综合以上得出:
在RENDEZVOUS类型(默认类型)下,发送者协程需要等待接收者就位了(到队列里等待)才会继续往下走。同样的,接收者协程需要等待发送者就位了(到队列里等待)才会继续往下走。因此,形成的现象是发送者/接收者成对出现。
成对出现的场景,我们称RENDEZVOUS 为约会类型。
3. Channel四种类型深入解析
CONFLATED 类型
前面的分析是基于约会类型,实际上Channel还有其它类型,通过其构造过程可看出:
#AbstractChannel.kt
public fun <E> Channel(capacity: Int = Channel.RENDEZVOUS,onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =when (capacity) {//约会类型Channel.RENDEZVOUS -> {if (onBufferOverflow == BufferOverflow.SUSPEND)RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channelelse//转为缓冲类型ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel}//混合类型Channel.CONFLATED -> {//此种类型下必须是挂起模式require(onBufferOverflow == BufferOverflow.SUSPEND) {"CONFLATED capacity cannot be used with non-default onBufferOverflow"}ConflatedChannel(onUndeliveredElement)}//无限制类型Channel.UNLIMITED -> LinkedListChannel(onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows//缓冲类型Channel.BUFFERED -> ArrayChannel( // uses default capacity with SUSPENDif (onBufferOverflow == BufferOverflow.SUSPEND) Channel.CHANNEL_DEFAULT_CAPACITY else 1,onBufferOverflow, onUndeliveredElement)//没有指定具体类型是以上4种内的组合}
先看CONFLATED(混合)类型。
ConflatedChannel 继承自AbstractChannel,有个成员变量:value。
重点来看其重写的函数:offerInternal与pollInternal,分别对应send与receive的逻辑。
send 分析
#ConflatedChannel.kt
protected override fun offerInternal(element: E): Any {var receive: ReceiveOrClosed<E>? = null//先上锁lock.withLock {//如果value 为空,也就是之前没有发送过,说明可能有接收者在等待。if (value === kotlinx.coroutines.channels.EMPTY) {loop@ while(true) {//尝试取出接收者receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queuedif (receive is Closed) {//是关闭Node,直接返回return receive!!}//赋值给接收者协程val token = receive!!.tryResumeReceive(element, null)if (token != null) {//跳出锁return@withLock}}}//更新发送值到value里updateValueLocked(element)?.let { throw it }//成功插入return OFFER_SUCCESS}//如果找到接收者,则恢复接收者协程receive!!.completeResumeReceive(element)return receive!!.offerResult
}
receive 分析
#ConflatedChannel.kt
protected override fun pollInternal(): Any? {var result: Any? = null//上锁lock.withLock {//如果value 为空,说明没数据,取数据失败if (value === kotlinx.coroutines.channels.EMPTY) return closedForSend ?: POLL_FAILED//从value 里取数据result = value//恢复到无数据状态value = EMPTY}return result
}
由此可见:
在 CONFLATED类型下,发送者无需等待接收者就位,它可以一直更新数据。
BUFFERED 类型
此为缓冲类型,与其它类型最大的不同之处在于它内部有数据缓冲区。
ArrayChannel 继承自AbstractChannel,其成员变量:
//数据缓冲区private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 8)).apply { fill(EMPTY) }
重点函数还是offerInternal与pollInternal。
send 分析
#ArrayChannel.kt
protected override fun offerInternal(element: E): Any {var receive: ReceiveOrClosed<E>? = nulllock.withLock {//size 为buffer 当前的实际存储数据的个数val size = this.size.value//更新size,此处根据发送策略,有可能会直接退出updateBufferSize(size)?.let { return it }if (size == 0) {//当前缓冲区没有数据loop@ while (true) {//查看是否有接收者等待receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued//给接收者协程赋值val token = receive!!.tryResumeReceive(element, null)if (token != null) {//缓冲区数量不变this.size.value = sizereturn@withLock}}}//加入到缓冲队列enqueueElement(size, element)//插入成功return OFFER_SUCCESS}//恢复接收者协程receive!!.completeResumeReceive(element)return receive!!.offerResult
}private fun updateBufferSize(currentSize: Int): Symbol? {if (currentSize < capacity) {//还可以继续存放数据size.value = currentSize + 1 // tentatively put it into the bufferreturn null // proceed}//缓冲区满return when (onBufferOverflow) {//协程需要挂起BufferOverflow.SUSPEND -> OFFER_FAILED//舍弃最新数据,相当于发送永远是成功的BufferOverflow.DROP_LATEST -> OFFER_SUCCESS//舍弃旧的数据,发送继续走下面的流程BufferOverflow.DROP_OLDEST -> null // proceed, will drop oldest in enqueueElement}
}
receive 分析
receive 过程与send类似,就不贴源码了,直接上图对比:
可以看出,对于发送者来说:
先将数据放入数据缓冲队列,当缓冲区满后才会考虑是否需要挂起发送者协程。同样的,对于接收者来说,先从缓冲队列取数据,当缓冲区没数据时才会挂起自身。
UNLIMITED 类型
此类型为无限制类型,网上一些文章将此与BUFFERED类型类比,并归为“无限缓冲类型”,该说法是否正确,接下来一步步印证。
同样的,LinkedListChannel继承自AbstractChannel。
重点函数还是offerInternal与pollInternal。
send 分析
#LinkedListChannel.kt
protected override fun offerInternal(element: E): Any {while (true) {//快速查找是否有接收者等待val result = super.offerInternal(element)when {//找到接收者,插入算是成功result === OFFER_SUCCESS -> return OFFER_SUCCESS//没找到result === OFFER_FAILED -> {//加入到协程缓冲队列when (val sendResult = sendBuffered(element)) {null -> return OFFER_SUCCESSis Closed<*> -> return sendResult}}}}
}protected fun sendBuffered(element: E): ReceiveOrClosed<*>? {//将SendBuffered 加入到queue里(队尾)queue.addLastIfPrev(AbstractSendChannel.SendBuffered(element)) { prev ->if (prev is ReceiveOrClosed<*>) return@sendBuffered prevtrue}//添加成功return null
}
receive 分析
receive 过程完全依靠父类AbstractChannel完成,此处就不再赘述,用图表示:
可以看出:
此类型下,发送者不会挂起,会一直往队列里存放数据,理论上是可以无限制存放的。与BUFFERED类型不同的是,UNLIMITED 缓冲数据使用的是queue,它是链表。而BUFFERED 缓冲数据使用的是数组。
Channel 四种类型比对
对于接收者来说,只有一种逻辑:
有数据则消费数据,没数据则挂起等待。
4. produce/actor的使用与原理
使用
通过上面的分析,我们知道接收者有可能会阻塞,怎样才能让接收者知道数据已经发送完毕了呢?
答案是:Channel.close()。
当调用该函数时,会往queue里加入Closed节点,当send/receive 取出该节点时就知道Channel关闭了。
你说,能不能不手动调用该函数呢?刚好,produce可以解决该问题:
fun testProduce() {//返回接收者var receiveChannel = GlobalScope.produce<String> {//for (x in 1..5) {var sendStr = "fish $x"println("send $sendStr")send("$sendStr")}}//接收数据GlobalScope.launch {while (true) {println("job2 receive:${receiveChannel.receive()}")}println("job2 end")}GlobalScope.launch {while (true) {println("job3 receive:${receiveChannel.receive()}")}println("job3 end")}}
produce 函数返回Channel,在produce的协程体里可以发送数据,而通过返回的Channel,其它协程可以接收数据。当produce协程执行完毕后,将会主动调用close关闭Channel,其它Receive的Channel就会有感知,从而退出挂起状态。
这是一个典型的单生产者–多消费者的模型。
反之单消费者–多生产者的模型如下:
fun testActor() {//返回发送者var sendChannel = GlobalScope.actor<String> {//for (x in 1..5) {println("job1 receive:${receive()}")}println("actor end")}//发送者GlobalScope.launch {sendChannel.send("send from job2")}GlobalScope.launch {sendChannel.send("send from job3")}}
原理
produce和actor内部创建了RENDEZVOUS 类型的Channel,它们返回的Channel以及协程体里的Channel都是委托这个内部的Channel来完成功能的,并且Channel绑定了协程的生命周期,当协程取消时将会取消Channel。(由于篇幅原因,就不展开细说了,有兴趣可以自行阅读源码或是留言)。
下篇将着重分析Flow以及与LiveData的PK。
本文基于Kotlin 1.5.3,文中完整Demo请点击
您若喜欢,请点赞、关注、收藏,您的鼓励是我前进的动力
持续更新中,和我一起步步为营系统、深入学习Android/Kotlin
这篇关于继续来,同我一起撸Kotlin Channel 深水区的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!