Project Reactor源碼解析publishOn使用示例
功能分析
相關示例源碼:github.com/chentianmin…
public final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch)
在onNext()
、onComplete()
、onError()
方法進行線程切換,publishOn()
使得它下遊的消費階段異步執行。
- scheduler:線程切換的調度器,
Scheduler
用來生成實際執行異步任務的Worker
。 - delayError:是否延時轉發
Error
。如果為true
,當收到上遊的Error
時,會等隊列中的元素消費完畢後再向下遊轉發Error
。否則會立即轉發Error
,可能導致隊列中的元素丟失。默認為true
。 - prefetch:預取元素的數量,同時也是隊列的容量。默認值為
Queues.SMALL_BUFFER_SIZE
,該值通過配置進行修改。
代碼示例
prefetch
/** * 每隔delayMillis生產一個元素 */ protected Flux<Integer> delayPublishFlux(int delayMillis, int startInclusive, int endExclusive) { return Flux.create(fluxSink -> { IntStream.range(startInclusive, endExclusive) .forEach(i -> { // 同步next sleep(delayMillis); logInt(i, "生產"); fluxSink.next(i); }); fluxSink.complete(); }); } @Test public void testPreFetch() { delayPublishFlux(1000, 1, 5) .doOnRequest(i -> logLong(i, "request")) .publishOn(Schedulers.boundedElastic(), 2) .subscribe(i -> logInt(i, "消費")); sleep(10000); }
每次會都向上遊請求2個元素。另外還能發現,從第二個request開始,線程發生瞭切換。
delayError
/** * 每隔delayMillis生產一個元素,最後發送Error */ protected Flux<Integer> delayPublishFluxError(int delayMillis, int startInclusive, int endExclusive) { return Flux.create(fluxSink -> { IntStream.range(startInclusive, endExclusive) .forEach(i -> { // 同步next sleep(delayMillis); logInt(i, "生產"); fluxSink.next(i); }); fluxSink.error(new RuntimeException("發佈錯誤!")); }); } @Test public void testDelayError() { delayPublishFluxError(500, 1, 5) .publishOn(Schedulers.boundedElastic()) // 隻是為瞭消費慢一點 .doOnNext(i -> sleep(1000)) .subscribe(i -> logInt(i, "消費")); sleep(10000); }
元素消費完才觸發Error
!
@Test public void testNotDelayError() { delayPublishFluxError(500, 1, 5) .publishOn(Schedulers.boundedElastic(), false, 256) // 隻是為瞭消費慢一點 .doOnNext(i -> sleep(1000)) .subscribe(i -> logInt(i, "消費")); sleep(10000); }
元素還沒消費完就觸發Error
!
源碼分析
首先看一下publishOn()
操作符在裝配階段做瞭什麼,直接查看Flux#publishOn()
源碼。
Flux#publishOn()
publishOn()
裝配階段重點是創建瞭FluxPublishOn
對象。
接下來,我們分析訂閱階段發生瞭什麼。一個Publisher
在訂閱的時候調用的是其subscribe()
方法,因此我們繼續看Flux#subscribe()
源碼。
Flux#subscribe()
在Flux#subscribe()
方法的實現中,如果上遊Publisher
是OptimizableOperator
類型,實際的Subscriber
是通過調用該InternalFluxOperator#subscribeOrReturn()
方法返回的。如果返回值為null
,直接return
。
對於publishOn()
操作符來說,裝配階段創建的FluxPublishOn
就是OptimizableOperator
類型。所以繼續查看FluxPublishOn#subscribeOrReturn()
源碼。
FluxPublishOn#subscribeOrReturn()
可以看到,方法返回的是PublishOnSubscriber
,它包裝瞭原始的Subscriber
。
在後續的訂閱階段一定會調用其onSubscribe()
方法,在運行階段一定會調用其onNext()
方法。我們先看FluxPublishOn#onSubscribe()
源碼。
FluxPublishOn#onSubscribe()
在onSubscribe()
實現中,分為同步隊列融合、異步隊列融合以及非融合方式處理。
如果上遊的Subscription
是QueueSubscription
類型,則會進行隊列融合。具體采用同步還是異步,取決於該QueueSubscription#requestFusion()
實現。
- 同步隊列融合:復用當前隊列,繼續調用下遊
onSubscribe()
方法,但不會繼續調用上遊request()
方法。 - 異步隊列融合:復用當前隊列,然後繼續調用下遊
onSubscribe()
以及上遊request()
方法,請求數量是prefetch
。 - 非融合:創建一個新的隊列,然後繼續調用下遊
onSubscribe()
以及上遊request()
方法,請求數量是prefetch
。
接下來,我們從源碼角度分別介紹上述三種方式的處理邏輯,首先介紹非融合方式。
非融合
先看如下代碼示例,該代碼會以非融合方式執行。
@Test public void testNoFuse() { delayPublishFlux(1000, 1, 5) .publishOn(Schedulers.boundedElastic()) .subscribe(i -> logInt(i, "消費")); sleep(10000); }
間隔1s生產消費元素!
在消費階段,一定會調用FluxPublishOn#onNext()
方法。
FluxPublishOn#onNext()
我們重點關註非融合方式執行邏輯,其實隻做瞭2件事:
- 將下發的元素添加到隊列中,該隊列就是
onSubscribe()
階段創建的新隊列。 - 調用
trySchedule()
方法進行調度。
繼續看FluxPublishOn#trySchedule()
源碼。
FluxPublishOn#trySchedule()
這裡其實就是交由woker
異步執行,後續會執行FluxPublishOn.run()
方法。
FluxPublishOn#run()
在run()方法執行的時候,分為3段邏輯:
- 如果是輸出融合,執行
runBackfused()
方法。 - 如果是同步隊列融合,執行
runSync()
方法。 - 否則,執行
runAsync()
方法。
對於當前例子,實際執行的是runAsync()
方法,繼續查看其源碼。
FluxPublishOn#runAsync()
runAsync()
做的事情比較簡單,就是排空隊列中的元素下發給下遊。同時在這裡會繼續調用request()
向上遊請求數據,這也是前面說的從第二個request()
開始會進行線程切換的原因。
另外這裡還會調用checkTerminated()
,檢查終止情況。
FluxPublishOn#checkTerminated()
如果delayError=true
,必須當前隊列為空是才會轉發Error
。如果delayError=false
,則直接轉發Error
。繼續查看onComplete()
方法。
FluxPublishOn#onComplete()
如果未結束,將done
標記設置為true
,然後再次調用trySchedule()
進行調度。後續再被調度到的時候,如果隊列已經排空,才會調用下遊onComplete()
,觸發完成。
小結
簡單總結一下非融合執行過程:
在onSubscribe()
時創建一個隊列,在onNext()
時將上遊下發的元素添加到隊列中,然後異步排空隊列中的元素,繼續下發給下遊。
同步隊列融合
以下代碼會以同步隊列融合方式執行。
@Test public void testSyncFuse() { Flux.just(1, 2 ,3, 4, 5) .publishOn(Schedulers.boundedElastic()) .subscribe(this::logInt); sleep(10000); }
因為Flux.just()
對應的Subscription
是SynchronousSubscription
,其requestFusion()
方法實現如下:
SynchronousSubscription#requestFusion()
此時返回的是SYNC
,執行同步隊列融合。
前面提到過,同步隊列融合會復用當前隊列,繼續調用下遊onSubscribe()
方法,但不會繼續調用上遊request()
方法。
這意味著,此時FluxPublishOn#onNext()
和FluxPublishOn#onComplete()
方法並不會調用。但是FluxPublishOn#request()
依然會被下遊調用到。
FluxPublishOn#request()
在request()
方法中還是會調用trySchedule()
,後續會異步調用runSync()
方法(前面已經分析瞭)。
對於非融合方式,trySchedule()
也會執行,隻是這次調度的時候,隊列中還沒有數據被添加進去。
FluxPublishOn#runSync()
runSync()
實現上runAsync()
差不多,也是排空隊列的元素,繼續下發給下遊。不同的點是少瞭request()
調用,以及取消完成控制有差異。
小結
簡單總結一下同步隊列融合執行過程:
在onSubsrribe()
時直接復用上遊QueueSubscription
作為隊列,不會調用上遊request()
請求數據,在自身request()
時異步排空隊列中的元素,繼續下發給下遊。
異步隊列融合
以下代碼會以異步隊列融合方式執行。
@Test public void testAsyncFuse() { Flux.just(1, 2, 3, 4, 5) .windowUntil(i -> i % 3 == 0) .publishOn(Schedulers.boundedElastic()) .flatMap(Function.identity()) .subscribe(this::logInt); sleep(10000); }
因為windowUntil()
對應的Subscription
是WindowPredicateMain
,其requestFusion()
方法實現如下:
WindowPredicateMain#requestFusion()
此時返回ASYNC
,執行異步隊列融合。接下來再看一下FluxPublishOn#onNext()
源碼。
FluxPublishOn#onNext()
註意,此時onNext()
方法參數是null
,表明上遊並沒有真正下發元素,可以將其看做是一個觸發Worker
調度的信號。後續還是會異步執行runAsync()
方法,這裡就不再分析瞭。
這其實也很容易理解:異步隊列融合直接復用瞭上遊的QueueSubscription
作為隊列,真正的數據應該由這個隊列下發。
總結
簡單總結一下同步隊列融合執行過程:
在onSubsrribe()
時直接復用上遊QueueSubscription
作為隊列,在onNext()
時接收上遊信號,異步排空隊列中的元素,繼續下發給下遊。
非融合、同步隊列融合、異步隊列融合比較如下:
以上就是Project Reactor源碼解析publishOn使用示例的詳細內容,更多關於Project Reactor publishOn的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- RxJava 觸發流基本原理源碼解析
- RxJava實戰之訂閱流基本原理示例解析
- RxJava構建流基本原理示例解析
- 詳解Java中的reactive stream協議
- Java反應式框架Reactor中的Mono和Flux