Kotlin協程啟動createCoroutine及創建startCoroutine原理
createCoroutine 和 startCoroutine
協程到底是怎麼創建和啟動的?本篇文章帶你揭曉。
在Continuation.kt文件中,有2個基礎API,這裡單獨提出來說一下,方便後面我們理解launch。
public fun <T> (suspend () -> T).createCoroutine( completion: Continuation<T> ): Continuation<Unit> = SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED) public fun <T> (suspend () -> T).startCoroutine( completion: Continuation<T> ) { createCoroutineUnintercepted(completion).intercepted().resume(Unit) }
createCoroutine和startCoroutine就是用來創建和啟動協程的基礎API,launch、async等在底層一定程度上都使用瞭該基礎API,launch和async隻不過是封裝而已。所以,我們先掌握它們。
這2個函數看起來差別不大,一個調用瞭resume開始瞭協程,一個沒有調用,需要外部去調用resume(createCoroutine會把Continuation返回出去)。
既然launch和async可以用它們來創建和啟動協程,那我們是否可以直接用它們來創建和啟動協程?那當然可以。這裡我舉個startCoroutine的例子,仔細看它的函數聲明,它其實是個擴展函數,擴展的是(suspend () -> T)
這種類型。
(suspend () -> T)
:suspend函數+返回類型是T
它可以有2種寫法:
//方式1----------- val block = suspend { ... "雲天明" } block.startCoroutine(continuation) //方式2-------------- suspend fun getUserName(): String { ... return "雲天明" } (::getUserName).startCoroutine(continuation)
一種是匿名的suspend函數,一種是正常的有名字的suspend函數。現在,我們簡單寫個demo來調一下startCoroutine。
startCoroutine調用
//StartCoroutine.kt fun main() { val continuation = object : Continuation<String> { override val context: CoroutineContext get() = EmptyCoroutineContext override fun resumeWith(result: Result<String>) { println("結果: ${result.getOrNull()}") } } block.startCoroutine(continuation) Thread.sleep(3000L) } val block = suspend { println("start") delay(2000L) println("end") "DX3906" }
調起非常簡單,startCoroutine是(suspend () -> T)
的擴展函數,且需要傳遞一個Continuation參數。我們先反編譯看一下,長什麼樣子。
public final class StartCoroutineKt { //block那塊被轉換成瞭一個類StartCoroutineKt$block$1,這裡創建好一個實例對象,待會兒可以直接使用 private static final Function1<Continuation<? super String>, Object> block = new StartCoroutineKt$block$1((Continuation<? super StartCoroutineKt$block$1>) null); public static final void main() { //調用擴展函數,將block和continuation參數傳入。 ContinuationKt.startCoroutine(block, new StartCoroutineKt$main$continuation$1()); Thread.sleep(3000); } public static final Function1<Continuation<? super String>, Object> getBlock() { return block; } } //對應block那塊 final class StartCoroutineKt$block$1 extends SuspendLambda implements Function1<Continuation<? super String>, Object> { int label; StartCoroutineKt$block$1(Continuation<? super StartCoroutineKt$block$1> continuation) { super(1, continuation); } //創建StartCoroutineKt$block$1實例 public final Continuation<Unit> create(Continuation<?> continuation) { return new StartCoroutineKt$block$1(continuation); } public final Object invoke(Continuation<? super String> continuation) { //創建StartCoroutineKt$block$1實例並執行invokeSuspend return ((StartCoroutineKt$block$1) create(continuation)).invokeSuspend(Unit.INSTANCE); } public final Object invokeSuspend(Object $result) { Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED(); //狀態機 switch (this.label) { case 0: //label一開始是0 ResultKt.throwOnFailure($result); System.out.println("start"); this.label = 1; //這裡正常情況會返回COROUTINE_SUSPENDED,label已經改成1瞭,下次走case 1的邏輯 if (DelayKt.delay(2000, this) != coroutine_suspended) { break; } else { return coroutine_suspended; } case 1: //label為1,沒有return,繼續走最後的結束語句 ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } //結束 System.out.println("end"); return "雲天明"; } } //對應Continuation那塊 public final class StartCoroutineKt$main$continuation$1 implements Continuation<String> { StartCoroutineKt$main$continuation$1() { } public CoroutineContext getContext() { return EmptyCoroutineContext.INSTANCE; } public void resumeWith(Object result) { //輸出結果 StringBuilder sb = new StringBuilder(); sb.append("結果: "); sb.append((String) (Result.m29isFailureimpl(result) ? null : result)); System.out.println(sb.toString()); } }
還是比較清晰的,
- 首先
object : Continuation<String>
是肯定會生成一個匿名內部類,在該類中,簡單在resumeWith裡面輸出瞭一下結果 - block那塊代碼,也會生成一個匿名內部類。需要註意的是,它繼承自SuspendLambda,這個沒見過,待會兒分析,裡面有幾個方法:create、invoke、invokeSuspend。其中create是創建該類的實例,invoke是調用create方法並執行invokeSuspend,invokeSuspend裡面是狀態機相關的邏輯。
- main裡面執行瞭
ContinuationKt.startCoroutine(block, continuation)
,調起瞭擴展方法(擴展方法的原理就是這樣的)
反編譯出來的代碼大致結構我們是瞭解瞭,現在需要分析一下startCoroutine具體是怎麼走的瞭,看它是怎麼利用這些反編譯出來的代碼的。
createCoroutineUnintercepted
public fun <T> (suspend () -> T).startCoroutine( completion: Continuation<T> ) { createCoroutineUnintercepted(completion).intercepted().resume(Unit) } //這個函數是expect的,沒有函數體 public expect fun <T> (suspend () -> T).createCoroutineUnintercepted( completion: Continuation<T> ): Continuation<Unit>
startCoroutine首先是調用瞭createCoroutineUnintercepted函數,而createCoroutineUnintercepted是expect的,它是一種聲明。因為Kotlin是跨平臺的,所以部分邏輯與平臺相關,這個createCoroutineUnintercepted就是這種。
它沒有函數體,我們隻關心JVM平臺,所以需要到JVM平臺上找該函數的實現。在Kotlin源碼地圖文章中,我們提到協程源碼,分為2個倉庫,一個是Kotlin倉庫,一個是Kotlin協程倉庫。
這個createCoroutineUnintercepted是在Kotlin倉庫中,具體位置是:
kotlin/libraries/stdlib/jvm/src/kotlin/coroutines/intrinsics/IntrinsicsJvm.kt
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted( completion: Continuation<T> ): Continuation<Unit> { val probeCompletion = probeCoroutineCreated(completion) return if (this is BaseContinuationImpl) //走這裡 create(probeCompletion) else createCoroutineFromSuspendFunction(probeCompletion) { (this as Function1<Continuation<T>, Any?>).invoke(it) } }
咦,createCoroutineUnintercepted居然也是(suspend () -> T)
的擴展函數,所以if那裡的this指的就是block,也就是StartCoroutineKt$block$1。它繼承自SuspendLambda。
internal abstract class SuspendLambda( public override val arity: Int, completion: Continuation<Any?>? ) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction { constructor(arity: Int) : this(arity, null) public override fun toString(): String = if (completion == null) Reflection.renderLambdaToString(this) // this is lambda else super.toString() // this is continuation } internal abstract class ContinuationImpl( completion: Continuation<Any?>?, private val _context: CoroutineContext? ) : BaseContinuationImpl(completion) { ...... } //BaseContinuationImpl實現瞭Continuation接口 internal abstract class BaseContinuationImpl( public val completion: Continuation<Any?>? ) : Continuation<Any?>, CoroutineStackFrame, Serializable { ... }
SuspendLambda是ContinuationImpl的子類,而ContinuationImpl是BaseContinuationImpl的子類。所以上面的if (this is BaseContinuationImpl)
判斷是ok的,會走到create(probeCompletion)
。
也就是StartCoroutineKt$block$1
的create方法,在裡面會創建StartCoroutineKt$block$1
實例。
public final Continuation<Unit> create(Continuation<?> continuation) { return new StartCoroutineKt$block$1(continuation); }
走到這裡相當於startCoroutine中的createCoroutineUnintercepted(completion)
這一步就走完瞭,它最終返回的是StartCoroutineKt$block$1
的實例,也就是一個Continuation。它標志著協程被創建好瞭。再來看下intercepted是什麼邏輯
intercepted
public fun <T> (suspend () -> T).startCoroutine( completion: Continuation<T> ) { createCoroutineUnintercepted(completion).intercepted().resume(Unit) } //好傢夥,intercepted也是expect的 public expect fun <T> Continuation<T>.intercepted(): Continuation<T>
發現這裡的intercepted擴展函數也是expect的,又得去kotlin倉庫裡面找jvm相關的實現。我找瞭下,路徑在這裡:
kotlin/libraries/stdlib/jvm/src/kotlin/coroutines/intrinsics/IntrinsicsJvm.kt
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> = (this as? ContinuationImpl)?.intercepted() ?: this
intercepted是一個擴展函數,這裡的this也就是前面createCoroutineUnintercepted(completion)
創建出來的StartCoroutineKt$block$1
實例,它本身是SuspendLambda的子類,而SuspendLambda就是ContinuationImpl的子類。
所以這裡的as?
會轉換成功,轉換出來的不是null。也就是說走到瞭ContinuationImpl的intercepted()
internal abstract class ContinuationImpl( completion: Continuation<Any?>?, private val _context: CoroutineContext? ) : BaseContinuationImpl(completion) { constructor(completion: Continuation<Any?>?) : this(completion, completion?.context) //這個context其實就是傳入的Continuation中的context public override val context: CoroutineContext get() = _context!! @Transient private var intercepted: Continuation<Any?>? = null public fun intercepted(): Continuation<Any?> = intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) .also { intercepted = it } } @Transient private var intercepted: Continuation<Any?>? = null public fun intercepted(): Continuation<Any?> = intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) .also { intercepted = it }
第一次執行這裡時intercepted是null,那麼會從context中取ContinuationInterceptor,而context就是Continuation傳入的context,我們傳入的是EmptyCoroutineContext,取出來是null(ContinuationInterceptor會對Continuation進行攔截,然後將執行邏輯指派到對應的線程之上去,這塊的邏輯後面再細說,就不詳細展開瞭。)
所以這裡intercepted()最終執行結果就是返回this,this也就是StartCoroutineKt$block$1
(block函數生成的類)。
intercepted()
走完後再回到startCoroutine:
public fun <T> (suspend () -> T).startCoroutine( completion: Continuation<T> ) { createCoroutineUnintercepted(completion).intercepted().resume(Unit) }
resume
就差最後一個resume(Unit)瞭,前面createCoroutineUnintercepted(completion).intercepted()
創建出來的是StartCoroutineKt$block$1
實例,所以我們需要到這個類裡面去找resume函數。
再提一下類的繼承關系:
StartCoroutineKt$block$1 extends SuspendLambda implements Function1 internal abstract class SuspendLambda( public override val arity: Int, completion: Continuation<Any?>? ) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction internal abstract class ContinuationImpl( completion: Continuation<Any?>?, private val _context: CoroutineContext? ) : BaseContinuationImpl(completion) internal abstract class BaseContinuationImpl( public val completion: Continuation<Any?>? ) : Continuation<Any?>, CoroutineStackFrame, Serializable public interface Continuation<in T> { public val context: CoroutineContext public fun resumeWith(result: Result<T>) }
StartCoroutineKt$block$1
中沒有該resume函數,其父類SuspendLambda也沒有該函數,再到SuspendLambda的父類ContinuationImpl中,發現也沒有。再到ContinuationImpl的父類BaseContinuationImpl中,也沒有該函數,隻有一個resumeWith,奇瞭怪瞭。後來,我發現這個resume函數是一個擴展函數:
public inline fun <T> Continuation<T>.resume(value: T): Unit = resumeWith(Result.success(value))
而resume這個擴展函數最終是調用的resumeWith,resumeWidth的實現在BaseContinuationImpl中。
public final override fun resumeWith(result: Result<Any?>) { var current = this var param = result while (true) { probeCoroutineResumed(current) with(current) { val completion = completion!! // fail fast when trying to resume continuation without completion val outcome: Result<Any?> = try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { current = completion param = outcome } else { //label等於1時走這裡 completion.resumeWith(outcome) return } } } }
這個開瞭個while(true)
循環,不斷地執行invokeSuspend(),如果遇到invokeSuspend返回結果是COROUTINE_SUSPENDED
則退出while(true)
循環。
public final Object invokeSuspend(Object $result) { Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED(); //狀態機 switch (this.label) { case 0: //label一開始是0 ResultKt.throwOnFailure($result); System.out.println("start"); this.label = 1; //這裡正常情況會返回COROUTINE_SUSPENDED,label已經改成1瞭,下次走case 1的邏輯 if (DelayKt.delay(2000, this) != coroutine_suspended) { break; } else { return coroutine_suspended; } case 1: //label為1,沒有return,繼續走最後的結束語句 ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } //結束 System.out.println("end"); return "雲天明"; }
invokeSuspend實際上就是我們的demo中的StartCoroutineKt$block$1
裡的invokeSuspend函數。在demo中,這個invokeSuspend第一次的時候狀態機那裡,label是0,所以會隨即走到DelayKt.delay(2000, this)
,它是一個掛起函數,此時會拿到結果:COROUTINE_SUSPENDED
。
resumeWith遇到COROUTINE_SUSPENDED
就不會繼續往下走瞭,等到delay執行完成之後,會回調這個resumeWith函數,再繼續走invokeSuspend,此時label已經是1瞭,走到狀態機邏輯那裡,返回結果“雲天明”。
這個結果會被resumeWidth的outcome接收住,resumeWidth中的這個completion其實就是我們demo中的StartCoroutineKt$main$continuation$1
(實現Continuation<String>
的那個類,是通過構造函數傳進來的),最終會走到completion.resumeWith(outcome)
,也就是來到瞭輸出結果的地方:println("結果: ${result.getOrNull()}")
。整個流程就走完瞭。
結語
createCoroutine用來創建協程,startCoroutine用來創建並啟動協程。它們內部的原理是類似的,隻是一個沒有調用resume啟動協程,另一個調用瞭resume啟動協程。編譯的時候,會生成一個SuspendLambda的實現類,該類invokeSuspend用於執行狀態機的邏輯,調用resume後該狀態機會被觸發,狀態機走完,協程也就走完瞭。
以上就是Kotlin協程啟動createCoroutine及創建startCoroutine原理的詳細內容,更多關於Kotlin協程啟動創建的資料請關註WalkonNet其它相關文章!