本文主要是介绍Kotlin 流 Flow,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
挂起函数可以异步地返回一个值,而对于返回多个值,可以使用流,使用 emit(x)
发射多个值,
collect { }
来收集值。
默认 流是冷的,只有 收集(collect) 时才会执行。
1. 流的创建
flow {}
生成流,emit(x)
来发射值;xxx.asFlow()
集合转成Flow;flowOf(1, 2, 3)
生成固定值的流。
1.1 flow {}
flow {}
里的 发射(emissions)默认是可取消的,对应 SafeFlow
,继承自 AbstractFlow
:
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {public final override suspend fun collect(collector: FlowCollector<T>) {val safeCollector = SafeCollector(collector, coroutineContext)try {collectSafely(safeCollector)} finally {safeCollector.releaseIntercepted()}}}
对应 emit()
方法,就是 SafeCollector.emit()
内部 调用了 currentContext.ensureActive()
做 取消检查。
而 其他两种,默认是 不可取消,使用 cancellable()
操作符可取消。
2. 流的操作符
2.1 中间操作符
map
filter
transform
2.2 结束操作符
collect {}
toList()
或toSet()
收集到集合first()
或single()
reduce()
或fold()
合并值
3. 流的上下文
默认 流 执行在 和 收集者(Collector) 相同的 上下文。
更改流的发射的上下文,必须使用 flowOn
,而不是在 flow {}
中使用 withContext()
。
flow {...
}.flowOn(Dispatchers.Default)
4. 缓冲
当 Flow emit 生产者速度 大于 collector 消费者速度时。
buffer()
并发地执行 发射 和 收集,而不是 顺序执行(发射 收集 再 发射 收集);conflate()
丢弃中间值,取最新发射的值;collectLatest { }
收集最新的值,但 如果 发射新值,会 取消 慢的收集。
simple().collectLatest { value -> // cancel & restart on the latest valueprintln("Collecting $value") delay(300) // pretend we are processing it for 300 msprintln("Done $value") }
说明:会执行所有 Collecting,但是 因为慢处理,会被取消。
结果:
Collecting 1
Collecting 2
Collecting 3
Done 3
https://kotlinlang.org/docs/flow.html#processing-the-latest-value
5. 流的组合
zip
1对1的 组合combine
每次上游更新,就会重新计算。两个流 生产速度不一样时,就会 不同的对应组合。更新值即组合
举例:
zip 组合值: 1->one, 2->two 3->three,
而 combine 组合,则可能 1->one 2->one 这样只有一方流 发射值,就会调用计算。
6. flatten 展平
对于 Flow
又对应 Flow<T>
任务,这时候对于 Flow<Flow<T>>
需要展开为Flow<T>
。场景就是 一序列 对应 请求任务。
fun requestFlow(i: Int): Flow<String> = flow {emit("$i: First")delay(500) // wait 500 msemit("$i: Second")
}(1..3).asFlow().map { requestFlow(it) }
flatMapConcat
按顺序,流完成后,才接着下一个流flatMapMerge
支持并发地处理,流 则 出现并发交错地收集值,concurrency
设置并发数flatMapLatest
处理最新的流,当新的流发射值时,取消之前的流
7. 异常
try { flow.collect {} } catch (e: Exception) { }
处理异常,包含收集器里代码异常;flow {}.catch { }.collect { }
处理 上游异常,但不会处理 下游 异常;flow {}.onEach { }.collect()
处理上下游异常。
7.1. try/catch
全部捕获
try {simple().collect {println("value: $it")}
} catch (e: Exception) {// 捕获了 flow发射代码块、中间操作符 和 结束操作符 的所有异常
}
7.2. catch
操作符
simple().catch { e ->// catch 捕获上面的异常,但 不处理 下游 和 结束操作符 的 异常println("exception: $e") }.collect {// 如果 这里异常,则不会被捕捉println("value: $it")}
7.3. 声明式捕获
如果想 捕获 结束操作符的异常,需要 声明式地捕捉。把 collect
的代码部分 上移到 onEach
中,使用无参的 collect()
收集:
simple().onEach {check(it < 2)println("value: $it")}.catch { e -> println("exception: $e")}.collect()
8. 完成
try/finnaly
在结束后处理flow {}.onCompletion { cause -> }
处理
flow {}.onCompletion { cause ->// 完成回调,cause 是空 表示 正常完成if (cause == null) {println("success")}
}
9. 取消
onEach
时检测cancel()
在收集时,调用取消flow {}.cancellable()
设置flow
可取消
// 不加 cancellable() 不会 做取消检查,导致完成收集后 才 报异常
// cancellable() 则会 及时取消
// flowOf(1, 2, 3)
listOf(1, 2, 3).asFlow()
// .cancellable().collect {if (it > 1) {cancel()}println("value: $it")}
cancellable()
的实现:
- CancellableFlowImpl
public fun <T> Flow<T>.cancellable(): Flow<T> =when (this) {is CancellableFlow<*> -> this // Fast-path, already cancellableelse -> CancellableFlowImpl(this)}private class CancellableFlowImpl<T>(private val flow: Flow<T>) : CancellableFlow<T> {override suspend fun collect(collector: FlowCollector<T>) {flow.collect {currentCoroutineContext().ensureActive()collector.emit(it)}}
}
文档
- Flow
- 异步流
- Kotlin Flow:掌握基本,征服应用,避开开发陷阱!
这篇关于Kotlin 流 Flow的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!