Kotlin協程Flow生命周期及異常處理淺析
正文
Kotlin協程中的Flow主要用於處理復雜的異步數據,以一種”流“的方式,從上到下依次處理,和RxJava的處理方式類型,但是比後者更加強大。
Flow基本概念
Flow中基本上有三個概念,即 發送方,處理中間層,接收方,可以類比水利發電站中的上遊,發電站,下遊的概念, 數據從上遊開始發送”流淌“至中間站被”處理“瞭一下,又流淌到瞭下遊。
示例代碼如下
flow { // 發送方、上遊 emit(1) // 掛起函數,發送數據 emit(2) emit(3) emit(4) emit(5) } .filter { it > 2 } // 中轉站,處理數據 .map { it * 2 } .take(2) .collect{ // 接收方,下遊 println(it) } 輸出內容: 6 8
通過上面代碼我們可以看到,基於一種鏈式調用api的方式,流式的進行處理數據還是很棒的,接下來具體看一下上面的組成:
- flow{},是個高階函數,主要用於創建一個新的Flow。在其Lambda函數內部使用瞭emit()掛起函數進行發送數據。
- filter{}、map{}、take{},屬於中間處理層,也是中間數據處理的操作符,Flow最大的優勢,就是它的操作符跟集合操作符高度一致。隻要會用List、Sequence,那麼就可以快速上手 Flow 的操作符。
- collect{},下遊接收方,也成為終止操作符,它的作用其實隻有一個:終止Flow數據流,並且接收這些數據。
其他創建Flow的方式還是flowOf()函數,示例代碼如下
fun main() = runBlocking{aassssssssaaaaaaaas flowOf(1,2,3,4,5).filter { it > 2 } .map { it * 2 } .take(2) .collect{ println("flowof: $it") } }
我們在看一下list集合的操作示例
listOf(1,2,3,4,5).filter { it > 2 } .map { it * 2 } .take(2) .forEach{ println("listof: $it") }
通過以上對比發現,兩者的基本操作幾乎一致,Kotlin也提供瞭兩者相互轉換的API,Flow.toList()、List.asFlow()這兩個擴展函數,讓數據在 List、Flow 之間來回轉換,示例代碼如下:
//flow 轉list flowOf(1,2,3) .toList() .filter { it > 1 } .map { it * 2 } .take(2) .forEach{ println(it) } // list 轉 flow listOf(1,2,3).asFlow() .filter { it > 2 } .map { it * 2 } .take(2) .collect{ println(it) }
Flow生命周期
雖然從上面操作看和集合類型,但是Flow還是有些特殊操作符的,畢竟它是協程的一部分,和Channel不同,Flow是有生命周期的,隻是以操作符的形式回調而已,比如onStart、onCompletion這兩個中間操作符。
flowOf(1,2,3,4,5,6) .filter { println("filter: $it") it > 3 } .map { println("map: $it") it * 2 } .take(2) .onStart { println("onStart") } .collect{ println("collect: $it") } 輸出內容: onStart filter: 1 filter: 2 filter: 3 filter: 4 map: 4 collect: 8 filter: 5 map: 5 collect: 10
我們可以看到onStart,它的作用是註冊一個監聽事件:當 flow 啟動以後,它就會被回調。
和filter、map、take這些中間操作符不同,他們的順序會影響數據的處理結果,這也很好理解;onStart和位置沒有關系,它本質上是一個回調,不是一個數據處理的中間站。同樣的還有數據處理完成的回調onCompletion。
flowOf(1,2,3,4,5,6) .filter { println("filter: $it") it > 3 } .map { println("map: $it") it * 2 } .take(2) .onStart { println("onStart") } .onCompletion { println("onCompletion") } .collect{ println("collect: $it") }
Flow中onCompletion{} 在面對以下三種情況時都會進行回調:
- 1,Flow 正常執行完畢
- 2,Flow 當中出現異常
- 3,Flow 被取消。
處理異常
在數據流的處理過程中,很難保證不出現問題,那麼出現異常之後再該怎麼處理呢?
- 對於發生在上遊、中間操作這兩個階段的異常,我們可以直接使用 catch 這個操作符來進行捕獲和進一步處理。
- 對於發生在下遊,使用try-catch,把collect{}當中可能出現問題的代碼包裹起來進行捕獲處理。
上遊或者中間異常使用catch
fun main() = runBlocking{ val flow = flow { emit(1) emit(2) throw IllegalStateException() emit(3) } flow.map { it * 2 } .catch { println("catch: $it") } .collect{ println("collect: $it") } } 輸出: collect: 2 collect: 4 catch: java.lang.IllegalStateException
catch 這個操作符的作用是和它的位置強相關的,catch 的作用域,僅限於catch的上遊。換句話說,發生在 catch 上遊的異常,才會被捕獲,發生在 catch 下遊的異常,則不會被捕獲。
val flow = flow { emit(1) emit(2) throw IllegalStateException() emit(3) } flow.map { it * 2 } .catch { println("catch: $it") } .filter { it / 0 > 1 } // catch之後發生異常 .collect{ println("collect: $it") } 輸出內容: Exception in thread "main" java.lang.ArithmeticException: / by zero
下遊使用try-catch
flowOf(1,2,3) .onCompletion { println("onCompletion $it") } .collect{ try { println("collect: $it") throw IllegalStateException(); }catch (e: Exception){ println("catch $e") } } 輸出: collect: 1 catch java.lang.IllegalStateException collect: 2 catch java.lang.IllegalStateException collect: 3 catch java.lang.IllegalStateException onCompletion null
切換執行線程
Flow適合處理復雜的異步任務,大多數情況下耗時任務放在子線程或線程池中處理,對於UI任務放在主線程中進行。
在Flow中可以使用flowOn操作符實現上述場景中的線程切換。
flowOf(1,2,3,4,5) .filter { logX("filter: $it") it > 2 } .flowOn(Dispatchers.IO) // 切換線程 .collect{ logX("collect: $it") } 輸出內容: ================================ filter: 1 Thread:DefaultDispatcher-worker-1 ================================ ================================ filter: 2 Thread:DefaultDispatcher-worker-1 ================================ ================================ filter: 3 Thread:DefaultDispatcher-worker-1 ================================ ================================ filter: 4 Thread:DefaultDispatcher-worker-1 ================================ ================================ filter: 5 Thread:DefaultDispatcher-worker-1 ================================ ================================ collect: 3 Thread:main ================================ ================================ collect: 4 Thread:main ================================ ================================ collect: 5 Thread:main ================================
flowOn 操作符也是和它的位置強相關的。作用域限於它的上遊。在上面的代碼中,flowOn 的上遊,就是 flowOf{}、filter{} 當中的代碼,所以,它們的代碼全都運行在 DefaultDispatcher 這個線程池當中。隻有collect{}當中的代碼是運行在 main 線程當中的。
終止操作符
Flow 裡面,最常見的終止操作符就是collect。除此之外,還有一些從集合中借鑒過來的操作符,也是Flow的終止操作符。比如 first()、single()、fold{}、reduce{},本質上來說說當我們嘗試將 Flow 轉換成集合的時候,已經不屬於Flow的API,也不屬於協程的范疇瞭,它本身也就意味著 Flow 數據流的終止。
"冷的數據流"從何而來
在上面文章《Kotlin協程Channel淺析》中,我們認識到Channel是”熱數據流“,隨時準備好,隨用隨取,就像海底撈裡的服務員。
現在我們看下Flow和Channel的區別
val flow = flow { (1..4).forEach{ println("Flow發送前:$it") emit(it) println("Flow發送後: $it") } } val channel: ReceiveChannel<Int> = produce { (1..4).forEach{ println("Channel發送前: $it") send(it) println("Channel發送後: $it") } } 輸出內容: Channel發送前: 1
Flow中的邏輯並未執行,因此我們可以這樣類比,Channel之所以被認為是“熱”的原因,是因為不管有沒有接收方,發送方都會工作。那麼對應的,Flow被認為是“冷”的原因,就是因為隻有調用終止操作符之後,Flow才會開始工作。
除此之外,Flow一次處理一條數據,是個”懶傢夥“。
val flow = flow { (3..6).forEach { println("Flow發送前:$it") emit(it) println("Flow發送後: $it") } }.filter { println("filter: $it") it > 3 }.map { println("map: $it") it * 2 }.collect { println("結果collect: $it") } 輸出內容: Flow發送前:3 filter: 3 Flow發送後: 3 Flow發送前:4 filter: 4 map: 4 結果collect: 8 Flow發送後: 4 Flow發送前:5 filter: 5 map: 5 結果collect: 10 Flow發送後: 5 Flow發送前:6 filter: 6 map: 6 結果collect: 12 Flow發送後: 6
相比於滿面春風,熱情服務的Channel,Flow更像個冷漠的傢夥,你不找他,他不搭理你。
- Channel,響應速度快,但數據可能是舊的,占用資源
- Flow,響應速度慢,但數據是最新的,節省資源
Flow也可以是”熱“的,你知道嗎?
更多關於Kotlin協程Flow生命周期異常處理的資料請關註WalkonNet其它相關文章!