Java源碼解析之ConcurrentHashMap

早期 ConcurrentHashMap,其實現是基於:

  • 分離鎖,也就是將內部進行分段(Segment),裡面則是 HashEntry 的數組,和 HashMap 類似,哈希相同的條目也是以鏈表形式存放。
  • HashEntry 內部使用 volatile 的 value 字段來保證可見性,也利用瞭不可變對象的機制以改進利用 Unsafe 提供的底層能力,比如 volatile access,去直接完成部分操作,以最優化性能,畢竟 Unsafe 中的很多操作都是 JVM intrinsic 優化過的。

在進行並發操作的時候,隻需要鎖定相應段,這樣就有效避免瞭類似 Hashtable 整體同步的問題,大大提高瞭性能。

Put操作

通過二次哈希避免哈希沖突,然後以 Unsafe 調用方式,直接獲取相應的 Segment,然後進行線程安全的 put 操作

public V put(K key, V value) {
 
        Segment<K,V> s;
 
        if (value == null)
 
            throw new NullPointerException();
 
        // 二次哈希,以保證數據的分散性,避免哈希沖突
 
        int hash = hash(key.hashCode());
 
        int j = (hash >>> segmentShift) & segmentMask;
 
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
 
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
 
            s = ensureSegment(j);
 
        return s.put(key, hash, value, false);
 
    }

其核心邏輯實現在下面的內部方法中:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
 
            // scanAndLockForPut 會去查找是否有 key 相同 Node
 
            // 無論如何,確保獲取鎖
 
            HashEntry<K,V> node = tryLock() ? null :
 
                scanAndLockForPut(key, hash, value);
 
            V oldValue;
 
            try {
 
                HashEntry<K,V>[] tab = table;
 
                int index = (tab.length - 1) & hash;
 
                HashEntry<K,V> first = entryAt(tab, index);
 
                for (HashEntry<K,V> e = first;;) {
 
                    if (e != null) {
 
                        K k;
 
                        // 更新已有 value...
 
                    }
 
                    else {
 
                        // 放置 HashEntry 到特定位置,如果超過閾值,進行 rehash
 
                        // ...
 
                    }
 
                }
 
            } finally {
 
                unlock();
 
            }
 
            return oldValue;
 
        }

在寫的時候:

  • ConcurrentHashMap 會獲取再入鎖,以保證數據一致性,Segment 本身就是基於 ReentrantLock 的擴展實現,所以,在並發修改期間,相應 Segment 是被鎖定的。
  • 在最初階段,進行重復性的掃描,以確定相應 key 值是否已經在數組裡面,進而決定是更新還是放置操作。
  • 在 ConcurrentHashMap 中解決擴容的問題,不是整體的擴容,而是單獨對 Segment 進行擴容。
  • 為瞭減少鎖定segment的開銷,ConcurrentHashMap 的實現是通過重試機制(RETRIES_BEFORE_LOCK,指定重試次數 2),來試圖獲得可靠值。如果沒有監控到發生變化(通過對比 Segment.modCount),就直接返回,否則獲取鎖進行操作。

機制在Java 8 上的變化:

  • 總體結構上,它的內部存儲與HashMap 結構非常相似,同樣是大的桶(bucket)數組,然後內部也是一個個所謂的鏈表結構(bin),同步的粒度要更細致一些。
  • 其內部仍然有 Segment 定義,但僅僅是為瞭保證序列化時的兼容性而已,不再有任何結構上的用處。
  • 因為不再使用 Segment,初始化操作大大簡化,修改為 lazy-load 形式,這樣可以有效避免初始開銷。
  • 數據存儲利用 volatile 來保證可見性。
  • 使用 CAS (Compare And Swap)等操作,在特定場景進行無鎖並發操作。
  • 使用 Unsafe、LongAdder 之類底層手段,進行極端情況的優化。

看看在java8上的put操作

final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException();
 
    int hash = spread(key.hashCode());
 
    int binCount = 0;
 
    for (Node<K,V>[] tab = table;;) {
 
        Node<K,V> f; int n, i, fh; K fk; V fv;
 
        if (tab == null || (n = tab.length) == 0)
 
            tab = initTable();
 
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
 
            // 利用 CAS 去進行無鎖線程安全操作,如果 bin 是空的
 
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
 
                break;
 
        }
 
        else if ((fh = f.hash) == MOVED)
 
            tab = helpTransfer(tab, f);
 
        else if (onlyIfAbsent // 不加鎖,進行檢查
 
                 && fh == hash
 
                 && ((fk = f.key) == key || (fk != null && key.equals(fk)))
 
                 && (fv = f.val) != null)
 
            return fv;
 
        else {
 
            V oldVal = null;
 
            synchronized (f) {
 
                   // 細粒度的同步修改操作...
 
                }
 
            }
 
            // Bin 超過閾值,進行樹化
 
            if (binCount != 0) {
 
                if (binCount >= TREEIFY_THRESHOLD)
 
                    treeifyBin(tab, i);
 
                if (oldVal != null)
 
                    return oldVal;
 
                break;
 
            }
 
        }
 
    }
 
    addCount(1L, binCount);
 
    return null;
 
}

初始化操作實現在 initTable 裡面,這是一個典型的 CAS 使用場景,利用 volatile 的 sizeCtl 作為互斥手段:如果發現競爭性的初始化,就 spin 在那裡,等待條件恢復;否則利用 CAS 設置排他標志。如果成功則進行初始化;否則重試。

private final Node<K,V>[] initTable() {
 
    Node<K,V>[] tab; int sc;
 
    while ((tab = table) == null || tab.length == 0) {
 
        // 如果發現沖突,進行 spin 等待
 
        if ((sc = sizeCtl) < 0)
 
            Thread.yield();
 
        // CAS 成功返回 true,則進入真正的初始化邏輯
 
        else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
 
            try {
 
                if ((tab = table) == null || tab.length == 0) {
 
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
 
                    @SuppressWarnings("unchecked")
 
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
 
                    table = tab = nt;
 
                    sc = n - (n >>> 2);
 
                }
 
            } finally {
 
                sizeCtl = sc;
 
            }
 
            break;
 
        }                                                          
 
    }
 
    return tab;
 
}

當 bin 為空時,同樣是沒有必要鎖定,也是以 CAS 操作去放置。

到此這篇關於Java源碼解析之ConcurrentHashMap的文章就介紹到這瞭,更多相關Java ConcurrentHashMap內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: