Java高並發BlockingQueue重要的實現類詳解

ArrayBlockingQueue

有界的阻塞隊列,內部是一個數組,有邊界的意思是:容量是有限的,必須進行初始化,指定它的容量大小,以先進先出的方式存儲數據,最新插入的在對尾,最先移除的對象在頭部。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
 /** 隊列元素 */
 final Object[] items;

 /** 下一次讀取操作的位置, poll, peek or remove */
 int takeIndex;

 /** 下一次寫入操作的位置, offer, or add */
 int putIndex;

 /** 元素數量 */
 int count;
 
 /*
  * Concurrency control uses the classic two-condition algorithm
  * found in any textbook.
  * 它采用一個 ReentrantLock 和相應的兩個 Condition 來實現。
  */

 /** Main lock guarding all access */
 final ReentrantLock lock;

 /** Condition for waiting takes */
 private final Condition notEmpty;

 /** Condition for waiting puts */
 private final Condition notFull;
 
 /** 指定大小 */
 public ArrayBlockingQueue(int capacity) {
  this(capacity, false);
 }
 
 /** 
  * 指定容量大小與指定訪問策略 
  * @param fair 指定獨占鎖是公平鎖還是非公平鎖。非公平鎖的吞吐量比較高,公平鎖可以保證每次都是等待最久的線程獲取到鎖;
  */
 public ArrayBlockingQueue(int capacity, boolean fair) {}
 
 /** 
  * 指定容量大小、指定訪問策略與最初包含給定集合中的元素 
  * @param c 將此集合中的元素在構造方法期間就先添加到隊列中 
  */
 public ArrayBlockingQueue(int capacity, boolean fair,
        Collection<? extends E> c) {}
}

  • ArrayBlockingQueue 在生產者放入數據和消費者獲取數據,都是共用一個鎖對象,由此也意味著兩者無法真正並行運行。按照實現原理來分析, ArrayBlockingQueue 完全可以采用分離鎖,從而實現生產者和消費者操作的完全並行運行。然而事實上並沒有如此,因為 ArrayBlockingQueue 的數據寫入已經足夠輕巧,以至於引入獨立的鎖機制,除瞭給代碼帶來額外的復雜性外,其在性能上完全占不到任何便宜。
  • 通過構造函數得知,參數 fair 控制對象內部是否采用公平鎖,默認采用非公平鎖。
  • items、takeIndex、putIndex、count 等屬性並沒有使用 volatile 修飾,這是因為訪問這些變量(通過方法獲取)使用都在鎖內,並不存在可見性問題,如 size() 。
  • 另外有個獨占鎖 lock 用來對出入對操作加鎖,這導致同時隻有一個線程可以訪問入隊出隊。

Put 源碼分析

/** 進行入隊操作 */
public void put(E e) throws InterruptedException {
  //e為null,則拋出NullPointerException異常
  checkNotNull(e);
  //獲取獨占鎖
  final ReentrantLock lock = this.lock;
  /**
   * lockInterruptibly()
   * 獲取鎖定,除非當前線程為interrupted
   * 如果鎖沒有被另一個線程占用並且立即返回,則將鎖定計數設置為1。
   * 如果當前線程已經保存此鎖,則保持計數將遞增1,該方法立即返回。
   * 如果鎖被另一個線程保持,則當前線程將被禁用以進行線程調度,並且處於休眠狀態
   * 
   */
  lock.lockInterruptibly();
  try {
   //空隊列
   while (count == items.length)
    //進行條件等待處理
    notFull.await();
   //入隊操作
   enqueue(e);
  } finally {
   //釋放鎖
   lock.unlock();
  }
 }
 
 /** 真正的入隊 */
 private void enqueue(E x) {
  // assert lock.getHoldCount() == 1;
  // assert items[putIndex] == null;
  //獲取當前元素
  final Object[] items = this.items;
  //按下一個插入索引進行元素添加
  items[putIndex] = x;
  // 計算下一個元素應該存放的下標,可以理解為循環隊列
  if (++putIndex == items.length)
   putIndex = 0;
  count++;
  //喚起消費者
  notEmpty.signal();
}

這裡由於在操作共享變量前加瞭鎖,所以不存在內存不可見問題,加鎖後獲取的共享變量都是從主內存中獲取的,而不是在CPU緩存或者寄存器裡面的值,釋放鎖後修改的共享變量值會刷新到主內存。

另外這個隊列使用循環數組實現,所以在計算下一個元素存放下標時候有些特殊。另外 insert 後調用 notEmpty.signal() ;是為瞭激活調用 notEmpty.await(); 阻塞後放入 notEmpty 條件隊列的線程。

Take 源碼分析

public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
   while (count == 0)
    notEmpty.await();
   return dequeue();
  } finally {
   lock.unlock();
  }
 }
 private E dequeue() {
  // assert lock.getHoldCount() == 1;
  // assert items[takeIndex] != null;
  final Object[] items = this.items;
  @SuppressWarnings("unchecked")
  E x = (E) items[takeIndex];
  items[takeIndex] = null;
  if (++takeIndex == items.length)
   takeIndex = 0;
  count--;
  //這裡有些特殊
  if (itrs != null)
   //保持隊列中的元素和迭代器的元素一致
   itrs.elementDequeued();
  notFull.signal();
  return x;
}

Take 操作和 Put 操作很類似

//該類的迭代器,所有的迭代器共享數據,隊列改變會影響所有的迭代器

transient Itrs itrs = null; //其存放瞭目前所創建的所有迭代器。

/**
* 迭代器和它們的隊列之間的共享數據,允許隊列元素被刪除時更新迭代器的修改。
*/
class Itrs {
  void elementDequeued() {
   // assert lock.getHoldCount() == 1;
   if (count == 0)
    //隊列中數量為0的時候,隊列就是空的,會將所有迭代器進行清理並移除
    queueIsEmpty();
   //takeIndex的下標是0,意味著隊列從尾中取完瞭,又回到頭部獲取
   else if (takeIndex == 0)
    takeIndexWrapped();
  }
  
  /**
   * 當隊列為空的時候做的事情
   * 1. 通知所有迭代器隊列已經為空
   * 2. 清空所有的弱引用,並且將迭代器置空
   */
  void queueIsEmpty() {}
  
  /**
   * 將takeIndex包裝成0
   * 並且通知所有的迭代器,並且刪除已經過期的任何對象(個人理解是置空對象)
   * 也直接的說就是在Blocking隊列進行出隊的時候,進行迭代器中的數據同步,保持隊列中的元素和迭代器的元素是一致的。
   */
  void takeIndexWrapped() {}
}

Itrs迭代器創建的時機

//從這裡知道,在ArrayBlockingQueue對象中調用此方法,才會生成這個對象
//那麼就可以理解為,隻要並未調用此方法,則ArrayBlockingQueue對象中的Itrs對象則為空
public Iterator<E> iterator() {
  return new Itr();
 }
 
 private class Itr implements Iterator<E> {
  Itr() {
   //這裡就是生產它的地方
   //count等於0的時候,創建的這個迭代器是個無用的迭代器,可以直接移除,進入detach模式。
   //否則就把當前隊列的讀取位置給迭代器當做下一個元素,cursor存儲下個元素的位置。
   if (count == 0) {
    // assert itrs == null;
    cursor = NONE;
    nextIndex = NONE;
    prevTakeIndex = DETACHED;
   } else {
    final int takeIndex = ArrayBlockingQueue.this.takeIndex;
    prevTakeIndex = takeIndex;
    nextItem = itemAt(nextIndex = takeIndex);
    cursor = incCursor(takeIndex);
    if (itrs == null) {
     itrs = new Itrs(this);
    } else {
     itrs.register(this); // in this order
     itrs.doSomeSweeping(false);
    }
    prevCycles = itrs.cycles;
    // assert takeIndex >= 0;
    // assert prevTakeIndex == takeIndex;
    // assert nextIndex >= 0;
    // assert nextItem != null;
    }
  }
}

代碼演示

package com.rumenz.task;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @className: BlockingQuqueExample
 * @description: TODO 類描述
 * @author: mac
 * @date: 2021/1/20
 **/
public class BlockingQueueExample {

 private static volatile Boolean flag=false;

 public static void main(String[] args) {

 

  BlockingQueue blockingQueue=new ArrayBlockingQueue(1024);
  ExecutorService executorService = Executors.newFixedThreadPool(2);

  executorService.execute(()->{
    try{
     blockingQueue.put(1);
     Thread.sleep(2000);
     blockingQueue.put(3);
     flag=true;
    }catch (Exception e){
     e.printStackTrace();
    }
  });

  executorService.execute(()->{
   try {

    while (!flag){
     Integer i = (Integer) blockingQueue.take();
     System.out.println(i);
    }

   }catch (Exception e){
    e.printStackTrace();
   }

  });

  executorService.shutdown();
 }
}

LinkedBlockingQueue

基於鏈表的阻塞隊列,通 ArrayBlockingQueue 類似,其內部也維護這一個數據緩沖隊列(該隊列由一個鏈表構成),當生產者往隊列放入一個數據時,隊列會從生產者手上獲取數據,並緩存在隊列的內部,而生產者立即返回,隻有當隊列緩沖區到達最大值容量時(LinkedBlockingQueue可以通過構造函數指定該值),才會阻塞隊列,直到消費者從隊列中消費掉一份數據,生產者會被喚醒,反之對於消費者這端的處理也基於同樣的原理。

LinkedBlockingQueue 之所以能夠高效的處理並發數據,還因為其對於生產者和消費者端分別采用瞭獨立的鎖來控制數據同步,這也意味著在高並發的情況下生產者和消費者可以並行的操作隊列中的數據,以調高整個隊列的並發能力。

如果構造一個 LinkedBlockingQueue 對象,而沒有指定容量大小, LinkedBlockingQueue 會默認一個類似無限大小的容量 Integer.MAX_VALUE ,這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到隊列滿阻塞產生,系統內存就有可能已經被消耗殆盡瞭。

LinkedBlockingQueue 是一個使用鏈表完成隊列操作的阻塞隊列。鏈表是單向鏈表,而不是雙向鏈表。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
 //隊列的容量,指定大小或為默認值Integer.MAX_VALUE
 private final int capacity;
 
 //元素的數量
 private final AtomicInteger count = new AtomicInteger();
 
 //隊列頭節點,始終滿足head.item==null
 transient Node<E> head;
 
 //隊列的尾節點,始終滿足last.next==null
 private transient Node<E> last;
 
 /** Lock held by take, poll, etc */
 //出隊的鎖:take, poll, peek 等讀操作的方法需要獲取到這個鎖
 private final ReentrantLock takeLock = new ReentrantLock();

 /** Wait queue for waiting takes */
 //當隊列為空時,保存執行出隊的線程:如果讀操作的時候隊列是空的,那麼等待 notEmpty 條件
 private final Condition notEmpty = takeLock.newCondition();

 /** Lock held by put, offer, etc */
 //入隊的鎖:put, offer 等寫操作的方法需要獲取到這個鎖
 private final ReentrantLock putLock = new ReentrantLock();

 /** Wait queue for waiting puts */
 //當隊列滿時,保存執行入隊的線程:如果寫操作的時候隊列是滿的,那麼等待 notFull 條件
 private final Condition notFull = putLock.newCondition();
 
 //傳說中的無界隊列
 public LinkedBlockingQueue() {}
 //傳說中的有界隊列
 public LinkedBlockingQueue(int capacity) {
  if (capacity <= 0) throw new IllegalArgumentException();
  this.capacity = capacity;
  last = head = new Node<E>(null);
 }
 //傳說中的無界隊列
 public LinkedBlockingQueue(Collection<? extends E> c){}
 
 /**
  * 鏈表節點類
  */
 static class Node<E> {
  E item;

  /**
   * One of:
   * - 真正的繼任者節點
   * - 這個節點,意味著繼任者是head.next
   * - 空,意味著沒有後繼者(這是最後一個節點)
   */
  Node<E> next;

  Node(E x) { item = x; }
 }
}

通過其構造函數,得知其可以當做無界隊列也可以當做有界隊列來使用。
這裡用瞭兩把鎖分別是 takeLock 和 putLock ,而 Condition 分別是 notEmpty 和 notFull ,它們是這樣搭配的。

takeLock
putLock

從上面的構造函數中可以看到,這裡會初始化一個空的頭結點,那麼第一個元素入隊的時候,隊列中就會有兩個元素。讀取元素時,也是獲取頭結點後面的一個元素。count的計數值不包含這個頭結點。

Put源碼分析

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
  implements BlockingQueue<E>, java.io.Serializable { 
 /**
  * 將指定元素插入到此隊列的尾部,如有必要,則等待空間變得可用。
  */
 public void put(E e) throws InterruptedException {
  if (e == null) throw new NullPointerException();
  // 如果你糾結這裡為什麼是 -1,可以看看 offer 方法。這就是個標識成功、失敗的標志而已。
  int c = -1;
  //包裝成node節點
  Node<E> node = new Node<E>(e);
  final ReentrantLock putLock = this.putLock;
  final AtomicInteger count = this.count;
  //獲取鎖定
  putLock.lockInterruptibly();
  try {
   /** 如果隊列滿,等待 notFull 的條件滿足。 */
   while (count.get() == capacity) {
    notFull.await();
   }
   //入隊
   enqueue(node);
   //原子性自增
   c = count.getAndIncrement();
   // 如果這個元素入隊後,還有至少一個槽可以使用,調用 notFull.signal() 喚醒等待線程。
   // 哪些線程會等待在 notFull 這個 Condition 上呢?
   if (c + 1 < capacity)
    notFull.signal();
  } finally {
  //解鎖
   putLock.unlock();
  }
  // 如果 c == 0,那麼代表隊列在這個元素入隊前是空的(不包括head空節點),
  // 那麼所有的讀線程都在等待 notEmpty 這個條件,等待喚醒,這裡做一次喚醒操作
  if (c == 0)
   signalNotEmpty();
 }
 
 /** 鏈接節點在隊列末尾 */
 private void enqueue(Node<E> node) {
  // assert putLock.isHeldByCurrentThread();
  // assert last.next == null;
  // 入隊的代碼非常簡單,就是將 last 屬性指向這個新元素,並且讓原隊尾的 next 指向這個元素
  //last.next = node;
  //last = node;
  // 這裡入隊沒有並發問題,因為隻有獲取到 putLock 獨占鎖以後,才可以進行此操作
  last = last.next = node;
 }
 
 /**
  * 等待PUT信號
  * 僅在 take/poll 中調用
  * 也就是說:元素入隊後,如果需要,則會調用這個方法喚醒讀線程來讀
  */
 private void signalNotFull() {
  final ReentrantLock putLock = this.putLock;
  putLock.lock();
  try {
   notFull.signal();//喚醒
  } finally {
   putLock.unlock();
  }
 }
}

Take源碼分析

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
  implements BlockingQueue<E>, java.io.Serializable { 
 public E take() throws InterruptedException {
  E x;
  int c = -1;
  final AtomicInteger count = this.count;
  final ReentrantLock takeLock = this.takeLock;
  //首先,需要獲取到 takeLock 才能進行出隊操作
  takeLock.lockInterruptibly();
  try {
   // 如果隊列為空,等待 notEmpty 這個條件滿足再繼續執行
   while (count.get() == 0) {
    notEmpty.await();
   }
   //// 出隊
   x = dequeue();
   //count 進行原子減 1
   c = count.getAndDecrement();
   // 如果這次出隊後,隊列中至少還有一個元素,那麼調用 notEmpty.signal() 喚醒其他的讀線程
   if (c > 1)
    notEmpty.signal();
  } finally {
   takeLock.unlock();
  }
  if (c == capacity)
   signalNotFull();
  return x;
 }
 
 /**
  * 出隊
  */
 private E dequeue() {
  // assert takeLock.isHeldByCurrentThread();
  // assert head.item == null;
  Node<E> h = head;
  Node<E> first = h.next;
  h.next = h; // help GC
  head = first;
  E x = first.item;
  first.item = null;
  return x;
 }
 
 /**
  * Signals a waiting put. Called only from take/poll.
  */
 private void signalNotFull() {
  final ReentrantLock putLock = this.putLock;
  putLock.lock();
  try {
   notFull.signal();
  } finally {
   putLock.unlock();
  }
 }
}

與 ArrayBlockingQueue 對比

ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而後者則會生成一個額外的Node對象。這在長時間內需要高效並發地處理大批量數據的系統中,其對於GC的影響還是存在一定的區別。

LinkedBlockingQueue 實現一個線程添加文件對象,四個線程讀取文件對象

package concurrent;
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class TestBlockingQueue {
 static long randomTime() {
 return (long) (Math.random() * 1000);
 }

 public static void main(String[] args) {
 // 能容納100個文件
 final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100);
 // 線程池
 final ExecutorService exec = Executors.newFixedThreadPool(5);
 final File root = new File("F:\\JavaLib");
 // 完成標志
 final File exitFile = new File("");
 // 讀個數
 final AtomicInteger rc = new AtomicInteger();
 // 寫個數
 final AtomicInteger wc = new AtomicInteger();
 // 讀線程
 Runnable read = new Runnable() {
  public void run() {
  scanFile(root);
  scanFile(exitFile);
  }

  public void scanFile(File file) {
  if (file.isDirectory()) {
   File[] files = file.listFiles(new FileFilter() {
   public boolean accept(File pathname) {
    return pathname.isDirectory()
     || pathname.getPath().endsWith(".java");
   }
   });
   for (File one : files)
   scanFile(one);
  } else {
   try {
   int index = rc.incrementAndGet();
   System.out.println("Read0: " + index + " "
    + file.getPath());
   queue.put(file);
   } catch (InterruptedException e) {
   }
  }
  }
 };
 exec.submit(read);
 // 四個寫線程
 for (int index = 0; index < 4; index++) {
  // write thread
  final int NO = index;
  Runnable write = new Runnable() {
  String threadName = "Write" + NO;
  public void run() {
   while (true) {
   try {
    Thread.sleep(randomTime());
    int index = wc.incrementAndGet();
    File file = queue.take();
    // 隊列已經無對象
    if (file == exitFile) {
    // 再次添加"標志",以讓其他線程正常退出
    queue.put(exitFile);
    break;
    }
    System.out.println(threadName + ": " + index + " "
     + file.getPath());
   } catch (InterruptedException e) {
   }
   }
  }
  };
  exec.submit(write);
 }
 exec.shutdown();
 }
}

總結

到此這篇關於Java高並發BlockingQueue重要實現類的文章就介紹到這瞭,更多相關Java高並發BlockingQueue實現類內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: