Kotlin協程launch啟動流程原理詳解

1.launch啟動流程

已知協程的啟動方式之一是Globalscope.launch,那麼Globalscope.launch的流程是怎樣的呢,直接進入launch的源碼開始看起。

fun main() {
    coroutineTest()
    Thread.sleep(2000L)
}
val block = suspend {
    println("Hello")
    delay(1000L)
    println("Kotlin")
}
private fun coroutineTest() {
    CoroutineScope(Job()).launch {
        withContext(Dispatchers.IO) {
            block.invoke()
        }
    }
}

反編譯後的Java代碼

public final class CoroutineDemoKt {
   @NotNull
   private static final Function1 block;
   public static final void main() {
      coroutineTest();
      Thread.sleep(2000L);
   }
   // $FF: synthetic method
   public static void main(String[] var0) {
      main();
   }
   @NotNull
   public static final Function1 getBlock() {
      return block;
   }
   private static final void coroutineTest() {
      BuildersKt.launch$default(CoroutineScopeKt.CoroutineScope((CoroutineContext)JobKt.Job$default((Job)null, 1, (Object)null)), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
         int label;
         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               CoroutineContext var10000 = (CoroutineContext)Dispatchers.getIO();
               Function2 var10001 = (Function2)(new Function2((Continuation)null) {
                  int label;
                  @Nullable
                  public final Object invokeSuspend(@NotNull Object $result) {
                     Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                     switch(this.label) {
                     case 0:
                        ResultKt.throwOnFailure($result);
                        Function1 var10000 = CoroutineDemoKt.getBlock();
                        this.label = 1;
                        if (var10000.invoke(this) == var2) {
                           return var2;
                        }
                        break;
                     case 1:
                        ResultKt.throwOnFailure($result);
                        break;
                     default:
                        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                     }
                     return Unit.INSTANCE;
                  }
                  @NotNull
                  public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
                     Intrinsics.checkNotNullParameter(completion, "completion");
                     Function2 var3 = new <anonymous constructor>(completion);
                     return var3;
                  }
                  public final Object invoke(Object var1, Object var2) {
                     return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
                  }
               });
               this.label = 1;
               if (BuildersKt.withContext(var10000, var10001, this) == var2) {
                  return var2;
               }
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               break;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
            return Unit.INSTANCE;
         }
         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }
         public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      }), 3, (Object)null);
   }
   static {
      Function1 var0 = (Function1)(new Function1((Continuation)null) {
         int label;
         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            String var2;
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               var2 = "Hello";
               System.out.println(var2);
               this.label = 1;
               if (DelayKt.delay(1000L, this) == var3) {
                  return var3;
               }
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               break;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
            var2 = "Kotlin";
            System.out.println(var2);
            return Unit.INSTANCE;
         }
         @NotNull
         public final Continuation create(@NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function1 var2 = new <anonymous constructor>(completion);
            return var2;
         }
         public final Object invoke(Object var1) {
            return ((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE);
         }
      });
      block = var0;
   }
}

先分析一下上面代碼的流程:

  • 首先聲明瞭一個Function1類型的block變量,這個變量就是demo中的block,然後會在static函數中會被賦值。
  • 接下來就是coroutineTest函數的調用。這個函數中的第一行代碼就是CoroutineScope的傳參和一些默認值
  • 然後通過89行的invoke進入到瞭外層狀態機流轉的過程
  • 95行的static表示的是內部的掛起函數就是demo中的block.invoke,它是以匿名內部類的方式實現,然後執行內部的狀態機流轉過程,最後給block賦值。
  • block被賦值後最終在Function1 var10000 = CoroutineDemoKt.getBlock();被調用

那麼這個過程又是如何實現的,進入launch源碼進行查看:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

這裡的block指的就是demo中的block代碼段

再來看一下裡面的幾行代碼的含義:

  • newCoroutineContext: 通過默認的或者傳入的context創建一個新的Context;
  • coroutine: launch 會根據傳入的啟動模式來創建對應的協程對象。這裡有兩種,一種是標準的,一種是懶加載的。
  • coroutine.start: 嘗試啟動協程

2.協程是如何被啟動的

通過launch的源碼可知協程的啟動是通過coroutine.start啟動的,那麼協程的啟動流程又是怎樣的?

public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
    ...
    /**
     * 用給定的代碼塊啟動這個協程並啟動策略。這個函數在這個協程上最多調用一次。
     */
    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        start(block, receiver, this)
    }
}

start函數中傳入瞭三個參數,隻需要關註第一個參數即可。

public enum class CoroutineStart {
    ...
    /**
     * 用這個協程的啟動策略啟動相應的塊作為協程。
     */
    public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
    when (this) {
        DEFAULT -> block.startCoroutineCancellable(completion)
        ATOMIC -> block.startCoroutine(completion)
        UNDISPATCHED -> block.startCoroutineUndispatched(completion)
        LAZY -> Unit // will start lazily
    }
}

啟動策略的具體實現有三種方式,這裡隻需要分析startCoroutine,另外兩個其實就是它的基礎上增加瞭一些功能,其中前者代表啟動協程以後可以在等待調度時取消,後者表示協程啟動後不會被分發。

/**
 * 創建沒有接收方且結果類型為T的協程,這個函數每次調用時都會創建一個新的可掛起的實例。
 */ 
public fun <T> (suspend () -> T).startCoroutine(
    completion: Continuation<T>
) {
    createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}

createCoroutineUnintercepted在源代碼中隻是一個聲明,它的具體實現是在IntrinsicsJvm.kt文件中。

//IntrinsicsJvm.kt#createCoroutineUnintercepted
/**
 * 創建沒有接收方且結果類型為T的非攔截協程。這個函數每次調用時都會創建一個新的可掛起的實例。
 */
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(probeCompletion)
    else
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function1<Continuation<T>, Any?>).invoke(it)
        }
}

actual代表瞭 createCoroutineUnintercepted() 在 JVM 平臺的實現。

createCoroutineUnintercepted是一個擴展函數,接收者類型是一個無參數,返回值為 T 的掛起函數或者 Lambda。

第9行代碼中的this代表的是(suspend () -> T)也就是invoke函數中的block變量,這個block變量就是demo中的block代碼段。

第9行的BaseContinuationImpl是一個抽象類它實現瞭Continuation

關於if (this is BaseContinuationImpl)的結果暫且不分析,先分析兩種情況下的create函數:

  • create(probeCompletion):
//ContinuationImpl.kt#create
public open fun create(completion: Continuation<*>): Continuation<Unit> {
    throw UnsupportedOperationException("create(Continuation) has not been overridden")
}
public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
    throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
}

這個create函數拋出一個異常,意思就是這個create()沒有被重寫,而這個create()的重寫就是在反編譯後的Java代碼中的create函數

@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
    Intrinsics.checkNotNullParameter(completion, "completion");
    Function2 var3 = new <anonymous constructor>(completion);
    return var3;
}
  • createCoroutineFromSuspendFunction(probeCompletion):
//IntrinsicsJvm.kt#createCoroutineFromSuspendFunction
/**
 * 當一個被suspend修飾的lambda表達式沒有繼承BaseContinuationImpl類時,則通過此方法創建協程。
 *
 * 它發生在兩種情況下:
 * 1.lambda表達式中調用瞭其他的掛起方法
 * 2.掛起方法是通過Java實現的
 *
 * 必須將它封裝到一個擴展[BaseContinuationImpl]的實例中,因為這是所有協程機制的期望。 
 */
private inline fun <T> createCoroutineFromSuspendFunction(
	completion: Continuation<T>,
	crossinline block: (Continuation<T>) -> Any?
		): Continuation<Unit> {
	val context = completion.context
	// context為空創建一個受限協程
	return if (context === EmptyCoroutineContext)
	//受限協程:隻能調用協程作用域中提供的掛起方式掛起,其他掛起方法不能調用
	object : RestrictedContinuationImpl(completion as Continuation<Any?>) {
		private var label = 0
		override fun invokeSuspend(result: Result<Any?>): Any? =
		when (label) {
			0 -> {
				label = 1
				result.getOrThrow() // 如果試圖以異常開始,則重新拋出異常(將被BaseContinuationImpl.resumeWith捕獲)
				block(this) // 運行塊,可以返回或掛起
			}
			1 -> {
				label = 2
				result.getOrThrow() // 這是block掛起的結果
			}
			else -> error("This coroutine had already completed")
		}
	}
	else
	//創建一個正常的協程
	object : ContinuationImpl(completion as Continuation<Any?>, context) {
		private var label = 0
		override fun invokeSuspend(result: Result<Any?>): Any? =
		when (label) {
			0 -> {
				label = 1
				result.getOrThrow() // 如果試圖以異常開始,則重新拋出異常(將被BaseContinuationImpl.resumeWith捕獲)
				block(this) // 運行塊,可以返回或掛起
			}
			1 -> {
				label = 2
				result.getOrThrow() // 這是block掛起的結果
			}
			else -> error("This coroutine had already completed")
		}
	}
}

createCoroutineFromSuspendFunction就是當一個被suspend修飾的Lambda表達式沒有繼承BaseContinuationImpl是才會被調用,然後根據上下文是否為空創建不同類型的協程。

兩種情況都已經分析完瞭,那麼現在if (this is BaseContinuationImpl)會執行哪一個呢,首先這裡的this所指的就是demo中的block代碼段,Kotlin編譯器編譯後會自動生成一個類就是上面的static,它會繼承SuspendLambda類,而這個SuspendLambda類繼承自ContinuationImpl,ContinuationImpl繼承自BaseContinuationImpl,因此可以得到判斷結果為true,

createCoroutineUnintercepted的過程就是協程創建的過程。

然後就是intercepted函數,這個函數的具體實現也在IntrinsicsJvm.kt中,那麼intercepted又做瞭什麼呢

public expect fun <T> Continuation<T>.intercepted(): Continuation<T>
//具體實現
//IntrinsicsJvm.kt#intercepted
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this

首先有個強轉,通過上面的分析這個強轉是一定會成功的,到這裡intercepted就進入到瞭ContinuationImpl中瞭

internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
	...
    @Transient
    private var intercepted: Continuation<Any?>? = null
	//如果沒有緩存,則從上下文獲取攔截器,調用interceptContinuation進行攔截
	//將獲取到的內容保存到全局變量
    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
}

這裡的ContinuationInterceptor指的就是Demo中傳輸的Dispatcher.IO,默認值時Dispatcher.Default

再回到startContinue中還剩最後一個resume

/**
 * 恢復執行相應的協程傳遞值作為最後一個掛起點的返回值。
 */
public inline fun <T> Continuation<T>.resume(value: T): Unit =
	resumeWith(Result.success(value))
public interface Continuation<in T> {
	/**
     * 與此延續相對應的協程的上下文。
     */
	public val context: CoroutineContext
	/**
     * 恢復執行相應的協程傳遞值作為最後一個掛起點的返回值。
     */
	public fun resumeWith(result: Result<T>)
}

這裡的resume(Unit)作用就相當與啟動瞭一個協程。

上面的啟動流程中為瞭方便分析的是CoroutineStart.ATOMIC,而默認的是CoroutineStart.DEFAULT,下面分析一下DEFAULT的流程

//Cancellable.kt#startCoroutineCancellable
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
    createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}

startCoroutineCancellable對於協程的創建和攔截與ATOMIC是一樣的,區別就在於resumeCancellableWith

//DispatchedContinuation#resumeCancellableWith
public fun <T> Continuation<T>.resumeCancellableWith(
	result: Result<T>,
	onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
	is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
	else -> resumeWith(result)
}
// 我們內聯它來保存堆棧上的一個條目,在它顯示的情況下(無限制調度程序)
// 它隻在Continuation<T>.resumeCancellableWith中使用
@Suppress("NOTHING_TO_INLINE")
inline fun resumeCancellableWith(
	result: Result<T>,
	noinline onCancellation: ((cause: Throwable) -> Unit)?
		) {
	val state = result.toState(onCancellation)
	//是否需要分發
	if (dispatcher.isDispatchNeeded(context)) {
		_state = state
		resumeMode = MODE_CANCELLABLE
		//將可運行塊的執行分派給給定上下文中的另一個線程
		dispatcher.dispatch(context, this)
	} else {
		executeUnconfined(state, MODE_CANCELLABLE) {
			//協程未被取消
			if (!resumeCancelled(state)) {
				// 恢復執行
				resumeUndispatchedWith(result)
			}
		}
	}
}
//恢復執行前判斷協程是否已經取消執行
inline fun resumeCancelled(state: Any?): Boolean {
	//獲取當前協程任務
	val job = context[Job]
	//如果不為空且不活躍
	if (job != null && !job.isActive) {
		val cause = job.getCancellationException()
		cancelCompletedResult(state, cause)
		//拋出異常
		resumeWithException(cause)
		return true
	}
	return false
}
//我們需要內聯它來在堆棧中保存一個條目
inline fun resumeUndispatchedWith(result: Result<T>) {
	withContinuationContext(continuation, countOrElement) {
		continuation.resumeWith(result)
	}
}

以上就是Kotlin協程launch啟動流程原理詳解的詳細內容,更多關於Kotlin協程launch啟動流程的資料請關註WalkonNet其它相關文章!

推薦閱讀: