java並發中DelayQueue延遲隊列原理剖析

介紹

DelayQueue隊列是一個延遲隊列,DelayQueue中存放的元素必須實現Delayed接口的元素,實現接口後相當於是每個元素都有個過期時間,當隊列進行take獲取元素時,先要判斷元素有沒有過期,隻有過期的元素才能出隊操作,沒有過期的隊列需要等待剩餘過期時間才能進行出隊操作。

源碼分析

DelayQueue隊列內部使用瞭PriorityQueue優先隊列來進行存放數據,它采用的是二叉堆進行的優先隊列,使用ReentrantLock鎖來控制線程同步,由於內部元素是采用的PriorityQueue來進行存放數據,所以Delayed接口實現瞭Comparable接口,用於比較來控制優先級,如下代碼所示:

 public interface Delayed extends Comparable<Delayed> {
 
     /**
      * Returns the remaining delay associated with this object, in the
      * given time unit.
      *
      * @param unit the time unit
      * @return the remaining delay; zero or negative values indicate
      * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

DelayQueue的成員變量如下所示:

 // 鎖。
 private final transient ReentrantLock lock = new ReentrantLock();
 // 優先隊列。
 private final PriorityQueue<E> q = new PriorityQueue<E>();
 
 /**
  * Leader-Follower的變種。
  * Thread designated to wait for the element at the head of
  * the queue.  This variant of the Leader-Follower pattern
 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
 * minimize unnecessary timed waiting.  When a thread becomes
 * the leader, it waits only for the next delay to elapse, but
 * other threads await indefinitely.  The leader thread must
 * signal some other thread before returning from take() or
 * poll(...), unless some other thread becomes leader in the
 * interim.  Whenever the head of the queue is replaced with
 * an element with an earlier expiration time, the leader
 * field is invalidated by being reset to null, and some
 * waiting thread, but not necessarily the current leader, is
 * signalled.  So waiting threads must be prepared to acquire
 * and lose leadership while waiting.
 */
private Thread leader = null;

/**
 * Condition signalled when a newer element becomes available
 * at the head of the queue or a new thread may need to
 * become leader.
 */
// 條件,代表如果有數據則通知Follower線程,喚醒線程處理隊列內容。
private final Condition available = lock.newCondition();

Leader-Follower模式的變種,用於最小化不必要的定時等待,當一個線程被選擇為Leader時,它會等待延遲過去執行代碼邏輯,而其他線程則需要無限期等待,在從take或poll返回之前,每當隊列的頭部被替換為具有更早到期時間的元素時,leader字段將通過重置為空而無效,Leader線程必須向其中一個Follower線程發出信號,被喚醒的 follwer 線程被設置為新的Leader 線程。

offer操作

 public boolean offer(E e) {
     // 獲取到鎖
     final ReentrantLock lock = this.lock;
     lock.lock();
     try {
         // 將元素存儲到PriorityQueue優先隊列中
         q.offer(e);
         // 如果第一個元素是當前元素,說明之前隊列中為空,則先將Leader設置為空,通知等待線程可以爭搶Leader瞭。
         if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        // 返回成功
        return true;
    } finally {
        lock.unlock();
    }
}

offer操作前先進行獲取鎖的操作,也就是同一時間內隻能有一個線程可以入隊操作。

  • 獲取到ReentrantLock鎖對象。
  • 將元素添加到PriorityQueue優先隊列中
  • 如果隊列中最早過期的元素是自己,則說明隊列原先是空的,所以將Leader進行重置,通知Follower線程可以成為Leader線程。
  • 最後進行解鎖操作。

put操作

put操作其實就是調用的offer操作來進行添加數據的,以下是源碼信息:

public void put(E e) {
    offer(e);
}

take操作

 public E take() throws InterruptedException {
     final ReentrantLock lock = this.lock;
     // 獲取可中斷的鎖。
     lock.lockInterruptibly();
     try {
         // 循環獲取數據。
         for (;;) {
             // 獲取最早過期的元素,但是不彈出對象。
             E first = q.peek();
            // 如果最早過期的元素為空,說明隊列為空,則線程直接進入無限期等待,並且讓出鎖。
            if (first == null)
                // 當前線程無限期等待,直到被喚醒,並且讓出鎖對象。
                available.await();
            else {
                // 獲取最早過期的元素剩餘過期時間。
                long delay = first.getDelay(NANOSECONDS);
                // 如果剩餘過期時間小於0,則說明已經過期,反之還沒有過期。
                if (delay <= )
                    // 如果已經過期直接獲取最早過期的元素,並返回。
                    return q.poll();
                // 如果剩餘過期日期大於0,則會進入到這裡。
                // 將剛才獲取的最早過期的元素設置為空。
                first = null; // don't retain ref while waiting
                // 如果有線程爭搶的Leader線程,則進行無限期等待。
                if (leader != null)
                    // 無限期等待並讓出鎖。
                    available.await();
                else {
                    // 獲取當前線程。
                    Thread thisThread = Thread.currentThread();
                    // 設置當前線程變為Leader線程。
                    leader = thisThread;
                    try {
                        // 等待剩餘等待時間。
                        available.awaitNanos(delay);
                    } finally {
                        // 將Leader設置為null。
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果隊列不為空,並且沒有Leader則通知等待線程可以成為Leader。
        if (leader == null && q.peek() != null)
            // 通知等待線程。
            available.signal();
        lock.unlock();
    }
}
  1. 當獲取元素時,先獲取到鎖對象。
  2. 獲取最早過期的元素,但是並不從隊列中彈出元素。
  3. 最早過期元素是否為空,如果為空則直接讓當前線程無限期等待狀態,並且讓出當前鎖對象。
  4. 如果最早過期的元素不為空
    1. 獲取最早過期元素的剩餘過期時間,如果已經過期則直接返回當前元素
    2. 如果沒有過期,也就是說剩餘時間還存在,則先獲取Leader對象,如果Leader已經有線程在處理,則當前線程進行無限期等待,如果Leader為空,則首先將Leader設置為當前線程,並且讓當前線程等待剩餘時間。
    3. 最後將Leader線程設置為空
  5. 如果Leader已經為空,並且隊列有內容則喚醒一個等待的隊列。

poll操作

獲取最早過期的元素,如果隊列頭沒有過期的元素則直接返回null,反之返回過期的元素。

 public E poll() {
     final ReentrantLock lock = this.lock;
     lock.lock();
     try {
         E first = q.peek();
         // 如果隊列為空或者隊列最早過期的元素沒有過期,則返回null。
         if (first == null || first.getDelay(NANOSECONDS) > 0)
             return null;
         else
            // 出隊列操作。
            return q.poll();
    } finally {
        lock.unlock();
    }
}

小結

  • DelayQueue是一個無界的並發延遲阻塞隊列,隊列中的元素必須實現Delayed接口,相應瞭需要實現Comparable接口實現比較的方法
  • Leader-Follower模式的變種,用於最小化不必要的定時等待,當一個線程被選擇為Leader時,它會等待延遲過去執行代碼邏輯,而其他線程則需要無限期等待,在從take或poll返回之前,每當隊列的頭部被替換為具有更早到期時間的元素時,leader字段將通過重置為空而無效,Leader線程必須向其中一個Follower線程發出信號,被喚醒的 follwer 線程被設置為新的Leader 線程。

到此這篇關於java並發中DelayQueue延遲隊列原理剖析的文章就介紹到這瞭,更多相關java DelayQueue延遲隊列內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!