C# TaskScheduler任務調度器的實現

什麼是TaskScheduler?

SynchronizationContext是對“調度程序(scheduler)”的通用抽象。個別框架會有自己的抽象調度程序,比如System.Threading.Tasks。當Tasks通過委托的形式進行排隊和執行時,會用到System.Threading.Tasks.TaskScheduler。和SynchronizationContext提供瞭一個virtual Post方法用於將委托排隊調用一樣(稍後,我們會通過典型的委托調用機制來調用委托),TaskScheduler也提供瞭一個abstract QueueTask方法(稍後,我們會通過ExecuteTask方法來調用該Task)。

通過TaskScheduler.Default我們可以獲取到Task默認的調度程序ThreadPoolTaskScheduler——線程池(譯註:這下知道為什麼Task默認使用的是線程池線程瞭吧)。並且可以通過繼承TaskScheduler來重寫相關方法來實現在任意時間任意地點進行Task調用。例如,核心庫中有個類,名為System.Threading.Tasks.ConcurrentExclusiveSchedulerPair,其實例公開瞭兩個TaskScheduler屬性,一個叫ExclusiveScheduler,另一個叫ConcurrentScheduler。調度給ConcurrentScheduler的任務可以並發,但是要在構造ConcurrentExclusiveSchedulerPair時就要指定最大並發數(類似於前面演示的MaxConcurrencySynchronizationContext);相反,在ExclusiveScheduler執行任務時,那麼將隻允許運行一個排他任務,這個行為很像讀寫鎖。

和SynchronizationContext一樣,TaskScheduler也有一個Current屬性,會返回當前調度程序。不過,和SynchronizationContext不同的是,它沒有設置當前調度程序的方法,而是在啟動Task時就要提供,因為當前調度程序是與當前運行的Task相關聯的。所以,下方的示例程序會輸出“True”,這是因為和StartNew一起使用的lambda表達式是在ConcurrentExclusiveSchedulerPair的ExclusiveScheduler上執行的(我們手動指定cesp.ExclusiveScheduler),並且TaskScheduler.Current也

using System;
using System.Threading.Tasks;
class Program
{
    static void Main()
    {
        var cesp = new ConcurrentExclusiveSchedulerPair();
        Task.Factory.StartNew(() =>
        {
            Console.WriteLine(TaskScheduler.Current == cesp.ExclusiveScheduler);
        }, default, TaskCreationOptions.None, cesp.ExclusiveScheduler)
        .Wait();
    }
}

TaskScheduler  任務調度器的原理

public abstract class TaskScheduler
{
    // 任務入口,待調度執行的 Task 會通過該方法傳入,調度器會將任務安排task到指定的隊列(線程池任務隊列(全局任務隊列、本地隊列)、獨立線程、ui線程) 隻能被.NET Framework調用,不能配派生類調用
   //
    protected internal abstract void QueueTask(Task task);
    // 這個是在執行 Task 回調的時候才會被執行到的方法,放到後面再講
    protected abstract bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued);
protected abstract bool TryExecuteTask(Task task, bool taskWasPreviouslyQueued);
// 獲取所有調度到該 TaskScheduler 的 Task
 protected abstract IEnumerable<Task>? GetScheduledTasks();
 }

 .net中的任務調度器有哪些

線程池任務調度器:ThreadPoolTaskScheduler、
核心庫任務調度器:ConcurrentExclusiveSchedulerPair 
UI任務調度器:SynchronizationContextTaskScheduler,並發度為1

平時我們在用多線程開發的時候少不瞭Task,確實task給我們帶來瞭巨大的編程效率,在Task底層有一個TaskScheduler,它決定瞭task該如何被調度,而在.net framework中有兩種系統定義Scheduler,第一個是Task默認的ThreadPoolTaskScheduler,還是一種就是SynchronizationContextTaskScheduler(wpf),默認的調度器無法控制任務優先級,那麼需要自定義調度器實現優先級控制。以及這兩種類型之外的如何自定義,這篇剛好和大傢分享一下。

一: ThreadPoolTaskScheduler

這種scheduler機制是task的默認機制,而且從名字上也可以看到它是一種委托到ThreadPool的機制,剛好也從側面說明task是基於ThreadPool基礎上的封裝,源代碼

ThreadPoolTaskScheduler的原理:將指定的長任務開辟一個獨立的線程去執行,未指定的長時間運行的任務就用線程池的線程執行

internal sealed class ThreadPoolTaskScheduler : TaskScheduler
    {
//其他代碼
   protected internal override void QueueTask(Task task)
        {
            TaskCreationOptions options = task.Options;
            if (Thread.IsThreadStartSupported && (options & TaskCreationOptions.LongRunning) != 0)
            {
                // Run LongRunning tasks on their own dedicated thread.
                new Thread(s_longRunningThreadWork)
                {
                    IsBackground = true,
                    Name = ".NET Long Running Task"
                }.UnsafeStart(task);
            }
            else
            {
                // Normal handling for non-LongRunning tasks.
                ThreadPool.UnsafeQueueUserWorkItemInternal(task, (options & TaskCreationOptions.PreferFairness) == 0);
            }
        }
//其他代碼
 }

二:SynchronizationContextTaskScheduler

使用條件:隻有當前程的同步上下文不為null時,該方法才能正常使用。例如在UI線程(wpf、 winform、 asp.net)中,UI線程的同步上下文不為Null。控制臺默認的當前線程同步上下文為null,如果給當前線程設置默認的同步上下文SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());就可以正常使用該方法。如果控制臺程序的線程未設置同步上下將引發【當前的 SynchronizationContext 不能用作 TaskScheduler】異常。

默認的同步上下文將方法委托給線程池執行。

使用方式:通過TaskScheduler.FromCurrentSynchronizationContext() 調用SynchronizationContextTaskScheduler。

原理:初始化時候捕獲當前的線程的同步上下文。 將同步上下文封裝入任務調度器形成新的任務調度器SynchronizationContextTaskScheduler。重寫該任務調度器中的QueueTask方法,利用同步上下文的post方法將任務送到不同的處理程序,如果是winform的UI線程同步上下文 的post方法(已重寫post方法),就將任務送到UI線程。如果是控制臺線程(默認為null 設置默認同步上下文後可以正常使用。默認同步上下文采用線程池線程)就將任務送入線程池處理。

在winform中的同步上下文:WindowsFormsSynchronizationContext
在wpf中的同步上下文:DispatcherSynchronizationContext
在控制臺\線程池\new thread 同步上下文:都默認為Null。可以給他們設置默認的同步上下文SynchronizationContext。SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());

SynchronizationContext 綜述 | Microsoft Docs

以下是SynchronizationContextTaskScheduler部分源代碼

internal sealed class SynchronizationContextTaskScheduler : TaskScheduler
    {
//初始化時候 ,捕獲當前線程的同步上下文  
 internal SynchronizationContextTaskScheduler()
        {
            m_synchronizationContext = SynchronizationContext.Current ??
                // make sure we have a synccontext to work with
                throw new InvalidOperationException(SR.TaskScheduler_FromCurrentSynchronizationContext_NoCurrent);
        }
//其他代碼
private readonly SynchronizationContext m_synchronizationContext;
protected internal override void QueueTask(Task task)
        {
            m_synchronizationContext.Post(s_postCallback, (object)task);
        }
//其他代碼
///改變post的調度方法、 調用者線程執行各方面的任務操作
  private static readonly SendOrPostCallback s_postCallback = static s =>
        {
            Debug.Assert(s is Task);
            ((Task)s).ExecuteEntry(); //調用者線程執行各方面的任務操作
        };
 }

以下是SynchronizationContext部分源代碼

public partial class SynchronizationContext
    {
    //其他代碼
    public virtual void Post(SendOrPostCallback d, object? state) => ThreadPool.QueueUserWorkItem(static s => s.d(s.state), (d, state), preferLocal: false);
   //其他代碼
  }

有瞭這個基礎我們再來看一下代碼怎麼寫,可以看到,下面這段代碼是不阻塞UIThread的,完美~~~

private void button1_Click(object sender, EventArgs e)
         {
             Task task = Task.Factory.StartNew(() =>
             {
                 //復雜操作,等待10s
                 Thread.Sleep(10000);
             }).ContinueWith((t) =>
             {
                 button1.Text = "hello world";
             }, TaskScheduler.FromCurrentSynchronizationContext());
         }

三:自定義TaskScheduler

我們知道在現有的.net framework中隻有這麼兩種TaskScheduler,有些同學可能想問,這些Scheduler我用起來不爽,我想自定義一下,這個可以嗎?當然!!!如果你想自定義,隻要自定義一個類實現一下TaskScheduler就可以瞭,然後你可以將ThreadPoolTaskScheduler簡化一下,即我要求所有的Task都需要走Thread,杜絕使用TheadPool,這樣可以嗎,當然瞭,不信你看。

namespace ConsoleApplication1
{
    class Program
    {
        static void Main(string[] args)
        {
            var task = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("hello world!!!");
            }, new CancellationToken(), TaskCreationOptions.None, new PerThreadTaskScheduler());
            Console.Read();
        }
    }
    /// <summary>
    /// 每個Task一個Thread
    /// </summary>
    public class PerThreadTaskScheduler : TaskScheduler
    {
        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return null;
        }
        protected override void QueueTask(Task task)
        {
            var thread = new Thread(() =>
            {
                TryExecuteTask(task);
            });
            thread.Start();
        }
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            throw new NotImplementedException();
        }
    }
}

創建一個與當前SynchronizationContext關聯的TaskScheduler。源代碼如下:

假設有一個UI App,它有一個按鈕。當點擊按鈕後,會從網上下載一些文本並將其設置為按鈕的內容。我們應當隻在UI線程中訪問該按鈕,因此當我們成功下載新的文本後,我們需要從擁有按鈕控制權的的線程中將其設置為按鈕的內容。如果不這樣做的話,會得到一個這樣的異常:

System.InvalidOperationException: 'The calling thread cannot access this object because a different thread owns it.'

如果我們自己手動實現,那麼可以使用前面所述的SynchronizationContext將按鈕內容的設置傳回原始上下文,例如借助TaskScheduler

用法如下

private static readonly HttpClient s_httpClient = new HttpClient();
private void downloadBtn_Click(object sender, RoutedEventArgs e)
{
    s_httpClient.GetStringAsync("http://example.com/currenttime").ContinueWith(downloadTask =>
    {
        downloadBtn.Content = downloadTask.Result;
    }, TaskScheduler.FromCurrentSynchronizationContext());//捕獲當前UI線程的同步上下文
}

到此這篇關於C# TaskScheduler任務調度器的實現的文章就介紹到這瞭,更多相關C# TaskScheduler任務調度器內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: