詳解Java中的延時隊列 DelayQueue

當用戶超時未支付時,給用戶發提醒消息。另一種場景是,超時未付款,訂單自動取消。通常,訂單創建的時候可以向延遲隊列種插入一條消息,到時間自動執行。其實,也可以用臨時表,把這些未支付的訂單放到一個臨時表中,或者Redis,然後定時任務去掃描。這裡我們用延時隊列來做。RocketMQ有延時隊列,RibbitMQ也可以實現,Java自帶的也有延時隊列,接下來就回顧一下各種隊列。

Queue

隊列是一種集合。除瞭基本的集合操作以外,隊列還提供瞭額外的插入、提取和檢查操作。隊列的每個方法都以兩種形式存在:一種是當操作失敗時拋異常,另一種是返回一個特定的值(null或者false,取決於具體操作)。後一種形式的插入操作是專門設計用於有界隊列實現的,在大多情況下,插入操作不會失敗。

隊列通常(但不一定)以FIFO(先進先出)的方式對元素進行排序。例外情況包括優先級隊列(根據提供的比較器對元素進行排序或元素的自然排序)和LIFO隊列(或堆棧),對LIFO進行排序(後進先出)。無論使用哪種順序,隊列的開頭都是該元素,可以通過調用remove()或poll()將其刪除。在FIFO隊列中,所有新元素都插入隊列的尾部。其他種類的隊列可能使用不同的放置規則。 每個Queue實現必須指定其排序屬性。無論使用哪種順序,都可以通過調用remove()或poll()來刪除隊列開頭的元素。在FIFO隊列中,所有新元素都插入到隊列的尾部。其他類型的隊列可能使用不同的放置規則。每個隊列實現都必須指定其排序屬性。

offer方法在可以的情況下會向隊列種插入一個元素,否則返回false。這不同於Collection.add方法,後者隻能通過拋異常來添加元素。offer方法設計用於在正常情況下(而不是在例外情況下)發生故障時,例如在固定容量(或者“有界”)隊列種使用。

remove()和poll()方法刪除並返回隊頭元素。當隊列為空時,remove()拋出異常,而poll()返回null。

element()和peek()方法返回隊頭元素。

PriorityQueue

PriorityQueue是一個無界優先級隊列是基於優先級堆的。優先級隊列種的元素根據自然順序進行排序,或者通過在隊列構建時提供的Comparator進行排序,當然這取決於使用哪種構造函數。優先級隊列不允許空(null)元素。一個依賴自然順序的優先級隊列也不允許插入不可比較的對象。

優先級隊列的隊頭元素是最小的元素,如果有多個元素並列最小,那麼隊頭是它們其中之一。

優先級隊列是無界的,但是有一個內部容量來控制用於在隊列上存儲元素的數組的大小。它總是至少與隊列大小一樣大。將元素添加到優先級隊列時,其容量會自動增長

BlockingQueue

這種隊列還支持以下操作:在檢索元素時等待隊列變為非空,並在存儲元素時等待隊列中的空間變為可用。

BlockingQueue方法有四種形式,它們以不同的方式處理操作,這些操作無法立即滿足,但將來可能會滿足:一種拋出異常,第二種返回特殊值(null或false,取決於具體操作),第三種阻塞當前線程,直到操作成功為止;第四種阻塞當前線程,超時則放棄。 下表總結瞭這些方法:

阻塞隊列不接受空元素,如果你試圖add , put 或者 offer 一個null,將會拋NullPointerException。

阻塞隊列是線程安全的。所有排隊方法都使用內部鎖或者其他形式的並發控制來保證以原子方式實現它們的效果。

阻塞隊列被設計主要用於生產者-消費者隊列。

下面是一個典型的生產者-消費者方案:

package com.example;

import java.text.MessageFormat;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * @author ChengJianSheng
 * @date 2020/12/15
 */
public class Setup {
  public static void main(String[] args) {
    BlockingQueue<Bread> queue = new ArrayBlockingQueue<>(5);

    Producer p1 = new Producer(queue);
    Producer p2 = new Producer(queue);
    Consumer c1 = new Consumer(queue);
    Consumer c2 = new Consumer(queue);

    new Thread(p1, "p1").start();
    new Thread(p2, "p2").start();
    new Thread(c1, "c1").start();
    new Thread(c2, "c2").start();
  }
}

class Bread {

}

/**
 * 生產者
 */
class Producer implements Runnable {

  private final BlockingQueue<Bread> queue;

  public Producer(BlockingQueue<Bread> queue) {
    this.queue = queue;
  }

  @Override
  public void run() {
    try {
      while (true) {
        queue.put(produce());
      }
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  public Bread produce() {
    try {
      Thread.sleep(Math.round(2000));
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return new Bread();
  }
}

/**
 * 消費者
 */
class Consumer implements Runnable {

  private final BlockingQueue<Bread> queue;

  public Consumer(BlockingQueue<Bread> queue) {
    this.queue = queue;
  }

  @Override
  public void run() {
    try {
      while (true) {
        consume(queue.take());
      }
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  public void consume(Bread bread) {
    try {
      Thread.sleep(Math.round(2000));
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

ArrayBlockingQueue

ArrayBlockingQueue是用數組實現的有界阻塞隊列。這種隊列中的元素按FIFO(先進先出)排序。隊頭是在隊列中停留最長時間的元素。隊尾是在隊列中停留時間最短的元素。新元素插入到隊列的尾部,並且隊列檢索操作在隊列的頭部獲取元素。

這是一個經典的“有界緩沖區”,其中固定大小的數組包含由生產者插入並由消費者提取的元素。 創建後,容量將無法更改。 試圖將一個元素放入一個已滿的隊列將導致操作阻塞; 試圖從空隊列中取出一個元素也會阻塞。

這個類支持一個可選的公平性策略,用於對等待的生產者和消費者線程進行排序。默認情況下,不保證這個順序。然而,將公平性設置為true的隊列將按FIFO順序授予線程訪問權。公平性通常會降低吞吐量,但會降低可變性並避免饑餓。

LinkedBlockingQueue

LinkedBlockingQueue是一個基於鏈表實現的可選邊界的阻塞隊列。

PriorityBlockingQueue

PriorityBlockingQueue是一個無界阻塞隊列,它使用與PriorityQueue相同的排序規則,並提供阻塞檢索操作。

DelayQueue

DelayQueue是一種由延遲元素組成的無界阻塞隊列,在該隊列中,僅當元素的延遲到期時才可以使用該元素。隊頭是已經過期的延遲元素,它已過期時間最長。如果沒有過期的延遲,則隊列沒有頭部,此時調用poll將返回null。當調用元素的getDelay(TimeUnit.NANOSECONDS)方法返回值小於或等於0時,就會發生過期。即使元素沒有過期,也不能用take或者poll將其刪除。

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer提供瞭一個框架來實現依賴於先進先出(FIFO)等待隊列的阻塞鎖和相關的同步器(信號燈,事件等)。該類旨在為大多數依賴單個原子int值表示狀態的同步器提供有用的基礎。子類必須定義更改此狀態的受保護方法,並定義該狀態對於獲取或釋放此對象而言意味著什麼。 鑒於這些,此類中的其他方法將執行所有排隊和阻塞機制。 子類可以維護其他狀態字段,但是僅跟蹤關於同步的使用方法getState(),setState(int)和compareAndSetState(int,int)操作的原子更新的int值。

小結

1、Queue是一個集合,隊列的每個方法都有兩種形式,一種是拋異常,另一種是返回一個特定的值。

2、PriorityQueue是一個無界優先級隊列,默認情況下,隊列種的元素按自然順序排序,或者根據提供的Comparator進行排序。也就是說,優先級隊列種的元素都是經過排序的,排序規則可以自己指定,同時隊列種的元素都必須是可排序的。

3、BlockingQueue是一個阻塞隊列,向已滿的隊列種插入元素時會阻塞,向空隊列中取元素時也會阻塞;阻塞隊列被設計主要用於生產者-消費者隊列。

4、ArrayBlockingQueue是用數組實現的有界阻塞隊列,隊列種的元素按FIFO(先進先出)排序。

5、LinkedBlockingQueue是用鏈表實現的可選邊界的阻塞隊列。

6、PriorityBlockingQueue相當於是阻塞隊列和優先級隊列的合體,排序規則與優先級隊列相同。

7、DelayQueue延時隊列中的元素都有一個有效期,隻有當過瞭有效期才可以使用該元素。

以上就是詳解Java中的延時隊列 DelayQueue的詳細內容,更多關於Java 延時隊列 DelayQueue的資料請關註WalkonNet其它相關文章!

推薦閱讀: