随笔博文

Flow是如何解决背压问题的

2022-12-16 11:34:41 michael007js 122

前言

随着时间的推移,越来越多的主流应用已经开始全面拥抱Kotlin,协程的引入,Flow的诞生,给予了开发很多便捷,作为协程与响应式编程结合的流式处理框架,一方面它简单的数据转换与操作符,没有繁琐的操作符处理,广受大部分开发的青睐,另一方面它并没有响应式编程带来的背压问题(BackPressure)的困扰;接下来,本文将会就Flow如何解决背压问题进行探讨

关于背压(BackPressure

背压问题是什么

首先我们要明确背压问题是什么,它是如何产生的?简单来说,在一般的流处理框架中,消息的接收处理速度跟不上消息的发送速度,从而导致数据不匹配,造成积压。如果不及时正确处理背压问题,会导致一些严重的问题

  • 比如说,消息拥堵了,系统运行不畅从而导致崩溃

  • 比如说,资源被消耗殆尽,甚至会发生数据丢失的情况

如下图所示,可以直观了解背压问题的产生,它在生产者的生产速率高于消费者的处理速率的情况下出现

背压@2x

定义背压策略

既然我们已经知道背压问题是如何产生的,就要去尝试正确地处理它,大致解决方案策略在于,如果你有一个流,你需要一个缓冲区,以防数据产生的速度快于消耗的速度,所以往往就会针对这个背压策略进行些讨论

  • 定义的中间缓冲区需要多大才比较合适?

  • 如果缓冲区数据已满了,我们怎么样处理新的事件?

对于以上问题,通过学习Flow里的背压策略,相信可以很快就知道答案了

Flow的背压机制

由于Flow是基于协程中使用的,它不需要一些巧妙设计的解决方案来明确处理背压,在Flow中,不同于一些传统的响应式框架,它的背压管理是使用Kotlin挂起函数suspend实现的,看下源码你会发现,它里面所有的函数方法都是使用suspend修饰符标记,这个修饰符就是为了暂停调度者的执行不阻塞线程。因此,Flow<T>在同一个协程中发射和收集时,如果收集器跟不上数据流,它可以简单地暂停元素的发射,直到它准备好接收更多。看到这,是不是觉得有点难懂.......

简单举个例子,假设我们拥有一个烤箱,可以用来烤面包,由于烤箱容量的限制,一次只能烤4个面包,如果你试着一次烤8个面包,会大大加大烤箱的承载负荷,这已经远远超过了它的内存使用量,很有可能会因此烧掉你的面包。

模拟背压问题

回顾下之前所说的,当我们消耗的速度比生产的速度慢的时候,就会产生背压,下面用代码来模拟下这个过程

  • 首先先创建一个方法,用来每秒发送元素

    fun currentTime() = System.currentTimeMillis()
    fun threadName() = Thread.currentThread().name
    var start: Long = 0

    fun createEmitter(): Flow<Int> =
     (1..5)
         .asFlow()
         .onStart { start = currentTime() }
         .onEach {
               delay(1000L)
               print("Emit $it (${currentTime() - start}ms) ")
         }

  • 接着需要收集元素,这里我们延迟3秒再接收元素, 延迟是为了夸大缓慢的消费者并创建一个超级慢的收集器。

    fun main() {
       runBlocking {
           val time = measureTimeMillis {
               createEmitter().collect {
                   print("\nCollect $it starts ${start - currentTime()}ms")
                   delay(3000L)
                   println("   Collect $it ends ${currentTime() - start}ms")
             }
         }
           print("\nCollected in $time ms")
     }
    }

    看下输出结果,如下图所示

输出结果

这样整个过程下来,大概需要20多秒才能结束,这里我们模拟了接收元素比发送元素慢的情况,因此就需要一个背压机制,而这正是Flow本质中的,它并不需要另外的设计来解决背压

背压处理方式

使用buffer进行缓存收集

为了使缓冲和背压处理正常工作,我们需要在单独的协程中运行收集器。这就是.buffer()操作符进来的地方,它是将所有发出的项目发送Channel到在单独的协程中运行的收集器

public fun <T> Flow<T>.buffer(
   capacity: Int = BUFFERED,
   onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T>

它还为我们提供了缓冲功能,我们可以指定capacity我们的缓冲区和处理策略onBufferOverflow,所以当Buffer溢出的时候,它为我们提供了三个选项

enum BufferOverflow { 
  SUSPEND,
  DROP_OLDEST,
  DROP_LATEST
}

  • 默认使用SUSPEND:会将当前协程挂起,直到缓冲区中的数据被消费了

  • DROP_OLDEST:它会丢弃最老的数据

  • DROP_LATEST: 它会丢弃最新的数据

好的,我们回到上文所展示的模拟示例,这时候我们可以加入缓冲收集buffer,不指定任何参数,这样默认就是使用SUSPEND,它会将当前协程进行挂起

2e237e1156e29a2b6dc3fc7e16f33d4.png

此时当收集器繁忙的时候,程序就开始缓冲,并在第一次收集方法调用结束的时候,两次发射后再次开始收集,此时流程的耗时时长缩短到大约16秒就可以执行完毕,如下图所示输出结果

0e3d1180f304af00553ac592d9c2987.png

使用conflate解决

conflate操作符于Channel中的Conflate模式是一直的,新数据会直接覆盖掉旧数据,它不设缓冲区,也就是缓冲区大小为 0,丢弃旧数据,也就是采取 DROP_OLDEST 策略,那么不就等于buffer(0,BufferOverflow.DROP_OLDEST),可以看下它的源码可以佐证我们的判断

public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)

在某些情况下,由于根本原因是解决生产消费速率不匹配的问题,我们需要做一些取舍的操作,conflate将丢弃掉旧数据,只有在收集器空闲之前发出的最后一个元素才被收集,将上文的模拟实例改为conflate执行,你会发现我们直接丢弃掉了2和4,或者说新的数据直接覆盖掉了它们,整个流程只需要10秒左右就执行完成了

2ad6092e759f600f0db93397b7b0266.png

使用collectLatest解决

通过官方介绍,我们知道collectLatest作用在于当原始流发出一个新的值的时候,前一个值的处理将被取消,也就是不会被接收, 和conflate的区别在于它不会用新的数据覆盖,而是每一个都会被处理,只不过如果前一个还没被处理完后一个就来了的话,处理前一个数据的逻辑就会被取消

suspend fun <T> Flow<T>.collectLatest(action: suspend (T) -> Unit)

还是上文的模拟实例,这里我们使用collectLatest看下输出结果:

d2555f7e584655901b3ebc6dd086b5c.png

这样也是有副作用的,如果每个更新都非常重要,例如一些视图,状态刷新,这个时候就不必要用collectLatest ; 当然如果有些更新可以无损失的覆盖,例如数据库刷新,就可以使用到collectLatest,具体详细的使用场景,还需要靠开发者自己去衡量选择使用

小结

对于Flow可以说不需要额外提供什么巧妙的方式解决背压问题,Flow的本质,亦或者说Kotlin协程本身就已经提供了相应的解决方案;开发者只需要在不同的场景中选择正确的背压策略即可。总的来说,它们都是通过使用Kotlin挂起函数suspend,当流的收集器不堪重负时,它可以简单地暂停发射器,然后在准备好接受更多元素时恢复它。

关于挂起函数suspend这里就不过多赘述了,只需要明白的一点是它与传统的基于线程的同步数据管道中背压管理非常相似,无非就是,缓慢的消费者通过阻塞生产者的线程自动向生产者施加背压,简单来说,suspend通过透明地管理跨线程的背压而不阻塞它们,将其超越单个线程并进入异步编程领域。


首页
关于博主
我的博客
搜索