詳解從ObjectPool到CAS指令

相信最近看過我的文章的朋友對於Microsoft.Extensions.ObjectPool不陌生;復用、池化是在很多高性能場景的優化技巧,它能減少內存占用率、降低GC頻率、提升系統TPS和降低請求時延。

那麼池化和復用對象意味著同一時間會有多個線程訪問池,去獲取和歸還對象,那麼這肯定就有並發問題。那ObjectPool在涉及多線程訪問資源應該怎麼做到線程安全呢?

今天就帶大傢通過學習ObjectPool的源碼聊一聊它是如何實現線程安全的。

源碼解析

ObjectPool的關鍵就在於兩個方法,一個是Get用於獲取池中的對象,另外就是Return用於歸還已經使用完的對象。我們先來簡單的看看ObjectPool的默認實現DefaultObjectPool.cs類的內容。

私有字段

先從它的私有變量開始,下面代碼中給出,並且註釋瞭其作用:

// 用於存放池化對象的包裝數組 長度為構造函數傳入的max - 1
// 為什麼 -1 是因為性能考慮把第一個元素放到 _firstItem中
private protected readonly ObjectWrapper[] _items;

// 池化策略 創建對象 和 回收對象的防范
private protected readonly IPooledObjectPolicy<T> _policy;

// 是否默認的策略 是一個IL優化 使編譯器生成call 而不是 callvirt
private protected readonly bool _isDefaultPolicy;

// 因為池化大多數場景隻會獲取一個對象 為瞭性能考慮 單獨整一個對象不放在數組中 
// 避免數組遍歷
private protected T? _firstItem;

// 這個類是在2.1中引入的,以盡可能地避免接口調用 也就是去虛擬化 callvirt
private protected readonly PooledObjectPolicy<T>? _fastPolicy;

構造方法

另外就是它的構造方法,默認實現DefaultObjectPool有兩個構造函數,代碼如下所示:

/// <summary>
/// Creates an instance of <see cref="DefaultObjectPool{T}"/>.
/// </summary>
/// <param name="policy">The pooling policy to use.</param>
public DefaultObjectPool(IPooledObjectPolicy<T> policy)
    : this(policy, Environment.ProcessorCount * 2)
{
    // 從這個構造方法可以看出,如果我們不指定ObjectPool的池大小
    // 那麼池大小會是當前可用的CPU核心數*2
}

/// <summary>
/// Creates an instance of <see cref="DefaultObjectPool{T}"/>.
/// </summary>
/// <param name="policy">The pooling policy to use.</param>
/// <param name="maximumRetained">The maximum number of objects to retain in the pool.</param>
public DefaultObjectPool(IPooledObjectPolicy<T> policy, int maximumRetained)
{
    _policy = policy ?? throw new ArgumentNullException(nameof(policy));

    // 是否為可以消除callvirt的策略
    _fastPolicy = policy as PooleObjectPolicy<T>;
    // 如上面備註所說 是否為默認策略 可以消除callvirt
    _isDefaultPolicy = IsDefaultPolicy();

    // 初始化_items數組 容量還剩一個在 _firstItem中
    _items = new ObjectWrapper[maximumRetained - 1];

    bool IsDefaultPolicy()
    {
        var type = policy.GetType();

        return type.IsGenericType && type.GetGenericTypeDefinition() == typeof(DefaultPooledObjectPolicy<>);
    }
}

Get 方法

如上文所說,Get()方法是ObjectPool中最重要的兩個方法之一,它的作用就是從池中獲取一個對象,它使用瞭CAS近似無鎖的指令來解決多線程資源爭用的問題,代碼如下所示:

public override T Get()
{
    // 先看_firstItem是否有值
    // 這裡使用瞭 Interlocked.CompareExchange這個方法
    // 原子性的判斷 _firstItem是否等於item
    // 如果等於那把null賦值給_firstItem
    // 然後返回_firstItem對象原始的值  反之就是什麼也不做
    var item = _firstItem;
    if (item == null || Interlocked.CompareExchange(ref _firstItem, null, item) != item)
    {

        var items = _items;
        // 遍歷整個數組
        for (var i = 0; i < items.Length; i++)
        {
            item = items[i].Element;
            // 通過原子性的Interlocked.CompareExchange嘗試讀取一個元素
            // 讀取成功則返回
            if (item != null && Interlocked.CompareExchange(ref items[i].Element, null, item) == item)
            {
                return item;
            }
        }

        // 如果遍歷整個沒有獲取到元素
        // 那麼走創建方法,創建一個
        item = Create();
    }

    return item;
}

上面代碼中,有一個點解釋一下Interlocked.CompareExchange(ref _firstItem, null, item) != item,其中!=item,如果其等於item就說明交換成功瞭,當前線程獲取到_firstItem元素的期間沒有其它線程修改_firstItem的值。

Return 方法

Retrun(T obj)方法是ObjectPool另外一個重要的方法,它的作用就是當程序代碼把從池中獲取的對象使用完以後,將其歸還到池中。同樣,它也使用CAS指令來解決多線程資源爭用的問題,代碼如下所示:

public override void Return(T obj)
{
    // 使用策略的Return方法對元素進行處理
    // 比如 List<T> 需要調用Claer方法清除集合內元素
    // StringBuilder之類的也需要調用Claer方法清除緩存的字符
    if (_isDefaultPolicy || (_fastPolicy?.Return(obj) ?? _policy.Return(obj)))
    {
        // 先嘗試將歸還的元素賦值到 _firstItem中
        if (_firstItem != null || Interlocked.CompareExchange(ref _firstItem, obj, null) != null)
        {
            var items = _items;
            // 如果 _firstItem已經存在元素
            // 那麼遍歷整個數組空間 找一個存儲為null的空位將對象存儲起來
            for (var i = 0; i < items.Length && Interlocked.CompareExchange(ref items[i].Element, obj, null) != null; ++i)
            {
            }
        }
    }
}

從核心的Get()Set()方法來看,其實整個代碼是比較簡單的,除瞭有一個_firstItem有一個簡單的優化,其餘沒有什麼特別的復雜的邏輯。

主要的關鍵就在Interlocked.CompareExchange方法上,我們在下文來仔細研究一下這個方法。

關於 Interlocked.CompareExchange

Interlocked.CompareExchange它實際上是一個CAS的實現,也就是Compare And Swap,從名字就可以看出來,它就是比較然後交換的意思。

從下面的代碼段我們也可以看出來,它總共需要三個參數。其特性就是隻有當localtion1 == comparand的時候才會將value賦值給localtion1,另外吧localtion1的原始值返回出來,這些操作都是原子性的。

// localtion1 需要比較的引用A
// value 計劃給引用A 賦的值
// comparand 和引用A比較的引用
public static T CompareExchange<T> (ref T location1, T value, T comparand) 
where T : class;

一個簡單的流程如下所示:

簡單的使用代碼如下所示:

var a = 1;
// a == 1的話就將其置為0
// 判斷是否成功就看返回的值是否為a的原始值
if(Interlocked.CompareExchange(ref a, 0, 1) == 1)
	Console.WriteLine("1.成功");
	
// 現在a已經變為0 這個交換不會成功
if(Interlocked.CompareExchange(ref a, 0, 1) == 1)
	Console.WriteLine("2.成功");

結果如下所示,隻有當a的原始值為1的時候,才會交換成功:

那麼Interlocked.CompareExchange是如何做到原子性的?在多核CPU中,數據可能在內存或者L1、L2、L3中(如下圖所示),我們如何保證能原子性的對某個數據進行操作?

實際上這是CPU提供的功能,如果查看過JIT編譯的結果,可以看到CompareExchange是由一條叫lock cmpxchgl的匯編指令支撐的。

其中lock是一個指令前綴,匯編指令被lock修飾後會成為"原子的",lock指令有兩種實現方法:

  • 早期 – Pentium時代(鎖總線),在Pentium及之前的處理器中,帶有lock前綴的指令在執行期間會鎖住總線,使得其它處理器暫時無法通過總線訪問內存,很顯然,這個開銷很大。
  • 現在 – P6以後時代(鎖緩存),在新的處理器中,Intel使用緩存鎖定來保證指令執行的原子性,緩存鎖定將大大降低lock前綴指令的執行開銷。

現在這裡的鎖緩存(Cache Locking)就是用瞭Ringbus + MESI協議。

MESI協議是 Cacheline 四種狀態的首字母的縮寫,分別是修改(Modified)態、獨占(Exclusive)態、共享(Shared)態和失效(Invalid)態。 Cache 中緩存的每個 Cache Line 都必須是這四種狀態中的一種。

修改態(Modified),如果該 Cache Line 在多個 Cache 中都有備份,那麼隻有一個備份能處於這種狀態,並且“dirty”標志位被置上。擁有修改態 Cache Line 的 Cache 需要在某個合適的時候把該 Cache Line 寫回到內存中。但是在寫回之前,任何處理器對該 Cache Line在內存中相對應的內存塊都不能進行讀操作。 Cache Line 被寫回到內存中之後,其狀態就由修改態變為共享態。

獨占態(Exclusive),和修改狀態一樣,如果該 Cache Line 在多個 Cache 中都有備份,那麼隻有一個備份能處於這種狀態,但是“dirty”標志位沒有置上,因為它是和主內存內容保持一致的一份拷貝。如果產生一個讀請求,它就可以在任何時候變成共享態。相應地,如果產生瞭一個寫請求,它就可以在任何時候變成修改態。

共享態(Shared),意味著該 Cache Line 可能在多個 Cache 中都有備份,並且是相同的狀態,它是和內存內容保持一致的一份拷貝,而且可以在任何時候都變成其他三種狀態。

失效態(Invalid),該 Cache Line 要麼已經不在 Cache 中,要麼它的內容已經過時。一旦某個Cache Line 被標記為失效,那它就被當作從來沒被加載到 Cache 中。

總得來說,若幹個CPU核心通過Ringbus連到一起。每個核心都維護自己的Cache的狀態。如果對於同一份內存數據在多個核裡都有Cache,則狀態都為S(Shared)。

一旦有一核心改瞭這個數據(狀態變成瞭M),其他核心就能瞬間通過Ringbus感知到這個修改,從而把自己的Cache狀態變成I(Invalid),並且從標記為M的Cache中讀過來。同時,這個數據會被原子的寫回到主存。最終,Cache的狀態又會變為S。

關於MESI協議更詳細的信息就不在本文中介紹瞭,在計算機操作系統和體系結構相關書籍和資料中有更詳細的介紹。

然後compxchg這個指令就很簡單瞭,和我們之前提到的一樣,比較兩個地址中的值是否相等,如果相等的話那麼就修改。

Interlocked類中的其它方法也是同樣的原理,我們可以看看Add之類的方法,同樣是在對應的操作指令前加瞭lock指令。

總結

本文主要是帶大傢看瞭下ObjectPool的源碼,然後看瞭看ObjectPool能實現無鎖線程安全的最大功臣Interlocked.CompareExchange方法;然後通過匯編代碼瞭解瞭一下Interlocked類中的一些方法是如何做到原子性的。

到此這篇關於從ObjectPool到CAS指令的文章就介紹到這瞭,更多相關ObjectPool到CAS指令內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: