LinkedBlockingQueue鏈式阻塞隊列的使用和原理解析

概覽

1. 基於鏈表的可選有界阻塞隊列。根據FIFO的出入隊順序,從隊列頭部檢索和獲取元素,在隊列尾部插入新元素。

2. 當作為有界阻塞隊列,在隊列空間不足時,put方法將會一直阻塞直到有多餘空間才會執行插入元素操作,take方法則相反,隻到隊列內元素不為空時,才將隊列元素逐個取出。

3. 隊列容量不指定時,默認為Integer.MAX_VALUE,此時可以看作無界隊列。

4. 使用非公平鎖進行並發控制。所有方法都是線程安全的。

使用方法

下面的文章給出瞭阻塞隊列的四種基本用法:

瞭解BlockingQueue 大體框架和實現思路

LinkedBlockingQueue實現瞭BlockingQueue類。

在BlockingQueue中,方法被分為如下四類:

  • Throws exception:操作未實現時(正常流程下的執行)拋出異常
  • Special value:根據操作的實際情況,返回特定值,例如null、false(這些失敗可能是線程中斷、隊列為空引起的)
  • Blocks:阻塞當前線程,直到當前線程可以成功執行
  • Times out:嘗試指定時間後,放棄執行
 

Throws exception

Special value

Blocks

Times out

新增

add(E e)

offer(E e)

put(E e)

offer(E e, long timeout, TimeUnit unit)

刪除

remove()

poll()

take()

poll(long timeout, TimeUnit unit)

查詢

element()

peek()

   

1. add | remove | element

這三個方法在BlockingQueue的定義中,都會在操作未實現時,拋出異常。

  • add(E e)在隊尾添加元素e,add內部調用offer方法實現。因此,元素e為空時,拋出NullPointerException異常;插入失敗時,拋出IllegalStateException異常。
  • remove刪除隊首元素,內部調用poll方法。隊首無數據時,拋出NoSuchElementException異常。
  • element檢索隊首元素。隊首無數據時,拋出NoSuchElementException異常。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
blockingQue.add(1);
blockingQue.remove(1);
blockingQue.remove(); // NoSuchElementException
blockingQue.element(); // NoSuchElementException

2. offer | poll | peek

根據操作的實際情況,返回特定值,例如null、false(這些失敗可能是線程中斷、隊列為空引起的)

  • offer(E e)在隊尾添加元素e,元素e為空時,拋出NullPointerException異常;插入失敗時返回false。
  • poll刪除隊首元素。刪除失敗時返回false。
  • peek檢索隊首元素。隊首無數據時,返回null。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
blockingQue.offer(1);
blockingQue.poll();
Integer peek = blockingQue.peek(); // 返回null

3. put | take

阻塞當前線程,直到當前線程可以成功執行。

  • put(E e)在隊尾添加元素e,元素e為空時,拋出NullPointerException異常。當隊列滿時,阻塞put線程,等待隊列被消費後,隊列容量不滿時,該阻塞線程繼續嘗試在隊尾插入元素。該方法在阻塞時可以被中斷,並拋出InterruptedException異常。
  • take刪除並獲取隊首元素。隊首元素不為空時返回。隊首元素為空,阻塞take線程,等待隊列不為空時,再次嘗試消費隊首元素。該方法在阻塞時可以被中斷,並拋出InterruptedException異常。

註意:阻塞時,不會解除鎖占用。

LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
try {
    blockingQue.put(1);
    blockingQue.take();
} catch (InterruptedException e) {
    // 線程被中斷
    e.printStackTrace();
}

4. offer | poll (timeout)

嘗試指定時間後,放棄執行

  • offer(E e, long timeout, TimeUnit unit)在隊尾添加元素e,元素e為空時,拋出NullPointerException異常;當隊列容量滿時,線程休眠一定時間後再次查看隊列容量,當該休眠時間大於等於timeout後,此時隊列還滿則返回false。不滿時,嘗試入隊。需要註意的是,由於偽喚醒機制的存在,線程可能在timeout這個時間段內的任意一點被喚醒,如果隊列容易不滿,則會直接執行入隊操作。阻塞時,當前線程被中斷拋出InterruptedException異常。
  • poll(long timeout, TimeUnit unit)刪除隊首元素。poll與offer對應的,當隊列為空的時候,線程休眠一定時間。休眠時,當前線程被中斷拋出InterruptedException異常。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
try {
    blockingQue.offer(1, 100, TimeUnit.MILLISECONDS);
    blockingQue.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
    e.printStackTrace();
}

當然除瞭上述的阻塞隊列的基本操作外,LinkedBlockingQueue還具有集合Collection的性質。因此集合中的通用方法也可以使用。

源碼解析

說明

本次源碼分析主要按照下面幾個步驟進行:

1. 保存隊列數據的容器以及出入隊方法

2. 主要成員變量以及作用

3. 主要方法分析

隊列容器

結構圖

僅有數據item和後繼next的單向節點,結構簡單。

static class Node<E> {
    E item;
 
    Node<E> next;
 
    Node(E x) { item = x; }
}

next三種情況

A 普通節點的真實後繼

B 真正的隊首節點,item=null(隊首節點恒為head.next)

C 隊尾節點,next=null

  • item: null -> first -> …… -> last
  • next: first -> second -> ……-> null

入隊操作

    // 入隊
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node; // last.next = node; last = node;
    }

步驟1

 步驟2

出隊操作

 
    // 出隊
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        // 形成引用鏈閉環,JVM根據可達性分析時,GC root的引用鏈與該對象之間不可達,進行GC
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

步驟1

 步驟2

關鍵成員變量

隊列入隊和出隊鎖分離,都使用瞭非公平鎖。

這裡的count屬性需要註意下,這裡使用瞭原子類保證操作的原子性。後面的入隊和出隊,將會頻繁使用它。

/** 容量, 初始化不設置時默認為Integer.MAX_VALUE*/
private final int capacity;
 
/** 當前隊列內的元素數量 */
private final AtomicInteger count = new AtomicInteger();
 
/**
 * 隊首
 * 不變量: head.item == null
 */
transient Node<E> head;
 
/**
 * 隊尾
 * 不變量: last.next == null
 */
private transient Node<E> last;
 
/** 出隊操作公用鎖 */
private final ReentrantLock takeLock = new ReentrantLock();
 
/** 用於出隊操作的阻塞和喚醒 出隊的話,隻需要考慮隊列是否為空 */
private final Condition notEmpty = takeLock.newCondition(); 
 
/** 入隊操作公用鎖 */
private final ReentrantLock putLock = new ReentrantLock();
 
/** 用於入隊操作的阻塞和喚醒 入隊隻需要考慮隊列空間是否足夠*/
private final Condition notFull = putLock.newCondition();

初始化

三個構造函數

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
 
    // 自定義容量
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
 
    // 初始化時,批量添加集合中的元素
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

put方法

put方法幾個關註點

  • 釋放鎖的時機
  • 執行入隊操作
  • 喚醒生產者的時機
  • 喚醒消費者的時機

這幾點是整個阻塞操作的核心,可以在下面的分析中仔細觀察。

註:由於阻塞隊列就是基於生產者-消費者模型的,因此,下文中都把調用put方法的線程稱為生產者,調用take方法的線程稱為消費者。

總體分析

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1; // -1代表操作異常
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 如果線程沒有被標記為中斷,則獲取鎖
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            // 這裡是線程在執行put操作時唯一一個執行過程中釋放鎖的地方
            notFull.await(); // 容量已滿,等待被消費後喚醒
        }
        // 添加元素,更新容量
        enqueue(node);
        c = count.getAndIncrement();
        // 隊列容量有餘時,在這裡再次喚醒一個其他的生產者線程(或者說消費者消費速度大於生產)
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 釋放鎖
        putLock.unlock();
    }
    // 喚醒一個消費者
    if (c == 0)
        signalNotEmpty();
}

count屬性並發問題

這裡需要重點關註count,由於有兩把鎖,count可以同時被putLock、takeLock操作,那麼這裡是否會產生並發問題。

分析如下:

A. 隻有putLock或takeLock一把鎖操作:就是單線程操作,沒影響,不產生並發問題。

其他所有put操作都處於await的狀態或者競爭鎖狀態,其他線程也因為獲取不到鎖而無法執行,隻有等該節點添加完成釋放鎖,其他線程才有機會繼續執行。

        while (count.get() == capacity) {
            notFull.await(); // 容量已滿,等待被消費後喚醒
        }

B. putLock和takeLock同時操作:我們假設兩個線程一個獲取到putLock,一個獲取到瞭takeLock(同時最多也隻有兩個線程操作count)。

// put            
while (count.get() == capacity) {
     notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
 
 
 
// take
while (count.get() == 0) {
    notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
    notEmpty.signal();

由於count是原子類那麼count的所有讀寫操作必然是一個串聯的操作,而非並行操作,因此也不存在並發問題,如下圖(順序可能不同):

喚醒消費者

代碼的最後一段,會有喚醒一個消費者的操作。

// 喚醒一個等待中的消費者
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

剛開始看到的時候很疑惑,為什麼是c == 0才喚醒。如果生產者入隊成功,那麼c應該為如下值:

c = count.getAndIncrement();

後面看瞭一下count.getAndIncrement()方法定義才發現自己記混瞭,count.getAndIncrement()是一個原子操作,且返回值的是操作前的值

ok,現在沒問題瞭。

count >= 0,也就是說,隻有在生產者入隊前隊列為空,入隊成功之後才會喚醒一個消費者消費。

take方法

take方法與put方法大致相似,隻是與put做相反操作。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 隊列元素為空,停止消費,讓出鎖並等待被喚醒
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 移除隊首元素,並更新容量
        x = dequeue();
        c = count.getAndDecrement();
        // 生產速度大於消費速度,喚醒一個其他消費者進行消費
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 消費之前隊列已滿,消費後喚醒一個生產者
    if (c == capacity)
        signalNotFull();
    return x;
}
 
// 喚醒生產者
/**
 * Signals a waiting put. Called only from take/poll.
 */
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

總結

總體上看LinkedBlockingQueue類不難,整個生產-消費的流程實現也比較簡單。源碼已經把該介紹的東西都講得很明白瞭,我這屬於依葫蘆畫瓢順著源碼註釋寫出來的。這麼一寫,自己這個類的印象就很深刻瞭。

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: