Kotlin圖文並茂講解續體與續體攔截器和調度器
一.Continuation
Continuation接口是協程中最核心的接口,代表著掛起點之後的續體,代碼如下:
public interface Continuation<in T> { // 續體的上下文 public val context: CoroutineContext // 該方法用於恢復續體的執行 // result為掛起點執行完成的返回值,T為返回值的類型 public fun resumeWith(result: Result<T>) }
Continuation圖解
二.ContinuationInterceptor
ContinuationInterceptor接口繼承自Element接口,是協程中的續體攔截器,代碼如下:
public interface ContinuationInterceptor : CoroutineContext.Element { // 攔截器的Key companion object Key : CoroutineContext.Key<ContinuationInterceptor> // 攔截器對續體進行攔截時會調用該方法,並對continuation進行緩存 // 攔截判斷:根據傳入的continuation對象與返回的continuation對象是否相同 public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> // 當interceptContinuation方法攔截的協程執行完畢後,會調用該方法 public fun releaseInterceptedContinuation(continuation: Continuation<*>) { /* do nothing by default */ } // get方法多態實現 public override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? { @OptIn(ExperimentalStdlibApi::class) if (key is AbstractCoroutineContextKey<*, *>) { @Suppress("UNCHECKED_CAST") return if (key.isSubKey(this.key)) key.tryCast(this) as? E else null } @Suppress("UNCHECKED_CAST") return if (ContinuationInterceptor === key) this as E else null } // minusKey方法多態實現 public override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext { @OptIn(ExperimentalStdlibApi::class) if (key is AbstractCoroutineContextKey<*, *>) { return if (key.isSubKey(this.key) && key.tryCast(this) != null) EmptyCoroutineContext else this } return if (ContinuationInterceptor === key) EmptyCoroutineContext else this } }
三.CoroutineDispatcher
CoroutineDispatcher類繼承自AbstractCoroutineContextElement類,實現瞭ContinuationInterceptor接口,是協程調度器的基類,代碼如下:
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { // ContinuationInterceptor的多態實現,調度器本質上就是攔截器 @ExperimentalStdlibApi public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>( ContinuationInterceptor, { it as? CoroutineDispatcher }) // 用於判斷調度器是否要調用dispatch方法進行調度,默認為true public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true // 調度的核心方法,在這裡進行調度,執行block public abstract fun dispatch(context: CoroutineContext, block: Runnable) // 如果調度是由Yield方法觸發的,默認通過dispatch方法實現 @InternalCoroutinesApi public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block) // ContinuationInterceptor接口的方法,將續體包裹成DispatchedContinuation,並傳入當前調度器 public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation) // 釋放父協程與子協程的關聯。 @InternalCoroutinesApi public override fun releaseInterceptedContinuation(continuation: Continuation<*>) { (continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild() } // 重載瞭"+"操作,直接返回others // 因為兩個調度器相加沒有意義,同一個上下文中隻能有一個調度器 // 如果需要加的是調度器對象,則直接替換成最新的,因此直接返回 public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other override fun toString(): String = "$classSimpleName@$hexAddress" }
四.EventLoop
EventLoop類繼承自CoroutineDispatcher類,用於協程中任務的分發執行,隻在runBlocking方法中和Dispatchers.Unconfined調度器中使用。與Handler中的Looper類似,在創建後會存儲在當前線程的ThreadLocal中。EventLoop本身不支持延時執行任務,如果需要可以自行繼承EventLoop並實現Delay接口,EventLoop中預留瞭一部分變量和方法用於延時需求的擴展。
為什麼協程需要EventLoop呢?協程的本質是續體傳遞,而續體傳遞的本質是回調,假設在Dispatchers.Unconfined調度下,要連續執行多個suspend方法,就會有多個續體傳遞,假設suspend方法達到一定數量後,就會造成StackOverflow,進而引起崩潰。同樣的,我們知道調用runBlocking會阻塞當前線程,而runBlocking阻塞的原理就是執行“死循環”,因此需要在循環中做任務的分發,去執行內部協程在Dispatchers.Unconfined調度器下加入的任務。
EventLoop代碼如下:
internal abstract class EventLoop : CoroutineDispatcher() { // 用於記錄使用當前EventLoop的runBlocking方法和Dispatchers.Unconfined調度器的數量 private var useCount = 0L // 表示當前的EventLoop是否被暴露給其他的線程 // runBlocking會將EventLoop暴露給其他線程 // 因此,當runBlocking使用時,shared必須為true private var shared = false // Dispatchers.Unconfined調度器的任務執行隊列 private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null // 處理任務隊列的下一個任務,該方法隻能在EventLoop所在的線程調用 // 返回值<=0,說明立刻執行下一個任務 // 返回值>0,說明等待這段時間後,執行下一個任務 // 返回值為Long.MAX_VALUE,說明隊列裡沒有任務瞭 public open fun processNextEvent(): Long { if (!processUnconfinedEvent()) return Long.MAX_VALUE return 0 } // 隊列是否為空 protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty // 下一個任務多長時間後執行 protected open val nextTime: Long get() { val queue = unconfinedQueue ?: return Long.MAX_VALUE return if (queue.isEmpty) Long.MAX_VALUE else 0L } // 任務的核心處理方法 public fun processUnconfinedEvent(): Boolean { // 若隊列為空,則返回 val queue = unconfinedQueue ?: return false // 從隊首取出一個任務,如果為空,則返回 val task = queue.removeFirstOrNull() ?: return false // 執行 task.run() return true } // 表示當前EventLoop是否可以在協程上下文中被調用 // EventLoop本質上也是協程上下文 // 如果EventLoop在runBlocking方法中使用,必須返回true public open fun shouldBeProcessedFromContext(): Boolean = false // 向隊列中添加一個任務 public fun dispatchUnconfined(task: DispatchedTask<*>) { // 若隊列為空,則創建一個新的隊列 val queue = unconfinedQueue ?: ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it } queue.addLast(task) } // EventLoop當前是否還在被使用 public val isActive: Boolean get() = useCount > 0 // EventLoop當前是否還在被Unconfined調度器使用 public val isUnconfinedLoopActive: Boolean get() = useCount >= delta(unconfined = true) // 判斷隊列是否為空 public val isUnconfinedQueueEmpty: Boolean get() = unconfinedQueue?.isEmpty ?: true // 下面三個方法用於計算使用當前的EventLoop的runBlocking方法和Unconfined調度器的數量 // useCount是一個64位的數, // 它的高32位用於記錄Unconfined調度器的數量,低32位用於記錄runBlocking方法的數量 private fun delta(unconfined: Boolean) = if (unconfined) (1L shl 32) else 1L fun incrementUseCount(unconfined: Boolean = false) { useCount += delta(unconfined) // runBlocking中使用,shared為true if (!unconfined) shared = true } fun decrementUseCount(unconfined: Boolean = false) { useCount -= delta(unconfined) // 如果EventLoop還在被使用 if (useCount > 0) return assert { useCount == 0L } // 如果EventLoop不被使用瞭,並且在EventLoop中使用過 if (shared) { // 關閉相關資源,並在ThreadLocal中移除 shutdown() } } protected open fun shutdown() {} }
協程中提供瞭EventLoopImplBase類,間接繼承自EventLoop,實現瞭Delay接口,用來延時執行任務。同時,協程中還提供單例對象ThreadLocalEventLoop用於EventLoop在ThreadLocal中的存儲。
到此這篇關於Kotlin圖文並茂講解續體與續體攔截器和調度器的文章就介紹到這瞭,更多相關Kotlin續體內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Kotlin協程到底是如何切換線程的
- Kotlin協程的啟動方式介紹
- Kotlin coroutineContext源碼層深入分析
- Kotlin Dispatchers協程調度器源碼深入分析
- Kotlin線程的橋接與切換使用介紹