Kotlin線程的橋接與切換使用介紹

一.線程的橋接

1.runBlocking方法

runBlocking方法用於在線程中去執行suspend方法,代碼如下:

@Throws(InterruptedException::class)
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
    // 通知編譯器,block隻執行一次
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    // 獲取當前線程
    val currentThread = Thread.currentThread()
    // 獲取上下文中的攔截器
    val contextInterceptor = context[ContinuationInterceptor]
    val eventLoop: EventLoop?
    val newContext: CoroutineContext
    // 如果攔截器為空,代表無法進行調度
    if (contextInterceptor == null) {
        // 從線程中獲取EventLoop,獲取失敗則創建一個新的
        eventLoop = ThreadLocalEventLoop.eventLoop
        // 添加到上下文中
        // newContext = EmptyCoroutineContext + context + eventLoop
        newContext = GlobalScope.newCoroutineContext(context + eventLoop)
    } else {// 如果有攔截器
        // 嘗試將當前攔截器轉換成EventLoop,
        // 如果轉換成功,則判斷是否允許可以在上下文中使用
        // 如果轉換失敗或不允許,則創建一個新的
        eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
            ?: ThreadLocalEventLoop.currentOrNull()
        // 計算新的上下文
        // 這裡沒有把EventLoop加到上下文,因為加入後會覆蓋攔截器
        newContext = GlobalScope.newCoroutineContext(context)
    }
    // 創建一個協程
    val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
    // 啟動協程
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
    // 分發任務
    return coroutine.joinBlocking()
}

2.BlockingCoroutine類

在runBlocking方法中,最終創建瞭一個類型為BlockingCoroutine的對象。BlockingCoroutine類繼承自AbstractCoroutine類,代碼如下:

// 繼承瞭AbstractCoroutine
private class BlockingCoroutine<T>(
    parentContext: CoroutineContext,
    private val blockedThread: Thread,
    private val eventLoop: EventLoop?
) : AbstractCoroutine<T>(parentContext, true) {
    // 該協程是一個作用域協程
    override val isScopedCoroutine: Boolean get() = true

    override fun afterCompletion(state: Any?) {
        // 如果當前線程不是阻塞線程
        if (Thread.currentThread() != blockedThread)
            // 喚醒阻塞線程
            LockSupport.unpark(blockedThread)
    }

    @Suppress("UNCHECKED_CAST")
    fun joinBlocking(): T {
        registerTimeLoopThread()
        try {
            // 註冊使用EventLoop
            eventLoop?.incrementUseCount()
            try {
                // 死循環
                while (true) {
                    @Suppress("DEPRECATION")
                    // 如果線程當前中斷,則拋出異常,同時取消當前協程
                    if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
                    // 分發執行任務,同時獲取等待時間
                    val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
                    // 如果任務執行結束,則退出循環
                    if (isCompleted) break
                    // 休眠指定的等待時間
                    parkNanos(this, parkNanos)
                }
            } finally { // paranoia
                // 註冊不使用EventLoop
                eventLoop?.decrementUseCount()
            }
        } finally { // paranoia
            unregisterTimeLoopThread()
        }
        // 獲取執行的結果
        val state = this.state.unboxState()
        // 如果執行過程中取消,則拋出異常
        (state as? CompletedExceptionally)?.let { throw it.cause }
        // 返回結果
        return state as T
    }
}

BlockingCoroutine類重寫瞭變量isScopedCoroutine為true。

isScopedCoroutine表示當前協程是否為作用域協程,該變量用在cancelParent方法中。對於一個作用域協程,當它的子協程在運行過程中拋出異常時,子協程調用cancelParent方法不會導致作用域協程取消,而是直接返回true。當子協程執行完畢,作用域協程獲取結果時,如果發現子協程返回的結果為異常,則會再次拋出。

相比於一般協程,作用域協程不相信子協程在執行過程中取消通知,而是在執行完畢後親自檢查結果是否為異常,達到一種“耳聽為虛,眼見為實”的效果。

joinBlocking方法通過循環在當前線程上對EventLoop進行任務分發來實現線程的阻塞。當任務發生異常或執行完畢後,會回調重寫的afterCompletion方法,喚起線程繼續循環,當在循環中檢測到isCompleted標志位為true時,會跳出循環,恢復線程執行。

二.線程的切換

1.withContext方法

withContext方法用於在協程中切換線程去執行其他任務,該方法被suspend關鍵字修飾,因此會引起協程的掛起,代碼如下:

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {
    // 通知編譯器,block隻執行一次
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    // 直接掛起,獲取續體
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        // 從續體中獲取上下文
        val oldContext = uCont.context
        // 計算新的上下文
        val newContext = oldContext + context
        // 檢查任務是否執行完畢或取消
        newContext.checkCompletion()
        // 如果前後兩次的上下文完全相同,說明不需要切換,隻需要執行即可
        if (newContext === oldContext) {
            // 創建續體的協程
            val coroutine = ScopeCoroutine(newContext, uCont)
            // 執行block
            return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
        }
        // 攔截器相同,但是上下文中增加瞭其他的元素
        // 這裡也是在同一個線程上執行,但是其中增加的元素隻在執行當前的block中使用
        if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
            // 創建續體的協程
            val coroutine = UndispatchedCoroutine(newContext, uCont)
            // 將當前線程ThreadLocal中的對象更新成newContext上下文對應的對象
            withCoroutineContext(newContext, null) {
                // 執行block
                return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
            }
        }
        // 走到這裡,說明要切換線程執行block任務
        val coroutine = DispatchedCoroutine(newContext, uCont)
        // 啟動父協程
        coroutine.initParentJob()
        // 啟動協程
        block.startCoroutineCancellable(coroutine, coroutine)
        // 獲取結果
        coroutine.getResult()
    }
}

通過對上面代碼的分析,可以發現withContext根據上下文的不同進行瞭三種分類,創建不同的協程並通過不同的方式去執行block。如下表所示:

協程上下文變化 協程類型 啟動方式
完全相同 ScopeCoroutine startUndispatchedOrReturn
攔截器相同 UndispatchedCoroutine startUndispatchedOrReturn
攔截器不同 DispatchedCoroutine startCoroutineCancellable

接下來,將對不同情況下協程的啟動與執行進行分析。

2.startUndispatchedOrReturn方法

startUndispatchedOrReturn方法用於在相同的上下文環境中啟動協程,代碼如下:

internal fun <T, R> ScopeCoroutine<T>.startUndispatchedOrReturn(receiver: R, block: suspend R.() -> T): Any? {
    // 初始化並綁定父協程
    initParentJob()
    // 獲取並處理執行結果
    return undispatchedResult({ true }) {
        // 啟動協程
        block.startCoroutineUninterceptedOrReturn(receiver, this)
    }
}
private inline fun <T> ScopeCoroutine<T>.undispatchedResult(
    shouldThrow: (Throwable) -> Boolean,
    startBlock: () -> Any?
): Any? {
    // 啟動協程,獲取結果,
    val result = try {
        startBlock()
    } catch (e: Throwable) {
        // 產生異常,則按照取消處理
        CompletedExceptionally(e)
    }
    // 如果結果為掛起,則通知外部掛起    
    if (result === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
    // 結束任務執行,獲取最終狀態
    val state = makeCompletingOnce(result)
    // 如果需要等待子協程的結束,則通知外部掛起
    if (state === COMPLETING_WAITING_CHILDREN) return COROUTINE_SUSPENDED
    // 如果執最終為異常狀態
    return if (state is CompletedExceptionally) {
        when {
            // 通過參數判斷是否拋出
            shouldThrow(state.cause) -> throw recoverStackTrace(state.cause, uCont)
            // 執行結果為異常
            result is CompletedExceptionally -> throw recoverStackTrace(result.cause, uCont)
            // 結果不為異常,則返回
            else -> result
        }
    } else {
        // 對最終狀態進行拆箱,返回最終結果
        state.unboxState()
    }
}
// JobSupport中提供瞭下面的類和方法,當協程進入完成狀態時,會對狀態進行裝箱。
// 包裝類
private class IncompleteStateBox(@JvmField val state: Incomplete)
// 裝箱
internal fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this
// 拆箱
internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: this

在startUndispatchedOrReturn方法中,通過調用block的startCoroutineUninterceptedOrReturn方法啟動協程,獲取最終結果,並對結果進行異常處理。

接下來,將分析startCoroutineUninterceptedOrReturn方法如何啟動協程,代碼如下:

@SinceKotlin("1.3")
@InlineOnly
public actual inline fun <R, T> (suspend R.() -> T).startCoroutineUninterceptedOrReturn(
    receiver: R,
    completion: Continuation<T>
): Any? = (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, completion)

這裡,直接找到最終的actual方法,可以發現該方法沒有創建狀態機,而是直接執行瞭block。這個方法被設計用在suspendCoroutineUninterceptedOrReturn方法中,來恢復掛起協程的執行。

至此,可以知道startUndispatchedOrReturn方法實際上就是在同一個協程中執行瞭block。

3.ScopeCoroutine類

在withContext方法中,當上下文相同時,會創建一個類型為ScopeCoroutine的對象。ScopeCoroutine類代表一個標準的作用域協程,代碼如下:

internal open class ScopeCoroutine<in T>(
    context: CoroutineContext,
    @JvmField val uCont: Continuation<T>
) : AbstractCoroutine<T>(context, true), CoroutineStackFrame {
    final override val callerFrame: CoroutineStackFrame? get() = uCont as CoroutineStackFrame?
    final override fun getStackTraceElement(): StackTraceElement? = null
    // 作用域協程
    final override val isScopedCoroutine: Boolean get() = true
    internal val parent: Job? get() = parentContext[Job]
    // 該方法會在協程異常或取消時調用
    override fun afterCompletion(state: Any?) {
        // 進行攔截,切換線程,恢復執行
        uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
    }
    // 該方法會在將其掛起的方法執行完畢後回調
    override fun afterResume(state: Any?) {
        // 直接恢復續體的執行
        uCont.resumeWith(recoverResult(state, uCont))
    }
}

ScopeCoroutine類重寫瞭afterCompletion和afterResume兩個方法,afterCompletion方法用於在協程取消時被回調。afterResume方法用於在掛起恢復時被回調。

根據上面的分析,當發生異常時,afterCompletion方法可能在其他的協程上下文中被調用,因此會調用攔截器切換回原本的線程中。而afterResume方法由於已經在正確的上下文環境中,因此可以直接恢復執行。

4.UndispatchedCoroutine類

在withContext方法中,當上下文不同,但調度器相同時,會創建一個類型為UndispatchedCoroutine的對象。UndispatchedCoroutine類繼承自ScopeCoroutine類,重寫瞭afterResume方法,代碼如下:

private class UndispatchedCoroutine<in T>(
    context: CoroutineContext,
    uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
    override fun afterResume(state: Any?) {
        val result = recoverResult(state, uCont)
        // 將當前線程ThreadLocal中的對象更新成uCont.context上下文對應的對象
        withCoroutineContext(uCont.context, null) {
            // 恢復執行
            uCont.resumeWith(result)
        }
    }
}

與父類ScopeCoroutine的afterResume方法相比,UndispatchedCoroutine類在afterResume方法中對協程上下文進行瞭更新,然後再恢復執行。

  • withCoroutineContext

withCoroutineContext方法用於當一個線程中執行多個協程時,保存和恢復ThreadLocal類中的對象。

通過withContext方法的代碼可以知道,當上下文不同但調度器相同時,在執行之前會通過withCoroutineContext方法將ThreadLocal中的對象更新成newContext對應的對象。在執行結束後,又將ThradLocal中的對象更新成原本續體的上下文context對應的對象。代碼如下:

internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T {
    // 將線程上下文更新新的上下文,並返回老的上下文
    val oldValue = updateThreadContext(context, countOrElement)
    try {
        // 在新的上下文環境中執行
        return block()
    } finally {
        // 執行結束恢復老的上下文
        restoreThreadContext(context, oldValue)
    }
}

協程中有一類上下文元素是ThreadContextElement,ThreadContextElement是一個接口,具體的實現類有CoroutineId類和ThreadLocalElement類。其中,CoroutineId類用來修改線程的名字。ThreadLocalElement類用來保存和恢復ThreadLocal類中的對象,withCoroutineContext方法內部的updateThreadContext方法與restoreThreadContext方法正是通過ThreadLocalElement類實現的。ThreadContextElement接口的代碼如下:

public interface ThreadContextElement<S> : CoroutineContext.Element {
    // 用於更新新的上下文,並且返回老的上下文
    public fun updateThreadContext(context: CoroutineContext): S
    // 重新恢復當前線程的上下文,
    // 其中oldStart來自updateThreadContext方法的返回值
    public fun restoreThreadContext(context: CoroutineContext, oldState: S)
}

當調用updateThreadContext方法時,會返回一個代表當前狀態的對象。當調用restoreThreadContext方法時,又需要傳入一個代表狀態的對象作為參數,來恢復之前的狀態。因此,這就需要對updateThreadContext方法的返回值進行保存。

當協程上下文中隻有一個ThreadContextElement接口指向的對象時,保存在變量中即可。而如果協程上下文中有多個ThreadContextElement接口指向的對象,這時就需要一個專門的類來對這些對象進行管理,這個類就是ThreadState類,他們之間的對應關系如下圖所示:

withCoroutineContext方法執行圖:

5.DispatchedCoroutine類

在withContext方法中,當需要切換線程時,會創建一個類型為DispatchedCoroutine的對象。DispatchedCoroutine類繼承自ScopeCoroutine類,代碼如下:

// 狀態機狀態
private const val UNDECIDED = 0
private const val SUSPENDED = 1
private const val RESUMED = 2
private class DispatchedCoroutine<in T>(
    context: CoroutineContext,
    uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
    // 初始狀態
    private val _decision = atomic(UNDECIDED)
    // 嘗試掛起
    private fun trySuspend(): Boolean {
        _decision.loop { decision ->
            when (decision) {
                UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
                RESUMED -> return false
                else -> error("Already suspended")
            }
        }
    }
    // 嘗試恢復
    private fun tryResume(): Boolean {
        _decision.loop { decision ->
            when (decision) {
                UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
                SUSPENDED -> return false
                else -> error("Already resumed")
            }
        }
    }
    override fun afterCompletion(state: Any?) {
        // 通過afterResume方法實現
        afterResume(state)
    }
    override fun afterResume(state: Any?) {
        // 如果沒有掛起,則返回
        if (tryResume()) return
        // 進行攔截,切換線程,恢復執行
        uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
    }
    // 獲取最終結果
    fun getResult(): Any? {
        if (trySuspend()) return COROUTINE_SUSPENDED
        val state = this.state.unboxState()
        if (state is CompletedExceptionally) throw state.cause
        @Suppress("UNCHECKED_CAST")
        return state as T
    }
}

DispatchedCoroutine類中使用瞭一個狀態機模型,這個狀態機與在Kotlin協程:生命周期原理中分析CancellableContinuationImpl類中的狀態機相同,獲取結果的邏輯也與CancellableContinuationImpl類相同。

這裡最重要的是DispatchedCoroutine類重寫瞭afterCompletion和afterResume方法,並且回調這兩個方法都會進行線程的切換。

6.總結

  ScopeCoroutine類 UndispatchedCoroutine類 DispatchedCoroutine類
afterCompletion方法 切線程 切線程 切線程
afterResume方法 不切線程 不切線程。更新ThreadLocal 切線程

以上就是Kotlin線程的橋接與切換使用介紹的詳細內容,更多關於Kotlin線程的資料請關註WalkonNet其它相關文章!

推薦閱讀: