LinkedBlockingQueue鏈式阻塞隊列的使用和原理解析
概覽
1. 基於鏈表的可選有界阻塞隊列。根據FIFO的出入隊順序,從隊列頭部檢索和獲取元素,在隊列尾部插入新元素。
2. 當作為有界阻塞隊列,在隊列空間不足時,put方法將會一直阻塞直到有多餘空間才會執行插入元素操作,take方法則相反,隻到隊列內元素不為空時,才將隊列元素逐個取出。
3. 隊列容量不指定時,默認為Integer.MAX_VALUE,此時可以看作無界隊列。
4. 使用非公平鎖進行並發控制。所有方法都是線程安全的。
使用方法
下面的文章給出瞭阻塞隊列的四種基本用法:
瞭解BlockingQueue 大體框架和實現思路
LinkedBlockingQueue實現瞭BlockingQueue類。
在BlockingQueue中,方法被分為如下四類:
Throws exception
:操作未實現時(正常流程下的執行)拋出異常Special value
:根據操作的實際情況,返回特定值,例如null、false(這些失敗可能是線程中斷、隊列為空引起的)Blocks
:阻塞當前線程,直到當前線程可以成功執行Times out
:嘗試指定時間後,放棄執行
Throws exception |
Special value |
Blocks |
Times out |
|
新增 |
add(E e) |
offer(E e) |
put(E e) |
offer(E e, long timeout, TimeUnit unit) |
刪除 |
remove() |
poll() |
take() |
poll(long timeout, TimeUnit unit) |
查詢 |
element() |
peek() |
1. add | remove | element
這三個方法在BlockingQueue的定義中,都會在操作未實現時,拋出異常。
add(E e)
:在隊尾添加元素e,add內部調用offer方法實現。因此,元素e為空時,拋出NullPointerException異常;插入失敗時,拋出IllegalStateException異常。remove
:刪除隊首元素,內部調用poll方法。隊首無數據時,拋出NoSuchElementException異常。element
:檢索隊首元素。隊首無數據時,拋出NoSuchElementException異常。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>(); blockingQue.add(1); blockingQue.remove(1); blockingQue.remove(); // NoSuchElementException blockingQue.element(); // NoSuchElementException
2. offer | poll | peek
根據操作的實際情況,返回特定值,例如null、false(這些失敗可能是線程中斷、隊列為空引起的)
offer(E e)
:在隊尾添加元素e,元素e為空時,拋出NullPointerException異常;插入失敗時返回false。poll
:刪除隊首元素。刪除失敗時返回false。peek
:檢索隊首元素。隊首無數據時,返回null。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>(); blockingQue.offer(1); blockingQue.poll(); Integer peek = blockingQue.peek(); // 返回null
3. put | take
阻塞當前線程,直到當前線程可以成功執行。
put(E e)
:在隊尾添加元素e,元素e為空時,拋出NullPointerException異常。當隊列滿時,阻塞put線程,等待隊列被消費後,隊列容量不滿時,該阻塞線程繼續嘗試在隊尾插入元素。該方法在阻塞時可以被中斷,並拋出InterruptedException異常。take
:刪除並獲取隊首元素。隊首元素不為空時返回。隊首元素為空,阻塞take線程,等待隊列不為空時,再次嘗試消費隊首元素。該方法在阻塞時可以被中斷,並拋出InterruptedException異常。
註意:阻塞時,不會解除鎖占用。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>(); try { blockingQue.put(1); blockingQue.take(); } catch (InterruptedException e) { // 線程被中斷 e.printStackTrace(); }
4. offer | poll (timeout)
嘗試指定時間後,放棄執行
offer(E e, long timeout, TimeUnit unit)
:在隊尾添加元素e,元素e為空時,拋出NullPointerException異常;當隊列容量滿時,線程休眠一定時間後再次查看隊列容量,當該休眠時間大於等於timeout後,此時隊列還滿則返回false。不滿時,嘗試入隊。需要註意的是,由於偽喚醒機制的存在,線程可能在timeout這個時間段內的任意一點被喚醒,如果隊列容易不滿,則會直接執行入隊操作。阻塞時,當前線程被中斷拋出InterruptedException異常。poll(long timeout, TimeUnit unit)
:刪除隊首元素。poll與offer對應的,當隊列為空的時候,線程休眠一定時間。休眠時,當前線程被中斷拋出InterruptedException異常。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>(); try { blockingQue.offer(1, 100, TimeUnit.MILLISECONDS); blockingQue.poll(100, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); }
當然除瞭上述的阻塞隊列的基本操作外,LinkedBlockingQueue還具有集合Collection的性質。因此集合中的通用方法也可以使用。
源碼解析
說明
本次源碼分析主要按照下面幾個步驟進行:
1. 保存隊列數據的容器以及出入隊方法
2. 主要成員變量以及作用
3. 主要方法分析
隊列容器
結構圖
僅有數據item和後繼next的單向節點,結構簡單。
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } }
next三種情況
A 普通節點的真實後繼
B 真正的隊首節點,item=null(隊首節點恒為head.next)
C 隊尾節點,next=null
- item: null -> first -> …… -> last
- next: first -> second -> ……-> null
入隊操作
// 入隊 private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; // last.next = node; last = node; }
步驟1
步驟2
出隊操作
// 出隊 private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; // 形成引用鏈閉環,JVM根據可達性分析時,GC root的引用鏈與該對象之間不可達,進行GC h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
步驟1
步驟2
關鍵成員變量
隊列入隊和出隊鎖分離,都使用瞭非公平鎖。
這裡的count屬性需要註意下,這裡使用瞭原子類保證操作的原子性。後面的入隊和出隊,將會頻繁使用它。
/** 容量, 初始化不設置時默認為Integer.MAX_VALUE*/ private final int capacity; /** 當前隊列內的元素數量 */ private final AtomicInteger count = new AtomicInteger(); /** * 隊首 * 不變量: head.item == null */ transient Node<E> head; /** * 隊尾 * 不變量: last.next == null */ private transient Node<E> last; /** 出隊操作公用鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 用於出隊操作的阻塞和喚醒 出隊的話,隻需要考慮隊列是否為空 */ private final Condition notEmpty = takeLock.newCondition(); /** 入隊操作公用鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** 用於入隊操作的阻塞和喚醒 入隊隻需要考慮隊列空間是否足夠*/ private final Condition notFull = putLock.newCondition();
初始化
三個構造函數
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } // 自定義容量 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } // 初始化時,批量添加集合中的元素 public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
put方法
put方法幾個關註點
- 釋放鎖的時機
- 執行入隊操作
- 喚醒生產者的時機
- 喚醒消費者的時機
這幾點是整個阻塞操作的核心,可以在下面的分析中仔細觀察。
註:由於阻塞隊列就是基於生產者-消費者模型的,因此,下文中都把調用put方法的線程稱為生產者,調用take方法的線程稱為消費者。
總體分析
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; // -1代表操作異常 Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 如果線程沒有被標記為中斷,則獲取鎖 putLock.lockInterruptibly(); try { while (count.get() == capacity) { // 這裡是線程在執行put操作時唯一一個執行過程中釋放鎖的地方 notFull.await(); // 容量已滿,等待被消費後喚醒 } // 添加元素,更新容量 enqueue(node); c = count.getAndIncrement(); // 隊列容量有餘時,在這裡再次喚醒一個其他的生產者線程(或者說消費者消費速度大於生產) if (c + 1 < capacity) notFull.signal(); } finally { // 釋放鎖 putLock.unlock(); } // 喚醒一個消費者 if (c == 0) signalNotEmpty(); }
count屬性並發問題
這裡需要重點關註count,由於有兩把鎖,count可以同時被putLock、takeLock操作,那麼這裡是否會產生並發問題。
分析如下:
A. 隻有putLock或takeLock一把鎖操作:就是單線程操作,沒影響,不產生並發問題。
其他所有put操作都處於await的狀態或者競爭鎖狀態,其他線程也因為獲取不到鎖而無法執行,隻有等該節點添加完成釋放鎖,其他線程才有機會繼續執行。
while (count.get() == capacity) { notFull.await(); // 容量已滿,等待被消費後喚醒 }
B. putLock和takeLock同時操作:我們假設兩個線程一個獲取到putLock,一個獲取到瞭takeLock(同時最多也隻有兩個線程操作count)。
// put while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); // take while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal();
由於count是原子類那麼count的所有讀寫操作必然是一個串聯的操作,而非並行操作,因此也不存在並發問題,如下圖(順序可能不同):
喚醒消費者
代碼的最後一段,會有喚醒一個消費者的操作。
// 喚醒一個等待中的消費者 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
剛開始看到的時候很疑惑,為什麼是c == 0才喚醒。如果生產者入隊成功,那麼c應該為如下值:
c = count.getAndIncrement();
後面看瞭一下count.getAndIncrement()方法定義才發現自己記混瞭,count.getAndIncrement()是一個原子操作,且返回值的是操作前的值。
ok,現在沒問題瞭。
count >= 0,也就是說,隻有在生產者入隊前隊列為空,入隊成功之後才會喚醒一個消費者消費。
take方法
take方法與put方法大致相似,隻是與put做相反操作。
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 隊列元素為空,停止消費,讓出鎖並等待被喚醒 while (count.get() == 0) { notEmpty.await(); } // 移除隊首元素,並更新容量 x = dequeue(); c = count.getAndDecrement(); // 生產速度大於消費速度,喚醒一個其他消費者進行消費 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // 消費之前隊列已滿,消費後喚醒一個生產者 if (c == capacity) signalNotFull(); return x; } // 喚醒生產者 /** * Signals a waiting put. Called only from take/poll. */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
總結
總體上看LinkedBlockingQueue類不難,整個生產-消費的流程實現也比較簡單。源碼已經把該介紹的東西都講得很明白瞭,我這屬於依葫蘆畫瓢順著源碼註釋寫出來的。這麼一寫,自己這個類的印象就很深刻瞭。
以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。
推薦閱讀:
- Java常見的阻塞隊列總結
- Java高並發BlockingQueue重要的實現類詳解
- java並發編程工具類JUC之LinkedBlockingQueue鏈表隊列
- Java中常用阻塞隊列的問題小結
- Tomcat使用線程池處理遠程並發請求的方法