詳解從ObjectPool到CAS指令
相信最近看過我的文章的朋友對於Microsoft.Extensions.ObjectPool
不陌生;復用、池化是在很多高性能場景的優化技巧,它能減少內存占用率、降低GC頻率、提升系統TPS和降低請求時延。
那麼池化和復用對象意味著同一時間會有多個線程訪問池,去獲取和歸還對象,那麼這肯定就有並發問題。那ObjectPool
在涉及多線程訪問資源應該怎麼做到線程安全呢?
今天就帶大傢通過學習ObjectPool
的源碼聊一聊它是如何實現線程安全的。
源碼解析
ObjectPool
的關鍵就在於兩個方法,一個是Get
用於獲取池中的對象,另外就是Return
用於歸還已經使用完的對象。我們先來簡單的看看ObjectPool
的默認實現DefaultObjectPool.cs
類的內容。
私有字段
先從它的私有變量開始,下面代碼中給出,並且註釋瞭其作用:
// 用於存放池化對象的包裝數組 長度為構造函數傳入的max - 1 // 為什麼 -1 是因為性能考慮把第一個元素放到 _firstItem中 private protected readonly ObjectWrapper[] _items; // 池化策略 創建對象 和 回收對象的防范 private protected readonly IPooledObjectPolicy<T> _policy; // 是否默認的策略 是一個IL優化 使編譯器生成call 而不是 callvirt private protected readonly bool _isDefaultPolicy; // 因為池化大多數場景隻會獲取一個對象 為瞭性能考慮 單獨整一個對象不放在數組中 // 避免數組遍歷 private protected T? _firstItem; // 這個類是在2.1中引入的,以盡可能地避免接口調用 也就是去虛擬化 callvirt private protected readonly PooledObjectPolicy<T>? _fastPolicy;
構造方法
另外就是它的構造方法,默認實現DefaultObjectPool
有兩個構造函數,代碼如下所示:
/// <summary> /// Creates an instance of <see cref="DefaultObjectPool{T}"/>. /// </summary> /// <param name="policy">The pooling policy to use.</param> public DefaultObjectPool(IPooledObjectPolicy<T> policy) : this(policy, Environment.ProcessorCount * 2) { // 從這個構造方法可以看出,如果我們不指定ObjectPool的池大小 // 那麼池大小會是當前可用的CPU核心數*2 } /// <summary> /// Creates an instance of <see cref="DefaultObjectPool{T}"/>. /// </summary> /// <param name="policy">The pooling policy to use.</param> /// <param name="maximumRetained">The maximum number of objects to retain in the pool.</param> public DefaultObjectPool(IPooledObjectPolicy<T> policy, int maximumRetained) { _policy = policy ?? throw new ArgumentNullException(nameof(policy)); // 是否為可以消除callvirt的策略 _fastPolicy = policy as PooleObjectPolicy<T>; // 如上面備註所說 是否為默認策略 可以消除callvirt _isDefaultPolicy = IsDefaultPolicy(); // 初始化_items數組 容量還剩一個在 _firstItem中 _items = new ObjectWrapper[maximumRetained - 1]; bool IsDefaultPolicy() { var type = policy.GetType(); return type.IsGenericType && type.GetGenericTypeDefinition() == typeof(DefaultPooledObjectPolicy<>); } }
Get 方法
如上文所說,Get()
方法是ObjectPool
中最重要的兩個方法之一,它的作用就是從池中獲取一個對象,它使用瞭CAS
近似無鎖的指令來解決多線程資源爭用的問題,代碼如下所示:
public override T Get() { // 先看_firstItem是否有值 // 這裡使用瞭 Interlocked.CompareExchange這個方法 // 原子性的判斷 _firstItem是否等於item // 如果等於那把null賦值給_firstItem // 然後返回_firstItem對象原始的值 反之就是什麼也不做 var item = _firstItem; if (item == null || Interlocked.CompareExchange(ref _firstItem, null, item) != item) { var items = _items; // 遍歷整個數組 for (var i = 0; i < items.Length; i++) { item = items[i].Element; // 通過原子性的Interlocked.CompareExchange嘗試讀取一個元素 // 讀取成功則返回 if (item != null && Interlocked.CompareExchange(ref items[i].Element, null, item) == item) { return item; } } // 如果遍歷整個沒有獲取到元素 // 那麼走創建方法,創建一個 item = Create(); } return item; }
上面代碼中,有一個點解釋一下Interlocked.CompareExchange(ref _firstItem, null, item) != item
,其中!=item
,如果其等於item
就說明交換成功瞭,當前線程獲取到_firstItem
元素的期間沒有其它線程修改_firstItem
的值。
Return 方法
Retrun(T obj)
方法是ObjectPool
另外一個重要的方法,它的作用就是當程序代碼把從池中獲取的對象使用完以後,將其歸還到池中。同樣,它也使用CAS
指令來解決多線程資源爭用的問題,代碼如下所示:
public override void Return(T obj) { // 使用策略的Return方法對元素進行處理 // 比如 List<T> 需要調用Claer方法清除集合內元素 // StringBuilder之類的也需要調用Claer方法清除緩存的字符 if (_isDefaultPolicy || (_fastPolicy?.Return(obj) ?? _policy.Return(obj))) { // 先嘗試將歸還的元素賦值到 _firstItem中 if (_firstItem != null || Interlocked.CompareExchange(ref _firstItem, obj, null) != null) { var items = _items; // 如果 _firstItem已經存在元素 // 那麼遍歷整個數組空間 找一個存儲為null的空位將對象存儲起來 for (var i = 0; i < items.Length && Interlocked.CompareExchange(ref items[i].Element, obj, null) != null; ++i) { } } } }
從核心的Get()
和Set()
方法來看,其實整個代碼是比較簡單的,除瞭有一個_firstItem
有一個簡單的優化,其餘沒有什麼特別的復雜的邏輯。
主要的關鍵就在Interlocked.CompareExchange
方法上,我們在下文來仔細研究一下這個方法。
關於 Interlocked.CompareExchange
Interlocked.CompareExchange
它實際上是一個CAS
的實現,也就是Compare And Swap,從名字就可以看出來,它就是比較然後交換的意思。
從下面的代碼段我們也可以看出來,它總共需要三個參數。其特性就是隻有當localtion1 == comparand
的時候才會將value
賦值給localtion1
,另外吧localtion1
的原始值返回出來,這些操作都是原子性的。
// localtion1 需要比較的引用A // value 計劃給引用A 賦的值 // comparand 和引用A比較的引用 public static T CompareExchange<T> (ref T location1, T value, T comparand) where T : class;
一個簡單的流程如下所示:
簡單的使用代碼如下所示:
var a = 1; // a == 1的話就將其置為0 // 判斷是否成功就看返回的值是否為a的原始值 if(Interlocked.CompareExchange(ref a, 0, 1) == 1) Console.WriteLine("1.成功"); // 現在a已經變為0 這個交換不會成功 if(Interlocked.CompareExchange(ref a, 0, 1) == 1) Console.WriteLine("2.成功");
結果如下所示,隻有當a
的原始值為1
的時候,才會交換成功:
那麼Interlocked.CompareExchange
是如何做到原子性的?在多核CPU中,數據可能在內存或者L1、L2、L3中(如下圖所示),我們如何保證能原子性的對某個數據進行操作?
實際上這是CPU提供的功能,如果查看過JIT編譯的結果,可以看到CompareExchange
是由一條叫lock cmpxchgl
的匯編指令支撐的。
其中lock
是一個指令前綴,匯編指令被lock
修飾後會成為"原子的",lock
指令有兩種實現方法:
- 早期 – Pentium時代(鎖總線),在Pentium及之前的處理器中,帶有
lock
前綴的指令在執行期間會鎖住總線,使得其它處理器暫時無法通過總線訪問內存,很顯然,這個開銷很大。 - 現在 – P6以後時代(鎖緩存),在新的處理器中,Intel使用緩存鎖定來保證指令執行的原子性,緩存鎖定將大大降低lock前綴指令的執行開銷。
現在這裡的鎖緩存(Cache Locking)就是用瞭Ringbus + MESI協議。
MESI
協議是 Cacheline 四種狀態的首字母的縮寫,分別是修改(Modified)態、獨占(Exclusive)態、共享(Shared)態和失效(Invalid)態。 Cache 中緩存的每個 Cache Line 都必須是這四種狀態中的一種。
修改態(Modified),如果該 Cache Line 在多個 Cache 中都有備份,那麼隻有一個備份能處於這種狀態,並且“dirty”標志位被置上。擁有修改態 Cache Line 的 Cache 需要在某個合適的時候把該 Cache Line 寫回到內存中。但是在寫回之前,任何處理器對該 Cache Line在內存中相對應的內存塊都不能進行讀操作。 Cache Line 被寫回到內存中之後,其狀態就由修改態變為共享態。
獨占態(Exclusive),和修改狀態一樣,如果該 Cache Line 在多個 Cache 中都有備份,那麼隻有一個備份能處於這種狀態,但是“dirty”標志位沒有置上,因為它是和主內存內容保持一致的一份拷貝。如果產生一個讀請求,它就可以在任何時候變成共享態。相應地,如果產生瞭一個寫請求,它就可以在任何時候變成修改態。
共享態(Shared),意味著該 Cache Line 可能在多個 Cache 中都有備份,並且是相同的狀態,它是和內存內容保持一致的一份拷貝,而且可以在任何時候都變成其他三種狀態。
失效態(Invalid),該 Cache Line 要麼已經不在 Cache 中,要麼它的內容已經過時。一旦某個Cache Line 被標記為失效,那它就被當作從來沒被加載到 Cache 中。
總得來說,若幹個CPU核心通過Ringbus連到一起。每個核心都維護自己的Cache的狀態。如果對於同一份內存數據在多個核裡都有Cache,則狀態都為S(Shared)。
一旦有一核心改瞭這個數據(狀態變成瞭M),其他核心就能瞬間通過Ringbus感知到這個修改,從而把自己的Cache狀態變成I(Invalid),並且從標記為M的Cache中讀過來。同時,這個數據會被原子的寫回到主存。最終,Cache的狀態又會變為S。
關於MESI
協議更詳細的信息就不在本文中介紹瞭,在計算機操作系統和體系結構相關書籍和資料中有更詳細的介紹。
然後compxchg
這個指令就很簡單瞭,和我們之前提到的一樣,比較兩個地址中的值是否相等,如果相等的話那麼就修改。
Interlocked
類中的其它方法也是同樣的原理,我們可以看看Add
之類的方法,同樣是在對應的操作指令前加瞭lock
指令。
總結
本文主要是帶大傢看瞭下ObjectPool
的源碼,然後看瞭看ObjectPool
能實現無鎖線程安全的最大功臣Interlocked.CompareExchange
方法;然後通過匯編代碼瞭解瞭一下Interlocked
類中的一些方法是如何做到原子性的。
到此這篇關於從ObjectPool到CAS指令的文章就介紹到這瞭,更多相關ObjectPool到CAS指令內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!