Java常見的阻塞隊列總結

Java阻塞隊列

阻塞隊列和普通隊列主要區別在阻塞二字:

  • 阻塞添加:隊列已滿時,添加元素線程會阻塞,直到隊列不滿時才喚醒線程執行添加操作
  • 阻塞刪除:隊列元素為空時,刪除元素線程會阻塞,直到隊列不為空再執行刪除操作

常見的阻塞隊列有 LinkedBlockingQueue 和 ArrayBlockingQueue,其中它們都實現 BlockingQueue 接口,該接口定義瞭阻塞隊列需實現的核心方法:

public interface BlockingQueue<E> extends Queue<E> {
	// 添加元素到隊尾,成功返回true,隊列滿拋出異常 IllegalStateException
    boolean add(E e);
	// 添加元素到隊尾,成功返回 true,隊列滿返回 false
    boolean offer(E e);
	// 阻塞添加
    void put(E e) throws InterruptedException;
	// 阻塞添加,包含最大等待時長
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
	// 阻塞移除隊頂元素
    E take() throws InterruptedException;
	// 阻塞移除隊頂元素,包含最大等待時長
    E poll(long timeout, TimeUnit unit) throws InterruptedException;
	// 返回可以添加到隊列不阻塞的最大數量
    int remainingCapacity();
	// 如果存在元素則刪除,成功返回 true,失敗返回 false
    boolean remove(Object o);
	// 是否包含某元素
    public boolean contains(Object o);
    // 批量移除元素並添加入指定集合
    int drainTo(Collection<? super E> c);
	// 批量移除包含最大數量
    int drainTo(Collection<? super E> c, int maxElements);
}

除瞭上面的方法,還有三個繼承自 Queue 接口的方法常常被用到:

// 獲取隊列頭元素,不刪除,沒有拋出異常 NoSuchElementException
E element();
// 獲取隊列頭元素,不刪除,沒有返回 null
E peek();
// 獲取並移除隊列頭元素,沒有返回 nul
E poll();

根據具體作用,方法可以被分為以下三類:

  • 添加元素類:add() 成功返回 true,失敗拋異常、offer() 成功返回 true,失敗返回 false,可以定義最大等待時長、put() 阻塞方法
  • 刪除元素類:remove() 成功返回 true,失敗返回 false、poll() 成功返回被移除元素,為空返回 null、take() 阻塞方法
  • 查詢元素類:element() 成功返回元素,否則拋出異常、peek() 返回對應元素或 null

根據方法類型又可以分為阻塞和非阻塞,其中 put()、take() 是阻塞方法,帶最大等待時長的 offer() 和 poll() 也是阻塞方法,其餘都是非阻塞方法,阻塞隊列基於上述方法實現

ArrayBlockingQueue 基於數組實現,滿足隊列先進先出特性,下面我們通過一段代碼初步認識:

public class ArrayBlockingQueueTest {

    ArrayBlockingQueue<TestProduct> queue = new ArrayBlockingQueue<TestProduct>(1);

    public static void main(String[] args) {
        ArrayBlockingQueueTest test = new ArrayBlockingQueueTest();
        new Thread(test.new Product()).start();
        new Thread(test.new Customer()).start();
    }

    class Product implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    queue.put(new TestProduct());
                    System.out.println("生產者創建產品等待消費者消費");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Customer implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(1000);
                    queue.take();
                    System.out.println("消費者消費產品等待生產者創建");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class TestProduct {
    }

}

上述代碼比較簡單,在一個容量為1的阻塞隊列中,生產者和消費者由於容量限制依次阻塞運行。

ArrayBlockingQueue 基於 ReentrantLock 鎖和 Condition 等待隊列實現,因此存在公平和非公平的兩種模式。公平場景下所有被阻塞的線程按照阻塞順序執行,非公平場景下,隊列中的線程和恰好準備進入隊列的線程競爭,誰搶到就是誰的。默認使用非公平鎖,因為效率更高:

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

從代碼可以看出,ArrayBlockingQueue 通過一個 ReentrantLock 鎖以及兩個 Condition 等待隊列實現,它的屬性如下:

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
	// 保存數據的數組
    final Object[] items;
	// 移除元素的索引
    int takeIndex;
	// 添加元素的索引
    int putIndex;
	// 元素數量
    int count;
	// 用於並發控制的鎖
    final ReentrantLock lock;
	// 不為空,用於take()操作
    private final Condition notEmpty;
	// 不滿,用於put()操作
    private final Condition notFull;
	// 迭代器
    transient Itrs itrs = null;
}

從代碼可以看出,ArrayBlockingQueue 使用同一個鎖、移除元素和添加元素通過數組下標的方式記錄,分表表示隊列頭和隊列尾。通過兩個等待隊列分別阻塞 take() 和 put() 方法,下面我們直接看源碼:

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
	// 檢查是否為空
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    	// 判斷隊列是否已滿
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}
private void enqueue(E x) {
    final Object[] items = this.items;
    // 賦值保存數據
    items[putIndex] = x;
    // 循環復用空間
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 喚醒take線程
    notEmpty.signal();
}

從代碼可以看出:add() 方法基於 offer() 方法實現,offer() 方法添加失敗返回 false 後,add() 方法拋出異常。offer() 方法會加鎖,保證線程安全,隊列沒滿時執行入隊操作,入隊操作通過操作數組實現,並且通過循環復用數組空間。元素添加成功後隊列不為空,調用 signal() 方法喚醒移除元素的阻塞線程,最後我們看 put() 方法:

public void put(E e) throws InterruptedException {
	// 判斷不為空
	checkNotNull(e);
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		// 隊列滿就掛起在等待隊列
	    while (count == items.length)
	        notFull.await();
	    enqueue(e);
	} finally {
	    lock.unlock();
	}
}

從代碼可以看出,當隊列滿時,當前線程會被掛起到等待隊列中,直到隊列不滿時被喚醒執行添加操作。下面我們看刪除操作:

public boolean remove(Object o) {
	// 判斷是否為 NULL
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            // 從移除下標開始遍歷到添加新元素的下標
            do {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                // 循環判斷,移除下標可能大於添加下標(添加下標二次遍歷時)
                if (++i == items.length)
                    i = 0; 
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}
void removeAt(final int removeIndex) {
	final Object[] items = this.items;
	// 要刪除的元素正好是移除下標
	if (removeIndex == takeIndex) {
	    items[takeIndex] = null;
	    // 循環刪除
	    if (++takeIndex == items.length)
	        takeIndex = 0;
	    count--;
	    if (itrs != null)
	        itrs.elementDequeued();
	} else {
	    final int putIndex = this.putIndex;
	    // 如果不是移除下標,從該下標開始到添加下標,所有元素左移一位
	    for (int i = removeIndex;;) {
	        int next = i + 1;
	        if (next == items.length)
	            next = 0;
	        if (next != putIndex) {
	        	// 向左移除
	            items[i] = items[next];
	            i = next;
	        } else {
	        	// 最後put下標置為null
	            items[i] = null;
	            this.putIndex = i;
	            break;
	        }
	    }
	    count--;
	    // 更新迭代器
	    if (itrs != null)
	        itrs.removedAt(removeIndex);
	}
	notFull.signal();
}

remove() 和 poll()、take() 不同,它可以刪除指定的元素。這裡需要考慮刪除的元素不是移除索引指向的情況,從代碼可以看出,當要刪除的元素不是移除索引指向的元素時,將所有從被刪除元素下標開始到添加元素下標所有元素左移一位。

public E poll() {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
	    return (count == 0) ? null : dequeue();
	} finally {
	    lock.unlock();
	}
}
private E dequeue() {
	final Object[] items = this.items;
	E x = (E) items[takeIndex];
	items[takeIndex] = null;
	if (++takeIndex == items.length)
	    takeIndex = 0;
	count--;
	if (itrs != null)
	    itrs.elementDequeued();
	// 移除元素後喚醒put()添加線程
	notFull.signal();
	return x;
}

相比 remove() 方法,poll() 方法簡單瞭很多,這裡不做贅述,下面我們看 take():

public E take() throws InterruptedException {
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		// 隊列為空就掛起
	    while (count == 0)
	        notEmpty.await();
	    return dequeue();
	} finally {
	    lock.unlock();
	}
}

take() 方法和 put() 方法可以說基本一致,相對也比較簡單,最後我們來看看兩個查詢方法:

public E element() {
    E x = peek();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}
public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    	// 直接返回移除元素下標對應的元素,也就是隊列頭
        return itemAt(takeIndex); 
    } finally {
        lock.unlock();
    }
}
final E itemAt(int i) {
    return (E) items[i];
}

element() 基於 peek() 方法實現實現、當隊列為空時,peek() 方法返回 null,element() 拋出異常。關於 ArrayBlockingQueue 就介紹到這裡

LinkedBlockingQueue 基於鏈表實現,它的屬性如下:

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
	// 鏈表節點,存儲元素
    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }
	// 鏈表容量
    private final int capacity;
	// 當前元素數量
    private final AtomicInteger count = new AtomicInteger();
	// 頭節點
    transient Node<E> head;
    // 尾節點
    private transient Node<E> last;
	// 刪除鎖
    private final ReentrantLock takeLock = new ReentrantLock();
	// 不為空等待隊列
    private final Condition notEmpty = takeLock.newCondition();
	// 添加鎖
    private final ReentrantLock putLock = new ReentrantLock();
	// 不滿等待隊列
    private final Condition notFull = putLock.newCondition();
}

從代碼可以看出,元素被封裝為 Node 節點保存在單向鏈表中,其中鏈表默認長度為 Integer.MAX_VALUE,因此在使用時需註意內存溢出:當添加元素速度大於刪除元素速度時,隊列最終會記錄到大量不會用到並且無法回收的對象,導致內存溢出。

ArrayBlockingQueue 和 LinkedBlockingQueue 的主要區別在於 ReentrantLock 鎖的數量和等待隊列,LinkedBlockingQueue 用到兩個鎖和兩個等待隊列,也就是說添加和刪除操作可以並發執行,整體效率更高。下面我們直接看代碼:

public boolean add(E e) {
     if (offer(e))
         return true;
     else
         throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
	// 元素為空拋出異常
	if (e == null) throw new NullPointerException();
	// 獲取當前隊列容量
	final AtomicInteger count = this.count;
	// 隊列已滿時直接返回false
	if (count.get() == capacity)
	    return false;
	int c = -1;
	Node<E> node = new Node<E>(e);
	// 獲取添加鎖
	final ReentrantLock putLock = this.putLock;
	putLock.lock();
	try {
		// 二次判斷,因為上面判斷時未加鎖,數據可能已更新
	    if (count.get() < capacity) {
	    	// 入隊操作
	        enqueue(node);
	        // 獲取還未添加元素前,隊列的容量
	        c = count.getAndIncrement();
	        if (c + 1 < capacity)
	        	// 喚醒其它添加元素的線程
	            notFull.signal();
	    }
	} finally {
	    putLock.unlock();
	}
	// 如果添加前隊列沒有數據,也就是說現在有一條數據時
	if (c == 0)
		// 喚醒take線程 
	  	signalNotEmpty();
	return c >= 0;
}
private void enqueue(Node<E> node) {
     last = last.next = node;
}
private void signalNotEmpty() {
	// 喚醒take線程前必須獲取對應take鎖
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

這裡有以下幾點需要我們註意:

1.LinkedBlockingQueue count 屬性必須通過並發類封裝,因為可能存在添加、刪除兩個線程並發執行,需考慮同步

2.這裡需要判斷兩次的主要原因在於方法開始時並沒有加鎖,數值可能改變,因此在獲取到鎖後需要二次判斷

3.和 ArrayBlockingQueue 不同,LinkedBlockingQueue 在隊列不滿時會喚醒添加線程,這樣做的原因是 LinkedBlockingQueue 中添加和刪除操作使用不同的鎖,各自隻需管好自己,還可以提高吞吐量。而 ArrayBlockingQueue 使用唯一鎖,這樣做會導致移除線程永遠不被喚醒或添加線程永遠不被喚醒,吞吐量較低

4.添加元素前隊列長度為0才喚醒移除線程,因為隊列長度為0時,移除線程肯定已經掛起,此時喚醒一個移除線程即可。因為移除線程和添加線程類似,都會自己喚醒自己。而 c>0 時隻會有兩種情況:存在移除線程在運行,如果有會遞歸喚醒,無須我們參與、不存在移除線程運行,此時也無須我們參與,等待調用 take()、poll() 方法即可

5.喚醒隻針對 put()、take() 方法阻塞的線程,offer() 方法直接返回(不包含最大等待時長),不參與喚醒場景

下面我們來看 put() 阻塞方法的實現:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
    	// 隊列滿時阻塞
        while (count.get() == capacity) {
            notFull.await();
        }
        // 入隊
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

從代碼可以看出,put() 方法和 offer() 方法唯一區別在於自身通過 condition 阻塞掛起到等待隊列,其餘基本相同。至此關於添加操作介紹完畢,下面我們看移除方法:

public boolean remove(Object o) {
	if (o == null) return false;
	// 同時加兩個鎖
	fullyLock();
	try {
		// 循環查找
	    for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) {
	        if (o.equals(p.item)) {
	            unlink(p, trail);
	            return true;
	        }
	    }
	    return false;
	} finally {
	    fullyUnlock();
	}
}
void unlink(Node<E> p, Node<E> trail) {
	// p是要溢出的節點,trail是它的前驅節點
	// 方便gc
    p.item = null;
    // 引用取消
    trail.next = p.next;
    if (last == p)
        last = trail;
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}
void fullyLock() {
    putLock.lock();
    takeLock.lock();
}
void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}

從代碼可以看出,remove() 方法隻會在操作前容量不滿時喚醒創建線程,並不會喚醒移除線程。並且由於我們不確定要刪除元素的位置,因此此時需要加兩個鎖,確保數據安全。

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            // 獲取移除前隊列的元素數量
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    // 移除前如果隊列是滿的,喚醒添加線程
    if (c == capacity)
        signalNotFull();
    return x;
}
private E dequeue() {
	Node<E> h = head;
	// 獲取要刪除的節點
	Node<E> first = h.next; 
	// 清除原來的頭結點(方便gc)
	h.next = h; 
	// 設置新的頭結點
	head = first;
	// 獲取返回值
	E x = first.item;
	// 新頭結點置為空
	first.item = null;
	return x;
}

需要註意的一點,每次出隊時更換 head 節點,head 節點本身不保存數據,head.next 記錄下次需要出隊的元素,每次出隊後 head.next 變為新的 head 節點返回並置為 null

poll() 方法和上面提到的 offer() 方法基本鏡像相同,這裡我再不做過多贅述

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
    	// 隊列為空就掛起
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

take() 方法和 poll() 方法類似,區別在於新增瞭阻塞邏輯。至此關於溢出元素方法介紹完畢,最後我們看看查詢方法源碼:

public LinkedBlockingQueue(int capacity) {
   if (capacity <= 0) throw new IllegalArgumentException();
   this.capacity = capacity;
   last = head = new Node<E>(null);
}
public E element() {
    E x = peek();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}
public E peek() {
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}

從代碼可以看出,默認 head 和 last 頭尾節點都為 null,入隊時直接從 next 開始操作,也就是說 head 節點不保存數據。

最後我們來看看有最大等待時長的 offer() 方法:

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
	if (e == null) throw new NullPointerException();
	// 將時間轉換成納秒
	long nanos = unit.toNanos(timeout);
	int c = -1;
	// 獲取鎖
	final ReentrantLock putLock = this.putLock;
	// 獲取當前隊列大小
	final AtomicInteger count = this.count;
	// 可中斷鎖
	putLock.lockInterruptibly();
	try {
	    while (count.get() == capacity) {
	    	// 小於0說明已到達最大等待時長
	        if (nanos <= 0)
	            return false;
	        // 如果隊列已滿,根據等待隊列阻塞等待
	        nanos = notFull.awaitNanos(nanos);
	    }
	    // 隊列沒滿直接入隊
	    enqueue(new Node<E>(e));
	    c = count.getAndIncrement();
	    if (c + 1 < capacity)
	        notFull.signal();
	} finally { 
	    putLock.unlock();
	}
	if (c == 0)
	    signalNotEmpty();
	return true;
}
 public final long awaitNanos(long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted()) throw new InterruptedException();
	// 將當前線程封裝為 AQS Node 類加入等待隊列
    Node node = addConditionWaiter();
    // 釋放鎖
    int savedState = fullyRelease(node);
    //計算過期時間
    final long deadline = System.nanoTime() + nanosTimeout;
    int interruptMode = 0;
    // 當前線程沒有喚醒進入同步隊列時
    while (!isOnSyncQueue(node)) {
    	// 已經等待相應時間,刪除當前節點,將狀態設置為已關閉從隊列刪除
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        // 判斷是否超時
        if (nanosTimeout >= spinForTimeoutThreshold)
        	// 掛起線程
            LockSupport.parkNanos(this, nanosTimeout);
        // 判斷線程狀態是否被中斷
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        // 重新計算剩餘等待時間
        nanosTimeout = deadline - System.nanoTime();
    }
    // 被喚醒後執行自旋操作爭取獲得鎖,同時判斷線程是否被中斷
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
    	// 清理等待隊列中不為Condition狀態的線程
        unlinkCancelledWaiters();
    // 判斷是否被中斷
    if (interruptMode != 0)
    	// 拋出異常或中斷線程,獨占模式拋出異常,共享模式中斷線程
        reportInterruptAfterWait(interruptMode);
    // 返回時差,如果成功當前時間小於最大等待時長,返回值大於0,否則返回值小於0
    return deadline - System.nanoTime();
}

從代碼可以看出,包含最大等待時長的 offer()、poll() 方法通過循環判斷時間是否超時的方式掛起在等待隊列,達到最大等待時長還未被喚醒或沒被執行就返回

ArrayBlockingQueue 和 LinkedBlockingQueue 對比:

  • 大小不同,一個有界,一個無界。ArrayBlockingQueue 必須指定初始大小,LinkedBlockingQueue 無界時可能內存溢出
  • 一個采用數組,一個采用鏈表,數組保存無須創建新對象,鏈表需創建 Node 對象
  • 鎖機制不同,ArrayBlockingQueue 添加刪除操作使用同一個鎖,兩者操作不能並發執行。LinkedBlockingQueue 添加和刪除使用不同鎖,添加和刪除操作可並發執行,整體效率 LinkedBlockingQueue 更高

到此這篇關於Java常見的阻塞隊列總結的文章就介紹到這瞭,更多相關Java阻塞隊列內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: