随笔博文

日常思考,目前Kotlin协程能完全取代Rxjava吗

2022-12-16 11:32:02 michael007js 124

前言

自从jetbrains公司提出Kotlin协程用来解决异步线程问题,并且衍生出来了Flow作为响应式框架,引来了大量Android开发者的青睐;而目前比较稳定的响应式库当属Rxjava,这样以来目的就很明显了,旨在用Kotlin协程来逐步替代掉Rxjava;

仔细思考下,真的可以完全替代掉Rxjava么,它的复杂性和多样化的操作符,而协程的许多API仍然是实验性的,目前为止,随着kt不断地进行版本迭代,越来越趋于稳定,对此我不能妄下断言;当然Rxjava无疑也是一个非常优秀的框架,值得我们不断深入思考,但是随着协程的出现,就个人而言我会更喜欢使用协程来作为满足日常开发的异步解决方案。

协程的本质和Rxjava是截然不同的,所以直接拿它们进行对比是比较棘手的;换一种思路,本文我们从日常开发中的异步问题出发,分别观察协程与Rxjava是如何提供相应的解决方案,依次来进行比对,探讨下 Kotlin协程是否真的足以取代Rxjava 这个话题吧

流类型的比较

现在我们来看下Rxjava提供的流类型有哪些,我们可以使用的基本流类型操作符如下图所示

Rxjava流类型@2x.png

它们的基本实现在下文会提及到,这里我们简单来讨论下在协程中是怎么定义这些流操作符的

  • Single<T>其实就是一个返回不可空值的suspend函数

  • Maybe<T>恰好相反,是一个返回可空的supspend函数

  • Completable不会发送事件,所以在协程中就是一个不返回任何东西的简单挂起函数

  • 对于ObservableFlowable,两者都可以发射多个事件,不同在于前者是没有背压管理的,后者才有,而他们在协程中我们可以直接使用Flow来完成,在异步数据流中按顺序发出值,所以只需要一个返回当前Data数据类型的Flow<T>

    值得注意的是,该函数本身是不需要supsend修饰符的,由于Flow是冷流,在进行收集\订阅之前是不会发射数据,只要在collect的时候才需要协程作用域中执行。为什么说Flow足以替代ObservableFlowable原因在与它处理背压(backpressure)的方式。这自然而然来源于协程中的设计与理念,不需要一些巧妙设计的解决方案来处理显示背压,Flow中所有Api基本上都带有suspend修复符,它也成为了解决背压的关键先生。其目的就是在不阻塞线程的情况下暂停调用者的执行,因此,当Flow<T>在同一个协程中发射和收集的时候,如果收集器跟不上数据流,它可以简单地暂停元素的发射,直到它准备好接收更多。

流类型比较的基本实现

好的小伙伴们,上文我们简单用协程写出Rxjava的几个基本流类型,现在让我们用几个详细的实例来看看他们的不同之处吧

Completable ---- 异步任务完成没有结果,可能会抛出错误

Rxjava中,我们使用Completable.create去创建,里面的CompletableEmitter中有onComplete表示完成的方法和一个onError传递异常的方法,如下代码所示

//completable in Rxjava
   fun completableRequest(): Completable {
       return Completable.create { emitter->
           try {
               emitter.onComplete()
         }catch (e:Exception) {
               emitter.onError(e)
         }
     }
 }
   fun main() {
       completableRequest()
         .subscribe {
               println("I,am done")
               println()
         }
 }
复制代码

在协程当中,我们对应的就是调用一个不返回任何内容的挂起函数(returns Unit),就类似于我们调用一个普通函数一样

 fun completableCoroutine() = runBlocking {
       try {
           delay(500L)
           println("I am done")
     } catch (e: Exception) {
           println("Got an exception")
     }
 }
复制代码

注意不要在生产环境代码使用runBlocking,你应该有一个合适的CoroutineScope,由于是测试代码本文都将使用runBlocking来辅助说明测试场景

Single ---- 必须返回或抛出错误的异步任务

RxJava 中,我们使用一个Single ,它里面有一个onSuccess传递返回值的方法和一个onError传递异常的方法。

```kotlin
/**
* Single in RxJava
*/
fun main() {
   singleResult()
     .subscribe(
         { result -> println(result) },
         { println("Got an exception") }
     )
}

fun singleResult(): Single<String> {
   return Single.create { emitter ->
       try {
           // process a request
           emitter.onSuccess("Some result")
     } catch (e: Exception) {
           emitter.onError(e)
     }
 }

```
复制代码

而在协程中,我们调用一个返回非空值的挂起函数:

/**
* Single equivalent in coroutines
*/
fun main() = runBlocking {
   try {
       val result = getResult()
       println(result)
 } catch (e: Exception) {
       println("Got an exception")
 }
}

suspend fun getResult(): String {
   // process a request
   delay(100)
   return "Some result"
}
复制代码
Maybe --- 可能返回结果或抛出错误的异步任务

RxJava 中,我们使用一个Maybe. 它里面有一个onSuccess传递返回值的方法onComplete,一个在没有值的情况下发出完成信号的方法,以及一个onError传递异常的方法。

/**
* Maybe in RxJava
*/
fun main() {
   maybeResult()
     .subscribe(
         { result -> println(result) },
         { println("Got an exception") },
         { println("Completed without a value!") }
     )
}

fun maybeResult(): Maybe<String> {
   return Maybe.create { emitter ->
       try {
           // process a request
           if (Random.nextBoolean()) {
               emitter.onSuccess("Some value")
         } else {
               emitter.onComplete()
         }
     } catch (e: Exception) {
           emitter.onError(e)
     }
 }
}
复制代码

在协程中,我们调用一个返回可空值得挂起函数

/**
* Maybe equivalent in coroutines
*/
fun main() = runBlocking {
   try {
       val result = getNullableResult()
       if (result != null) {
           println(result)
     } else {
           println("Completed without a value!")
     }
 } catch (e: Exception) {
       println("Got an exception")
 }
}

suspend fun getNullableResult(): String? {
   // process a request
   delay(100)
   return if (Random.nextBoolean()) {
       "Some value"
 } else {
       null
 }
}
复制代码
0..N事件的异步流

由于在Rxjava中,FlowableObservable都是属于0..N事件的异步流,但是Observable几乎没有做相应的背压管理,所以这里我们主要以Flowable为例子,onNext发出下一个流值的方法,一个onComplete表示流完成的方法,以及一个onError传递异常的方法。

/**
* Flowable in RxJava
*/
fun main() {
   flowableValues()
     .subscribe(
         { value -> println(value) },
         { println("Got an exception") },
         { println("I'm done") }
     )
}

fun flowableValues(): Flowable<Int> {
   val flowableEmitter = { emitter: FlowableEmitter<Int> ->
       try {
           for (i in 1..10) {
               emitter.onNext(i)
         }
     } catch (e: Exception) {
           emitter.onError(e)
     } finally {
           emitter.onComplete()
     }
 }

   return Flowable.create(flowableEmitter, BackpressureStrategy.BUFFER)
}
复制代码

在协程中,我们只是创建一个Flow就可以完成这个方法

/**
* Flow in Kotlin
*/
fun main() = runBlocking {
   try {
       eventFlow().collect { value ->
           println(value)
     }
       println("I'm done")
 } catch (e: Exception) {
       println("Got an exception")
 }
}

fun eventFlow() = flow {
   for (i in 1..10) {
       emit(i)
 }
}
复制代码

在惯用的 Kotlin 中,创建上述流程的方法之一是:fun eventFlow() = (1..10).asFlow()

如上面这些代码所见,我们基本可以使用协程涵盖Rxjava所有的主要基本用法,此外,协程的设计允许我们使用所有标准的Kotlin功能编写典型的顺序代码 ,它还消除了对onCompleteonError回调的需要。我们可以像在普通代码中那样捕获错误或设置协程异常处理程序。并且,考虑到当挂起函数完成时,协程继续按顺序执行,我们可以在下一行继续编写我们的“完成逻辑”。

值得注意的是,当我们进行调用collect收集的时候也是如此,在收集完所有元素后才会执行下一行代码

eventFlow().collect { value ->
    println(value)
}
println("I'm done")
复制代码
Flow`收集完所有元素后,才会调用打印`I'm done

操作符的比较

总所周知,Rxjava的主要优势在于它拥有非常多的操作符,基本上可以应对日常开发中出现的各种情况,由于它种类特别繁多又比较难记忆,这里我只简单举些常见的操作符进行比较

COMPLETABLE,SINGLE, MAYBE

这里需要强调的是,在RxjavaCompletable,SingleMaybe都有许多相同的操作符,然而在协程中任何类型的操作符其实都是多余的,我们以Single中的map()简单操作符为例来看下:

/**
 * Maps Single<String> to
 * Single<User> synchronously
 */
fun main() {
    getUsername()
        .map { username ->
            User(username)
        }
        .subscribe(
            { user -> println(user) },
            { println("Got an exception") }
        )
}
复制代码

map作为Rxjava中最常用的操作符,获取一个值并将其转换为另一个值,但是在协程中我们不需要.map()操作符就可以实现这种操作

fun main() = runBlocking {
    try {
        val username = getUsername() // suspend fun
        val user = User(username)
        println(user)
    } catch (e: Exception) {
        println("Got an exception")
    }
}
复制代码

使用suspend挂起函数可以挂起当前函数,当执行完毕后在按顺序执行接下来的代码

Flow操作符与Rxjava操作符

现在让我们看看Flow中有哪些操作符,它们与Rxjava相比有什么不同,由于篇幅原因,这里我简单比较下日常开发中最常用的操作符

map()

对于map操作符,Flow中也具有相同的操作符

/**
 * Maps Flow<String> to Flow<User>
 */
fun main() = runBlocking {
    usernameFlow()
        .map { username ->
            User(username)
        }
        .collect { user ->
            println(user)
        }
}
复制代码

Flow中的map操作符 相当于Rxjava做了一定的简化处理,这是它的一个主要优势,可以看下它的源码

fun <T, R> Flow<T>.map(transform: suspend (T) -> R): Flow<R> = flow {
    collect { value -> emit(transform(value)) }
}
复制代码

是不是非常简单,只是重新创建一个新的flow,它从从上游收集值transform并在当前函数应用后发出这些值;事实上大多数Flow的操作符都是这样工作的,不需要遵循严格的协议;对于大多数应用场景,标准Flow操作符就已经足够了,当然编写自定义操作符也是非常简单容易的;相对于Rxjava,如果想要编写自定义操作符,你必须非常了解Rxjava

Reactive Streams协议

flatmap()

另外,在Rxjava中我们经常使用的操作符还有flatmap(),同时还有很多种变体,例如.flatMapSingle()flatMapObservable(),flatMapIterable()等,简单来说,在Rxjava中我们如果需要对一个值进行同步转换,就使用map,进行异步转换的时候就需要使用flatMap();对此,Flow进行同步或者异步转换的时候不需要不同的操作符,仅仅使用map就足够了,由于它们都有supsend挂起函数进行修饰,不用担心同步性

可以看下在Rxjava中的示例

fun compareFlatMap() {
    getUsernames() //Flowable<String>
        .flatMapSingle { username ->
            getUserFromNetwork(username) // Single<User>
        }
        .subscribe(
            { user -> println(user) },
            { println("Got an exception") }
        )
}
复制代码

好的,我们使用Flow来转换下上述的这一段代码,只需要使用map就可以以任何方式进行转换值,如下代码所示:

    runBlocking {
        flow {
            emit(User("Jacky"))
        }.map {
            getUserFromName(it) //suspend
        }.collect {
            println(it)
        }
    }

    suspend fun getUserFromName(user: User): String {
        return user.userName
    }
复制代码

实际上使用Flow中的map操作符,就可以将上游流发出的值转换为新流,然后将所有流扁平化为一个,这和flatMap的功能几乎可以达到同样的效果

filter()

对于filter操作符,我们在Rxjava中并没有直接的方法进行异步过滤,这需要我们自己编写代码来进行过滤判断,如下所示

fun getUsernames(): Flowable<String> {
   val flowableEmitter = { emitter: FlowableEmitter<String> ->
       emitter.onNext("Jacky")
 }
   return Flowable.create(flowableEmitter, BackpressureStrategy.BUFFER)
}

fun isCorrectUserName(userName: String): Single<Boolean> {
   return Single.create { emitter ->
       runCatching {
           //名字判断....
           if (userName.isNotEmpty()) {
               emitter.onSuccess(true)
         } else {
               emitter.onSuccess(false)
         }
     }.onFailure {
           emitter.onError(it)
     }
 }
}

fun compareFilter() {
   getUsernames()//Flowable<String>
     .flatMapSingle { userName ->
           isCorrectUserName(userName)
             .flatMap { isCorrect ->
                   if (isCorrect) {
                       Single.just(userName)
                 } else {
                       Single.never()
                 }
             }
     }.subscribe {
           println(it)
     }

}
复制代码

乍一看,是不是感觉有点麻烦,事实上这确实需要我们使用些小手段才能达到目的;而在Flow中,我们能够轻松地根据同步和异步调用过滤流

runBlocking {
       userNameFlow().filter { user ->
           isCorrectName(user.userName)
     }.collect { user->
           println(user)
     }
 }

suspend fun isCorrectName(userName: String): Boolean {
   return userName.isNotEmpty()
}

复制代码

### 结语

由于篇幅原因,Rxjava和协程都是一个非常庞大的思考话题,它们之间的不同比较可以永远进行下去;事实上,在Kotlin协程被广泛使用之前,Rxjava作为项目中主要的异步解决方案,以至于到现在工作上还有很多项目用着Rxjava, 所以即使切换到Kotlin协程之后,还有相当长一段时间还在用着Rxjava;这并不代表Rxjava不够好,而是协程让代码变得更易读,更易于使用;

暂时先告一段落了,事实上证明协程确实能够满足我们日常开发的主要需求,下次将会对Rxjava中的背压和之前所讨论的Flow背压问题进行比较探讨,还有非常多的东西要学,共勉!!!!

本文主要内容译至 -> www.javaadvent.com/2021/12/are…


首页
关于博主
我的博客
搜索