解析阿裡一面CyclicBarrier和CountDownLatch的區別

引言

前面一篇文章我們《Java線程並發工具類CountDownLatch原理及用法》它有一個缺點,就是它的計數器隻能夠使用一次,也就是說當計數器(state)減到為 0的時候,如果 再有線程調用去 await() 方法,該線程會直接通過,不會再起到等待其他線程執行結果起到同步的作用。為瞭解決這個問題CyclicBarrier就應運而生瞭。

什麼是CyclicBarrier

CyclicBarrier是什麼?把它拆開來翻譯就是循環(Cycle)和屏障(Barrier

它的主要作用其實和CountDownLanch差不多,都是讓一組線程到達一個屏障時被阻塞,直到最後一個線程到達屏障時,屏障會被打開,所有被屏障阻塞的線程才會繼續執行,不過它是可以循環執行的,這是它與CountDownLanch最大的不同。CountDownLanch是隻有當最後一個線程把計數器置為0的時候,其他阻塞的線程才會繼續執行。學習CyclicBarrier之前建議先去看看這幾篇文章:

《Java高並發編程基礎之AQS》

《Java高並發編程基礎三大利器之Semaphore》

《Java高並發編程基礎三大利器之CountDownLatch》

如何使用

我們首先先來看下關於使用CyclicBarrier的一個demo:比如遊戲中有個關卡的時候,每次進入下一關的時候都需要進行加載一些地圖、特效背景音樂什麼的隻有全部加載完瞭才能夠進行遊戲:

public class CyclicBarrierExample {
 static class PreTaskThread implements Runnable {
 private String task;
 private CyclicBarrier cyclicBarrier;

 public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
  this.task = task;
  this.cyclicBarrier = cyclicBarrier;
 }

 @Override
 public void run() {
  for (int i = 0; i < 4; i++) {
  Random random = new Random();
  try {
   Thread.sleep(random.nextInt(1000));
   System.out.println(String.format("關卡 %d 的任務 %s 完成", i, task));
   cyclicBarrier.await();
  } catch (InterruptedException | BrokenBarrierException e) {
   e.printStackTrace();
  }
  }
 }

 public static void main(String[] args) {
  CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
  System.out.println("本關卡所有的前置任務完成,開始遊戲... ...");
  });
  new Thread(new PreTaskThread("加載地圖數據", cyclicBarrier)).start();
  new Thread(new PreTaskThread("加載人物模型", cyclicBarrier)).start();
  new Thread(new PreTaskThread("加載背景音樂", cyclicBarrier)).start();
 }
 }
}

輸出結果如下:

我們可以看到每次遊戲開始都會等當前關卡把遊戲的人物模型,地圖數據、背景音樂加載完成後才會開始進行遊戲。並且還是可以循環控制的。

源碼分析

結構組成

 /** The lock for guarding barrier entry */
 private final ReentrantLock lock = new ReentrantLock();
 /** Condition to wait on until tripped */
 private final Condition trip = lock.newCondition();
 /** The number of parties */
 private final int parties;
 /* The command to run when tripped */
 private final Runnable barrierCommand;
 /** The current generation */
 private Generation generation = new Generation();
  • lock:用於保護屏障入口的鎖
  • trip :達到屏障並且不能放行的線程在trip條件變量上等待
  • parties :柵欄開啟需要的到達線程總數barrierCommand:最後一個線程到達屏障後執行的回調任務
  • generation:這是一個內部類,通過它實現CyclicBarrier重復利用,每當await達到最大次數的時候,就會重新new 一個,表示進入瞭下一個輪回。裡面隻有一個boolean型屬性,用來表示當前輪回是否有線程中斷。

主要方法

await方法

 public int await() throws InterruptedException, BrokenBarrierException {
 try {
  return dowait(false, 0L);
 } catch (TimeoutException toe) {
  throw new Error(toe); // cannot happen
 }
 }
 /**
 * Main barrier code, covering the various policies.
 */
 private int dowait(boolean timed, long nanos)
 throws InterruptedException, BrokenBarrierException,
  TimeoutException {
 final ReentrantLock lock = this.lock;
 lock.lock();
  try {
  //獲取barrier當前的 “代”也就是當前循環
  final Generation g = generation;
  if (g.broken)
  throw new BrokenBarrierException();

  if (Thread.interrupted()) {
  breakBarrier();
  throw new InterruptedException();
  }
  // 每來一個線程調用await方法都會進行減1
  int index = --count;
  if (index == 0) { // tripped
  boolean ranAction = false;
  try {
   final Runnable command = barrierCommand;
   // new CyclicBarrier 傳入 的barrierCommand, command.run()這個方法是同步的,如果耗時比較多的話,是否執行的時候需要考慮下是否異步來執行。
   if (command != null)
   command.run();
   ranAction = true;
   // 這個方法1. 喚醒所有阻塞的線程,2. 重置下count(count 每來一個線程都會進行減1)和generation,以便於下次循環。
   nextGeneration();
   return 0;
  } finally {
   if (!ranAction)
   breakBarrier();
  }
  }

  // loop until tripped, broken, interrupted, or timed out
  for (;;) {
  try {
   // 進入if條件,說明是不帶超時的await
   if (!timed)
    // 當前線程會釋放掉lock,然後進入到trip條件隊列的尾部,然後掛起自己,等待被喚醒。
   trip.await();
   else if (nanos > 0L)
    //說明當前線程調用await方法時 是指定瞭 超時時間的!
   nanos = trip.awaitNanos(nanos);
  } catch (InterruptedException ie) {
   //Node節點在 條件隊列內 時 收到中斷信號時 會拋出中斷異常!
   //g == generation 成立,說明當前代並沒有變化。
   //! g.broken 當前代如果沒有被打破,那麼當前線程就去打破,並且拋出異常..
   if (g == generation && ! g.broken) {
   breakBarrier();
   throw ie;
   } else {
   // We're about to finish waiting even if we had not
   // been interrupted, so this interrupt is deemed to
   // "belong" to subsequent execution.
   //執行到else有幾種情況?
   //1.代發生瞭變化,這個時候就不需要拋出中斷異常瞭,因為 代已經更新瞭,這裡喚醒後就走正常邏輯瞭..隻不過設置下 中斷標記。
   //2.代沒有發生變化,但是代被打破瞭,此時也不用返回中斷異常,執行到下面的時候會拋出 brokenBarrier異常。也記錄下中斷標記位。
   Thread.currentThread().interrupt();
   }
  }
  //喚醒後,執行到這裡,有幾種情況?
  //1.正常情況,當前barrier開啟瞭新的一代(trip.signalAll())
  //2.當前Generation被打破,此時也會喚醒所有在trip上掛起的線程
  //3.當前線程trip中等待超時,然後主動轉移到 阻塞隊列 然後獲取到鎖 喚醒。
  if (g.broken)
   throw new BrokenBarrierException();
  //喚醒後,執行到這裡,有幾種情況?
  //1.正常情況,當前barrier開啟瞭新的一代(trip.signalAll())
  //2.當前線程trip中等待超時,然後主動轉移到 阻塞隊列 然後獲取到鎖 喚醒。
  if (g != generation)
   return index;
  //喚醒後,執行到這裡,有幾種情況?
  //.當前線程trip中等待超時,然後主動轉移到 阻塞隊列 然後獲取到鎖 喚醒。
  if (timed && nanos <= 0L) {
   breakBarrier();
   throw new TimeoutException();
  }
  }
 } finally {
  lock.unlock();
 }
 }

小結

到瞭這裡我們是不是可以知道為啥CyclicBarrier可以進行循環計數?
CyclicBarrier采用一個內部類Generation來維護當前循環,每一個await方法都會存儲當前的generation,獲取到相同generation對象的屬於同一組,每當count的次數耗盡就會重新new一個Generation並且重新設置count的值為parties,表示進入下一次新的循環。
從這個await方法我們是不是可以知道隻要有一個線程被中斷瞭,當代的 generationbroken 就會被設置為true,所以會導致其他的線程也會被拋出BrokenBarrierException。相當於一個失敗其他也必須失敗,感覺有“強一致性“的味道。

總結

CountDownLanch是為計數器是設置一個值,當多次執行countdown後,計數器減為0的時候所有線程被喚醒,然後CountDownLanch失效,隻能夠使用一次。

CyclicBarrier是當count0時同樣喚醒全部線程,同時會重新設置countparties,重新new一個generation來實現重復利用。

結束

  • 由於自己才疏學淺,難免會有紕漏,假如你發現瞭錯誤的地方,還望留言給我指出來,我會對其加以修正。
  • 如果你覺得文章還不錯,你的轉發、分享、贊賞、點贊、留言就是對我最大的鼓勵。感
  • 謝您的閱讀,十分歡迎並感謝您的關註。

巨人的肩膀摘蘋果

https://javajr.cn/
http://www.360doc.com/content/20/0812/08/55930996_929792021.shtml
https://www.cnblogs.com/xxyyy/p/12958160.html

到此這篇關於阿裡一面CyclicBarrier和CountDownLatch的區別是啥的文章就介紹到這瞭,更多相關CyclicBarrier和CountDownLatch的區別內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: