解析阿裡一面CyclicBarrier和CountDownLatch的區別
引言
前面一篇文章我們《Java線程並發工具類CountDownLatch原理及用法》它有一個缺點,就是它的計數器隻能夠使用一次,也就是說當計數器(state
)減到為 0
的時候,如果 再有線程調用去 await
() 方法,該線程會直接通過,不會再起到等待其他線程執行結果起到同步的作用。為瞭解決這個問題CyclicBarrier
就應運而生瞭。
什麼是CyclicBarrier
CyclicBarrier
是什麼?把它拆開來翻譯就是循環(Cycle
)和屏障(Barrier
)
它的主要作用其實和CountDownLanch
差不多,都是讓一組線程到達一個屏障時被阻塞,直到最後一個線程到達屏障時,屏障會被打開,所有被屏障阻塞的線程才會繼續執行,不過它是可以循環執行的,這是它與CountDownLanch
最大的不同。CountDownLanch
是隻有當最後一個線程把計數器置為0
的時候,其他阻塞的線程才會繼續執行。學習CyclicBarrier
之前建議先去看看這幾篇文章:
《Java高並發編程基礎之AQS》
《Java高並發編程基礎三大利器之Semaphore》
《Java高並發編程基礎三大利器之CountDownLatch》
如何使用
我們首先先來看下關於使用CyclicBarrier
的一個demo
:比如遊戲中有個關卡的時候,每次進入下一關的時候都需要進行加載一些地圖、特效背景音樂什麼的隻有全部加載完瞭才能夠進行遊戲:
public class CyclicBarrierExample { static class PreTaskThread implements Runnable { private String task; private CyclicBarrier cyclicBarrier; public PreTaskThread(String task, CyclicBarrier cyclicBarrier) { this.task = task; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { for (int i = 0; i < 4; i++) { Random random = new Random(); try { Thread.sleep(random.nextInt(1000)); System.out.println(String.format("關卡 %d 的任務 %s 完成", i, task)); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> { System.out.println("本關卡所有的前置任務完成,開始遊戲... ..."); }); new Thread(new PreTaskThread("加載地圖數據", cyclicBarrier)).start(); new Thread(new PreTaskThread("加載人物模型", cyclicBarrier)).start(); new Thread(new PreTaskThread("加載背景音樂", cyclicBarrier)).start(); } } }
輸出結果如下:
我們可以看到每次遊戲開始都會等當前關卡把遊戲的人物模型,地圖數據、背景音樂加載完成後才會開始進行遊戲。並且還是可以循環控制的。
源碼分析
結構組成
/** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation();
- lock:用於保護屏障入口的鎖
- trip :達到屏障並且不能放行的線程在trip條件變量上等待
- parties :柵欄開啟需要的到達線程總數barrierCommand:最後一個線程到達屏障後執行的回調任務
- generation:這是一個內部類,通過它實現
CyclicBarrier
重復利用,每當await
達到最大次數的時候,就會重新new
一個,表示進入瞭下一個輪回。裡面隻有一個boolean
型屬性,用來表示當前輪回是否有線程中斷。
主要方法
await
方法
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } /** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { //獲取barrier當前的 “代”也就是當前循環 final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 每來一個線程調用await方法都會進行減1 int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; // new CyclicBarrier 傳入 的barrierCommand, command.run()這個方法是同步的,如果耗時比較多的話,是否執行的時候需要考慮下是否異步來執行。 if (command != null) command.run(); ranAction = true; // 這個方法1. 喚醒所有阻塞的線程,2. 重置下count(count 每來一個線程都會進行減1)和generation,以便於下次循環。 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { // 進入if條件,說明是不帶超時的await if (!timed) // 當前線程會釋放掉lock,然後進入到trip條件隊列的尾部,然後掛起自己,等待被喚醒。 trip.await(); else if (nanos > 0L) //說明當前線程調用await方法時 是指定瞭 超時時間的! nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //Node節點在 條件隊列內 時 收到中斷信號時 會拋出中斷異常! //g == generation 成立,說明當前代並沒有變化。 //! g.broken 當前代如果沒有被打破,那麼當前線程就去打破,並且拋出異常.. if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. //執行到else有幾種情況? //1.代發生瞭變化,這個時候就不需要拋出中斷異常瞭,因為 代已經更新瞭,這裡喚醒後就走正常邏輯瞭..隻不過設置下 中斷標記。 //2.代沒有發生變化,但是代被打破瞭,此時也不用返回中斷異常,執行到下面的時候會拋出 brokenBarrier異常。也記錄下中斷標記位。 Thread.currentThread().interrupt(); } } //喚醒後,執行到這裡,有幾種情況? //1.正常情況,當前barrier開啟瞭新的一代(trip.signalAll()) //2.當前Generation被打破,此時也會喚醒所有在trip上掛起的線程 //3.當前線程trip中等待超時,然後主動轉移到 阻塞隊列 然後獲取到鎖 喚醒。 if (g.broken) throw new BrokenBarrierException(); //喚醒後,執行到這裡,有幾種情況? //1.正常情況,當前barrier開啟瞭新的一代(trip.signalAll()) //2.當前線程trip中等待超時,然後主動轉移到 阻塞隊列 然後獲取到鎖 喚醒。 if (g != generation) return index; //喚醒後,執行到這裡,有幾種情況? //.當前線程trip中等待超時,然後主動轉移到 阻塞隊列 然後獲取到鎖 喚醒。 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
小結
到瞭這裡我們是不是可以知道為啥CyclicBarrier
可以進行循環計數?
CyclicBarrier
采用一個內部類Generation
來維護當前循環,每一個await
方法都會存儲當前的generation
,獲取到相同generation
對象的屬於同一組,每當count
的次數耗盡就會重新new
一個Generation
並且重新設置count
的值為parties
,表示進入下一次新的循環。
從這個await
方法我們是不是可以知道隻要有一個線程被中斷瞭,當代的 generation
的broken
就會被設置為true
,所以會導致其他的線程也會被拋出BrokenBarrierException
。相當於一個失敗其他也必須失敗,感覺有“強一致性“的味道。
總結
CountDownLanch
是為計數器是設置一個值,當多次執行countdown
後,計數器減為0
的時候所有線程被喚醒,然後CountDownLanch
失效,隻能夠使用一次。
CyclicBarrier
是當count
為0
時同樣喚醒全部線程,同時會重新設置count
為parties
,重新new
一個generation
來實現重復利用。
結束
- 由於自己才疏學淺,難免會有紕漏,假如你發現瞭錯誤的地方,還望留言給我指出來,我會對其加以修正。
- 如果你覺得文章還不錯,你的轉發、分享、贊賞、點贊、留言就是對我最大的鼓勵。感
- 謝您的閱讀,十分歡迎並感謝您的關註。
巨人的肩膀摘蘋果
https://javajr.cn/
http://www.360doc.com/content/20/0812/08/55930996_929792021.shtml
https://www.cnblogs.com/xxyyy/p/12958160.html
到此這篇關於阿裡一面CyclicBarrier和CountDownLatch的區別是啥的文章就介紹到這瞭,更多相關CyclicBarrier和CountDownLatch的區別內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Java多線程同步工具類CyclicBarrier的使用
- JDK源碼之線程並發協調神器CountDownLatch和CyclicBarrier詳解
- java多線程之並發工具類CountDownLatch,CyclicBarrier和Semaphore
- Java中CyclicBarrier 循環屏障
- Java並發編程之詳解CyclicBarrier線程同步