Java並發編程之CountDownLatch源碼解析
一、前言
CountDownLatch維護瞭一個計數器(還是是state字段),調用countDown方法會將計數器減1,調用await方法會阻塞線程直到計數器變為0。可以用於實現一個線程等待所有子線程任務完成之後再繼續執行的邏輯,也可以實現類似簡易CyclicBarrier的功能,達到讓多個線程等待同時開始執行某一段邏輯目的。
二、使用
- 一個線程等待其它線程執行完再繼續執行
...... CountDownLatch cdl = new CountDownLatch(10); ExecutorService es = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { es.execute(() -> { //do something cdl.countDown(); }); } cdl.await(); ......
- 實現類似CyclicBarrier的功能,先await,再countDown
...... CountDownLatch cdl = new CountDownLatch(1); ExecutorService es = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { es.execute(() -> { cdl.await(); //do something }); } Thread.sleep(10000L); cdl.countDown(); ......
三、源碼分析
CountDownLatch的結構和ReentrantLock、Semaphore的結構類似,也是使用的內部類Sync繼承AQS的方式,並且重寫瞭tryAcquireShared和tryReleaseShared方法。
還是首先來看構造函數:
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
需要傳入一個大於0的count,代表CountDownLatch計數器的初始值,通過Sync的構造函數最終賦值給父類AQS的state字段。可一個看到這個state字段用法多多,在ReentrantLock中使用0和1來標識鎖的狀態,Semaphore中用來標識信號量,此處又用來表示計數器。
CountDownLatch要通過await方法完成阻塞,先來看看這個方法是如何實現的:
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
調用的是sync的acquireSharedInterruptibly方法,該方法定義在AQS中,Semaphore也調用的這個方法:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
這個方法的邏輯前面在解析SemaPhore的時候細說過瞭,這裡不再贅述,主要就是兩個方法的調用,先通過tryAcquireShared方法嘗試獲取”許可”,返回值代表此次獲取後的剩餘量,如果大於等於0表示獲取成功,否則表示失敗。如果失敗,那麼就會進入doAcquireSharedInterruptibly方法執行入隊阻塞的邏輯。這裡我們主要到CountDownLatch中看看tryAcquireShared方法的實現:
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
和Semaphore的實現中每次將state減去requires不同,這裡直接判斷state是否為0,如果為0那麼返回1,表示獲取”許可”成功;如果不為0,表示失敗,則需要入隊阻塞。從這個tryAcquireShared方法就能看出CountDownLatch的邏輯瞭:等到state變為瞭0,那麼所有線程都能獲取運行許可。
那麼我們接下來來到countDown方法:
public void countDown() { sync.releaseShared(1); }
調用的是sync的releaseShared方法,該方法定義在父類AQS中,Semaphore使用的也是這個方法:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //當state從非 doReleaseShared(); return true; } return false; }
前面提到瞭CountDownLatch也重寫瞭tryReleaseShared方法:
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) //如果state等於0瞭直接返回false //保證在並發情況下,最多隻會有一個線程返回true //也包括調用countDown的次數超過state的初始值 return false; int nextc = c-1; if (compareAndSetState(c, nextc)) //如果返回true,表示state從非0變為瞭0 //那麼後續需要喚醒阻塞線程 return nextc == 0; } }
Semaphore在釋放信號量的時候,是將獲取的許可歸還到state中,但是CountDownLatch沒有獲取許可的邏輯(獲取許可的時候是判斷state是否等於0),所以在countDown的時候沒有釋放的邏輯,就是將state減1,然後根據state減1之後的值是否為0判斷release是否成功,如果state本來大於0,經過減1之後變為瞭0,那麼返回true。tryReleaseShared方法的返回值決定瞭後續需不需要調用doReleaseShared方法喚醒阻塞線程。
這裡有個邏輯:如果state已經為0,那麼返回false。這個主要應對兩種情況:
- 調用countDown的次數超過瞭state的初始值多
- 線程並發調用的時候保證隻有一個線程去完成阻塞線程的喚醒操作
可以看到CountDownLatch沒有鎖的概念,countDown方法可以被一個線程重復調用,隻需要對state做reduce操作,而不用關心是誰做的reduce。如果tryReleaseShared返回true,那麼表示需要在後面進入doReleaseShared方法,該方法和Semaphore中調用的方法是同一個,主要是喚醒阻塞線程或者設置PROPAGAGE狀態,這裡也不再贅述~
阻塞線程被喚醒之後,會在doAcquireSharedInterruptibly方法中繼續循環,雖然和Semaphore調用的是同樣的方法,但是這裡有不一樣的地方,所以還是提一句。我們首先回到doAcquireSharedInterruptibly方法:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { //如果head.next被unpark喚醒,說明此時state==0 //那麼tryAcquireShared會返回1 int r = tryAcquireShared(arg); //r==1 if (r >= 0) { //node節點被喚醒後,還會繼續喚醒node.next //這樣依次傳遞,因為在這裡的r一定為1 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
當head.next線程被unpark喚醒後,會進入tryAcquireShared方法判斷,由於此時state已經為0(隻有當state變為0時,才會unpark喚醒線程),而前面提到瞭在CountDownLatch重寫的tryAcquireShared中,如果state==0,那麼會返回1,所以會進入setHeadAndPropagate方法:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
該方法在Semaphore中詳細介紹過,這裡我們就站在CountDownLatch的角度來看看。其實很簡單瞭,註意此時該方法的propagate參數值是1,那麼就會進入到下面的if邏輯裡,繼續喚醒下一個node。當下一個node對應的線程被喚醒後,同樣會進入setHeadAndPropagate方法,propagage同樣為1,那麼繼續喚醒下一個node,就這樣依次將整個CLH隊列的節點都喚醒。
四、總結
如果單獨把CountDownLatch拿出來看其實是很復雜的,隻是CountDownLatch(包括Semaphore和ReentrantLock)都高度共用瞭AQS提供的一些方法,而這些方法在前面介紹Semaphore和ReentrantLock的時候已經詳細分析過,所以到本文分析CountDownLatch的時候,隻需要關註它內部類Sync重寫的兩個方法:tryAcquireShared和tryReleaseShared,也就是”獲取許可”和”釋放許可”的邏輯。
CountDownLatch在await的邏輯裡,如果當前state的值大於0,那麼會進入CLH隊列進行阻塞等待unpark喚醒(或者中斷喚醒);在countDown的邏輯裡,就是簡單的將state-1,如果一個線程把state從1減為0,那麼該線程就會負責喚醒head.next節點,head.next節點被喚醒後,又會在setHeadAndPropagate方法中喚醒next.next節點,這樣依次喚醒所有CLH隊列中的阻塞節點。當然,如果線程被中斷喚醒,那麼也會進入cancelAcquire中進行無效節點的移除邏輯。
到此這篇關於Java並發編程之CountDownLatch源碼解析的文章就介紹到這瞭,更多相關Java中CountDownLatch源碼解析內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- None Found