Project Reactor源碼解析publishOn使用示例

功能分析

相關示例源碼:github.com/chentianmin…

public final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch)

onNext()onComplete()onError()方法進行線程切換,publishOn()使得它下遊的消費階段異步執行。

  • scheduler:線程切換的調度器,Scheduler用來生成實際執行異步任務的Worker
  • delayError:是否延時轉發Error。如果為true,當收到上遊的Error時,會等隊列中的元素消費完畢後再向下遊轉發Error。否則會立即轉發Error,可能導致隊列中的元素丟失。默認為true
  • prefetch:預取元素的數量,同時也是隊列的容量。默認值為Queues.SMALL_BUFFER_SIZE,該值通過配置進行修改。

代碼示例

prefetch

/**
 * 每隔delayMillis生產一個元素
 */
protected Flux<Integer> delayPublishFlux(int delayMillis, int startInclusive, int endExclusive) {
    return Flux.create(fluxSink -> {
        IntStream.range(startInclusive, endExclusive)
                .forEach(i -> {
                    // 同步next
                    sleep(delayMillis);
                    logInt(i, "生產");
                    fluxSink.next(i);
                });
        fluxSink.complete();
    });
}
@Test
public void testPreFetch() {
    delayPublishFlux(1000, 1, 5)
            .doOnRequest(i -> logLong(i, "request"))
            .publishOn(Schedulers.boundedElastic(), 2)
            .subscribe(i -> logInt(i, "消費"));
    sleep(10000);
}

每次會都向上遊請求2個元素。另外還能發現,從第二個request開始,線程發生瞭切換。

delayError

/**
 * 每隔delayMillis生產一個元素,最後發送Error
 */
protected Flux<Integer> delayPublishFluxError(int delayMillis, int startInclusive, int endExclusive) {
    return Flux.create(fluxSink -> {
        IntStream.range(startInclusive, endExclusive)
                .forEach(i -> {
                    // 同步next
                    sleep(delayMillis);
                    logInt(i, "生產");
                    fluxSink.next(i);
                });
        fluxSink.error(new RuntimeException("發佈錯誤!"));
    });
}
@Test
public void testDelayError() {
    delayPublishFluxError(500, 1, 5)
            .publishOn(Schedulers.boundedElastic())
            // 隻是為瞭消費慢一點
            .doOnNext(i -> sleep(1000))
            .subscribe(i -> logInt(i, "消費"));
    sleep(10000);
}

元素消費完才觸發Error

@Test
public void testNotDelayError() {
    delayPublishFluxError(500, 1, 5)
            .publishOn(Schedulers.boundedElastic(), false, 256)
            // 隻是為瞭消費慢一點
            .doOnNext(i -> sleep(1000))
            .subscribe(i -> logInt(i, "消費"));
    sleep(10000);
}

元素還沒消費完就觸發Error

源碼分析

首先看一下publishOn()操作符在裝配階段做瞭什麼,直接查看Flux#publishOn()源碼。

Flux#publishOn()

publishOn()裝配階段重點是創建瞭FluxPublishOn對象。

接下來,我們分析訂閱階段發生瞭什麼。一個Publisher在訂閱的時候調用的是其subscribe()方法,因此我們繼續看Flux#subscribe()源碼。

Flux#subscribe()

Flux#subscribe()方法的實現中,如果上遊PublisherOptimizableOperator類型,實際的Subscriber是通過調用該InternalFluxOperator#subscribeOrReturn()方法返回的。如果返回值為null,直接return

對於publishOn()操作符來說,裝配階段創建的FluxPublishOn就是OptimizableOperator類型。所以繼續查看FluxPublishOn#subscribeOrReturn()源碼。

FluxPublishOn#subscribeOrReturn()

可以看到,方法返回的是PublishOnSubscriber,它包裝瞭原始的Subscriber

在後續的訂閱階段一定會調用其onSubscribe()方法,在運行階段一定會調用其onNext()方法。我們先看FluxPublishOn#onSubscribe()源碼。

FluxPublishOn#onSubscribe()

onSubscribe()實現中,分為同步隊列融合、異步隊列融合以及非融合方式處理。

如果上遊的SubscriptionQueueSubscription類型,則會進行隊列融合。具體采用同步還是異步,取決於該QueueSubscription#requestFusion()實現。

  • 同步隊列融合:復用當前隊列,繼續調用下遊onSubscribe()方法,但不會繼續調用上遊request()方法。
  • 異步隊列融合:復用當前隊列,然後繼續調用下遊onSubscribe()以及上遊request()方法,請求數量是prefetch
  • 非融合:創建一個新的隊列,然後繼續調用下遊onSubscribe()以及上遊request()方法,請求數量是prefetch

接下來,我們從源碼角度分別介紹上述三種方式的處理邏輯,首先介紹非融合方式。

非融合

先看如下代碼示例,該代碼會以非融合方式執行。

@Test
public void testNoFuse() {
    delayPublishFlux(1000, 1, 5)
            .publishOn(Schedulers.boundedElastic())
            .subscribe(i -> logInt(i, "消費"));
    sleep(10000);
}

間隔1s生產消費元素!

在消費階段,一定會調用FluxPublishOn#onNext()方法。

FluxPublishOn#onNext()

我們重點關註非融合方式執行邏輯,其實隻做瞭2件事:

  • 將下發的元素添加到隊列中,該隊列就是onSubscribe()階段創建的新隊列。
  • 調用trySchedule()方法進行調度。

繼續看FluxPublishOn#trySchedule()源碼。

FluxPublishOn#trySchedule()

這裡其實就是交由woker異步執行,後續會執行FluxPublishOn.run()方法。

FluxPublishOn#run()

在run()方法執行的時候,分為3段邏輯:

  • 如果是輸出融合,執行runBackfused()方法。
  • 如果是同步隊列融合,執行runSync()方法。
  • 否則,執行runAsync()方法。

對於當前例子,實際執行的是runAsync()方法,繼續查看其源碼。

FluxPublishOn#runAsync()

runAsync()做的事情比較簡單,就是排空隊列中的元素下發給下遊。同時在這裡會繼續調用request()向上遊請求數據,這也是前面說的從第二個request()開始會進行線程切換的原因。

另外這裡還會調用checkTerminated(),檢查終止情況。

FluxPublishOn#checkTerminated()

如果delayError=true,必須當前隊列為空是才會轉發Error。如果delayError=false,則直接轉發Error。繼續查看onComplete()方法。

FluxPublishOn#onComplete()

如果未結束,將done標記設置為true,然後再次調用trySchedule()進行調度。後續再被調度到的時候,如果隊列已經排空,才會調用下遊onComplete(),觸發完成。

小結

簡單總結一下非融合執行過程:

onSubscribe()時創建一個隊列,在onNext()時將上遊下發的元素添加到隊列中,然後異步排空隊列中的元素,繼續下發給下遊。

同步隊列融合

以下代碼會以同步隊列融合方式執行。

@Test
public void testSyncFuse() {
    Flux.just(1, 2 ,3, 4, 5)
            .publishOn(Schedulers.boundedElastic())
            .subscribe(this::logInt);
    sleep(10000);
}

因為Flux.just()對應的SubscriptionSynchronousSubscription,其requestFusion()方法實現如下:

SynchronousSubscription#requestFusion()

此時返回的是SYNC,執行同步隊列融合。

前面提到過,同步隊列融合會復用當前隊列,繼續調用下遊onSubscribe()方法,但不會繼續調用上遊request()方法。

這意味著,此時FluxPublishOn#onNext()FluxPublishOn#onComplete()方法並不會調用。但是FluxPublishOn#request()依然會被下遊調用到。

FluxPublishOn#request()

request()方法中還是會調用trySchedule(),後續會異步調用runSync()方法(前面已經分析瞭)。

對於非融合方式,trySchedule()也會執行,隻是這次調度的時候,隊列中還沒有數據被添加進去。

FluxPublishOn#runSync()

runSync()實現上runAsync()差不多,也是排空隊列的元素,繼續下發給下遊。不同的點是少瞭request()調用,以及取消完成控制有差異。

小結

簡單總結一下同步隊列融合執行過程:

onSubsrribe()時直接復用上遊QueueSubscription作為隊列,不會調用上遊request()請求數據,在自身request()時異步排空隊列中的元素,繼續下發給下遊。

異步隊列融合

以下代碼會以異步隊列融合方式執行。

@Test
public void testAsyncFuse() {
    Flux.just(1, 2, 3, 4, 5)
            .windowUntil(i -&gt; i % 3 == 0)
            .publishOn(Schedulers.boundedElastic())
            .flatMap(Function.identity())
            .subscribe(this::logInt);
    sleep(10000);
}

因為windowUntil()對應的SubscriptionWindowPredicateMain,其requestFusion()方法實現如下:

WindowPredicateMain#requestFusion()

此時返回ASYNC,執行異步隊列融合。接下來再看一下FluxPublishOn#onNext()源碼。

FluxPublishOn#onNext()

註意,此時onNext()方法參數是null,表明上遊並沒有真正下發元素,可以將其看做是一個觸發Worker調度的信號。後續還是會異步執行runAsync()方法,這裡就不再分析瞭。

這其實也很容易理解:異步隊列融合直接復用瞭上遊的QueueSubscription作為隊列,真正的數據應該由這個隊列下發。

總結

簡單總結一下同步隊列融合執行過程:

onSubsrribe()時直接復用上遊QueueSubscription作為隊列,在onNext()時接收上遊信號,異步排空隊列中的元素,繼續下發給下遊。

非融合、同步隊列融合、異步隊列融合比較如下:

以上就是Project Reactor源碼解析publishOn使用示例的詳細內容,更多關於Project Reactor publishOn的資料請關註WalkonNet其它相關文章!

推薦閱讀: