JDK源碼之線程並發協調神器CountDownLatch和CyclicBarrier詳解
引言
那麼在程序的世界中是如何對這種協調關系進行描述的呢?今天就和大傢聊聊Java大神Doug Lea在並發包中如何通過CountDownLatch和CyclicBarrier實現任務協調的代碼描述。
CountDownLatch
我相信大傢都知道好代碼的一個重要特性就是代碼中類、變量等的命名可以做到顧名思義,也就是說看到命名就可以大概知道這個類或者變量表達瞭怎樣的業務語義。就拿 CountDownLatch 來說,它的命名形象的表示瞭其能力屬性,Count代表著計數,Down代表著計數器的遞減操作,而Latch表示計數器遞減後的結果動作。CountDownLatch結合起來的字面意思就是計數器遞減後打開門栓,通過後面內容的描述,回過頭來看大傢肯定會覺得這個命名十分之形象。
好瞭通過它的類的名稱,我們猜測瞭它的功能是通過計數器的遞減操作來控制線程,那我們再看看官方描述是不是這個意思。
/**
* A synchronization aid that allows one or more threads to wait until
* a set of operations being performed in other threads completes.
*
* <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
* The {@link #await await} methods block until the current count reaches
* zero due to invocations of the {@link #countDown} method, after which
* all waiting threads are released and any subsequent invocations of
* {@link #await await} return immediately. This is a one-shot phenomenon
* — the count cannot be reset. If you need a version that resets the
* count, consider using a {@link CyclicBarrier}.
*…
*/
上面註釋的大致意思就是CountDownLatch是一個線程同步器,它允許一個或者多個線程阻塞等待直到其他線程中業務執行完成。CountDownLatch可以通過一個計數器進行初始化,他可以讓那個等待的線程被阻塞,直到對應的計數器被置為0。當計數器置為0後,阻塞的線程被釋放。另外它是一個一次性使用的同步器,計數器無法被重置。
通過JDK的官方描述我們可以明確CountDownLatch三個核心特征:
1、它是一種線程同步器,用以協調線程的執行觸發時機;
2、它本質是一個計數器,是控制線程的號令槍;
3、它是一次性使用的,用完即失效。
知道瞭CountDownLatch是一個什麼東東之後,我們再一起來看下它的使用場景是什麼,我們在什麼樣的情況下可以使用它幫我們解決一些代碼中的問題。
使用場景
就像上文描述的,CountDownLatch就像是田徑賽場上裁判員發射的發令槍,所有參賽的選手準備就緒後,發令槍一響,所有運動員聞聲而動。那麼在Java多線程場景中,CountDownLatch就是線程協調者,它的計數器在沒有減為0之前。假設有這樣一個業務場景,在一個監控告警平臺中,需要從告警服務中查詢告警信息以及從工單服務中查詢工單信息,然後再分析哪些告警沒有轉工單。按照老系統的做法,參見如下簡化後的偽代碼:
List<Alarm> alarmList = alarmService.getAlarm(); List<WorkOrder> workOrderList = workOrderService.getWorkOrder(); List<Alarm> notTransferToWorkOrder = analysis(alarmList, workOrderList);
大傢能看出來這段偽代碼有什麼需要進行優化的地方嗎?我們來一起分析一下。這段代碼在數據量不大的時候可能沒什麼影響,但是一旦告警以及工單的數據量大的時候,獲取告警信息或者獲取工單信息都可能出現數據查詢慢的問題,那就會導致這個分析任務就會出現性能瓶頸的問題。那麼我們應該怎麼進行優化呢?從業務以及代碼我們可以看的出來,獲取告警信息以及獲取工單信息,實際上並沒有業務上面的耦合性,在上述代碼中他們是順序執行的,因此要進行性能優化,可以考慮將它們進行並行執行。
那麼修改優化後的偽代碼如下所示:
Executor executor = Executors.newFixedThreadPool(2); executor.execute(()-> { alarmList = alarmService.getAlarm(); }); executor.execute(()-> { workOrderList = workOrderService.getWorkOrder(); }); List<Alarm> notTransferToWorkOrder = analysis(alarmList, workOrderList);
我們通過使用線程池的方式,在獲取告警信息以及工單信息的時候並發執行,不再像之前的執行完獲取告警信息再執行獲取工單信息,這樣效率更高。但是這樣的實現方式還是存在問題,由於在線的線程中執行操作,並不知道其實際的執行結果,這就不好判斷執行數據分析的具體時機。這個時候CountDownLatch就派上用場瞭,利用它可以實現線程揀的等待,條件滿足後再放開執行後續的邏輯。這就好比公司組織團建,約定好瞭早上8點半在公司大門集合,那麼司機師傅肯定要等到所有參加團建的同時都到齊後才會發車。
使用CountDownLatch之後的偽代碼如下所示:
Executor executor = Executors.newFixedThreadPool(2); CountDownLatch latch = new CountDownLatch(2); executor.execute(()-> { alarmList = alarmService.getAlarm(); latch.countDown(); }); executor.execute(()-> { workOrderList = workOrderService.getWorkOrder(); latch.countDown(); }); latch.await(); List<Alarm> notTransferToWorkOrder = analysis(alarmList, workOrderList);
底層實現原理
初始化
在使用CountDownLatch之前我們得先進行初始化,在初始化的過程中實際做瞭兩件事情,一個是創建瞭一個AQS的同步隊列,另外一個是將AQS中的state設置成瞭count,這個state是AQS的核心變量(AQS是並發包的底層實現基礎,關於它的分析我們放到下一篇文章中進行)。
從代碼中我們可以看的出來實際創建瞭Sync內部類實例,而Sync繼承瞭AQS,同時重寫瞭AQS加鎖解鎖的方法,並通過Sync的對象,調用AQS的方法,阻塞線程的運行。Sync內部類的代碼如下所示,其中tryAcquireShared方法重寫瞭AQS的模板方法,主要用來獲取共享鎖,在CountDownLatch內部主要通過判斷獲取到的state的值是否為0來決定到底有沒有獲取到鎖。如果獲取到的state為0,則表示獲取鎖成功,此時線程不會阻塞,反之則獲取鎖失敗,線程會阻塞。
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } //嘗試加共享鎖(通過state判斷) protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //嘗試釋放共享鎖(通過state判斷) protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
計數器遞減
如上文場景中介紹的代碼,每個線程在執行完成自身業務後執行countDown操作,表示該線程已經準備完成。同時檢查count值是否為0。如果為0則需要喚醒所有等待的線程。如下代碼所示,實際上它調用的是父類AQS的releaseShared方法。
public void countDown() { sync.releaseShared(1); }
tryReleaseShared這個方法實際是進行嘗試釋放鎖的操作,如果此次count遞減為0,然後釋放所有的線程。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
大致的代碼執行邏輯可參見下圖:
阻塞線程
await的作用就是將當前線程阻塞住,直到count值減為0才會放開執行。它實際調用瞭內部類的tryAcquireSharedNanos方法,這個方法實際是Sync類的父類AQS中的方法。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
AQS提供瞭可以響應中斷的獲取公平鎖的實現的方式。tryAcquireShared在上文已經進行瞭介紹,該方法的作用是嘗試獲取共享鎖,如果獲取失敗,則線程將會被加入到AQS的同步隊列中進行等待,也就是所謂的線程阻塞。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
CyclicBarrier
我們還是從CyclicBarrier的字面意思來先進行理解,Cyclic是循環的意思而Barrier則表示柵欄、障礙的意思,字面的意思就是可循環的柵欄。還是老套路,在進行CyclicBarrier之前,我們先來看下JDK是怎麼描述的。
/**
* A synchronization aid that allows a set of threads to all wait for
* each other to reach a common barrier point. CyclicBarriers are
* useful in programs involving a fixed sized party of threads that
* must occasionally wait for each other. The barrier is called
* <em>cyclic</em> because it can be re-used after the waiting threads
* are released.
*
* <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command
* that is run once per barrier point, after the last thread in the party
* arrives, but before any threads are released.
* This <em>barrier action</em> is useful
* for updating shared-state before any of the parties continue.
*…
**/
通過JDK的描述,我們可以看得出來,CyclicBarrier也是一個線程同步協調器,用以協調一組進程的執行。當指定個數的線程到達柵欄後,可以放開柵欄,結束線程阻塞狀態。這麼看上去它和CountDownLatch作用差不多瞭,實際上還是有區別的,CyclicBarrier是可循環使用的,而CountDownLatch卻是一次性的。我們來看下CyclicBarrier的核心屬性。
//柵欄入口的鎖 private final ReentrantLock lock = new ReentrantLock(); //線程等待條件 private final Condition trip = lock.newCondition(); //攔截的線程數量 private final int parties; //在下一個柵欄代數到來前執行的任務 private final Runnable barrierCommand; //當前的柵欄代數 private Generation generation = new Generation();
CyclicBarrier 的源碼實現和 CountDownLatch 大同小異,CountDownLatch 基於 AQS 的共享模式的使用,而 CyclicBarrier 基於 Condition 來實現的。
CyclicBarrier內部維護瞭parties和count變量,parties表示每次參與到一個Generation中需要被攔截的線程數量,而count是內部計數器,在初始化的時候count與parties相等,當每次調用await方法的時候計數器count就會減1,這和上文中的countDown類似。
使用場景
還是以上文中的業務場景為例我們再分析一下,上文中我們通過CountDownLatch實現瞭查詢告警信息與查詢工單信息的線程協調問題,但是新的問題又出現瞭。因為告警信息和工單信息都是實時在產生的,而使用CountDownLatch的實現方式隻能完成一次的線程協調,後續產生的告警信息以及工單信息如果還有需要查詢到之後再進行數據分析的話,它就愛莫能助瞭。也就是說,如果需要進行持續的線程之間的互相等待完成之後再執行後續的業務操作的話,這個時候就需要使用CyclicBarrier 來實現我們的需求瞭。
底層實現原理
初始化
CyclicBarrier 存在兩種的構造函數,一種是構建CyclicBarrier 的時候指定每次需要進行協調的線程個數以及解除阻塞之後需要進行後續任務的執行,另一種隻是設置需要協調的線程個數不設置後續執行的任務。
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }
阻塞等待
對於CyclicBarrier 來說,其最核心的等待方法實現就是dowait方法,具體代碼如下所示:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; //如果count計算為0,則需要喚醒所有線程並進入到下一階段的線程協調期 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } //計數器不為0,繼續進行循環 for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
我們可以看到在dowait方法中進行瞭count的遞減操作,檢查count的值是否為0,如果在初始化的時候定義好瞭要執行的任務,那麼在count為0的時候就進行任務執行,任務執行完成之後調用nextGeneration進行下一次的線程協調周期,同時喚醒所有線程並重置計數器。
總結
本文分別從使用場景以及底層實現的角度分別介紹瞭線程同步協調神器CountDownLatch和CyclicBarrier,雖然它們都可以起到協調線程的作用但是實際上它們還是有區別的。CountDownLatch比較適合一個線程與其他多個線程之間的同步協調場景,而CyclicBarrier則適合一組線程之間的互相等待。另外CountDownLatch是一次性產品,而CyclicBarrier的計數器是可以重復使用的,可以進行自動重置計數器。
到此這篇關於JDK源碼之線程並發協調神器CountDownLatch和CyclicBarrier詳解的文章就介紹到這瞭,更多相關Java 線程並發協調內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- 解析阿裡一面CyclicBarrier和CountDownLatch的區別
- Java多線程同步工具類CyclicBarrier的使用
- Java中CyclicBarrier和CountDownLatch的用法與區別
- java多線程之並發工具類CountDownLatch,CyclicBarrier和Semaphore
- Java CountDownLatch的源碼硬核解析