淺談Java中的Queue傢族

Queue接口

先看下Queue的繼承關系和其中定義的方法:

Queue繼承自Collection,Collection繼承自Iterable。

Queue有三類主要的方法,我們用個表格來看一下他們的區別:

方法類型 方法名稱 方法名稱 區別
Insert add offer 兩個方法都表示向Queue中添加某個元素,不同之處在於添加失敗的情況,add隻會返回true,如果添加失敗,會拋出異常。offer在添加失敗的時候會返回false。所以對那些有固定長度的Queue,優先使用offer方法。
Remove remove poll 如果Queue是空的情況下,remove會拋出異常,而poll會返回null。
Examine element peek 獲取Queue頭部的元素,但不從Queue中刪除。兩者的區別還是在於Queue為空的情況下,element會拋出異常,而peek返回null。

註意,因為對poll和peek來說null是有特殊含義的,所以一般來說Queue中禁止插入null,但是在實現中還是有一些類允許插入null比如LinkedList。

盡管如此,我們在使用中還是要避免插入null元素。

Queue的分類

一般來說Queue可以分為BlockingQueue,Deque和TransferQueue三種。

BlockingQueue

BlockingQueue是Queue的一種實現,它提供瞭兩種額外的功能:

當當前Queue是空的時候,從BlockingQueue中獲取元素的操作會被阻塞。當當前Queue達到最大容量的時候,插入BlockingQueue的操作會被阻塞。

BlockingQueue的操作可以分為下面四類:

操作類型Throws exceptionSpecial valueBlocksTimes outInsertadd(e)offer(e)put(e)offer(e, time, unit)Removeremove()poll()take()poll(time, unit)Examineelement()peek()not applicablenot applicable

第一類是會拋出異常的操作,當遇到插入失敗,隊列為空的時候拋出異常。

第二類是不會拋出異常的操作。

第三類是會Block的操作。當Queue為空或者達到最大容量的時候。

第四類是time out的操作,在給定的時間裡會Block,超時會直接返回。

BlockingQueue是線程安全的Queue,可以在生產者消費者模式的多線程中使用,如下所示:

class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }

 class Setup {
   void main() {
     BlockingQueue q = new SomeQueueImplementation();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
 }

最後,在一個線程中向BlockQueue中插入元素之前的操作happens-before另外一個線程中從BlockQueue中刪除或者獲取的操作。

Deque

Deque是Queue的子類,它代表double ended queue,也就是說可以從Queue的頭部或者尾部插入和刪除元素。

同樣的,我們也可以將Deque的方法用下面的表格來表示,Deque的方法可以分為對頭部的操作和對尾部的操作:

方法類型 Throws exception Special value Throws exception Special value
Insert addFirst(e) offerFirst(e) addLast(e) offerLast(e)
Remove removeFirst() pollFirst() removeLast() pollLast()
Examine getFirst() peekFirst() getLast() peekLast()

和Queue的方法描述基本一致,這裡就不多講瞭。

當Deque以 FIFO (First-In-First-Out)的方法處理元素的時候,Deque就相當於一個Queue。

當Deque以LIFO (Last-In-First-Out)的方式處理元素的時候,Deque就相當於一個Stack。

TransferQueue

TransferQueue繼承自BlockingQueue,為什麼叫Transfer呢?因為TransferQueue提供瞭一個transfer的方法,生產者可以調用這個transfer方法,從而等待消費者調用take或者poll方法從Queue中拿取數據。

還提供瞭非阻塞和timeout版本的tryTransfer方法以供使用。

我們舉個TransferQueue實現的生產者消費者的問題。

先定義一個生產者:

@Slf4j
@Data
@AllArgsConstructor
class Producer implements Runnable {
    private TransferQueue<String> transferQueue;

    private String name;

    private Integer messageCount;

    public static final AtomicInteger messageProduced = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                boolean added = transferQueue.tryTransfer( "第"+i+"個", 2000, TimeUnit.MILLISECONDS);
                log.info("transfered {} 是否成功: {}","第"+i+"個",added);
                if(added){
                    messageProduced.incrementAndGet();
                }
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
        log.info("total transfered {}",messageProduced.get());
    }
}

在生產者的run方法中,我們調用瞭tryTransfer方法,等待2秒鐘,如果沒成功則直接返回。

再定義一個消費者:

@Slf4j
@Data
@AllArgsConstructor
public class Consumer implements Runnable {

    private TransferQueue<String> transferQueue;

    private String name;

    private int messageCount;

    public static final AtomicInteger messageConsumed = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                String element = transferQueue.take();
                log.info("take {}",element );
                messageConsumed.incrementAndGet();
                Thread.sleep(500);
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
        log.info("total consumed {}",messageConsumed.get());
    }

}

在run方法中,調用瞭transferQueue.take方法來取消息。

下面先看一下一個生產者,零個消費者的情況:

@Test
public void testOneProduceZeroConsumer() throws InterruptedException {

    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(10);
    Producer producer = new Producer(transferQueue, "ProducerOne", 5);

    exService.execute(producer);

    exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
    exService.shutdown();
}

輸出結果:

[pool-1-thread-1] INFO com.flydean.Producer – transfered 第0個 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer – transfered 第1個 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer – transfered 第2個 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer – transfered 第3個 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer – transfered 第4個 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer – total transfered 0

可以看到,因為沒有消費者,所以消息並沒有發送成功。

再看下一個有消費者的情況:

@Test
public void testOneProduceOneConsumer() throws InterruptedException {

    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(10);
    Producer producer = new Producer(transferQueue, "ProducerOne", 2);
    Consumer consumer = new Consumer(transferQueue, "ConsumerOne", 2);

    exService.execute(producer);
    exService.execute(consumer);

    exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
    exService.shutdown();
}

輸出結果:

[pool-1-thread-2] INFO com.flydean.Consumer – take 第0個

[pool-1-thread-1] INFO com.flydean.Producer – transfered 第0個 是否成功: true

[pool-1-thread-2] INFO com.flydean.Consumer – take 第1個

[pool-1-thread-1] INFO com.flydean.Producer – transfered 第1個 是否成功: true

[pool-1-thread-1] INFO com.flydean.Producer – total transfered 2

[pool-1-thread-2] INFO com.flydean.Consumer – total consumed 2

可以看到Producer和Consumer是一個一個來生產和消費的。

以上就是淺談Java中的Queue傢族的詳細內容,更多關於Java中的Queue傢族的資料請關註WalkonNet其它相關文章!

推薦閱讀: