Kotlin協程啟動createCoroutine及創建startCoroutine原理

createCoroutine 和 startCoroutine

協程到底是怎麼創建和啟動的?本篇文章帶你揭曉。

在Continuation.kt文件中,有2個基礎API,這裡單獨提出來說一下,方便後面我們理解launch。

public fun <T> (suspend () -> T).createCoroutine(
    completion: Continuation<T>
): Continuation<Unit> =
    SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
public fun <T> (suspend () -> T).startCoroutine(
    completion: Continuation<T>
) {
    createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}

createCoroutine和startCoroutine就是用來創建和啟動協程的基礎API,launch、async等在底層一定程度上都使用瞭該基礎API,launch和async隻不過是封裝而已。所以,我們先掌握它們。

這2個函數看起來差別不大,一個調用瞭resume開始瞭協程,一個沒有調用,需要外部去調用resume(createCoroutine會把Continuation返回出去)。

既然launch和async可以用它們來創建和啟動協程,那我們是否可以直接用它們來創建和啟動協程?那當然可以。這裡我舉個startCoroutine的例子,仔細看它的函數聲明,它其實是個擴展函數,擴展的是(suspend () -> T)這種類型。

(suspend () -> T):suspend函數+返回類型是T

它可以有2種寫法:

//方式1-----------
val block = suspend {
    ...
    "雲天明"
}
block.startCoroutine(continuation)
//方式2--------------
suspend fun getUserName(): String {
    ...
    return "雲天明"
}
(::getUserName).startCoroutine(continuation)

一種是匿名的suspend函數,一種是正常的有名字的suspend函數。現在,我們簡單寫個demo來調一下startCoroutine。

startCoroutine調用

//StartCoroutine.kt
fun main() {
    val continuation = object : Continuation<String> {
        override val context: CoroutineContext
            get() = EmptyCoroutineContext
        override fun resumeWith(result: Result<String>) {
            println("結果: ${result.getOrNull()}")
        }
    }
    block.startCoroutine(continuation)
    Thread.sleep(3000L)
}
val block = suspend {
    println("start")
    delay(2000L)
    println("end")
    "DX3906"
}

調起非常簡單,startCoroutine是(suspend () -> T)的擴展函數,且需要傳遞一個Continuation參數。我們先反編譯看一下,長什麼樣子。

public final class StartCoroutineKt {
    //block那塊被轉換成瞭一個類StartCoroutineKt$block$1,這裡創建好一個實例對象,待會兒可以直接使用
    private static final Function1<Continuation<? super String>, Object> block = new StartCoroutineKt$block$1((Continuation<? super StartCoroutineKt$block$1>) null);
    public static final void main() {
        //調用擴展函數,將block和continuation參數傳入。  
        ContinuationKt.startCoroutine(block, new StartCoroutineKt$main$continuation$1());
        Thread.sleep(3000);
    }
    public static final Function1<Continuation<? super String>, Object> getBlock() {
        return block;
    }
}
//對應block那塊
final class StartCoroutineKt$block$1 extends SuspendLambda implements Function1<Continuation<? super String>, Object> {
    int label;
    StartCoroutineKt$block$1(Continuation<? super StartCoroutineKt$block$1> continuation) {
        super(1, continuation);
    }
    //創建StartCoroutineKt$block$1實例
    public final Continuation<Unit> create(Continuation<?> continuation) {
        return new StartCoroutineKt$block$1(continuation);
    }
    public final Object invoke(Continuation<? super String> continuation) {
        //創建StartCoroutineKt$block$1實例並執行invokeSuspend
        return ((StartCoroutineKt$block$1) create(continuation)).invokeSuspend(Unit.INSTANCE);
    }
    public final Object invokeSuspend(Object $result) {
        Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        //狀態機
        switch (this.label) {
            case 0:
                //label一開始是0
                ResultKt.throwOnFailure($result);
                System.out.println("start");
                this.label = 1;
                //這裡正常情況會返回COROUTINE_SUSPENDED,label已經改成1瞭,下次走case 1的邏輯
                if (DelayKt.delay(2000, this) != coroutine_suspended) {
                    break;
                } else {
                    return coroutine_suspended;
                }
            case 1:
                //label為1,沒有return,繼續走最後的結束語句
                ResultKt.throwOnFailure($result);
                break;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
        //結束
        System.out.println("end");
        return "雲天明";
    }
}
//對應Continuation那塊
public final class StartCoroutineKt$main$continuation$1 implements Continuation<String> {
    StartCoroutineKt$main$continuation$1() {
    }
    public CoroutineContext getContext() {
        return EmptyCoroutineContext.INSTANCE;
    }
    public void resumeWith(Object result) {
        //輸出結果
        StringBuilder sb = new StringBuilder();
        sb.append("結果: ");
        sb.append((String) (Result.m29isFailureimpl(result) ? null : result));
        System.out.println(sb.toString());
    }
}

還是比較清晰的,

  • 首先object : Continuation<String> 是肯定會生成一個匿名內部類,在該類中,簡單在resumeWith裡面輸出瞭一下結果
  • block那塊代碼,也會生成一個匿名內部類。需要註意的是,它繼承自SuspendLambda,這個沒見過,待會兒分析,裡面有幾個方法:create、invoke、invokeSuspend。其中create是創建該類的實例,invoke是調用create方法並執行invokeSuspend,invokeSuspend裡面是狀態機相關的邏輯。
  • main裡面執行瞭ContinuationKt.startCoroutine(block, continuation),調起瞭擴展方法(擴展方法的原理就是這樣的)

反編譯出來的代碼大致結構我們是瞭解瞭,現在需要分析一下startCoroutine具體是怎麼走的瞭,看它是怎麼利用這些反編譯出來的代碼的。

createCoroutineUnintercepted

public fun <T> (suspend () -> T).startCoroutine(
    completion: Continuation<T>
) {
    createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
//這個函數是expect的,沒有函數體
public expect fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit>

startCoroutine首先是調用瞭createCoroutineUnintercepted函數,而createCoroutineUnintercepted是expect的,它是一種聲明。因為Kotlin是跨平臺的,所以部分邏輯與平臺相關,這個createCoroutineUnintercepted就是這種。

它沒有函數體,我們隻關心JVM平臺,所以需要到JVM平臺上找該函數的實現。在Kotlin源碼地圖文章中,我們提到協程源碼,分為2個倉庫,一個是Kotlin倉庫,一個是Kotlin協程倉庫。

這個createCoroutineUnintercepted是在Kotlin倉庫中,具體位置是:

kotlin/libraries/stdlib/jvm/src/kotlin/coroutines/intrinsics/IntrinsicsJvm.kt

public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        //走這裡
        create(probeCompletion)
    else
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function1<Continuation<T>, Any?>).invoke(it)
        }
}

咦,createCoroutineUnintercepted居然也是(suspend () -> T)的擴展函數,所以if那裡的this指的就是block,也就是StartCoroutineKt$block$1。它繼承自SuspendLambda。

internal abstract class SuspendLambda(
    public override val arity: Int,
    completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
    constructor(arity: Int) : this(arity, null)
    public override fun toString(): String =
        if (completion == null)
            Reflection.renderLambdaToString(this) // this is lambda
        else
            super.toString() // this is continuation
}
internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    ......
}
//BaseContinuationImpl實現瞭Continuation接口
internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    ...
}

SuspendLambda是ContinuationImpl的子類,而ContinuationImpl是BaseContinuationImpl的子類。所以上面的if (this is BaseContinuationImpl)判斷是ok的,會走到create(probeCompletion)

也就是StartCoroutineKt$block$1的create方法,在裡面會創建StartCoroutineKt$block$1實例。

public final Continuation<Unit> create(Continuation<?> continuation) {
    return new StartCoroutineKt$block$1(continuation);
}

走到這裡相當於startCoroutine中的createCoroutineUnintercepted(completion)這一步就走完瞭,它最終返回的是StartCoroutineKt$block$1的實例,也就是一個Continuation。它標志著協程被創建好瞭。再來看下intercepted是什麼邏輯

intercepted

public fun <T> (suspend () -> T).startCoroutine(
    completion: Continuation<T>
) {
    createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
//好傢夥,intercepted也是expect的
public expect fun <T> Continuation<T>.intercepted(): Continuation<T>

發現這裡的intercepted擴展函數也是expect的,又得去kotlin倉庫裡面找jvm相關的實現。我找瞭下,路徑在這裡:

kotlin/libraries/stdlib/jvm/src/kotlin/coroutines/intrinsics/IntrinsicsJvm.kt

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this

intercepted是一個擴展函數,這裡的this也就是前面createCoroutineUnintercepted(completion)創建出來的StartCoroutineKt$block$1實例,它本身是SuspendLambda的子類,而SuspendLambda就是ContinuationImpl的子類。

所以這裡的as?會轉換成功,轉換出來的不是null。也就是說走到瞭ContinuationImpl的intercepted()

internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
    //這個context其實就是傳入的Continuation中的context
    public override val context: CoroutineContext
        get() = _context!!
    @Transient
    private var intercepted: Continuation<Any?>? = null
    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
}
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

第一次執行這裡時intercepted是null,那麼會從context中取ContinuationInterceptor,而context就是Continuation傳入的context,我們傳入的是EmptyCoroutineContext,取出來是null(ContinuationInterceptor會對Continuation進行攔截,然後將執行邏輯指派到對應的線程之上去,這塊的邏輯後面再細說,就不詳細展開瞭。)

所以這裡intercepted()最終執行結果就是返回this,this也就是StartCoroutineKt$block$1(block函數生成的類)。

intercepted()走完後再回到startCoroutine:

public fun <T> (suspend () -> T).startCoroutine(
    completion: Continuation<T>
) {
    createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}

resume

就差最後一個resume(Unit)瞭,前面createCoroutineUnintercepted(completion).intercepted()創建出來的是StartCoroutineKt$block$1實例,所以我們需要到這個類裡面去找resume函數。

再提一下類的繼承關系:

StartCoroutineKt$block$1 extends SuspendLambda implements Function1
internal abstract class SuspendLambda(
    public override val arity: Int,
    completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction 
internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) 
internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable
public interface Continuation<in T> {
    public val context: CoroutineContext
    public fun resumeWith(result: Result<T>)
}

StartCoroutineKt$block$1中沒有該resume函數,其父類SuspendLambda也沒有該函數,再到SuspendLambda的父類ContinuationImpl中,發現也沒有。再到ContinuationImpl的父類BaseContinuationImpl中,也沒有該函數,隻有一個resumeWith,奇瞭怪瞭。後來,我發現這個resume函數是一個擴展函數:

public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))

而resume這個擴展函數最終是調用的resumeWith,resumeWidth的實現在BaseContinuationImpl中。

public final override fun resumeWith(result: Result<Any?>) {
    var current = this
    var param = result
    while (true) {
        probeCoroutineResumed(current)
        with(current) {
            val completion = completion!! // fail fast when trying to resume continuation without completion
            val outcome: Result<Any?> =
                try {
                    val outcome = invokeSuspend(param)
                    if (outcome === COROUTINE_SUSPENDED) return
                    Result.success(outcome)
                } catch (exception: Throwable) {
                    Result.failure(exception)
                }
            releaseIntercepted() // this state machine instance is terminating
            if (completion is BaseContinuationImpl) {
                current = completion
                param = outcome
            } else {
                //label等於1時走這裡
                completion.resumeWith(outcome)
                return
            }
        }
    }
}

這個開瞭個while(true)循環,不斷地執行invokeSuspend(),如果遇到invokeSuspend返回結果是COROUTINE_SUSPENDED則退出while(true)循環。

public final Object invokeSuspend(Object $result) {
    Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
    //狀態機
    switch (this.label) {
        case 0:
            //label一開始是0
            ResultKt.throwOnFailure($result);
            System.out.println("start");
            this.label = 1;
            //這裡正常情況會返回COROUTINE_SUSPENDED,label已經改成1瞭,下次走case 1的邏輯
            if (DelayKt.delay(2000, this) != coroutine_suspended) {
                break;
            } else {
                return coroutine_suspended;
            }
        case 1:
            //label為1,沒有return,繼續走最後的結束語句
            ResultKt.throwOnFailure($result);
            break;
        default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }
    //結束
    System.out.println("end");
    return "雲天明";
}

invokeSuspend實際上就是我們的demo中的StartCoroutineKt$block$1裡的invokeSuspend函數。在demo中,這個invokeSuspend第一次的時候狀態機那裡,label是0,所以會隨即走到DelayKt.delay(2000, this),它是一個掛起函數,此時會拿到結果:COROUTINE_SUSPENDED

resumeWith遇到COROUTINE_SUSPENDED就不會繼續往下走瞭,等到delay執行完成之後,會回調這個resumeWith函數,再繼續走invokeSuspend,此時label已經是1瞭,走到狀態機邏輯那裡,返回結果“雲天明”。

這個結果會被resumeWidth的outcome接收住,resumeWidth中的這個completion其實就是我們demo中的StartCoroutineKt$main$continuation$1(實現Continuation<String>的那個類,是通過構造函數傳進來的),最終會走到completion.resumeWith(outcome),也就是來到瞭輸出結果的地方:println("結果: ${result.getOrNull()}")。整個流程就走完瞭。

結語

createCoroutine用來創建協程,startCoroutine用來創建並啟動協程。它們內部的原理是類似的,隻是一個沒有調用resume啟動協程,另一個調用瞭resume啟動協程。編譯的時候,會生成一個SuspendLambda的實現類,該類invokeSuspend用於執行狀態機的邏輯,調用resume後該狀態機會被觸發,狀態機走完,協程也就走完瞭。

以上就是Kotlin協程啟動createCoroutine及創建startCoroutine原理的詳細內容,更多關於Kotlin協程啟動創建的資料請關註WalkonNet其它相關文章!

推薦閱讀: