C#的並發機制優秀在哪你知道麼

筆者上次用C#寫.Net代碼差不多還是10多年以前,由於當時Java已經頗具王者風范,Net幾乎被打得潰不成軍。因此當時筆者對於這個.Net的項目態度比較敷衍瞭事,沒有對其中一些優秀機制有很深的瞭解,在去年寫《C和Java沒那麼香瞭,高並發時代誰能稱王》時都沒給.Net以一席之地,不過最近恰好機緣巧合,我又接手瞭一個Windows方面的項目,這也讓我有機會重新審視一下自己關於.Net框架的相關知識。

項目原型要實現的功能並不復雜,主要就是記錄移動存儲設備中文件拷出的記錄,而且需要盡可能少的占用系統資源,而在開發過程中我無意中加瞭一行看似沒有任何效果的代碼,使用Invoke方法記錄文件拷出情況,這樣的操作卻讓程序執行效率明顯會更高,這背後的原因特別值得總結。

一行沒用的代碼卻提高瞭效率?

由於我需要記錄的文件拷出信息並沒有回顯在UI的需要,因此也就沒考慮並發沖突的問題,在最初版本的實現中,我對於filesystemwatcher的回調事件,都是直接處理的,如下:

private void DeleteFileHandler(object sender, FileSystemEventArgs e)
        {
            if(files.Contains(e.FullPath))
            {
                files.Remove(e.FullPath);
               //一些其它操作
            }
        }

這個程序的處理效率在普通的辦公PC上如果同時拷出20個文件,那麼在拷貝過程中,U盤監測程序的CPU使用率大約是0.7%。

但是一個非常偶然的機會,我使用瞭Event/Delegate的Invoke機制,結果發現這樣一個看似的廢操作,卻讓程序的CPU占用率下降到0.2%左右

 private void UdiskWather_Deleted(object sender, FileSystemEventArgs e)
        {
            if(this.InvokeRequired)
            {
                this.Invoke(new DeleteDelegate(DeleteFileHandler), new object[] { sender,e });               }
            else
            {
                DeleteFileHandler(sender, e);
            }
        }

在我最初的認識中.net中的Delegate機制在調用過程中是要進行拆、裝箱操作的,因此這不拖慢操作就不錯瞭,但實際的驗證結果卻相反。​

看似沒用的Invoke到底有什麼用

這裡先給出結論,Invoke能提升程序執行效率,其關鍵還是在於線程在多核之間切換的消耗要遠遠高於拆、裝箱的資源消耗,我們知道我們程序的核心就是操作files這個共享變量,每次在被檢測的U盤目錄中如果發生文件變動,其回調通知函數可能都運行在不同的線程,如下:

Invoke機制的背後其實就是保證所有對於files這個共享變量的操作,全部都是由一個線程執行完成的。

目前.Net的代碼都開源的,下面我們大致講解一下Invoke的調用過程,不管是BeginInvoke還是Invoke背後其實都是調用的MarshaledInvoke方法來完成的,如下:

​
public IAsyncResult BeginInvoke(Delegate method, params Object[] args) {
            using (new MultithreadSafeCallScope()) {
                Control marshaler = FindMarshalingControl();
                return(IAsyncResult)marshaler.MarshaledInvoke(this, method, args, false);
            }
        }
​

MarshaledInvoke的主要工作是創建ThreadMethodEntry對象,並把它放在一個鏈表裡進行管理,然後調用PostMessage將相關信息發給要通信的線程,如下:

​
private Object MarshaledInvoke(Control caller, Delegate method, Object[] args, bool synchronous) {
            if (!IsHandleCreated) {
                throw new InvalidOperationException(SR.GetString(SR.ErrorNoMarshalingThread));
            }
            ActiveXImpl activeXImpl = (ActiveXImpl)Properties.GetObject(PropActiveXImpl);
            if (activeXImpl != null) {
                IntSecurity.UnmanagedCode.Demand();
            }
            // We don't want to wait if we're on the same thread, or else we'll deadlock.
            // It is important that syncSameThread always be false for asynchronous calls.
            //
            bool syncSameThread = false;
            int pid; // ignored
            if (SafeNativeMethods.GetWindowThreadProcessId(new HandleRef(this, Handle), out pid) == SafeNativeMethods.GetCurrentThreadId()) {
                if (synchronous)
                    syncSameThread = true;
            }
            // Store the compressed stack information from the thread that is calling the Invoke()
            // so we can assign the same security context to the thread that will actually execute
            // the delegate being passed.
            //
            ExecutionContext executionContext = null;
            if (!syncSameThread) {
                executionContext = ExecutionContext.Capture();
            }
            ThreadMethodEntry tme = new ThreadMethodEntry(caller, this, method, args, synchronous, executionContext);
            lock (this) {
                if (threadCallbackList == null) {
                    threadCallbackList = new Queue();
                }
            }
            lock (threadCallbackList) {
                if (threadCallbackMessage == 0) {
                    threadCallbackMessage = SafeNativeMethods.RegisterWindowMessage(Application.WindowMessagesVersion + "_ThreadCallbackMessage");
                }
                threadCallbackList.Enqueue(tme);
            }
            if (syncSameThread) {
                InvokeMarshaledCallbacks();
            }  else {
                //
                UnsafeNativeMethods.PostMessage(new HandleRef(this, Handle), threadCallbackMessage, IntPtr.Zero, IntPtr.Zero);
            }
            if (synchronous) {
                if (!tme.IsCompleted) {
                    WaitForWaitHandle(tme.AsyncWaitHandle);
                }
                if (tme.exception != null) {
                    throw tme.exception;
                }
                return tme.retVal;
            }
            else {
                return(IAsyncResult)tme;
            }
        }
​

Invoke的機制就保證瞭一個共享變量隻能由一個線程維護,這和GO語言使用通信來替代共享內存的設計是暗合的,他們的理念都是 "讓同一塊內存在同一時間內隻被一個線程操作" 。這和現代計算體系結構的多核CPU(SMP)有著密不可分的聯系,

這裡我們先來科普一下CPU之間的通信MESI協議的內容。我們知道現代的CPU都配備瞭高速緩存,按照多核高速緩存同步的MESI協議約定,每個緩存行都有四個狀態,分別是E(exclusive)、M(modified)、S(shared)、I(invalid),其中:

M:代表該緩存行中的內容被修改,並且該緩存行隻被緩存在該CPU中。這個狀態代表緩存行的數據和內存中的數據不同。

E:代表該緩存行對應內存中的內容隻被該CPU緩存,其他CPU沒有緩存該緩存對應內存行中的內容。這個狀態的緩存行中的數據與內存的數據一致。

I:代表該緩存行中的內容無效。

S:該狀態意味著數據不止存在本地CPU緩存中,還存在其它CPU的緩存中。這個狀態的數據和內存中的數據也是一致的。不過隻要有CPU修改該緩存行都會使該行狀態變成 I 。

四種狀態的狀態轉移圖如下:

​我們上文也提到瞭,不同的線程是有大概率是運行在不同CPU核上的,在不同CPU操作同一塊內存時,站在CPU0的角度上看,就是CPU1會不斷發起remote write的操作,這會使該高速緩存的狀態總是會在S和I之間進行狀態遷移,而一旦狀態變為I將耗費比較多的時間進行狀態同步。

因此我們可以基本得出 this.Invoke(new DeleteDelegate(DeleteFileHandler), new object[] { sender,e });   ;這行看似無關緊要的代碼之後,無意中使files共享變量的維護操作,由多核多線程共同操作,變成瞭眾多子線程向主線程通信,所有維護操作均由主線程進行,這也使最終的執行效率有所提高。

​深度解讀,為何要加兩把鎖

在當前使用通信替代共享內存的大潮之下,鎖其實是最重要的設計。

我們看到在.Net的Invoke實現中,使用瞭兩把鎖lock (thislock (threadCallbackList)

lock (this) {
                if (threadCallbackList == null) {
                    threadCallbackList = new Queue();
                }
            }
            lock (threadCallbackList) {
                if (threadCallbackMessage == 0) {
                    threadCallbackMessage = SafeNativeMethods.RegisterWindowMessage(Application.WindowMessagesVersion + "_ThreadCallbackMessage");
                }
                threadCallbackList.Enqueue(tme);
            }

在.NET當中lock關鍵字的基本可以理解為提供瞭一個近似於CAS的鎖(Compare And Swap)。CAS的原理不斷地把"期望值"和"實際值"進行比較,當它們相等時,說明持有鎖的CPU已經釋放瞭該鎖,那麼試圖獲取這把鎖的CPU就會嘗試將"new"的值(0)寫入"p"(交換),以表明自己成為spinlock新的owner。偽代碼演示如下:

void CAS(int p, int old,int new)
{
    if *p != old
        do nothing
    else 
     *p ← new
}

基於CAS的鎖效率沒問題,尤其是在沒有多核競爭的情況CAS表現得尤其優秀,但CAS最大的問題就是不公平,因為如果有多個CPU同時在申請一把鎖,那麼剛剛釋放鎖的CPU極可能在下一輪的競爭中獲取優勢,再次獲得這把鎖,這樣的結果就是一個CPU忙死,而其它CPU卻很閑,我們很多時候詬病多核SOC“一核有難,八核圍觀”其實很多時候都是由這種不公平造成的。

為瞭解決CAS的不公平問題,業界大神們又引入瞭TAS(Test And Set Lock)機制,個人感覺還是把TAS中的T理解為Ticket更好記一些,TAS方案中維護瞭一個請求該鎖的頭尾索引值,由"head"和"tail"兩個索引組成。

struct lockStruct{
    int32 head;
    int32 tail;
} ;

"head"代表請求隊列的頭部,"tail"代表請求隊列的尾部,其初始值都為0。

最一開始時,第一個申請的CPU發現該隊列的tail值是0,那麼這個CPU會直接獲取這把鎖,並會把tail值更新為1,並在釋放該鎖時將head值更新為1。

在一般情況下當鎖被持有的CPU釋放時,該隊列的head值會被加1,當其他CPU在試圖獲取這個鎖時,鎖的tail值獲取到,然後把這個tail值加1,並存儲在自己專屬的寄存器當中,然後再把更新後的tail值更新到隊列的tail當中。接下來就是不斷地循環比較,判斷該鎖當前的"head"值,是否和自己存儲在寄存器中的"tail"值相等,相等時則代表成功獲得該鎖。

TAS這類似於用戶到政務大廳去辦事時,首先要在叫號機取號,當工作人員廣播叫到的號碼與你手中的號碼一致時,你就獲取瞭辦事櫃臺的所有權。

但是TAS卻存在一定的效率問題,根據我們上文介紹的MESI協議,這個lock的頭尾索引其實是在各個CPU之間共享的,因此tail和head頻繁更新,還是會引發調整緩存不停的invalidate,這會極大的影響效率。

因此我們看到在.Net的實現中幹脆就直接引入瞭threadCallbackList的隊列,並不斷將tme(ThreadMethodEntry)加入隊尾,而接收消息的進程,則不斷從隊首獲取消息.

lock (threadCallbackList) {
                if (threadCallbackMessage == 0) {
                    threadCallbackMessage = SafeNativeMethods.RegisterWindowMessage(Application.WindowMessagesVersion + "_ThreadCallbackMessage");
                }
                threadCallbackList.Enqueue(tme);
            }

當隊首指向這個tme時,消息才被發送,其實是一種類似於MAS的實現,當然MAS實際是為每個CPU都建立瞭一個專屬的隊列,和Invoke的設計略有不同,不過基本的思想是一致的。

總結

本篇文章就到這裡瞭,希望能夠給你帶來幫助,也希望您能夠多多關註WalkonNet的更多內容!   

推薦閱讀: