Kotlin協程Job生命周期結構化並發詳解

引言

前面在學習協程啟動方式的時候在launch的源碼中有一個返回值是Jobasync的返回Deferred也是實現瞭Job,那麼而也就是說launchasync在創建一個協程的時候也會創建一個對應的Job對象。還提到過Job是協程的句柄,那麼Job到底是什麼?它有什麼用?

1.Job的生命周期

先看一下Job的源碼,這裡隻保留瞭跟標題相關的內容

public interface Job : CoroutineContext.Element {
    // ------------ 狀態查詢API ------------
    /**
    * 當該Job處於活動狀態時,返回true——它已經開始,沒有完成,也沒有取消。
    * 如果沒有取消或失敗,等待其子任務完成的Job仍被認為是活動的。
    */
    public val isActive: Boolean
    /**
    * 當Job因任何原因完成時返回true。作業被取消或失敗並已完成其執行也被視為完成。
    * Job隻有在所有子任務完成後才算完成。
    */
    public val isCompleted: Boolean
    /**
    *如果該作業因任何原因被取消,無論是通過顯式調用cancel,還是因為它失敗或其子或父作業被取消,
    * 則返回true。在一般情況下,它並不意味著任務已經完成,因為它可能仍然在完成它正在做的事情,
    * 並等待它的子任務完成。
    */
    public val isCancelled: Boolean
    // ------------ 操控狀態API ------------
    /**
    * 如果Job所在的協程還沒有被啟動那麼調用這個方法就會啟動協程
    * 如果這個協程被啟動瞭返回true,如果已經啟動或者執行完畢瞭返回false
    */
    public fun start(): Boolean
    /**
    * 取消此Job,可用於指定錯誤消息或提供有關取消原因的其他詳細信息
    */
    public fun cancel(cause: CancellationException? = null)
    /**
    * 取消此Job
    */
    public fun cancel(): Unit = cancel(null)
    public fun cancel(cause: Throwable? = null): Boolean
    // ------------ 等待狀態API ------------
    /**
    * 掛起協程,知道任務完成再恢復
    */
    public suspend fun join()
    // ------------ 完成狀態回調API ------------
    /**
    * 註冊Job完成時同步調用的處理程序.
    * 當Job已經完成時,將處理程序將立即調用Job的異常或取消原因或null
    * 否則,該處理程序將在此Job完成時調用一次。
    */
    public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
    /**
    * 註冊在取消或完成此Job時同步調用的處理程序。
    * 當Job已經被取消並完成執行時,處理程序將立即調用Job的取消原因或null,
    * 除非將invokeImmediately設置為false。否則,
    * 當Job取消或完成時將調用一次handler。
    */
    public fun invokeOnCompletion(
        onCancelling: Boolean = false,
        invokeImmediately: Boolean = true,
        handler: CompletionHandler): DisposableHandle
}

從源碼中可以發現這幾個函數和變量跟Actviity或者Fragment非常像,所以我們可以總結出兩個結論:

  • Job可以監測協程的生命周期
  • Job可以操控協程

在例子中使用這幾個函數和變量再來校驗一下上面的結論:

fun main() = runBlocking {
    val job = launch {
        delay(1000L)
    }
    job.log()
    job.cancel()
    job.log()
}
fun Job.log() {
    println(
        """
        isActive:$isActive
        isCompleted:$isCompleted
        isCancelled:$isCancelled
        Thread:${Thread.currentThread().name}  
        ================================
    """.trimIndent()
    )
}
//輸出結果
//isActive:true
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1  
//================================
//isActive:false
//isCompleted:false
//isCancelled:true
//Thread:main @coroutine#1  
//================================

Job.log用瞭擴展函數,方便調用Job中的狀態監測返回值。

上面的代碼通過launch創建瞭一個協程,接收瞭Job的返回值,這裡用這個job對象做瞭三件事:

  • 第一個job.log() launch的創建標志著協程已經被啟動所以在第一個job.log()的日志中isActive返回值是true;
  • job.cancel() 這裡調用瞭job的取消函數將協程任務取消;
  • 第二個job.log() 上面的代碼將協程任務取消瞭,然後再次獲取協程狀態發現isActivte返回false,isCancelled返回true。

上面的代碼也印證瞭前面提出的結論,還有一個函數start沒使用,再來調用它之後輸出的日志:

fun main() = runBlocking {
    //變化1
    val job = launch(start = CoroutineStart.LAZY) {
        delay(1000L)
    }
    job.log()
    //變化2
    job.start()
    job.log()
    job.cancel()
    job.log()
}
fun Job.log() {
    println(
        """
        isActive:$isActive
        isCompleted:$isCompleted
        isCancelled:$isCancelled
        Thread:${Thread.currentThread().name}  
        ================================
    """.trimIndent()
    )
}
//輸出結果:
//isActive:false
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1  
//================================
//isActive:true
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1  
//================================
//isActive:false
//isCompleted:false
//isCancelled:true
//Thread:main @coroutine#1  
//================================

上面的代碼增加瞭兩處修改:

  • 變化1:協程在創建出來的時候就已經被啟動,因此為瞭查看調用Job.start()前的日志需要加上懶啟動
  • 變化2:調用start函數啟動協程

從輸出結果來看沒有調用start函數前isActive返回true,調用後就返回瞭true,當使用懶啟動後在調用cancel函數與前面使用cancel函數輸出的日志是一樣的,可以得知懶啟動後對協程的生命周期並沒有設麼影響(這可能是句廢話)。

現在還有最後一個變量沒有看isCompleted,在上面的代碼中添加一個延時函數,等協程任務結束再打印日志

fun main() = runBlocking {
    val job = launch(start = CoroutineStart.LAZY) {
        delay(1000L)
    }
    job.log()
    job.start()
    job.log()
    job.cancel()
    delay(2000L)		//變化在這裡
    job.log()
}
fun Job.log() {
    println(
        """
        isActive:$isActive
        isCompleted:$isCompleted
        isCancelled:$isCancelled
        Thread:${Thread.currentThread().name}  
        ================================
    """.trimIndent()
    )
}
//輸出結果:
//isActive:false
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1  
//================================
//isActive:true
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1  
//================================
//isActive:false
//isCompleted:true
//isCancelled:true
//Thread:main @coroutine#1  
//================================

從輸出結果中看到當調用isCancelisCompleted也返回瞭true,也就是說任務結束瞭。

上面的代碼為瞭監測isCompleted的狀態加瞭一個延時函數delay,但是這種方式並不建議使用,因為這個時間他不是固定的,例如從後臺請求數據或者下載文件,這種情況下的時間是完全無法預知的。

現在假設已經知道協程執行完畢需要delay(1000L)的時間,如果將協程內的delay時長設置的大於外部的delay時長,會帶來什麼問題?

fun main() = runBlocking {
    val job = launch(start = CoroutineStart.LAZY) {
        delay(4000L)
    }
    job.log()
    job.start()
    job.log()
    delay(1000L)
    job.log()
    println("Process end!")
}
fun Job.log() {
    println(
        """
        isActive:$isActive
        isCompleted:$isCompleted
        isCancelled:$isCancelled
        Thread:${Thread.currentThread().name}  
        ================================
    """.trimIndent()
    )
}
//輸出結果:
//isActive:false
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1  
//================================
//isActive:true
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1  
//================================
//isActive:true
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1  
//================================
//Process end!

由輸出結果可知isCompleted狀態是false,協程任務是否執行完畢不得而知。另外當println("Process end!")執行完畢後程序並沒有立即輸出Process finished with exit code 0,這是因為runBlocking 會一直阻塞,等到 job 任務執行完畢以後才真正退出。

那要如何解決這個問題?

//Job#join 
/**
* 掛起協程,知道任務完成再恢復
*/
public suspend fun join()

joinJob中的一個掛起函數,調用後會掛起當前程序的執行流程,等待job當中的協程任務執行完畢然後再恢復當前程序的執行流程。

join將任務掛起後再恢復,那要如何知道任務是否執行完畢瞭?invokeOnCompletion可以監聽任務執行的狀態

//Job#invokeOnCompletion
/**
* 註冊Job完成時同步調用的處理程序.
* 當Job已經完成時,將處理程序將立即調用Job的異常或取消原因或null
* 否則,該處理程序將在此Job完成時調用一次。
*/
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
//Job#invokeOnCompletion
/**
* 註冊在取消或完成此Job時同步調用的處理程序。
* 當Job已經被取消並完成執行時,處理程序將立即調用Job的取消原因或null,
* 除非將invokeImmediately設置為false。否則,
* 當Job取消或完成時將調用一次handler。
*/
public fun invokeOnCompletion(
        onCancelling: Boolean = false,
        invokeImmediately: Boolean = true,
        handler: CompletionHandler): DisposableHandle

joininvokeOnCompletion的使用如下:

fun main() = runBlocking {
    val job = launch(start = CoroutineStart.LAZY) {
        delay(4000L)
    }
    job.log()
    job.start()
    job.log()
    //新增
    job.join()
    //新增
    job.invokeOnCompletion {
        println("==========Task status==========")
        job.log()
    }
    println("Process end!")
}
fun Job.log() {
    println(
        """
        isActive:$isActive
        isCompleted:$isCompleted
        isCancelled:$isCancelled
        Thread:${Thread.currentThread().name}  
        ================================
    """.trimIndent()
    )
}
//輸出結果:
//isActive:false
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1  
//================================
//isActive:true
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1  
//================================
//==========Task status==========
//isActive:false
//isCompleted:true
//isCancelled:false
//Thread:main @coroutine#1  
//================================
//Process end!

可以看到加入joininvokeOnCompletion之後isCompleted的狀態就正確瞭,同時Process end!輸出後Process finished with exit code 0也會很快的輸出,這說明任務確實執行完畢瞭。

在講協程的啟動方式的時候提出一個觀點:launch的返回值Job代表的是協程的句柄。那麼Job是協程的句柄該怎麼理解?

句柄: 是指一個中間媒介,可以操控一個東西。就類似於遙控器操作空調場景中遙控器就是句柄,開關控制燈具場景中開關就是句柄。

所以Job和協程的關系就類似於遙控器和空調,開關和燈具。Job可以監測協程的運行狀態也可以控制協程的運行狀態。那麼Job就和遙控器、開關一樣看做是一個句柄。

2.Deffered

launch直接創建瞭Jobasync通過Deffered間接創建瞭Job對象,但是它並沒有在 Job 的基礎上擴展出很多其他功能,而接收一個返回值是依靠 await() 方法,那await方法是如何實現的?

fun main() = runBlocking {
    val deferred = async {
        logX("Coroutine start!")
        delay(1000L)
        logX("Coroutine end!")
        "Coroutine result!"
    }
    val result = deferred.await()
    println("Result = $result")
    logX("Process end!")
}
fun logX(any: Any?) {
    println(
        """
================================
$any 
Thread:${Thread.currentThread().name}
================================
""".trimIndent()
    )
}
//輸出結果:
//Coroutine start! 
//Thread:main @coroutine#2
//================================
//================================
//Coroutine end! 
//Thread:main @coroutine#2
//================================
//Result = Coroutine result!
//================================
//Process end! 
//Thread:main @coroutine#1

從輸出結果來看,await方法可以獲取協程執行結果外,好像還會阻塞協程的執行流程,直到協程任務執行完畢。看一下await的源碼

//Deferred#await
public interface Deferred<out T> : Job {
    ...
    public suspend fun await(): T
    ...
}

從源碼來看await也是一個掛起函數,它跟join是一樣的,看似阻塞的過程其實是協程的掛起恢復能力。

所以,總的來說,Deferred 隻是比 Job 多瞭一個 await() 掛起函數而已,通過這個掛起函數,就可以等待協程執行完畢的同時,還可以直接拿到協程的執行結果。

3.Job與結構化並發

在其他地方看過這麼一句話:協程的優勢在於結構化並發, 這句話該如何理解?

這句話可以理解為帶有結構和層級的並發,用代碼表現就像這樣:

fun main() = runBlocking {
    val parentJob: Job
    var childJob1: Job? = null
    var childJob2: Job? = null
    var childJob3: Job? = null
    parentJob = launch {
        childJob1 = launch {
            delay(1000L)
        }
        childJob2 = launch {
            delay(3000L)
        }
        childJob3 = launch {
            delay(5000L)
        }
    }
    delay(500L)
    parentJob.children.forEachIndexed { index, job ->
        when (index) {
            0 -> println("childJob1 === childJob1 is ${childJob1 === job}")
            1 -> println("childJob2 === childJob2 is ${childJob2 === job}")
            2 -> println("childJob3 === childJob3 is ${childJob3 === job}")
        }
    }
    parentJob.join()
    logX("Process end!")
}
//輸出結果:
//childJob1 === childJob1 is true
//childJob2 === childJob2 is true
//childJob3 === childJob3 is true
//================================
//Process end! 
//Thread:main @coroutine#1

上面的代碼是父子層級,父Job使用launch啟動瞭協程同時它的內部還有三個Job,三個子Job是並發執行的,同時也是用過launch啟動的協程,調用瞭parentJob.join()那麼掛起的時間就是childJob3的時長—5秒,因為它要等待所有任務都執行完畢才會恢復執行,然後通過children.forEachIndexed進行遍歷並分別對比他們與三個子Job的引用是否相等“===”代表瞭引用相等,即是否是同一個對象)。圖示如下

前面講過,Job可以調用cancel方法取消執行,那麼當調用parentJob.cancel會有什麼樣的情況?

fun main() = runBlocking {
    val parentJob: Job
    var childJob1: Job? = null
    var childJob2: Job? = null
    var childJob3: Job? = null
    parentJob = launch {
        childJob1 = launch {
            println("childJob1 start")
            delay(1000L)
            println("childJob1 end")
        }
        childJob2 = launch {
            println("childJob2 start")
            delay(3000L)
            println("childJob2 start")
        }
        childJob3 = launch {
            println("childJob3 start")
            delay(5000L)
            println("childJob3 start")
        }
    }
    delay(500L)
    parentJob.cancel()
    logX("Process end!")
}
//輸出結果:
//childJob1 start
//childJob2 start
//childJob3 start
//================================
//Process end! 
//Thread:main @coroutine#1

parentJob.cancel調用後,每個子Job隻是輸出瞭start,這就可以得出一個結論:父Job取消後子Job也會依次跟著取消。如果調用任何一個子Jobcancel則不會對父Job和其他子Job產生影響。

到這裡對於開頭的那句協程的優勢在於結構化並發就有更更好的理解瞭,這是Kotlin協程的第二大優勢。

4.launch和async的使用場景

  • launch: 主要用來發起一些不需要任何結果的耗時任務,這個任務在執行中可以改變它的執行狀態。
  • async: 主要用來發起一些需要結果的耗時任務,以及與掛起函數結合,優化並發。

以上就是Kotlin協程Job生命周期結構化並發詳解的詳細內容,更多關於Kotlin協程Job的資料請關註WalkonNet其它相關文章!

推薦閱讀: