詳解Java七大阻塞隊列之SynchronousQueue

其實SynchronousQueue 是一個特別有意思的阻塞隊列,就我個人理解來說,它很重要的特點就是沒有容量。

直接看一個例子:

package dongguabai.test.juc.test;

import java.util.concurrent.SynchronousQueue;

/**
 * @author Dongguabai
 * @description
 * @date 2021-09-01 21:52
 */
public class TestSynchronousQueue {

    public static void main(String[] args) {
        SynchronousQueue synchronousQueue = new SynchronousQueue();
        boolean add = synchronousQueue.add("1");
        System.out.println(add);
    }
}

代碼很簡單,就是往 SynchronousQueue 裡放瞭一個元素,程序卻拋異常瞭:

Exception in thread "main" java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(AbstractQueue.java:98)
	at dongguabai.test.juc.test.TestSynchronousQueue.main(TestSynchronousQueue.java:14)

而異常原因是隊列滿瞭。剛剛使用的是 SynchronousQueue#add 方法,現在來看看 SynchronousQueue#put 方法:

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue synchronousQueue = new SynchronousQueue();
        synchronousQueue.put("1");
        System.out.println("----");
    }

看到 InterruptedException 其實就能猜出這個方法肯定會阻塞當前線程。

通過這兩個例子,也就解釋瞭 SynchronousQueue 隊列是沒有容量的,也就是說在往 SynchronousQueue 中添加元素之前,得先向 SynchronousQueue 中取出元素,這句話聽著很別扭,那可以換個角度猜想其實現原理,調用取出方法的時候設置瞭一個“已經有線程在等待取出”的標識,線程等待,然後添加元素的時候,先看這個標識,如果有線程在等待取出,則添加成功,反之則拋出異常或者阻塞。

分析

接下來從 SynchronousQueue#put 方法開始進行分析:

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

可以發現是調用的 Transferer#transfer 方法,這個 Transferer 是在構造 SynchronousQueue 的時候初始化的:

    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

SynchronousQueue 有兩種模式,公平與非公平,默認是非公平,非公平使用的就是 TransferStack,是基於單向鏈表做的:

 static final class SNode {
            volatile SNode next;        // next node in stack
            volatile SNode match;       // the node matched to this
            volatile Thread waiter;     // to control park/unpark
            Object item;                // data; or null for REQUESTs
            int mode;
   ...
 }

那麼重點就是 SynchronousQueue.TransferStack#transfer 方法瞭,從方法名都可以看出這是用來做數據交換的,但是這個方法有好幾十行,裡面各種 Node 指針搞來搞去,這個地方我覺得沒必要過於糾結細節,老規矩,抓大放小,而且隊列這種,很方便進行 Debug 調試。

再理一下思路:

  • 今天研究的是阻塞隊列,關註阻塞的話,更應該關系的是 takeput 方法;
  • Transferer 是一個抽象類,隻有一個 transfer 方法,即 takeput 共用,那就肯定是基於入參進行功能的區分;
  • takeput 方法底層都調用的 SynchronousQueue.TransferStack#transfer 方法;

將上面 SynchronousQueue#put 使用的例子修改一下,再加一個線程take

package dongguabai.test.juc.test;

import java.util.Date;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * @author Dongguabai
 * @description
 * @date 2021-09-01 21:52
 */
public class TestSynchronousQueue {

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue synchronousQueue = new SynchronousQueue();
        new Thread(()->{
            System.out.println(new Date().toLocaleString()+"::"+Thread.currentThread().getName()+"-put瞭數據:"+"1");

            try {
                synchronousQueue.put("1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        System.out.println("----");
        new Thread(()->{
            Object take = null;
            try {
                take = synchronousQueue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(new Date().toLocaleString()+"::"+Thread.currentThread().getName()+"-take到瞭數據:"+take);
        }).start();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("結束...");
    }
}

整個程序結束,並且輸出:

—-
2021-9-2 0:58:55::Thread-0-put瞭數據:1
2021-9-2 0:58:55::Thread-1-take到瞭數據:1
結束…

也就是說當一個線程在 put 的時候,如果有線程 take ,那麼 put 線程可以正常運行,不會被阻塞。

基於這個例子,再結合上文的猜想,也就是說核心點就是找到 put 的時候現在已經有線程在 take 的標識,或者 take 的時候已經有線程在 put,這個標識不一定是變量,結合 AQS 的原理來看,很可能是根據鏈表中的 Node 進行判斷。

接下來看 SynchronousQueue.put 方法:

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

它底層也是調用的 SynchronousQueue.TransferStack#transfer 方法,但是傳入參數是當前 put 的元素、false 和 0。再回過頭看 SynchronousQueue.TransferStack#transfer 方法:

E transfer(E e, boolean timed, long nanos) {
            SNode s = null; // constructed/reused as needed
  					//這裡的參數e就是要put的元素,顯然不為null,也就是說是DATA模式,根據註釋,DATA模式就說明當前線程是producer
            int mode = (e == null) ? REQUEST : DATA;  

            for (;;) {
                SNode h = head;
                if (h == null || h.mode == mode) {  // empty or same-mode
                    if (timed && nanos <= 0) {      // can't wait
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null;
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
                        //因為第一次put那麼h肯定為null,這裡入參timed為false,所以會到這裡,執行awaitFulfill方法,根據名稱可以猜想出是一個阻塞方法
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                   ....
        }

這裡首先會構造一個 SNode,然後執行 casHead 函數,其實最終棧結構就是:

head->put_e

就是 head 會指向 put 的元素對應的 SNode

然後會執行 awaitFulfill 方法:

SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            int spins = (shouldSpin(s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel();
                SNode m = s.match;
                if (m != null)
                    return m;
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel();
                        continue;
                    }
                }
                if (spins > 0)
                    spins = shouldSpin(s) ? (spins-1) : 0;    //自旋機制
                else if (s.waiter == null)
                    s.waiter = w; // establish waiter so can park next iter
                else if (!timed)
                    LockSupport.park(this); //阻塞
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }

最終還是會使用 LockSupport 進行阻塞,等待喚醒。

已經大致過瞭一遍流程瞭,細節方面就不再糾結瞭,那麼假如再put 一個元素呢,其實結合源碼已經可以分析出此時棧的結果為:

head–>put_e_1–>put_e

避免分析出錯,寫個 Debug 的代碼驗證一下:

package dongguabai.test.juc.test;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * @author Dongguabai
 * @description
 * @date 2021-09-02 02:15
 */
public class DebugPut2E {

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue synchronousQueue = new SynchronousQueue();
        new Thread(()-> {
            try {
                synchronousQueue.put("1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(()-> {
            try {
                synchronousQueue.put("2");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

SynchronousQueue.TransferStack#awaitFulfill 方法的 LockSupport.park(this); 處打上斷點,運行上面的代碼,再看看現在的 head

在這裡插入圖片描述

的確與分析的一致。

也就是先進後出。再看 take 方法:

    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

調用的 SynchronousQueue.TransferStack#transfer 方法,但是傳入參數是 nullfalse 和 0。

偷個懶就不分析源碼瞭,直接 Debug 走一遍,代碼如下:

package dongguabai.test.juc.test;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * @author Dongguabai
 * @description
 * @date 2021-09-02 02:24
 */
public class DebugTake {

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue synchronousQueue = new SynchronousQueue();
        new Thread(()-> {
            try {
                synchronousQueue.put("1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread-put-1").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(()-> {
            try {
                synchronousQueue.put("2");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread-put-2").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(()->{
            try {
                Object take = synchronousQueue.take();
                System.out.println("======take:"+take);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread-Take").start();
    }
}

SynchronousQueue#take 方法中打上斷點,運行上面的代碼:

在這裡插入圖片描述

這裡的 s 就是 headm 就是棧頂的元素,也是最近一次 put 的元素。說白瞭 take 就是取的棧頂的元素,最後再匹配一下,符合條件就直接取出來。take 之後 head 為:

在這裡插入圖片描述

棧的結構為:

head–>put_e

最後再把整個流程梳理一遍:

執行 put 操作的時候,每次壓入棧頂;take 的時候就取棧頂的元素,即先進後出;這也就實現瞭非公平;

至於公平模式,結合 TransferStack 的實現,可以猜測實現就是 put 的時候放入隊列,take 的時候從隊列頭部開始取,先進先出。

那麼這個隊列設計的優勢使用場景在哪裡呢?個人感覺它的優勢就是完全不會產生對隊列中數據的爭搶,因為說白瞭隊列是空的,從某種程度上來說消費速率是很快的。

至於使用場景,我這邊的確沒有想到比較好的使用場景。結合組內同學的使用來看,他選擇使用這個隊列的原因是因為它不會在內存中生成任務隊列,當服務宕機後不用擔心內存中任務的丟失(非優雅停機的情況)。經過討論後發現即使使用瞭 SynchronousQueue 也無法有效的避免任務丟失,但這的確是一個思路,沒準以後在其他場景中用得上。

到此這篇關於詳解Java七大阻塞隊列之SynchronousQueue的文章就介紹到這瞭,更多相關Java阻塞隊列 SynchronousQueue內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: