C#多線程系列之多階段並行線程

前言

這一篇,我們將學習用於實現並行任務、使得多個線程有序同步完成多個階段的任務。

應用場景主要是控制 N 個線程(可隨時增加或減少執行的線程),使得多線程在能夠在 M 個階段中保持同步。

線程工作情況如下:

我們接下來 將學習C# 中的 Barrier ,用於實現並行協同工作。

Barrier 類

使多個任務能夠采用並行方式依據某種算法在多個階段中協同工作,使多個線程(稱為“參與者” )分階段同時處理算法。

可以使多個線程(稱為“參與者” )分階段同時處理算法。(註意算法這個詞)

每個參與者完成階段任務後後將被阻止繼續執行,直至所有參與者都已達到同一階段。

Barrier 的構造函數如下:

構造函數 說明
Barrier(Int32) 初始化 Barrier 類的新實例。
Barrier(Int32, Action) 初始化 Barrier 類的新實例。

其中一個構造函數定義如下:

public Barrier (int participantCount, Action<Barrier> postPhaseAction);

participantCount :處於的線程數量,大於0並且小於32767。

postPhaseAction :在每個階段後執行 Action(委托)。

屬性和方法

在還沒有清楚這個類有什麼作用前,我們來看一下這個類的常用屬性和方法。

大概瞭解 Barrier 有哪些常用屬性和方法後,我們開始編寫示例代碼。

屬性:

屬性 說明
CurrentPhaseNumber 獲取屏障的當前階段的編號。
ParticipantCount 獲取屏障中參與者的總數。
ParticipantsRemaining 獲取屏障中尚未在當前階段發出信號的參與者的數量。

方法:

方法 說明
AddParticipant() 通知 Barrier,告知其將會有另一個參與者。
AddParticipants(Int32) 通知 Barrier,告知其將會有多個其他參與者。
RemoveParticipant() 通知 Barrier,告知其將會減少一個參與者。
RemoveParticipants(Int32) 通知 Barrier,告知其將會減少一些參與者。
SignalAndWait() 發出參與者已達到屏障並等待所有其他參與者也達到屏障。
SignalAndWait(CancellationToken) 發出參與者已達到屏障的信號,並等待所有其他參與者達到屏障,同時觀察取消標記。
SignalAndWait(Int32) 發出參與者已達到屏障的信號,並等待所有其他參與者也達到屏障,同時使用 32 位帶符號整數測量超時。
SignalAndWait(Int32, CancellationToken) 發出參與者已達到屏障的信號,並等待所有其他參與者也達到屏障,使用 32 位帶符號整數測量超時,同時觀察取消標記。
SignalAndWait(TimeSpan) 發出參與者已達到屏障的信號,並等待所有其他參與者也達到屏障,同時使用 TimeSpan 對象測量時間間隔。
SignalAndWait(TimeSpan, CancellationToken) 發出參與者已達到屏障的信號,並等待所有其他參與者也達到屏障,使用 TimeSpan 對象測量時間間隔,同時觀察取消標記。

Barrier 翻譯屏障,前面所說的 “階段”,在文檔中稱為屏障,官方有一些例子和實踐場景:

https://docs.microsoft.com/zh-cn/dotnet/standard/threading/barrier?view=netcore-3.1

https://docs.microsoft.com/zh-cn/dotnet/standard/threading/how-to-synchronize-concurrent-operations-with-a-barrier?view=netcore-3.1

本文的教程比較簡單,你可以先看本教程,再去看看官方示例。

示例

假設有個比賽,一個有三個環節,有三個小組參加比賽。

比賽有三個環節,小組完成一個環節後,可以去等待區休息,等待其他小組也完成比賽後,開始進行下一個環節的比賽。

示例如下:

new Barrier(int,Action) 設置有多少線程參與,Action 委托設置每個階段完成後執行哪些動作。

.SignalAndWait() 阻止當前線程繼續往下執行;直到其他完成也執行到此為止。

    class Program
    {
        // Barrier(Int32, Action)
        private static Barrier barrier = new Barrier(3, b =>
                            Console.WriteLine($"\n第 {b.CurrentPhaseNumber + 1} 環節的比賽結束,請評分!"));

        static void Main(string[] args)
        {
            // Random 模擬每個小組完成一個環節比賽需要的時間
            Thread thread1 = new Thread(() => DoWork("第一小組", new Random().Next(2, 10)));
            Thread thread2 = new Thread(() => DoWork("第二小組", new Random().Next(2, 10)));
            Thread thread3 = new Thread(() => DoWork("第三小組", new Random().Next(2, 10)));

            // 三個小組開始比賽
            thread1.Start();
            thread2.Start();
            thread3.Start();


            Console.ReadKey();
        }
        static void DoWork(string name, int seconds)
        {
            // 第一環節
            Console.WriteLine($"\n{name}:開始進入第一環節比賽");
            Thread.Sleep(TimeSpan.FromSeconds(seconds));    // 模擬小組完成環節比賽需要的時間
            Console.WriteLine($"\n    {name}:完成第一環節比賽,等待其它小組");
            // 小組完成階段任務,去休息等待其它小組也完成比賽
            barrier.SignalAndWait();

            // 第二環節
            Console.WriteLine($"\n        {name}:開始進入第二環節比賽");
            Thread.Sleep(TimeSpan.FromSeconds(seconds));
            Console.WriteLine($"\n        {name}:完成第二環節比賽,等待其它小組\n");
            barrier.SignalAndWait();


            // 第三環節
            Console.WriteLine($"\n        {name}:開始進入第三環節比賽");
            Thread.Sleep(TimeSpan.FromSeconds(seconds));
            Console.WriteLine($"\n        {name}:完成第三環節比賽,等待其它小組\n");
            barrier.SignalAndWait();
        }
    }

上面的示例中,每個線程都使用瞭 DoWork() 這個方法去中相同的事情,當然也可以設置多個線程執行不同的任務,但是必須保證每個線程都具有相同數量的 .SignalAndWait(); 方法。

當然 SignalAndWait() 可以設置等待時間,如果其他線程遲遲沒有到這一步,那就繼續運行。可以避免死鎖等問題。

到目前,隻使用瞭 SignalAndWait() ,我們繼續學習一下 Barrier 類的其他方法。

新的示例

Barrier.AddParticipant():添加參與者;

Barrier.RemoveParticipant():移除參與者;

這裡繼續使用第二節的示例。

因為這是比賽,老是等待其他小組,會使得比賽進行比較慢。

新的規則:不必等待最後一名,當環節隻剩下最後一名時為完成時,其它小組可以立即進行下一個環節的比賽。

​ 當然,最後一名小組,有權利繼續完成比賽。

修改第二小節的代碼,在 Main 內第一行加上 barrier.RemoveParticipant();

        static void Main(string[] args)
        {
            barrier.RemoveParticipant();
            ... ...

試著再運行一下。

說明

SignalAndWait() 的 重載比較多,例如 SignalAndWait(CancellationToken),這裡筆者先不講解此方法如何使用。等到寫到後面的異步(Task),讀者學到相關的知識點,我們再過一次復習,這樣由易到難,自然水到渠成。

Barrier 適合用於同時執行相同流程的工作,因為工作內容是相同的,便於協同。工作流有可能用得上吧。

但是 Barrier 更加適合用於算法領域,可以參考:https://devblogs.microsoft.com/pfxteam/parallel-merge-sort-using-barrier/

到此這篇關於C#多線程系列之多階段並行線程的文章就介紹到這瞭。希望對大傢的學習有所幫助,也希望大傢多多支持WalkonNet。

推薦閱讀: