Kotlin Job啟動流程源碼層深入分析

Job啟動流程

job啟動流程,我們先從一段最簡單使用協程的代碼開始,進行代碼跟跟蹤,順便引出幾個關鍵的概念,在後面章節裡面去單獨分析。代碼如下:

private fun testParentChildJob() {
    val coroutineContext = Job() + CoroutineName("name1") + Dispatchers.IO + CoroutineExceptionHandler{ c,e -> println(e.message) }
    val myScope = CoroutineScope(coroutineContext)
    val job = myScope.launch {
        println("myScope.launch :")
    }
}

首先創建一個有四種元素的上下文域myScope,由Job() + CoroutineName("name1") + Dispatchers.IO + CoroutineExceptionHandler{ c,e -> println(e.message) }組成,上一章coroutineContext篇已經講過plus操作的過程瞭,不贅述。

接著用這個作用域myScope開啟一個協程,協程內打印println("myScope.launch :")

我自己從launch函數一步一步跟蹤後,得到瞭如下圖所示的流程:

launch流程分析

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

分為三步:

首先使用入參context: CoroutineContext = EmptyCoroutineContext,創建一個新的上下文集合newCoroutineContext(context),newCoroutineContext函數操作:就是根據所在的scope域的上下文集合和入參進行組合操作,得到一個新的上下文集合,代碼如下:

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    val combined = coroutineContext + context
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug
}

可以看到各種+操作,就是coroutineContext的各種plus操作,可以得到一個繼承自所在scope域的上下文集合(這個域由coroutineContext變量決定,這個變量屬於CoroutineScope成員),並且包含瞭入參的context元素,這樣上下文集合就具有繼承性,並且自己還可以對已有元素進行覆蓋。上一篇coroutineContext篇已經講過,就不贅述瞭。

由於我們使用的默認方式launch的,使用上面創建的newContext元素集合,就會創建一個StandaloneCoroutine(newContext, active = true)協程對象。這個對象繼承關系比較復雜,繼承關系如下:

這個類裡面包含瞭很多成員變量,源碼如下:

private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
	//省略。。。
}
public abstract class AbstractCoroutine<in T>(
    /**
     * The context of the parent coroutine.
     */
    @JvmField
    protected val parentContext: CoroutineContext,
    active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
    /**
     * The context of this coroutine that includes this coroutine as a [Job].
     */
    @Suppress("LeakingThis")
    public final override val context: CoroutineContext = parentContext + this
    /**
     * The context of this scope which is the same as the [context] of this coroutine.
     */
    public override val coroutineContext: CoroutineContext get() = context
    override val isActive: Boolean get() = super.isActive
}

context成員變量是外部傳進來的newContext上下文集合 + this得到的,那麼newContext的Job元素會被this替換掉;

coroutineContext成員變量是CoroutineScope接口的成員,覆寫為context對象; isActive標志這個Job是否是存活狀態; 調用剛剛創建的coroutine協程的start方法,coroutine.start(start, coroutine, block),跟進去看看

    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        initParentJob()
        start(block, receiver, this)
    }

initParentJob()方法主要是用於關聯父子Job的,這裡先不講,對啟動流程沒啥影響。

start(block, receiver, this)是正真啟動協程的地方,CoroutineStart的值是DEFAULT

public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
    when (this) {
        DEFAULT -> block.startCoroutineCancellable(completion)
        ATOMIC -> block.startCoroutine(completion)
        UNDISPATCHED -> block.startCoroutineUndispatched(completion)
        LAZY -> Unit // will start lazily
    }

那麼調用的就是DEFAULT -> block.startCoroutineCancellable(completion)這個分支,

public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
    createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit> {
        //省略。。。
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function1<Continuation<T>, Any?>).invoke(it)
        }
}
private inline fun <T> createCoroutineFromSuspendFunction(
    completion: Continuation<T>,
    crossinline block: (Continuation<T>) -> Any?
): Continuation<Unit> {
    val context = completion.context
    // label == 0 when coroutine is not started yet (initially) or label == 1 when it was
    return if (context === EmptyCoroutineContext)
		//省略。。。
    else
        object : ContinuationImpl(completion as Continuation<Any?>, context) {
            private var label = 0
            override fun invokeSuspend(result: Result<Any?>): Any? =
                when (label) {
                    0 -> {
                        label = 1
                        result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith
                        block(this) // run the block, may return or suspend
                    }
                	//省略。。。
                }
        }
}

第一步:createCoroutineUnintercepted(completion)就是以completion作為參數創建一個ContinuationImpl對象,這個completion就是上面創建的StandaloneCoroutine對象。這個新的ContinuationImpl對象是繼承自Continuation,那麼他就有fun resumeWith(result: Result<T>)方法,該方法是用於恢復掛起點,val context: CoroutineContext參數,這個參數就是Continuation的所關聯的上下文集合。

我們再自己看看這個createCoroutineFromSuspendFunction這個方法,發現將我們launch{}的lambda參數進行包裝後(this as Function1<Continuation<T>, Any?>).invoke(it)然後作為入參block,這個block作為ContinuationImpl對象覆寫的invokeSuspend函數的回調函數。那麼可以從這個看出一個關系:

ContinuationImpl.invokeSuspend  -> launch入參的lambda函數體

第二步:就是調用ContinuationImpl .intercepted(),內部處理是獲取ContinuationImpl的上下文集合中的ContinuationInterceptor元素,然後將ContinuationImpl作為參數,包裝成DispatchedContinuation(this, continuation),其中this代表ContinuationInterceptor也就是dispatcher,continuation代表剛剛傳遞進來的ContinuationImpl。

第三步:resumeCancellableWith(Result.success(Unit)),調用DispatchedContinuation的resumeCancellableWith函數,代碼如下:

public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result)
    else -> resumeWith(result)
}
//DispatchedContinuation extends DIspatchedTask
inline fun resumeCancellableWith(result: Result<T>) {
    val state = result.toState()
    if (dispatcher.isDispatchNeeded(context)) {
        _state = state
        resumeMode = MODE_CANCELLABLE
        dispatcher.dispatch(context, this)
    } else {
        省略。。。
    }
}
//DIspatchedTask
public final override fun run() {
        val taskContext = this.taskContext
        var fatalException: Throwable? = null
        try {
            val delegate = delegate as DispatchedContinuation<T>
            val continuation = delegate.continuation
            withCoroutineContext(context, delegate.countOrElement) {
            	//省略。。。
                if (exception == null && job != null && !job.isActive) {
                    //省略。。。
                } else {
                    if (exception != null) continuation.resumeWithException(exception)
                    else continuation.resume(getSuccessfulResult(state))
                }
            }
        } catch (e: Throwable) {
            //省略。。。
        }
    }

由於DispatchedContinuation是繼承自DIspatchedTask的,所以DispatchedContinuation的run方法是DIspatchedTask已經實現的瞭,所以dispatcher.dispatch(context, this),dispatcher調用的是DIspatchedTask.run方法,(dispatcher是一個線程池和java線程池類似,但是有一點區別,後面章節再講),run方法中,首先獲取delegate,然後取出continuation變量,這個delegate其實是被DispatchedContinuation覆寫的,而且實現的Continuation接口被構造函數的continuation代理,這個入參continuation其實就是ContinuationImpl,上一步分析過瞭。

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
	//省略。。。
    override val delegate: Continuation<T>
        get() = this

	//省略。。。
}
//Continuation
public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))

那麼其實就是調用的ContinuationImpl.resumeWith(Result.success(value))方法,ContinuationImpl繼承自BaseContinuationImpl,繼續進去看看

public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var param = result
        while (true) {
            //省略。。。
            with(current) {
           		try {
                    val outcome = invokeSuspend(param)
                    if (outcome === COROUTINE_SUSPENDED) return
                    Result.success(outcome)
                } catch (exception: Throwable) {
                    Result.failure(exception)
                }
                //省略。。。
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    //省略。。。
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

第一步,調用val outcome = invokeSuspend(param),上面已經分析瞭,invokeSuspend被ContinuationImpl覆寫瞭,內部回調瞭launch的lambda表達式;

第二步,調用completion.resumeWith(outcome),這個completion上面分析瞭,是StandAloneCoroutine協程,調用瞭StandAloneCoroutine對象的resumeWith方法,這個方法裡面用於更新協程狀態,比如協程成功,失敗之類的。

綜上,通過上面的invokeSuspend函數調用,最終調用到瞭launch的lambda表達式,也就是我們業務代碼,我們的業務代碼是被封裝到瞭ContinuationImpl類中。

通過上面的分析,一共發現瞭三種不同類型的continuation,它們分別是:

DispatchedContinuation用於分發continuation到指定的線程池中; ContinuationImpl用於包裝launch的lambda代碼塊作為業務代碼代理類; StandAloneCoroutine協程管理類管理Job生命周期以及協程的狀態父子Job關系維護等等。

它們的調用鏈如下:

父子Job關聯分析

父子Job關聯操作是在上面launch流程中的,在調用start方法的時候進行關聯的:

initParentJob方法裡面,先調用parent.start方法,確保parent的Job已經啟動瞭,接著調用parent.attachChild(this)方法,用於關聯父子Job。 代碼如下:

//AbstractCoroutine
internal fun initParentJob() {
	//取出上下文集合中的Job元素,調用initParentJobInternal方法
	initParentJobInternal(parentContext[Job])
}
//AbstractCoroutine
internal fun initParentJobInternal(parent: Job?) {
	//省略。。。
	parent.start() // make sure the parent is started
	//省略。。。
	val handle = parent.attachChild(this)
	parentHandle = handle
	//省略。。。
	if (isCompleted) {
	    handle.dispose()
	}
}

首先取出parentContext[Job]的Job元素,這個parentContext就是launch的時候根據scope的上下文集合創建出來的上下文集合,取出的Job元素就是父Job,作為initParentJobInternal的參數,接著調用parent.attachChild(this):

//JobSupport
public final override fun attachChild(child: ChildJob): ChildHandle {
    /*
     * Note: This function attaches a special ChildHandleNode node object. This node object
     * is handled in a special way on completion on the coroutine (we wait for all of them) and
     * is handled specially by invokeOnCompletion itself -- it adds this node to the list even
     * if the job is already cancelling. For cancelling state child is attached under state lock.
     * It's required to properly wait all children before completion and provide linearizable hierarchy view:
     * If child is attached when the job is already being cancelled, such child will receive immediate notification on
     * cancellation, but parent *will* wait for that child before completion and will handle its exception.
     */
    return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle
}
//JobSupport
internal class ChildHandleNode(
    parent: JobSupport,
    @JvmField val childJob: ChildJob
) : JobCancellingNode<JobSupport>(parent), ChildHandle {
    override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
    override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
    override fun toString(): String = "ChildHandle[$childJob]"
}

首先創建瞭一個handler = ChildHandleNode(this, child).asHandler對象,這個對象ChildHandleNode作為參數傳遞給invokeOnCompletion,然後返回一個ChildHandle類型的對象,賦值給子Job的parentHandle val handle = parent.attachChild(this); parentHandle = handle,parentHandle 這個是子Job持有的變量,ChildHandle接口擁有childCancelled方法,用於子Job通知父Job,子Job已經取消瞭,父Job需要根據子Job狀態繼續進行處理。

//JobSupport
public final override fun invokeOnCompletion(
        onCancelling: Boolean,
        invokeImmediately: Boolean,
        handler: CompletionHandler
    ): DisposableHandle {
        var nodeCache: JobNode<*>? = null
        loopOnState { state ->
            when (state) {
                is Empty -> { // EMPTY_X state -- no completion handlers
                    if (state.isActive) {
                        // try move to SINGLE state
                        val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
                        if (_state.compareAndSet(state, node)) return node
                    }
                    //省略。。。
                }
                is Incomplete -> {
                    val list = state.list
                    if (list == null) { // SINGLE/SINGLE+
                        promoteSingleToNodeList(state as JobNode<*>)
                    } else {
                        var rootCause: Throwable? = null
                        var handle: DisposableHandle = NonDisposableHandle
                      	val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
                        if (!addLastAtomic(state, list, node)) return@loopOnState // retry
                        if (rootCause == null) return node
                  	 	 //省略。。。
                        if (rootCause != null) {
                            //省略。。。
                        } else {
                            val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
                            if (addLastAtomic(state, list, node)) return node
                        }
                    }
                }
                else -> { // is complete
                   //省略。。。
                }
            }
        }
    }

invokeOnCompletion方法就是,將傳遞進來的handler: CompletionHandler,分情況存儲起來,

當state狀態是Empty狀態,創建一個代理節點node ,之後存入到state中; 當state是Incomplete狀態,如果state.list結構是空的,那麼創建一個鏈表,將node 節點作為第一個節點存進去,當前state.list不為空,那麼將node節點插入到鏈表的末尾。 這樣經過上面這兩步: 子Job持有的parentHandle對象可以通知父Job自己已經取消瞭:

override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)

父Job持有的state對象保存著包裝著子Job的ChildHandleNode對象,父Job通過遍歷調用列表中的node元素的invoke方法,即可取消所有的子Job:

override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)

會發現, 調用launch生成一個Job,這個Job就會initParentJob() ,進而子Job會持有父Job,父Job也會將子Job加入到state的數據結構中,進而形成瞭樹的結構,類似於下圖:

父子Job都可以互相通知對方自己已經取消,需要做出對應的處理。

結論

launch啟動一個協程,會生成三個continuation,分別是

DispatchedContinuation用於分發continuation到指定的線程池中; ContinuationImpl用於包裝launch的lambda代碼塊作為業務代碼代理類; StandAloneCoroutine協程管理類管理Job生命周期以及協程的狀態父子Job關系維護等等。 調用鏈:DispatchedContinuation -> ContinuationImpl(在這裡調用launch的lambda業務代碼塊) -> StandAloneCoroutine

launch啟動一個協程Job,這個Job所在域如果存在parentJob ,那麼parentJob和Job會形成樹結構上的父子節點,並且子Job繼承瞭父Job的CoroutineScope的上下文集合(根據參數會覆蓋一些重復Key的元素)。

到此這篇關於Kotlin Job啟動流程源碼層深入分析的文章就介紹到這瞭,更多相關Kotlin Job啟動流程內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: