Kotlin Dispatchers協程調度器源碼深入分析
Dispatchers協程調度器
CoroutineDispatcher,具有用於調度任務的底層執行器。ExecutorCoroutineDispatcher的實例應由調度程序的所有者關閉。
此類通常用作基於協程的API和異步API之間的橋梁,異步API需要Executor的實例。
根據各種調度器的繼承關系,梳理如下繼承結構:
CoroutineDispatcher基類將由所有協程調度器實現擴展,kotlin官方實現瞭以下四種調度器:
Dispatchers.Default -如果上下文中未指定調度器或任何其他ContinuationInterceptor,則所有標準構建器都使用默認值。它使用共享後臺線程的公共池。對於消耗CPU資源的計算密集型協程來說,這是一個合適的選擇。
Dispatchers.IO -使用按需創建線程的共享池,用於卸載IO密集型阻塞操作(如文件I/O和阻塞套接字I/O)。
Dispatchers.Unconfined -在當前調用幀中啟動協程執行,直到第一次暫停,然後協程生成器函數返回。協程稍後將在相應的掛起函數使用的任何線程中恢復,而不將其限制在任何特定的線程或池中。無約束調度器通常不應在代碼中使用。
HandlerContext -在主線程中調度任務,android中主線程也就是ui線程,使用該調度器謹慎ANR異常,不應該使用該調度器調度阻塞或者耗時任務。
可以使用newSingleThreadContext和newFixedThreadPoolContext創建專用線程池。
可以使用asCoroutineDispatcher擴展函數將任意執行器轉換為調度器。
Dispatchers.Default
這個調度器的類型是DefaultScheduler,一般是做cpu密集計算型任務,內部包含的成員變量IO,也就是對應的Dispatchers.IO調度器。主要實現在ExecutorCoroutineDispatcher()
中,代碼如下:
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() { val IO: CoroutineDispatcher = LimitingDispatcher( this, systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)), "Dispatchers.IO", TASK_PROBABLY_BLOCKING ) //省略。。。 } public open class ExperimentalCoroutineDispatcher( private val corePoolSize: Int, private val maxPoolSize: Int, private val idleWorkerKeepAliveNs: Long, private val schedulerName: String = "CoroutineScheduler" ) : ExecutorCoroutineDispatcher() { public constructor(//省略。。。) override val executor: Executor get() = coroutineScheduler // This is variable for test purposes, so that we can reinitialize from clean state private var coroutineScheduler = createScheduler() override fun dispatch(context: CoroutineContext, block: Runnable): Unit = try { coroutineScheduler.dispatch(block) } catch (e: RejectedExecutionException) { DefaultExecutor.dispatch(context, block) } override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = try { coroutineScheduler.dispatch(block, tailDispatch = true) } catch (e: RejectedExecutionException) { DefaultExecutor.dispatchYield(context, block) } } //省略。。。 }
Default調度器其實沒做什麼特別的操作,隻是用coroutineScheduler代理實現瞭協程的調度。
Dispatchers.IO
這個是LimitingDispatcher類型的,是DefaultScheduler類型的成員變量,而LimitingDispatcher類型又是繼承自ExecutorCoroutineDispatcher的,LimitingDispatcher在它基礎上做瞭有調度個數限制的排隊機制,IO這個名字代表的IO操作,IO操作又是阻塞線程的操作,線程不能及時釋放,所以加入瞭隊列機制,防止IO線程爆炸式增長。如下:
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() { val IO: CoroutineDispatcher = LimitingDispatcher( this, systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)), "Dispatchers.IO", TASK_PROBABLY_BLOCKING ) //省略。。。 } private class LimitingDispatcher( private val dispatcher: ExperimentalCoroutineDispatcher, private val parallelism: Int, private val name: String?, override val taskMode: Int ) : ExecutorCoroutineDispatcher(), TaskContext, Executor { private val queue = ConcurrentLinkedQueue<Runnable>() private val inFlightTasks = atomic(0) private fun dispatch(block: Runnable, tailDispatch: Boolean) { var taskToSchedule = block while (true) { // Commit in-flight tasks slot val inFlight = inFlightTasks.incrementAndGet() // Fast path, if parallelism limit is not reached, dispatch task and return if (inFlight <= parallelism) { dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch) return } queue.add(taskToSchedule) if (inFlightTasks.decrementAndGet() >= parallelism) { return } taskToSchedule = queue.poll() ?: return } } override fun dispatchYield(context: CoroutineContext, block: Runnable) { dispatch(block, tailDispatch = true) } }
構造函數 傳入瞭parallelism參數 ,這個是並發數。
dispatchYield方法 實現是直接調用的dispatch方法。
dispatch方法:一個while循環,循環內,
- 給inFlightTasks變量加一(這個變量代表正在調度中的個數),如果inFlightTasks <= parallelism,代表當前調度任務數小於最大並發數,說明可以繼續向調度器中調度任務
- 否則將任務加入到隊列中,接著嘗試將inFlightTasks減一,如果大於並發數,那麼直接結束;
- 如果小於並發數,說明剛剛已經有任務結束瞭,讓出瞭並發數,這個時候可以再次嘗試從隊列中取出任務,從1開始。
override fun afterTask() { var next = queue.poll() // If we have pending tasks in current blocking context, dispatch first if (next != null) { dispatcher.dispatchWithContext(next, this, true) return } inFlightTasks.decrementAndGet() next = queue.poll() ?: return dispatch(next, true) }
afterTask方法
這個方法是任務調度結束後的回調,這裡面首先從隊列中取出一個任務,
任務不為空,讓調度器調度這個任務,結束;
為空,給調度任務數加一,然後嘗試取出任務,為空返回,不為空,繼續調用dispatch方法,整個流程就串起來瞭。
整個流程如下圖所示:
綜上:IO調度器側重於調度任務數量的限制,防止IO操作阻塞線程,讓線程數量爆炸式增長。
Dispatchers.Main
具體的實現類是HandlerContext
,代碼如下:
HandlerContext(Looper.getMainLooper().asHandler(async = true)) internal class HandlerContext private constructor( private val handler: Handler, private val name: String?, private val invokeImmediately: Boolean ) : HandlerDispatcher(), Delay { //省略。。。 }
主線程中調度任務,android中主線程也就是ui線程。實現原理是內部持有一個val handler : Handler = Looper.getMainLooper().asHandler(async = true)
,這個handler正是主線程的handler。
在調用dispatch調度方法的時候,是使用handler發送一個Runnable任務,
override fun dispatch(context: CoroutineContext, block: Runnable) { handler.post(block) }
在delay的時候,如果當前的dispatcher正是HandlerContext,那麼實現是handler發送一個延遲瞭timeMillis
毫秒時長的Runnable。invokeOnCancellation的擴展方法是在協程被取消的時候,移除掉該runnable消息。
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { val block = Runnable { with(continuation) { resumeUndispatched(Unit) } } handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY)) continuation.invokeOnCancellation { handler.removeCallbacks(block) } }
下面這個方法也比較常看到,就是協程在調度continuation的時候,會去判斷是不是需要去調度,不需要的話,直接在當前線程執行,需要調度的,需要由dispatcher來重新調度任務,這樣可能執行的線程會被切換,如果不是主線程的話,、就需要調度瞭, 如果是主線程的話立刻執行。
override fun isDispatchNeeded(context: CoroutineContext): Boolean { return !invokeImmediately || Looper.myLooper() != handler.looper }
Dispatchers.Unconfined
具體的實現如下:
internal object Unconfined : CoroutineDispatcher() { //省略。。。 }
isDispatchNeeded直接返回false,代表不需要重新調度。
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
dispatchYield沒有被覆寫,直接調用dispatch方法,用的還是CoroutineDispatcher的實現。
dispatch的報錯信息顯示,Unconfined調度器隻能在存在YieldContext的時候調度,否則就會報異常。
//CoroutineDispatcher public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block) //Unconfined override fun dispatch(context: CoroutineContext, block: Runnable) { // It can only be called by the "yield" function. See also code of "yield" function. val yieldContext = context[YieldContext] if (yieldContext != null) { // report to "yield" that it is an unconfined dispatcher and don't call "block.run()" yieldContext.dispatcherWasUnconfined = true return } throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " + "If you wrap Unconfined dispatcher in your code, make sure you properly delegate " + "isDispatchNeeded and dispatch calls.") }
yied方法:是暫時讓出工作線程,等待下一次線程調取恢復協程。
yield代碼如下:
public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont -> val context = uCont.context context.checkCompletion() val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit if (cont.dispatcher.isDispatchNeeded(context)) { cont.dispatchYield(context, Unit) } else { val yieldContext = YieldContext() cont.dispatchYield(context + yieldContext, Unit) if (yieldContext.dispatcherWasUnconfined) { return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit } } COROUTINE_SUSPENDED }
如果isDispatchNeeded == true,那麼就需要重新將協程被調度器調度一次,線程有可能切換掉;
如果isDispatchNeeded == false,上下文集合需要添加val yieldContext = YieldContext()這個元素(在上面的Dispatchers.Unconfined
的dispatche方法中,如果有YieldContext元素,將dispatcherWasUnconfined設置為true,代表yield操作什麼都沒有做,需要協程調度器用其他方法調度一次)。
判斷dispatcherWasUnconfined,true:說明Dispatchers.Unconfined什麼都沒有做,需要在調度一次,調用瞭yieldUndispatched
方法,這個方法大概就是讓協程直接恢復一次,或者線程調度一次恢復;
false:說明正在被調度器調度,是個掛起點,返回COROUTINE_SUSPENDED值。
不太清楚Dispatchers.Unconfined
這個調度器有啥用,有知道的留言下,學習學習。
協程調度器的實現CoroutineScheduler
調度過程正真的實現是CoroutineScheduler
這個類,上面說的四種調度器是包裝類,調度邏輯在CoroutineScheduler中,代碼如下:
internal class CoroutineScheduler( @JvmField val corePoolSize: Int, @JvmField val maxPoolSize: Int, @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME ) : Executor, Closeable { //省略。。。 }
構造函數入參 corePoolSize: Int
定義核心線程數,maxPoolSize: Int
定義最大線程數量
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) { val task = createTask(block, taskContext) // try to submit the task to the local queue and act depending on the result val currentWorker = currentWorker() val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch) if (notAdded != null) { if (!addToGlobalQueue(notAdded)) { // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted throw RejectedExecutionException("$schedulerName was terminated") } } val skipUnpark = tailDispatch && currentWorker != null // Checking 'task' instead of 'notAdded' is completely okay if (task.mode == TASK_NON_BLOCKING) { if (skipUnpark) return signalCpuWork() } else { // Increment blocking tasks anyway signalBlockingWork(skipUnpark = skipUnpark) } }
dispatch函數的實現:
創建task,block如果是Task類型的話,設置submissionTime變量,submissionTime變量用於延遲執行的時間判斷,以及隊列排序的時間順序;設置taskContext,該變量是task執行的協程上下文。不是Task類型的話,會創建TaskImp類型的任務返回,關鍵是finally中的taskContext.afterTask()
,就是task執行完成後需要回調afterTask通知協程上下文執行完畢瞭,上面的Dispatchers.IO裡面的LimitingDispatcher調度器就是需要afterTask回調通知,才能將隊列中下一個任務拋給CoroutineScheduler去執行。
internal fun createTask(block: Runnable, taskContext: TaskContext): Task { val nanoTime = schedulerTimeSource.nanoTime() if (block is Task) { block.submissionTime = nanoTime block.taskContext = taskContext return block } return TaskImpl(block, nanoTime, taskContext) } internal class TaskImpl( @JvmField val block: Runnable, submissionTime: Long, taskContext: TaskContext ) : Task(submissionTime, taskContext) { override fun run() { try { block.run() } finally { taskContext.afterTask() } } }
獲取當前的工作線程,如果當前是工作線程直接返回,不是的話返回空
private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf {<!--{C}%3C!%2D%2D%20%2D%2D%3E--> it.scheduler == this }
將任務提交到工作線程的本地隊列中
private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? { if (this == null) return task if (state === WorkerState.TERMINATED) return task if (task.mode == TASK_NON_BLOCKING && state === WorkerState.BLOCKING) { return task } mayHaveLocalTasks = true return localQueue.add(task, fair = tailDispatch) }
返回是空的,說明添加成功瞭,返回task說明沒有添加成功。
如果線程是中斷狀態,那麼直接返回task。 如果任務是非阻塞的也就是cpu密集型任務,而線程是阻塞的(正在執行任務中),那麼不添加任務,直接返回task。 其他情況,添加任務到隊列中,mayHaveLocalTasks標志位true,代表當前線程中有任務。
沒有添加的話,需要添加到全局隊列中,globalCpuQueue全局cpu密集型隊列,globalBlockingQueue全局IO隊列,根據任務類型添加到對應的隊列中。如果全局隊列都添加失敗的話,直接拋出異常。
if (notAdded != null) { if (!addToGlobalQueue(notAdded)) { // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted throw RejectedExecutionException("$schedulerName was terminated") } } val globalCpuQueue = GlobalQueue() val globalBlockingQueue = GlobalQueue() private fun addToGlobalQueue(task: Task): Boolean { return if (task.isBlocking) { globalBlockingQueue.addLast(task) } else { globalCpuQueue.addLast(task) } }
根據是否是尾部添加和當前線程是否是空,決定是否跳過喚醒工作線程的步驟。
val skipUnpark = tailDispatch && currentWorker != null
非阻塞任務:skipUnpark為true,跳過喚醒步驟,否則喚醒cpu密集型線程;阻塞任務:skipUnpark為true,跳過喚醒步驟,喚醒IO線程。
// Checking 'task' instead of 'notAdded' is completely okay if (task.mode == TASK_NON_BLOCKING) { if (skipUnpark) return signalCpuWork() } else { // Increment blocking tasks anyway signalBlockingWork(skipUnpark = skipUnpark) }
看下喚醒步驟的具體實現,大概都是先tryUnpark,喚醒線程,如果沒有喚醒成功,創建一個新的線程,再次嘗試喚醒。
private fun signalBlockingWork(skipUnpark: Boolean) { // Use state snapshot to avoid thread overprovision val stateSnapshot = incrementBlockingTasks() if (skipUnpark) return if (tryUnpark()) return if (tryCreateWorker(stateSnapshot)) return tryUnpark() // Try unpark again in case there was race between permit release and parking } internal fun signalCpuWork() { if (tryUnpark()) return if (tryCreateWorker()) return tryUnpark() }
看下工作線程的具體實現吧:
worker繼承自Thread,實現瞭run方法,具體是由runWorker()方法實現的,每個工作線程都有一個本地隊列用於存儲任務,這樣本地有任務就不用去全局隊列中去搶資源瞭,減少鎖競爭。
internal inner class Worker private constructor() : Thread() { //省略。。。 @JvmField val localQueue: WorkQueue = WorkQueue() @JvmField var mayHaveLocalTasks = false override fun run() = runWorker() //省略。。。 }
runWorker() 的實現:
private fun runWorker() { var rescanned = false while (!isTerminated && state != WorkerState.TERMINATED) { val task = findTask(mayHaveLocalTasks) // Task found. Execute and repeat if (task != null) { rescanned = false minDelayUntilStealableTaskNs = 0L executeTask(task) continue } else { mayHaveLocalTasks = false } if (minDelayUntilStealableTaskNs != 0L) { if (!rescanned) { rescanned = true } else { rescanned = false tryReleaseCpu(WorkerState.PARKING) interrupted() LockSupport.parkNanos(minDelayUntilStealableTaskNs) minDelayUntilStealableTaskNs = 0L } continue } tryPark() } tryReleaseCpu(WorkerState.TERMINATED) }
工作線程是用while循環一直運行的,循環內:
val task = findTask(mayHaveLocalTasks)
,前面這個變量mayHaveLocalTasks出現過,在添加task到本地隊列的時候,會置為true,本地隊列有任務,從本地獲取,沒有就從全局隊列中獲取,如果還是沒有,從其他線程隊列中偷取任務到自己隊列中:
fun findTask(scanLocalQueue: Boolean): Task? { if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue) // If we can't acquire a CPU permit -- attempt to find blocking task val task = if (scanLocalQueue) { localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull() } else { globalBlockingQueue.removeFirstOrNull() } return task ?: trySteal(blockingOnly = true) }
trySteal方法,循環workers隊列,遍歷線程本地隊列,去偷取任務,偷到的話返回任務,沒偷到的話,返回null:
private fun trySteal(blockingOnly: Boolean): Task? { //省略。。。 var currentIndex = nextInt(created) var minDelay = Long.MAX_VALUE repeat(created) { //省略。。。 val worker = workers[currentIndex] if (worker !== null && worker !== this) { val stealResult = if (blockingOnly) { localQueue.tryStealBlockingFrom(victim = worker.localQueue) } else { localQueue.tryStealFrom(victim = worker.localQueue) } if (stealResult == TASK_STOLEN) { return localQueue.poll() } else if (stealResult > 0) { minDelay = min(minDelay, stealResult) } } } minDelayUntilStealableTaskNs = if (minDelay != Long.MAX_VALUE) minDelay else 0 return null }
在偷不到任務的時候會設置一個變量,stealResult等於-2,最後minDelayUntilStealableTaskNs 等於0;
internal const val TASK_STOLEN = -1L internal const val NOTHING_TO_STEAL = -2L
在偷取任務的時候,如果上個任務時間和這次時間間隔太短的話,返回下次執行的間隔時間差,minDelayUntilStealableTaskNs設置為這個時間值,大於0。
找到task瞭,直接執行任務executeTask(task)
,執行完成,continue循環,從1開始;
沒找到任務,設置mayHaveLocalTasks = false
如果minDelayUntilStealableTaskNs不等於0,就是上面的間隔時間太短的條件觸發,那麼讓線程釋放鎖(防止線程執行任務太過密集,等待下次循環再去調度任務),continue循環,從1開始;
上面條件不成立,調用tryPark(),這個是和unPark相反的操作,讓線程閑置,放入到線程隊列中:
private fun tryPark() { if (!inStack()) { parkedWorkersStackPush(this) return } assert { localQueue.size == 0 } workerCtl.value = PARKED // Update value once while (inStack()) { // Prevent spurious wakeups if (isTerminated || state == WorkerState.TERMINATED) break tryReleaseCpu(WorkerState.PARKING) interrupted() // Cleanup interruptions park() } }
首先判斷是否在隊列中,不在的話,放入線程隊列中;在隊列中,將狀態設置為PARKED,不斷循環將釋放線程的cpu占用鎖,嘗試放到隊列中,park函數中有可能銷毀工作線程,看線程是否到達死亡時間點。
worker工作流程如下圖所示:
總結
1. Dispatchers的四種調度器是餓漢式單例對象,所以一個進程隻存在一個實例對象。
2. Dispatchers的四種調度器中,IO和default是共用的一個線程池,它的實現是CoroutineScheduler。
3. CoroutineScheduler線程池,有一個保存線程的隊列,有兩種全局任務隊列:一個是IO阻塞型隊列,一個是cpu密集型任務隊列;Worker線程擁有一個本地任務隊列。
4. Worker線程會根據任務類型,去對應的全局隊列或者從本地隊列找任務,找不到會從其他worker隊列中偷任務,然後執行;worker會根據自己的狀態回到線程隊列或者銷毀自己。
到此這篇關於Kotlin Dispatchers協程調度器原阿門深入分析的文章就介紹到這瞭,更多相關Kotlin Dispatchers內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Kotlin圖文並茂講解續體與續體攔截器和調度器
- Android Dispatchers.IO線程池深入刨析
- Kotlin協程到底是如何切換線程的
- Java簡單實現定時器
- Kotlin Job啟動流程源碼層深入分析