本文是对发布在 Medium 上的文章 Kotlin Coroutines Flow in a nutshell 的翻译。
请注意,虽然本翻译按照
CC BY-SA 4.0分发,但没有原作者的授权,我本质上不享有翻译权。所以如果想转载本文,你首先应该取得原作者的同意。
在我的上一篇文章中,我阐明了 RxJava 的工作原理。很长一段时间以来,在 Android 中,RxJava 是用于处理流和多线程的事实标准。但现在可以选择受 Google 推荐的,来自 JetBrains 的 Coroutines(协程)。
虽然很多开发者对 Coroutines 仍存疑虑,但新项目很少依赖于 RxJava。Flow 替代了它的位置。
本文关于……
在本文中,我会告诉你 Flow 如何工作。包含使用的基本准则和生命周期(lifecycle)管理,也就是说我们在讨论分配(Dispatching)。
本文的目标读者即包括第一次尝试理解 Flow 的人,也包括有一定经验的协程使用者。
链式调用
CoroutineScope(context = Dispatchers.Main.immediate).launch() { doAction() flowOf("Hey") .onEach { doAction() } .map { it.length } .onStart { doAction() } .flowOn(Dispatchers.Default) .flatMapMerge { doAction() flowOf(1) .flowOn(Dispatchers.Main) .onEach { doAction() } } .flowOn(Dispatchers.IO) .collect { doAction() } } }我们的目标是搞清楚每个动作实际的效果,以何顺序调用,在哪个线程上执行。
基础和生命周期
Flow 的定义只有两个接口:
public interface Flow<out T> { public suspend fun collect(collector: FlowCollector<T>) } public fun interface FlowCollector<in T> { public suspend fun emit(value: T) }这两个接口是消费者-生产者模式(Consumer & Producer pattern)的基础。
在继续之前,我强烈建议你查看基于这两个接口编写的流 API 示例。
每个 Flow 链代表一组特定操作。每次操作会创建新的 Flow 对象,同时也会存储先前调用的 Flow 实例的引用。在调用 collet 方法之前,运算不会开始(冷流)。
Flow 的生命周期历经 5 个重要阶段:
启动 ⬇️
一个协程在使用特定 Dispatcher 的 CoroutineScope 上启动。 之后:Flow 创建、操作收集 & 数据发射。最终结果将在指定的 Dispatcher 上处理。
Flow 创建 ⬇️
在当前线程,运算从上到下创建。(类似于 Builder 模式。)
操作收集 ⬆️
自下而上进行,每个操作符收集上一个。(译注:也就是创建对上一个操作的引用)
数据发射(Emission)⬇️
数据发射开始于所有操作被成功收集,并最终调用了
collect时。数据从上往下依次执行操作。取消或完成 整个链死亡 😵
链执行完毕或取消
让我们仔细看看每个阶段。
启动
// Scope 启动 val job = CoroutineScope(Dispatchers.Main.immediate).launch { doAction() /*.../* }一目了然。我们创建了一个在主线程上运行的协程作用域。doAction() 方法也在这个协程上启动。
Scope 返回了 Job,可以用于管理生命周期。(例如,调用 cancel() 停止全部工作。)
Immediate Dispatcher(调度器)的作用
译注:这里是 Android Only 的内容,对其他平台不一定适用。非 Android 开发者跳过此节没有影响。
在 Android 中,切换到主线程的唯一方式是使用 Handler/Looper/MessageQueue 链。
这个逻辑隐藏在 HandlerContext 里,同时这也是 Dispatcher.Main 的隐藏逻辑。
// 使用 Looper.mainLooper() 创建 handler override fun dispatch(context: CoroutineContext, block: Runnable) { if (!handler.post(block)) { cancelOnRejection(context, block) } }假设我们已经在主线程了,但我们仍尝试用 handler.post 切换。
在这种情况下代码不会立刻执行,因为可能会影响用户体验,比如,造成屏幕闪烁。
代码必须等待 MessageQueue 上的其他命令完成。Dispatcher.Main.immediate 主要作用是跳过该队列并立即执行。
Dispatcher 有一个 isDispatchNeeded 方法以解决问题。在 HandlerContext 中,该方法这样实现:
override fun isDispatchNeeded(context: CoroutineContext): Boolean { return !invokeImmediately || Looper.myLooper() != handler.looper }Dispatchers.Main.immediate 在 HandlerContext 创建新实例,它将 invokeImmediately 设为 true。因此,主线程的 Looper 将始终与当前线程的 Looper 比较,从而防止对 handler.post 的非必要调用。
Flow 创建
我们所写的 Flow 的第一个链是 flowOf("hey"),在底层,可以看到显式创建了 Flow 的实例,并将值存在 lambda 中。lambda 将会在收集阶段被调用。
import kotlinx.coroutines.flow.internal.unsafeFlow as flow public fun <T> flowOf(value: T): Flow<T> = flow { emit(value) } internal inline fun <T> unsafeFlow(crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> { return object : Flow<T> { override suspend fun collect(collector: FlowCollector<T>) { collector.block() } } }之后,onEach 方法也会以同样的方式创建。
总之,该拓展函数同样会保留对先前 Flow 的引用。
直到 collect() 的所有其他运算符都这样创建,同时不执行任何真正的运算。
在创建阶段链的行为:
flowOf("Hey")→ 缓存传递的值
onEach { doAction() }→ 缓存 lambda 并在发射阶段执行
map {...}→ 通过映射缓存 lambda
onStart { doAction() }→ 缓存 lambda 并在收集阶段执行
flowOn(Dispatchers.Default)→ 缓存赋值到该 Dispatcher 的操作
flatMapMerge {...}→ 缓存 lambda,并在发射阶段执行
flowOn(Dispatchers.IO)→ 缓存赋值到该 Dispatcher 的操作
最终,每个操作符,除了第一个,都保留对先前操作的引用,构成一个 LinkedList
创建操作将会在主线程执行。

收集
该过程自下而上进行,并在终端(terminal)操作符调用后立即开始。
Flow 中的终端操作:
collcet()first()toList()toSet()reduce()fold()
当我们调用 collect 时,不收集整条链,而只收集上方的 flowOn。
然后 flowOn 调用 collect 和自身之间的引用操作符,在这个例子里是 flatMapMerge。这就是为何操作符保留对上游引用的原因。
在收集阶段链的行为:
flowOn(Dispatchers.IO)→ 在 IO Dispatcher 上创建新的协程,改变创建的协程上下文 → 调用上游的 collect
flatMapMerge {...}→ 调用上游的 collectflowOn(Dispatchers.Default)→ 在 Default Dispatcher 上创建新的协程,改变创建的协程上下文 → 调用上游的 collect
onStart { doAction() }→ 在 Default Dispatcher 上执行操作 → 调用上游的 collect
map {...}→ 调用上游的 collect
onEach {...}→ 调用上游的 collect

在所有操作之中,只有 onStart 和 flowOn 被执行。
线程切换了两次:第一次到 IO,另一次是 Default。
也就是说 flowOn 将被执行多次,并创建一些协程实例。
⚠️ 然而,flowOn 并不总在底层创建新协程。看下面的例子:
CoroutineScope(Dispatchers.Main.immediate).launch { flowOf("Hey") .onStart { doAction() } .flowOn(Dispatchers.IO) .onStart { doAction() } .flowOn(Dispatchers.IO) .onStart { doAction() } .flowOn(Dispatchers.IO) .collect() }我们有意地写了多次切换到同样 Dispatcher 的 flowOn。
结果
// onStart1 _____________________________________ Job: ProducerCoroutine{Active}@53f45ab) Thread: DefaultDispatcher-worker-1,5,main // onStart2 _____________________________________ Job: ProducerCoroutine{Active}@53f45ab) Thread: DefaultDispatcher-worker-1,5,main // onStart3 _____________________________________ Job: ProducerCoroutine{Active}@53f45ab) Thread: DefaultDispatcher-worker-1,5,main如你所见,协程实例只被创建了一次,并且绑定了一个线程。
发射
一旦到达没有对其他 Flow 引用的 Flow,发射过程就开始了。从根 Flow 到最低。
flowOf("Hey")→ 发射
hey,Default DispatcheronEach { doAction() }→ 执行操作,Default Dispatcher
map {...}→ 映射,Default Dispatcher
onStart { doAction() }→ 发射
3,Default DispatcherflowOn(Dispathers.Default)→ 发射
3,Default DispatcherflatMapMerge { ... }这个操作比较棘手,回想一下它的内容:
// ... flatMapMerge { doAction() flowOf(1) .flowOn(Dispatchers.Main) .onEach { doAction() } }在
flatMapMerge里面的链也会经历创建、收集、发射。之后,最终的值会被发射到下流。
请注意,onEach 将在 IO Dispatcher 上执行。(在块执行之前恢复。)
与 RxJava 不同的是,Kotlin Flow 有上下文保留(Context Preservation)的概念,保证了上层流的上下文不会影响到下层流。
flowOn(Dispatchers.IO)→ 发射
1,IO Dispatchercollect→ 在主线程调用收集器,尽管协程上下文在上游中被改变

结论
collcet()方法被挂起,这迫使我们提前决定在哪个上下文中处理我们的链的结果。所有操作从上到下创建,从下到上收集。然后被一个
LinkedList组织起来。发射过程从根 Flow 开始,从上到下执行。一些操作可能在收集阶段执行。例如,写入多少值,
onStart就执行多少次。flowOn创建一个新的协程,在参数中传递 Dispatcher 并更改上下文。(但是,如果我们有多个 flowOn 使用同一个调度程序,则实际上只会创建一个协程。)它只影响上游,从而保证符合上下文保留原则。
协程的创建和上下文的改变都可以在收集和发射两个阶段执行。
一个线程可能被多个协程共用。 如果你写了
flowOn,如果当前上下文不同,肯定会创建一个新的协程。但是,不能保证线程不同。flatMapMerge/flatMapConcat仅在父链数据发射期间启动链。在根 Flow 收集过程中不执行任何操作。
在我的下一篇文章中,我将展示协程的底层,以及比常规线程更高效的原因。
敬请关注!
感谢 @kost.maksym 对内容的审阅。
