Kotlin協程Flow生命周期及異常處理淺析

正文

Kotlin協程中的Flow主要用於處理復雜的異步數據,以一種”流“的方式,從上到下依次處理,和RxJava的處理方式類型,但是比後者更加強大。

Flow基本概念

Flow中基本上有三個概念,即 發送方,處理中間層,接收方,可以類比水利發電站中的上遊,發電站,下遊的概念, 數據從上遊開始發送”流淌“至中間站被”處理“瞭一下,又流淌到瞭下遊。

示例代碼如下

flow {         // 發送方、上遊
    emit(1)    // 掛起函數,發送數據
    emit(2)
    emit(3)
    emit(4)
    emit(5)
}
.filter { it > 2 }  // 中轉站,處理數據
.map { it * 2 }
.take(2)
.collect{           // 接收方,下遊
    println(it)
}
輸出內容:
6
8

通過上面代碼我們可以看到,基於一種鏈式調用api的方式,流式的進行處理數據還是很棒的,接下來具體看一下上面的組成:

  • flow{},是個高階函數,主要用於創建一個新的Flow。在其Lambda函數內部使用瞭emit()掛起函數進行發送數據。
  • filter{}、map{}、take{},屬於中間處理層,也是中間數據處理的操作符,Flow最大的優勢,就是它的操作符跟集合操作符高度一致。隻要會用List、Sequence,那麼就可以快速上手 Flow 的操作符。
  • collect{},下遊接收方,也成為終止操作符,它的作用其實隻有一個:終止Flow數據流,並且接收這些數據。

其他創建Flow的方式還是flowOf()函數,示例代碼如下

fun main() = runBlocking{aassssssssaaaaaaaas
    flowOf(1,2,3,4,5).filter { it > 2 }
        .map { it * 2 }
        .take(2)
        .collect{
            println("flowof: $it")
    }
}

我們在看一下list集合的操作示例

listOf(1,2,3,4,5).filter { it > 2 }
        .map { it * 2 }
        .take(2)
        .forEach{
            println("listof: $it")
        }

通過以上對比發現,兩者的基本操作幾乎一致,Kotlin也提供瞭兩者相互轉換的API,Flow.toList()、List.asFlow()這兩個擴展函數,讓數據在 List、Flow 之間來回轉換,示例代碼如下:

//flow 轉list
    flowOf(1,2,3)
        .toList()
        .filter { it > 1 }
        .map { it * 2 }
        .take(2)
        .forEach{
            println(it)
        }
    // list 轉 flow
    listOf(1,2,3).asFlow()
        .filter { it > 2 }
        .map { it * 2 }
        .take(2)
        .collect{
            println(it)
        }

Flow生命周期

雖然從上面操作看和集合類型,但是Flow還是有些特殊操作符的,畢竟它是協程的一部分,和Channel不同,Flow是有生命周期的,隻是以操作符的形式回調而已,比如onStart、onCompletion這兩個中間操作符。

flowOf(1,2,3,4,5,6)
        .filter {
            println("filter: $it")
            it > 3
        }
        .map {
            println("map: $it")
            it * 2
        }
        .take(2)
        .onStart { println("onStart") }
        .collect{
            println("collect: $it")
        }
輸出內容:
onStart
filter: 1
filter: 2
filter: 3
filter: 4
map: 4
collect: 8
filter: 5
map: 5
collect: 10

我們可以看到onStart,它的作用是註冊一個監聽事件:當 flow 啟動以後,它就會被回調。

和filter、map、take這些中間操作符不同,他們的順序會影響數據的處理結果,這也很好理解;onStart和位置沒有關系,它本質上是一個回調,不是一個數據處理的中間站。同樣的還有數據處理完成的回調onCompletion。

flowOf(1,2,3,4,5,6)
        .filter {
            println("filter: $it")
            it > 3
        }
        .map {
            println("map: $it")
            it * 2
        }
        .take(2)
        .onStart { println("onStart") }
        .onCompletion { println("onCompletion") }
        .collect{
            println("collect: $it")
        }

Flow中onCompletion{} 在面對以下三種情況時都會進行回調:

  • 1,Flow 正常執行完畢
  • 2,Flow 當中出現異常
  • 3,Flow 被取消。

處理異常

在數據流的處理過程中,很難保證不出現問題,那麼出現異常之後再該怎麼處理呢?

  • 對於發生在上遊、中間操作這兩個階段的異常,我們可以直接使用 catch 這個操作符來進行捕獲和進一步處理。
  • 對於發生在下遊,使用try-catch,把collect{}當中可能出現問題的代碼包裹起來進行捕獲處理。

上遊或者中間異常使用catch

fun main() = runBlocking{
    val flow = flow {
        emit(1)
        emit(2)
        throw IllegalStateException()
        emit(3)
    }
    flow.map { it * 2 }
        .catch { println("catch: $it") }
        .collect{
            println("collect: $it")
        }
}
輸出:
collect: 2
collect: 4
catch: java.lang.IllegalStateException

catch 這個操作符的作用是和它的位置強相關的,catch 的作用域,僅限於catch的上遊。換句話說,發生在 catch 上遊的異常,才會被捕獲,發生在 catch 下遊的異常,則不會被捕獲。

val flow = flow {
        emit(1)
        emit(2)
        throw IllegalStateException()
        emit(3)
    }
    flow.map { it * 2 }
        .catch { println("catch: $it") }
        .filter { it / 0 > 1 } // catch之後發生異常
        .collect{
            println("collect: $it")
    }
輸出內容:
Exception in thread "main" java.lang.ArithmeticException: / by zero

下遊使用try-catch

flowOf(1,2,3)
        .onCompletion { println("onCompletion $it") }
        .collect{
            try {
                println("collect: $it")
                throw IllegalStateException();
            }catch (e: Exception){
                println("catch $e")
            }
        }
輸出:
collect: 1
catch java.lang.IllegalStateException
collect: 2
catch java.lang.IllegalStateException
collect: 3
catch java.lang.IllegalStateException
onCompletion null

切換執行線程

Flow適合處理復雜的異步任務,大多數情況下耗時任務放在子線程或線程池中處理,對於UI任務放在主線程中進行。

在Flow中可以使用flowOn操作符實現上述場景中的線程切換。

flowOf(1,2,3,4,5)
        .filter {
            logX("filter: $it")
            it > 2 }
        .flowOn(Dispatchers.IO) // 切換線程
        .collect{
            logX("collect: $it")
        }
輸出內容:
================================
filter: 1
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 2
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 3
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 4
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 5
Thread:DefaultDispatcher-worker-1
================================
================================
collect: 3
Thread:main
================================
================================
collect: 4
Thread:main
================================
================================
collect: 5
Thread:main
================================

flowOn 操作符也是和它的位置強相關的。作用域限於它的上遊。在上面的代碼中,flowOn 的上遊,就是 flowOf{}、filter{} 當中的代碼,所以,它們的代碼全都運行在 DefaultDispatcher 這個線程池當中。隻有collect{}當中的代碼是運行在 main 線程當中的。

終止操作符

Flow 裡面,最常見的終止操作符就是collect。除此之外,還有一些從集合中借鑒過來的操作符,也是Flow的終止操作符。比如 first()、single()、fold{}、reduce{},本質上來說說當我們嘗試將 Flow 轉換成集合的時候,已經不屬於Flow的API,也不屬於協程的范疇瞭,它本身也就意味著 Flow 數據流的終止。

"冷的數據流"從何而來

在上面文章《Kotlin協程Channel淺析》中,我們認識到Channel是”熱數據流“,隨時準備好,隨用隨取,就像海底撈裡的服務員。

現在我們看下Flow和Channel的區別

val flow = flow {
        (1..4).forEach{
            println("Flow發送前:$it")
            emit(it)
            println("Flow發送後: $it")
        }
    }
    val channel: ReceiveChannel<Int> = produce {
        (1..4).forEach{
            println("Channel發送前: $it")
            send(it)
            println("Channel發送後: $it")
        }
    }
輸出內容:
Channel發送前: 1

Flow中的邏輯並未執行,因此我們可以這樣類比,Channel之所以被認為是“熱”的原因,是因為不管有沒有接收方,發送方都會工作。那麼對應的,Flow被認為是“冷”的原因,就是因為隻有調用終止操作符之後,Flow才會開始工作。

除此之外,Flow一次處理一條數據,是個”懶傢夥“。

    val flow = flow {
        (3..6).forEach {
            println("Flow發送前:$it")
            emit(it)
            println("Flow發送後: $it")
        }
    }.filter {
        println("filter: $it")
        it > 3
    }.map {
        println("map: $it")
        it * 2
    }.collect {
        println("結果collect: $it")
    }
輸出內容:
Flow發送前:3
filter: 3
Flow發送後: 3
Flow發送前:4
filter: 4
map: 4
結果collect: 8
Flow發送後: 4
Flow發送前:5
filter: 5
map: 5
結果collect: 10
Flow發送後: 5
Flow發送前:6
filter: 6
map: 6
結果collect: 12
Flow發送後: 6

相比於滿面春風,熱情服務的Channel,Flow更像個冷漠的傢夥,你不找他,他不搭理你。

  • Channel,響應速度快,但數據可能是舊的,占用資源
  • Flow,響應速度慢,但數據是最新的,節省資源

Flow也可以是”熱“的,你知道嗎?

更多關於Kotlin協程Flow生命周期異常處理的資料請關註WalkonNet其它相關文章!

推薦閱讀: