淺談Java源碼ConcurrentHashMap

一、記錄形式

打算直接把過程寫在源碼中,會按序進行註釋,查閱的時候可以按序號隻看註釋部分

二、ConcurrentHashMap

直接模擬該類的使用過程,從而一步步看其怎麼運作的吧,當然最好還是帶著問題一遍思考一遍總結會比較好,我閱讀源碼的時候帶著以下幾個問題

  • 並發體現在哪裡?怎麼保證線程安全的
  • 怎麼擴容的?擴容是怎麼保證線程安全的?
  • 怎麼put的?put是怎麼保證線程安全的?
  • 用瞭哪些鎖?這些鎖的作用是什麼?
  • 需要留意哪些關鍵點?

我們最簡單地使用方法是怎麼樣的?

  • new一個ConcurrentHashMap對象
  • 調用put方法放入k-v對
  • 調用get方法獲取k-v對

那麼很顯然,考慮隻有在進行修改與更新時需要考慮並發,所以我關註的重點在put方法與擴容上

首先new一個對象時,我們傳參入一個size,調用其隻有一個參數的構造器

public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        // 1、判斷傳入的大小是否超過最大值的一半,若超過則取最大值
        // 2、若沒超過,則調用tableSizeFor
        // 3、tableSizeFor可以讓size為2的倍數,為什麼要是2的倍數呢?因為對hash取模的時候
        // 用的是位運算,隻有size為2的倍數才能這麼做
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
// 1、判斷傳入的大小是否超過最大值的一半,若超過則取最大值
// 2、若沒超過,則調用tableSizeFor
// 3、tableSizeFor讓size為2的倍數,為什麼要是2的倍數呢?因為對hash取模的時候
// 用的是位運算,隻有size為2的倍數才能這麼做

註意,此時並沒有創建map數據結構,所以ConcurrentHashMap是懶惰創建的

接著我們調用put方法放入數據,put方法調的putVal

final V putVal(K key, V value, boolean onlyIfAbsent) {
   		// 1、k-v都不能為空,不然拋異常
        if (key == null || value == null) throw new NullPointerException();
        // 2、獲取key的hashcode的hash值
        int hash = spread(key.hashCode());
        // 3、使用binCount來統計鏈表有多少個元素的,便於後面判斷是否需要變成紅黑樹
        int binCount = 0;
        // 4、創建tab臨時變量,並賦值為table,由於還沒初始化,值為null
        // 這裡註意瞭,table的類型是Node數組,這個Node其實就是Map.Entry<K,V>
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // 5、判斷tab為空後才進行初始化,初始化完成後重新進入循環
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            // 6、此時已經初始化好瞭,可以開始插入數據。
            // 這裡用tabAt(tab,i)獲取tab的第i個下標上的鏈表指針
            // 註意瞭,(n-1)& hash其實就是hash%n,隻有n為2的次方才能這麼做
            // 位運算可以提升效率
            // 7、f就是獲取到的第i個位置的鏈表頭指針
            // 如果為null說明什麼都沒有,可以嘗試插入元素
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            	// 8、考慮插入那就要考慮並發瞭,casTab表示用cas方法進行插入
            	// 插入一個新的Node結點,這個是能夠保證線程安全的
            	// 我們知道保證線程安全除瞭用cas之外還不夠,還需要保證操作對象的可見性
            	// 在這裡是對tab進行操作,tab在前面用table賦值,而table是加瞭volatile的
            	// 所以沒毛病哈
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 9、如果f不為空,並且f.hash==MOVED(-1),說明當前這個位置正在被移動
            // 說明有線程在擴容,那麼當然不能這時候還去插入瞭,這裡調用helpTransfer去幫助擴容
            // 註意瞭,這意味著擴容時是一個一個位置來移動的,每移動一個就將f.hash改成MOVED
            // 也就意味著如果當前線程想要操作的位置還沒有被移動時是可以操作的,這使得並發度更高瞭
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            // 10、如果f.hash表示沒有被移動,且f不為null說明可以這個位置已經有東西瞭
            // 需要對其遍歷,並找到合適的位置插入
            else {
                V oldVal = null;
                // 11、由於要進行插入瞭,所以得加鎖,註意瞭哈,這裡用的synchronized
                // 並且鎖住的是當前位置對象(不一定是鏈表也可能是樹)
                // 意味著除瞭當前線程,其他線程都不準操作瞭哈
                // 如果這時候正在擴容的線程擴到這裡瞭,會被阻塞的哈
                synchronized (f) {
                	// 確定f為起點
                    if (tabAt(tab, i) == f) {
                    	// 12、判斷f.hash是大於0,大於0表示當前這個東西是鏈表
                    	// 下面執行鏈表的更新操作
                        if (fh >= 0) {
                            binCount = 1;
                            // 13、接著就是具體的遍歷鏈表,查找是否對應值是否存在
                            // 如果遍歷完都找不到,那麼就在尾部插入新的結點
                            // 註意瞭哈這就是尾插
                            // 14、同時,每遍歷一個結點還要累加binCount
                            // 即統計一下當前鏈表個數,便於後面轉紅黑樹判斷
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 13、如果f對應的是樹結構,那就執行樹的更新方法
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 14、前面說瞭哈,binCount就是鏈表元素個數,接著就判斷是否大於閾值
                // 大於則轉樹,可以看這個閾值TREEIFY_THRESHOLD=8
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 15、addCount是讓size+1的
        // 這裡要註意,加法也是分多步的,需要先get才能+,因此為瞭保證線程安全也是需要用cas的
        // 這裡加size的方式並不是直接往size上加,因為size是經常修改的,如果經常訪問的話效率很低
        // 那麼做法和LongAdder這一原子累加器類似,用一個CountCell數組,每個線程隻操作數組中的
        // 某一個Cell,最後匯總即可
        addCount(1L, binCount);
        return null;
    }

總結一下put的過程

1.先判斷map是否創建,沒創建則先創建,結構為 Node<K,V>[ ] extend Map.Entry

2.接著找key應該放在哪個位置 i

3.判斷該位置是否為空,為空則使用CAS插入一個新的Node

4.不為空則判斷當前位置狀態是否為MOVED,是則說明當前位置正在被其他線程改動,當前線程需要幫助完成移動

5.不為空且不為MOVED,則判斷是鏈表還是樹,分別執行對應的更新方法

6.如果為鏈表

  • 先對鏈表上鎖,用syn
  • 則遍歷並查找是否已存在
  • 找到最後都不存在則直接尾插
  • 同時統計鏈表上的元素

7.判斷鏈表元素是否超過變成樹的閾值 8 ,超過則直接變成樹,變成樹也是加syn

8.使用addCount更新size,更新方式類似LongAdder

三、關鍵點

  • 懶惰加載map
  • 對當前位置操作之前需要判斷當前位置的存儲的內容是否被其他線程移動瞭,如果被移動則先去幫助完成移動

執行擴容移動的線程是挨個移動每個位置的鏈表,移動前會先將當前位置的狀態用CAS改成MOVED

註意瞭這個是提升並發度的關鍵所在

因為插入和移動(擴容)的粒度是細化到每個位置的鏈表上

意味著擴容和插入可以同時進行

意味著插入時上鎖後,擴容線程執行到該位置需要阻塞

而不是直接整個map都鎖住

  • 當前位置沒內容時,通過CAS插入新Node,而操作鏈表時用的是syn

如果面試問到ConcurrentHashMap中用瞭什麼鎖就心中有數瞭

  • 更新size的時候用的是LongAdder類似的方法

有一個CountCell數組,每個線程更新後,對數組中的某個Cell+1

如果沒有競爭則隻有一個baseCell,對其+1

統計size時匯總即可

再細化一下前面的流程
思考初始化map的時候怎麼保證線程安全?防止多個線程同時初始化?
答案在initTable方法中

  • 可以看到,sizeCtl如果是負數說明正在擴容或者初始化
  • 因此當需要初始化時,就去CAS地改變sizeCtl,將其變為負數
  • 哪個線程CAS成功,則可以執行初始化,這就保證瞭線程安全
  • 可以再去看看,sizeCtl是volatile修飾的哈
  • 並且SIZECTL是sizeCtl的offset,這些都是原子類類似的東西瞭
private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

get方法就不說瞭,因為不涉及並發,就是查找而已
感覺差不多瞭,把put方法搞清楚瞭,ConcurrentHashMap怎麼解決線程安全的也清楚瞭,並且並發關鍵點在哪也清楚瞭

到此這篇關於淺談Java源碼ConcurrentHashMap的文章就介紹到這瞭,更多相關Java ConcurrentHashMap內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: