Kotlin協程launch啟動流程原理詳解
1.launch啟動流程
已知協程的啟動方式之一是Globalscope.launch
,那麼Globalscope.launch
的流程是怎樣的呢,直接進入launch
的源碼開始看起。
fun main() { coroutineTest() Thread.sleep(2000L) } val block = suspend { println("Hello") delay(1000L) println("Kotlin") } private fun coroutineTest() { CoroutineScope(Job()).launch { withContext(Dispatchers.IO) { block.invoke() } } }
反編譯後的Java代碼
public final class CoroutineDemoKt { @NotNull private static final Function1 block; public static final void main() { coroutineTest(); Thread.sleep(2000L); } // $FF: synthetic method public static void main(String[] var0) { main(); } @NotNull public static final Function1 getBlock() { return block; } private static final void coroutineTest() { BuildersKt.launch$default(CoroutineScopeKt.CoroutineScope((CoroutineContext)JobKt.Job$default((Job)null, 1, (Object)null)), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) { int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch(this.label) { case 0: ResultKt.throwOnFailure($result); CoroutineContext var10000 = (CoroutineContext)Dispatchers.getIO(); Function2 var10001 = (Function2)(new Function2((Continuation)null) { int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch(this.label) { case 0: ResultKt.throwOnFailure($result); Function1 var10000 = CoroutineDemoKt.getBlock(); this.label = 1; if (var10000.invoke(this) == var2) { return var2; } break; case 1: ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } return Unit.INSTANCE; } @NotNull public final Continuation create(@Nullable Object value, @NotNull Continuation completion) { Intrinsics.checkNotNullParameter(completion, "completion"); Function2 var3 = new <anonymous constructor>(completion); return var3; } public final Object invoke(Object var1, Object var2) { return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); } }); this.label = 1; if (BuildersKt.withContext(var10000, var10001, this) == var2) { return var2; } break; case 1: ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } return Unit.INSTANCE; } @NotNull public final Continuation create(@Nullable Object value, @NotNull Continuation completion) { Intrinsics.checkNotNullParameter(completion, "completion"); Function2 var3 = new <anonymous constructor>(completion); return var3; } public final Object invoke(Object var1, Object var2) { return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); } }), 3, (Object)null); } static { Function1 var0 = (Function1)(new Function1((Continuation)null) { int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); String var2; switch(this.label) { case 0: ResultKt.throwOnFailure($result); var2 = "Hello"; System.out.println(var2); this.label = 1; if (DelayKt.delay(1000L, this) == var3) { return var3; } break; case 1: ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } var2 = "Kotlin"; System.out.println(var2); return Unit.INSTANCE; } @NotNull public final Continuation create(@NotNull Continuation completion) { Intrinsics.checkNotNullParameter(completion, "completion"); Function1 var2 = new <anonymous constructor>(completion); return var2; } public final Object invoke(Object var1) { return ((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE); } }); block = var0; } }
先分析一下上面代碼的流程:
- 首先聲明瞭一個Function1類型的block變量,這個變量就是demo中的block,然後會在static函數中會被賦值。
- 接下來就是coroutineTest函數的調用。這個函數中的第一行代碼就是CoroutineScope的傳參和一些默認值
- 然後通過89行的invoke進入到瞭外層狀態機流轉的過程
- 95行的static表示的是內部的掛起函數就是demo中的block.invoke,它是以匿名內部類的方式實現,然後執行內部的狀態機流轉過程,最後給block賦值。
- block被賦值後最終在
Function1 var10000 = CoroutineDemoKt.getBlock();
被調用
那麼這個過程又是如何實現的,進入launch
源碼進行查看:
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }
這裡的block
指的就是demo中的block代碼段
再來看一下裡面的幾行代碼的含義:
- newCoroutineContext: 通過默認的或者傳入的context創建一個新的Context;
- coroutine: launch 會根據傳入的啟動模式來創建對應的協程對象。這裡有兩種,一種是標準的,一種是懶加載的。
- coroutine.start: 嘗試啟動協程
2.協程是如何被啟動的
通過launch
的源碼可知協程的啟動是通過coroutine.start
啟動的,那麼協程的啟動流程又是怎樣的?
public abstract class AbstractCoroutine<in T>( parentContext: CoroutineContext, initParentJob: Boolean, active: Boolean ) : JobSupport(active), Job, Continuation<T>, CoroutineScope { ... /** * 用給定的代碼塊啟動這個協程並啟動策略。這個函數在這個協程上最多調用一次。 */ public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { start(block, receiver, this) } }
start
函數中傳入瞭三個參數,隻需要關註第一個參數即可。
public enum class CoroutineStart { ... /** * 用這個協程的啟動策略啟動相應的塊作為協程。 */ public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit = when (this) { DEFAULT -> block.startCoroutineCancellable(completion) ATOMIC -> block.startCoroutine(completion) UNDISPATCHED -> block.startCoroutineUndispatched(completion) LAZY -> Unit // will start lazily } }
啟動策略的具體實現有三種方式,這裡隻需要分析startCoroutine
,另外兩個其實就是它的基礎上增加瞭一些功能,其中前者代表啟動協程以後可以在等待調度時取消,後者表示協程啟動後不會被分發。
/** * 創建沒有接收方且結果類型為T的協程,這個函數每次調用時都會創建一個新的可掛起的實例。 */ public fun <T> (suspend () -> T).startCoroutine( completion: Continuation<T> ) { createCoroutineUnintercepted(completion).intercepted().resume(Unit) }
createCoroutineUnintercepted
在源代碼中隻是一個聲明,它的具體實現是在IntrinsicsJvm.kt文件中。
//IntrinsicsJvm.kt#createCoroutineUnintercepted /** * 創建沒有接收方且結果類型為T的非攔截協程。這個函數每次調用時都會創建一個新的可掛起的實例。 */ public actual fun <T> (suspend () -> T).createCoroutineUnintercepted( completion: Continuation<T> ): Continuation<Unit> { val probeCompletion = probeCoroutineCreated(completion) return if (this is BaseContinuationImpl) create(probeCompletion) else createCoroutineFromSuspendFunction(probeCompletion) { (this as Function1<Continuation<T>, Any?>).invoke(it) } }
actual
代表瞭 createCoroutineUnintercepted()
在 JVM 平臺的實現。
createCoroutineUnintercepted
是一個擴展函數,接收者類型是一個無參數,返回值為 T 的掛起函數或者 Lambda。
第9行代碼中的this
代表的是(suspend () -> T)
也就是invoke
函數中的block
變量,這個block
變量就是demo中的block
代碼段。
第9行的BaseContinuationImpl
是一個抽象類它實現瞭Continuation
。
關於if (this is BaseContinuationImpl)
的結果暫且不分析,先分析兩種情況下的create函數:
- create(probeCompletion):
//ContinuationImpl.kt#create public open fun create(completion: Continuation<*>): Continuation<Unit> { throw UnsupportedOperationException("create(Continuation) has not been overridden") } public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> { throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden") }
這個create
函數拋出一個異常,意思就是這個create()
沒有被重寫,而這個create()
的重寫就是在反編譯後的Java代碼中的create
函數
@NotNull public final Continuation create(@Nullable Object value, @NotNull Continuation completion) { Intrinsics.checkNotNullParameter(completion, "completion"); Function2 var3 = new <anonymous constructor>(completion); return var3; }
- createCoroutineFromSuspendFunction(probeCompletion):
//IntrinsicsJvm.kt#createCoroutineFromSuspendFunction /** * 當一個被suspend修飾的lambda表達式沒有繼承BaseContinuationImpl類時,則通過此方法創建協程。 * * 它發生在兩種情況下: * 1.lambda表達式中調用瞭其他的掛起方法 * 2.掛起方法是通過Java實現的 * * 必須將它封裝到一個擴展[BaseContinuationImpl]的實例中,因為這是所有協程機制的期望。 */ private inline fun <T> createCoroutineFromSuspendFunction( completion: Continuation<T>, crossinline block: (Continuation<T>) -> Any? ): Continuation<Unit> { val context = completion.context // context為空創建一個受限協程 return if (context === EmptyCoroutineContext) //受限協程:隻能調用協程作用域中提供的掛起方式掛起,其他掛起方法不能調用 object : RestrictedContinuationImpl(completion as Continuation<Any?>) { private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 result.getOrThrow() // 如果試圖以異常開始,則重新拋出異常(將被BaseContinuationImpl.resumeWith捕獲) block(this) // 運行塊,可以返回或掛起 } 1 -> { label = 2 result.getOrThrow() // 這是block掛起的結果 } else -> error("This coroutine had already completed") } } else //創建一個正常的協程 object : ContinuationImpl(completion as Continuation<Any?>, context) { private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 result.getOrThrow() // 如果試圖以異常開始,則重新拋出異常(將被BaseContinuationImpl.resumeWith捕獲) block(this) // 運行塊,可以返回或掛起 } 1 -> { label = 2 result.getOrThrow() // 這是block掛起的結果 } else -> error("This coroutine had already completed") } } }
createCoroutineFromSuspendFunction就是當一個被suspend修飾的Lambda表達式沒有繼承BaseContinuationImpl是才會被調用,然後根據上下文是否為空創建不同類型的協程。
兩種情況都已經分析完瞭,那麼現在if (this is BaseContinuationImpl)
會執行哪一個呢,首先這裡的this
所指的就是demo中的block代碼段,Kotlin編譯器編譯後會自動生成一個類就是上面的static
,它會繼承SuspendLambda類,而這個SuspendLambda類繼承自ContinuationImpl,ContinuationImpl繼承自BaseContinuationImpl,因此可以得到判斷結果為true,
createCoroutineUnintercepted
的過程就是協程創建的過程。
然後就是intercepted函數,這個函數的具體實現也在IntrinsicsJvm.kt中,那麼intercepted
又做瞭什麼呢
public expect fun <T> Continuation<T>.intercepted(): Continuation<T> //具體實現 //IntrinsicsJvm.kt#intercepted public actual fun <T> Continuation<T>.intercepted(): Continuation<T> = (this as? ContinuationImpl)?.intercepted() ?: this
首先有個強轉,通過上面的分析這個強轉是一定會成功的,到這裡intercepted
就進入到瞭ContinuationImpl
中瞭
internal abstract class ContinuationImpl( completion: Continuation<Any?>?, private val _context: CoroutineContext? ) : BaseContinuationImpl(completion) { ... @Transient private var intercepted: Continuation<Any?>? = null //如果沒有緩存,則從上下文獲取攔截器,調用interceptContinuation進行攔截 //將獲取到的內容保存到全局變量 public fun intercepted(): Continuation<Any?> = intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) .also { intercepted = it } }
這裡的ContinuationInterceptor
指的就是Demo中傳輸的Dispatcher.IO
,默認值時Dispatcher.Default
。
再回到startContinue
中還剩最後一個resume
:
/** * 恢復執行相應的協程傳遞值作為最後一個掛起點的返回值。 */ public inline fun <T> Continuation<T>.resume(value: T): Unit = resumeWith(Result.success(value)) public interface Continuation<in T> { /** * 與此延續相對應的協程的上下文。 */ public val context: CoroutineContext /** * 恢復執行相應的協程傳遞值作為最後一個掛起點的返回值。 */ public fun resumeWith(result: Result<T>) }
這裡的resume(Unit)
作用就相當與啟動瞭一個協程。
上面的啟動流程中為瞭方便分析的是CoroutineStart.ATOMIC
,而默認的是CoroutineStart.DEFAULT
,下面分析一下DEFAULT
的流程
//Cancellable.kt#startCoroutineCancellable public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) { createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit)) }
startCoroutineCancellable
對於協程的創建和攔截與ATOMIC
是一樣的,區別就在於resumeCancellableWith
//DispatchedContinuation#resumeCancellableWith public fun <T> Continuation<T>.resumeCancellableWith( result: Result<T>, onCancellation: ((cause: Throwable) -> Unit)? = null ): Unit = when (this) { is DispatchedContinuation -> resumeCancellableWith(result, onCancellation) else -> resumeWith(result) } // 我們內聯它來保存堆棧上的一個條目,在它顯示的情況下(無限制調度程序) // 它隻在Continuation<T>.resumeCancellableWith中使用 @Suppress("NOTHING_TO_INLINE") inline fun resumeCancellableWith( result: Result<T>, noinline onCancellation: ((cause: Throwable) -> Unit)? ) { val state = result.toState(onCancellation) //是否需要分發 if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE //將可運行塊的執行分派給給定上下文中的另一個線程 dispatcher.dispatch(context, this) } else { executeUnconfined(state, MODE_CANCELLABLE) { //協程未被取消 if (!resumeCancelled(state)) { // 恢復執行 resumeUndispatchedWith(result) } } } } //恢復執行前判斷協程是否已經取消執行 inline fun resumeCancelled(state: Any?): Boolean { //獲取當前協程任務 val job = context[Job] //如果不為空且不活躍 if (job != null && !job.isActive) { val cause = job.getCancellationException() cancelCompletedResult(state, cause) //拋出異常 resumeWithException(cause) return true } return false } //我們需要內聯它來在堆棧中保存一個條目 inline fun resumeUndispatchedWith(result: Result<T>) { withContinuationContext(continuation, countOrElement) { continuation.resumeWith(result) } }
以上就是Kotlin協程launch啟動流程原理詳解的詳細內容,更多關於Kotlin協程launch啟動流程的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- Kotlin協程launch原理詳解
- Kotlin協程啟動createCoroutine及創建startCoroutine原理
- Kotlin Job啟動流程源碼層深入分析
- Kotlin協程到底是如何切換線程的
- Kotlin掛起函數原理示例剖析