• 作者:老汪软件技巧
  • 发表时间:2024-08-29 21:02
  • 浏览量:

基本概念Flow 数据流(连续),关注的是同一个协程下的数据序列的持续发送与处理。SharedFlow 事件流(独立)关注的是事件序列向多个订阅者的一对多的,跨协程的通知StateFlow 状态流(独立)特殊的SharedFlow,提供状态订阅,本质上也是事件订阅,状态的改变也是事件Flow的功能定位

Flow,可以称之为协程版本的sequence,类似于队列的机制,队头插入数据,队尾取出数据,其实只要有这样的机制的数据结构我们都可以称之为队列,例如LinkedList。Sequence就是按照队列的方式来提供数据,并且按照提供的顺序来获取数据的Api,但和队列的区别在于,它不是数据结构,而是一种机制。因为它让我提供的是数据的规则,而不是数据的本身,我们用Sequence来提供的数据,实际上是动态生产的,用完一条才生产下一条,而不是像队列那样,提前把数据都准备好了,然后一个一个取用。当然队列也可以实现容量是1,就是生产一条取一条。不生产就没有。Sequence是从机制上就限制了,生产一条使用一条,使用完成之后再去生产一条新的数据的机制。

val nums = sequence {
  while (true) {
    yield(1)
  }
}

协程里边有一个yield函数,它是手动的让协程暂停一下,和线程的Thread.yield()是一样的定位。通过强行让自己暂停的方式让出当前暂用的线程。来手动调节线程占用的,本质上就是在当前时间片还没有到时间的时候,就主动放弃时间片。属于很底层的函数。Sequence里边也有yield的函数,它是用来生产数据的,和协程的yield完全无关。

利用sequence{}函数可以创建一个Sequence对象,在大括号里边用yeild()函数生产元素,然后使用遍历的方式来获取元素,这个有点类似List,Kotlin里边有个buildList。

  val list = buildList {
      add(1)
      add(2)
  }
  for (num in list) {
    println("List item: $num")
  }
  val nums = sequence {
      yield(1)
      yield(2)
  }
  for (num in nums) {
    println("sequence item: $num")
  }

Sequence并不是一种数据机构,它并不存在某种内部解结构来保存一堆数据,是一种机制,与List的最大区别就是前者是惰性的,也就是Lazy的,这也就导致他在创建的时候并不会真的把元素填进去,而是在遍历的时候才会真正填入数据,用到的时候才生产,声明的时候不会生产。而且它还是用一条才生产一条。例如上边的循环,如果我们遍历了第一个元素后就退出,那么第二条元素不会生产。List实在返回list对象的时候就已经把所有的数据装好了。由于这个特性,因此我们下边的这种写法,是可以的:

val list = buildList {
  while (true) { // list 永远无法执行完,直到内存溢出。
    add(getData())
  }
}
for (num in list) {
  println("List item: $num")
}
val nums = sequence {// sequence的机制保证了它的数据是使用者需要数据的时候才会生产。
  while (true) {
    yield(1)
  }
}
for (num in nums) {
  println("List item: $num")
}

我们也可以用list来创建Sequence,list.asSequence,一个基于list的Sequence,但本质上,它只是为Sequence的生产过程提供了预先准备好的生产数据,而不是提供了这个sequence的内部数据,Sequence只有生产规则,没有内部数据。,因为这种模式,从这里来看,Sequence相比于list,可以更快的获取与处理数据,开始操作具体数据的时间比list要快。虽然如果元素总数一样,遍历完所有的数据,list和Sequence是差不多的。

进一步澄清:

总结来说,Sequence 的优势在于它的惰性计算模型,可以在需要时逐步生成和处理数据,从而在某些场景下(如处理部分数据)比List 更高效。但这并不意味着Sequence 总是更快。如果需要处理所有元素,并且进行多次操作,Sequence 的性能可能与 List 相差不大,甚至在某些情况下可能会略慢(因为 Sequence 需要逐个元素处理,而 List 可以一次性完成)。因为在 Sequence 中,每个元素都是按需生成的,如果元素生成过程比较复杂或昂贵(比如涉及网络请求、大量计算等),且对于Sequence的元素的这些处理可能重复多次。那么这种按需生成的开销会在每次访问时累积起来。相比之下,List 是在创建时一次性完成所有数据的生成,并将数据保存在内存中,这样后续访问数据时就不会再有这些开销。

可以看出Sequence在有一条就处理一条,而不需要全部数据处理完了才有意义的业务场景,是比List更为适用的。 比如网络数据的持续获取和刷新。

在sequence里边,是不能调用不属于Sequence类的挂起函数,调用其余的挂起函数都会报错。为什么会有这个限制呢?这是因为kotlin团队希望Sequence使用协程来实现业务逻辑,但这套协程的逻辑是完全独立的、与世隔绝的,避免其他环境的协程代码进入执行时,影响Sequence的初始逻辑。从本质上讲,这种只能用自己挂起函数的限制,是提供了一种场景化的特权。

val nums = sequence {
  // delay() 调用就会报错。
  while (true) {
    yield(1)
    yieldAll()
  }
}
@SinceKotlin("1.3")
@Suppress("DEPRECATION")
public fun  sequence(@BuilderInference block: suspend SequenceScope.() -> Unit): Sequence = Sequence { iterator(block) }
@RestrictsSuspension // 这个注解导致SequenceScope只能使用自己的挂起函数。
@SinceKotlin("1.3")
public abstract class SequenceScope<in T> internal constructor() {

那么如何解除这种限制?可以使用Flow,它具有Sequence的逻辑,同时内部还可以使用协程。

val numsFlow = flow {
  while (true) {
    emit(getData())
  }
}.map { "number $it" }
val scope = CoroutineScope(EmptyCoroutineContext)
scope.launch {
  numsFlow.collect { // Flow的遍历使用collect函数。
    println(it)
  }
}

Sequence提供了一个边生产边消费的数据序列,而Flow就是协程版本的Sequence,提供了一个支持挂起函数的数据流,比如持续获取网络数据的数据流。

Flow的工作原理和应用场景

Flow的工作模式:设定好一套逻辑,在每个collect地方都重复执行一遍这个逻辑,在这套逻辑里边安插的一些发送数据的节点,而collect执行这一套逻辑的时候,对于每一条数据都会执行自己设定好的数据处理逻辑,这就是Flow的本质。由Flow对象来提供生产数据流的生产逻辑,然后在收集流程里执行这套生产逻辑,并处理每条数据,所以说Flow就是一个数据流工具。Kotlin官方所说的Channel是冷流,Flow是热流,其实这里的冷和热就是说数据的生成是独立的,还是和数据收集有关的。Channel有他自己独立的生产线,调用send就会生产一条数据,跟是否调用receive无关。这就是所谓的hot,热流;而Flow存的只是生产规则,而真正的生产一定是在每次Collect的时候才开始的,每次collect都会有一次完整的生产流程。这就是所谓的cold,冷流。 channel在不取数据的时候就已经启动生产流程了,他是独立的、统一化的生产流程。而Flow只会在collect的时候才会生产,即便有多个协程,每一个协程内部的collect是互不干扰的、独立的。因此,可以看出,热,就是指没收集数据的时候可以生产数据,而冷就是指开始收集数据的时候才会生产。Flow的执行过程中,虽然我们的代码是这么写的,但是实际上emit的代码执行时机是在scope.launch {}代码块内,不过还是有一定的区别的,emit的时候flow会检查coroutineContext,发送数据的时候不允许切换coroutineContext。

val numsFlow = flow {
  emit(1)
  delay(100)
  emit(2)
}
val scope = CoroutineScope(EmptyCoroutineContext)
scope.launch {
  weatherFlow.collect {
  }
  numsFlow.collect {
    println("A: $it")
  }
}
scope.launch {
  delay(50)
  numsFlow.collect {
    println("B: $it")
  }
}
// 这才是Flow真正执行的效果。当然emit直接放里边会报错,最终效果如(2)所示
scope.launch {
  emit(1)
  delay(100)
  emit(2)
}
// (2)
scope.launch {
  println("A : 1")// emit(1)
  delay(100)
  println("A : 2")// emit(2)
}

包括Sequence也是这样的执行逻辑,只是Sequence是非挂起式的,而Flow的收集是挂起式的,因此Flow收集一定要在协程里边执行,不过创建可以在非协程中。

// 如果是在普通代码中执行下边的逻辑
// Sequence下边这种写法正确
  val nums = sequence {
    while (true) {
      yield(1)
    }
  }
  for (num in nums) {
    println("List item: $num")
  }
// Flow中的Collect方法或报错,因为collect方式是suspend修饰的,必须在协程中执行。
val numsFlow = flow {
  while (true) {
    emit(getData())
  }
}.map { "number $it" }
val scope = CoroutineScope(EmptyCoroutineContext)
scope.launch {
  numsFlow.collect { // Flow的遍历使用collect函数。
    println(it)
  }
}

什么时候需要用到Flow呢?如果想把数据的提供和数据的处理两个流程拆分开来,就需要使用Flow了。这种持续提供数据的形式,就是数据流,而需要把数据流的生产和消费功能差费开的业务场景就是Flow的用途所在。

  scope.launch { 
    showWeather(weatherFlow)
    // 或者像下边这样,只是拆分生产部分。
    // weatherFlow.collect{
    //   println("Weather: $it")
    // }
  }
}
val weatherFlow = flow {
  while (true) {
    emit(getWeather())
    delay(60000)
  }
}
suspend fun showWeather(flow: Flow<String>) {
  flow.collect {
    println("Weather: $it")
  }
}

我们知道,挂起函数执行完之前,它下边的流程是不会执行的。collect并不是订阅,它是收集,不停的收集事件,收集完了就结束了。collect作为挂起函数,全部都收集完了之后再继续后边的流程,也是正确的。事件流与数据流并没有非常清晰的界限,只是事件流通常没有明确的终点,数据流有这个。可是有些事件流也是有限的,也有些数据流的数据是无限的。总之,一般的,我们认为Flow是个数据流,但是要把它当做事件流来使用,也是可以的。怎么做?直接把Collect当做订阅事件的函数来用就可以了。 例如上边的代码,一个个天气数据,我们把它认为是天气更新的事件。我们如果实在不想让flow的collect函数挡住下边的代码,可以把这个逻辑放到一个单独的协程里边,一个协程里边,只有一个Flow,这个也是可以的。

Flow的创建

Flow的创建有以下5种方式:

//(1)最原始的方式
flow {
  emit(1)
}
//(2)
val flow1 = flowOf(1, 2, 3) // 内部也是使用了(1)的方式
//(3)
val flow2 = listOf(1, 2, 3).asFlow()
//(4)
val flow4 = sequenceOf(1, 2, 3).asFlow()
//(5)用channel创建的flow,依然保留了Channel的特性,即:每一条创建的数据只能被接收一次。
val channel = Channel<Int>()
val flow5 = channel.consumeAsFlow()
val flow6 = channel.receiveAsFlow()
// 上游的Channel是自顾自的生产数据的管道,而下游在拿到数据之后,
// 会先卡住这些数据,不继续往下释放,而是直到它的下面索要数据了,再去释放。
scope.launch { 
  flow5.collect{
    println("flow5 =  $it")
  }
  flow6.collect{
    println("flow6 =  $it")
  }
}

这个有个有意思的问题,以Channel创建的Flow是冷流还是热流呢?我们知道Channel是热流(生产数据与消费数据无关),而Flow是冷流(只有需要消费时才生产数据)。上游的Channel是自顾自的生产数据的管道,而下游在拿到数据之后,会先卡住这些数据,不继续往下释放,而是直到它的下面索要数据了,再去释放。因此,说他是分阶段的热流与冷流也可以,是一个双阶段的生产流程。但是,当我们对一个Channel创建出来的Flow对象调用多次Collect的时候,它的生产逻辑和普通的Flow是不一样的。它并不会在多个collect里边生产相同的数据,而是会把Channel里边生产的数据在多个Channel里边瓜分。现在我们再再回头说说 consumeAsFlow 和receiveAsFlow,他们的区别是,consumeAsFlow有个消费的概念,由该方式创建的Flow只能被消费一次,在被调用了collect之后,他就会被内部标记为已消费,这时候如果再次调用collect,他就会抛出异常。例如下边这种方式:

scope.launch { 
  flow5.collect{
    println("flow5 =  $it")
  }
  flow5.collect{
    println("flow6 =  $it")
  }
}

创建Flow还有两种方式,channelFlow和callbackFlow:

val flow7 = channelFlow {
  launch {
    delay(2000)
    // 生产函数
    send(2)
  }
  delay(1000)
  send(1)
}
val flow9 = callbackFlow {
  gitHub.contributorsCall("square", "retrofit")
    .enqueue(object : Callback<List<Contributor>> {
      override fun onResponse(call: Call>, response: Response>) {
        trySend(response.body()!!)
        close()
      }
      override fun onFailure(call: Call>, error: Throwable) {
        cancel(CancellationException(error))
      }
    })
  awaitClose()
}

这两种方式与上边的 consumeAsFlow 和receiveAsFlow的区别是,上边两种方式共用了一个Channel,所有数据的消费都需要通过collect进行,会瓜分channel生产的数据。而后边这两种方式,它创建的Flow是直到Collect的时候才会创建Channel,开始生产。也就是,对此调用就会创建多个Channel,这些Channel的生产流程是互相隔离、各自独立的。如果还用冷热概念来说的话,consumeAsFlow 和receiveAsFlow是热的Flow,而 channelFlow 与callbackFlow 是冷的Flow。

channelFlow有什么作用呢?Channel是用来做协程间通信的组件。使用ChannelFlow的一个好处就是可以在它的代码块里边启动子协程了。而flow{}这种方式是无法使用子协程的。flow的emit是无法切换协程的。例如下边的代码是会报无法从其他协程发送数据的错误:

val flow8 = flow {
  launch {
    delay(2000)
    emit(2)
  }
  delay(1000)
  emit(1)
}
scope.launch {
  flow8.collect {
    println("channelFlow with callback: $it")
  }
}

因此如果有跨协程的生产需求,就用channelFlow来替代flow进行生产。因此跨协程才是channelFlow的用处所在,而Channel本身只是它的底层实现而已。

ChannelFlow中还有一个函数,叫awaitClose(),用来防止协程结束。如果是涉及到回调函数执行的,最好是使用 callbackFlow,它其实也是一个ChannelFlow,只是它会强制要求你调用awaitClose(),否则会崩溃。当然,如果只是单次的回调,其实使用suspendCancellableCoroutine()就足够了,如果是连续的,也就是数据流,就需要使用callbackFlow,我们可以把callbackFlow看成是Flow版本的suspendCancellableCoroutine,他们都负责和回调的api对接。一个负责单次的,一个负责数据流。

这就是ChannelFlow和callbackFlow,内部逻辑几乎一致。应用场景上,ChannelFlow一般适用于跨协程或者是多协程发送数据的场景。而callbackFlow面向的是回调式api的场景。

请思考一个问题,为什么普通的Flow不允许跨协程呢?这是因为我们的collect在处理数据的时候,是在发送数据的各个协程里边处理的,这也就导致了,各个协程可能所在的Dispatcher是不一样的,最终会导致软件的运行所在线程可能与开发者的预期不一致。处理这个问题有两种方式,一种是上层业务方自己使用的时候注意切换写成,另一种就是底层,从api的设计就限制了flow,禁止在别的协程调用flow来发送数据,也就是emit必须在flow自己的代码块中发送,强制保证数据一定是从collect所在的协程发送的。

val flow = flow {
   // 下边这种方式或报错,因为flow的发送与接收一定是在同一个协程。
  /*launch(Dispatchers.IO) {
    delay(2000)
    emit(2)
  }*/
  delay(1000)
  emit(1)
 }
scope.launch(Dispatchers.Default) {
  flow.collect {
    println("flow: $it")
  }
  // 将收到的数据编上序号
  flow.collectIndexed { index, value ->  }
  // 
  flow.collectLatest {  }
}

其实总结下来就一句话,是为了保证 flow.collect {}这个逻辑在collect所在的协程里边被运行。也就是上边的Dispatchers.Default,这样更加符合开发者的直觉。那为什么ChannelFlow可以跨协程生产呢?因为channel就是干的跨协程的事情。他是从另一个协程生产完数据之后,通过send发送过来,然后我们在这个协程接住了数据并进行处理。它本质上是把数据的发送和处理拆分开了,数据的处理依然在从良了collect所在的协程。