C#多線程系列之多階段並行線程
前言
這一篇,我們將學習用於實現並行任務、使得多個線程有序同步完成多個階段的任務。
應用場景主要是控制 N 個線程(可隨時增加或減少執行的線程),使得多線程在能夠在 M 個階段中保持同步。
線程工作情況如下:
我們接下來 將學習C# 中的 Barrier ,用於實現並行協同工作。
Barrier 類
使多個任務能夠采用並行方式依據某種算法在多個階段中協同工作,使多個線程(稱為“參與者” )分階段同時處理算法。
可以使多個線程(稱為“參與者” )分階段同時處理算法。(註意算法這個詞)
每個參與者完成階段任務後後將被阻止繼續執行,直至所有參與者都已達到同一階段。
Barrier 的構造函數如下:
構造函數 | 說明 |
---|---|
Barrier(Int32) | 初始化 Barrier 類的新實例。 |
Barrier(Int32, Action) | 初始化 Barrier 類的新實例。 |
其中一個構造函數定義如下:
public Barrier (int participantCount, Action<Barrier> postPhaseAction);
participantCount :處於的線程數量,大於0並且小於32767。
postPhaseAction :在每個階段後執行 Action(委托)。
屬性和方法
在還沒有清楚這個類有什麼作用前,我們來看一下這個類的常用屬性和方法。
大概瞭解 Barrier 有哪些常用屬性和方法後,我們開始編寫示例代碼。
屬性:
屬性 | 說明 |
---|---|
CurrentPhaseNumber | 獲取屏障的當前階段的編號。 |
ParticipantCount | 獲取屏障中參與者的總數。 |
ParticipantsRemaining | 獲取屏障中尚未在當前階段發出信號的參與者的數量。 |
方法:
方法 | 說明 |
---|---|
AddParticipant() | 通知 Barrier,告知其將會有另一個參與者。 |
AddParticipants(Int32) | 通知 Barrier,告知其將會有多個其他參與者。 |
RemoveParticipant() | 通知 Barrier,告知其將會減少一個參與者。 |
RemoveParticipants(Int32) | 通知 Barrier,告知其將會減少一些參與者。 |
SignalAndWait() | 發出參與者已達到屏障並等待所有其他參與者也達到屏障。 |
SignalAndWait(CancellationToken) | 發出參與者已達到屏障的信號,並等待所有其他參與者達到屏障,同時觀察取消標記。 |
SignalAndWait(Int32) | 發出參與者已達到屏障的信號,並等待所有其他參與者也達到屏障,同時使用 32 位帶符號整數測量超時。 |
SignalAndWait(Int32, CancellationToken) | 發出參與者已達到屏障的信號,並等待所有其他參與者也達到屏障,使用 32 位帶符號整數測量超時,同時觀察取消標記。 |
SignalAndWait(TimeSpan) | 發出參與者已達到屏障的信號,並等待所有其他參與者也達到屏障,同時使用 TimeSpan 對象測量時間間隔。 |
SignalAndWait(TimeSpan, CancellationToken) | 發出參與者已達到屏障的信號,並等待所有其他參與者也達到屏障,使用 TimeSpan 對象測量時間間隔,同時觀察取消標記。 |
Barrier 翻譯屏障,前面所說的 “階段”,在文檔中稱為屏障,官方有一些例子和實踐場景:
https://docs.microsoft.com/zh-cn/dotnet/standard/threading/barrier?view=netcore-3.1
https://docs.microsoft.com/zh-cn/dotnet/standard/threading/how-to-synchronize-concurrent-operations-with-a-barrier?view=netcore-3.1
本文的教程比較簡單,你可以先看本教程,再去看看官方示例。
示例
假設有個比賽,一個有三個環節,有三個小組參加比賽。
比賽有三個環節,小組完成一個環節後,可以去等待區休息,等待其他小組也完成比賽後,開始進行下一個環節的比賽。
示例如下:
new Barrier(int,Action)
設置有多少線程參與,Action 委托設置每個階段完成後執行哪些動作。
.SignalAndWait()
阻止當前線程繼續往下執行;直到其他完成也執行到此為止。
class Program { // Barrier(Int32, Action) private static Barrier barrier = new Barrier(3, b => Console.WriteLine($"\n第 {b.CurrentPhaseNumber + 1} 環節的比賽結束,請評分!")); static void Main(string[] args) { // Random 模擬每個小組完成一個環節比賽需要的時間 Thread thread1 = new Thread(() => DoWork("第一小組", new Random().Next(2, 10))); Thread thread2 = new Thread(() => DoWork("第二小組", new Random().Next(2, 10))); Thread thread3 = new Thread(() => DoWork("第三小組", new Random().Next(2, 10))); // 三個小組開始比賽 thread1.Start(); thread2.Start(); thread3.Start(); Console.ReadKey(); } static void DoWork(string name, int seconds) { // 第一環節 Console.WriteLine($"\n{name}:開始進入第一環節比賽"); Thread.Sleep(TimeSpan.FromSeconds(seconds)); // 模擬小組完成環節比賽需要的時間 Console.WriteLine($"\n {name}:完成第一環節比賽,等待其它小組"); // 小組完成階段任務,去休息等待其它小組也完成比賽 barrier.SignalAndWait(); // 第二環節 Console.WriteLine($"\n {name}:開始進入第二環節比賽"); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine($"\n {name}:完成第二環節比賽,等待其它小組\n"); barrier.SignalAndWait(); // 第三環節 Console.WriteLine($"\n {name}:開始進入第三環節比賽"); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine($"\n {name}:完成第三環節比賽,等待其它小組\n"); barrier.SignalAndWait(); } }
上面的示例中,每個線程都使用瞭 DoWork()
這個方法去中相同的事情,當然也可以設置多個線程執行不同的任務,但是必須保證每個線程都具有相同數量的 .SignalAndWait();
方法。
當然 SignalAndWait()
可以設置等待時間,如果其他線程遲遲沒有到這一步,那就繼續運行。可以避免死鎖等問題。
到目前,隻使用瞭 SignalAndWait()
,我們繼續學習一下 Barrier 類的其他方法。
新的示例
Barrier.AddParticipant()
:添加參與者;
Barrier.RemoveParticipant()
:移除參與者;
這裡繼續使用第二節的示例。
因為這是比賽,老是等待其他小組,會使得比賽進行比較慢。
新的規則:不必等待最後一名,當環節隻剩下最後一名時為完成時,其它小組可以立即進行下一個環節的比賽。
當然,最後一名小組,有權利繼續完成比賽。
修改第二小節的代碼,在 Main 內第一行加上 barrier.RemoveParticipant();
。
static void Main(string[] args) { barrier.RemoveParticipant(); ... ...
試著再運行一下。
說明
SignalAndWait()
的 重載比較多,例如 SignalAndWait(CancellationToken)
,這裡筆者先不講解此方法如何使用。等到寫到後面的異步(Task
),讀者學到相關的知識點,我們再過一次復習,這樣由易到難,自然水到渠成。
Barrier 適合用於同時執行相同流程的工作,因為工作內容是相同的,便於協同。工作流有可能用得上吧。
但是 Barrier 更加適合用於算法領域,可以參考:https://devblogs.microsoft.com/pfxteam/parallel-merge-sort-using-barrier/
到此這篇關於C#多線程系列之多階段並行線程的文章就介紹到這瞭。希望對大傢的學習有所幫助,也希望大傢多多支持WalkonNet。