Java反應式框架Reactor中的Mono和Flux
1. 前言
最近寫關於響應式編程的東西有點多,很多同學反映對Flux
和Mono
這兩個Reactor中的概念有點懵逼。但是目前Java響應式編程中我們對這兩個對象的接觸又最多,諸如Spring WebFlux、RSocket、R2DBC。我開始也對這兩個對象頭疼,所以今天我們就簡單來探討一下它們。
2. 響應流的特點
要搞清楚這兩個概念,必須說一下響應流規范。它是響應式編程的基石。他具有以下特點:
響應流必須是無阻塞的。響應流必須是一個數據流。它必須可以異步執行。並且它也應該能夠處理背壓。
背壓是反應流中的一個重要概念,可以理解為,生產者可以感受到消費者反饋的消費壓力,並根據壓力進行動態調整生產速率。形象點可以按照下面理解:
3. Publisher
由於響應流的特點,我們不能再返回一個簡單的POJO對象來表示結果瞭。必須返回一個類似Java中的Future
的概念,在有結果可用時通知消費者進行消費響應。
Reactive Stream規范中這種被定義為Publisher<T>
,Publisher<T>
是一個可以提供0-N個序列元素的提供者,並根據其訂閱者Subscriber<? super T>
的需求推送元素。一個Publisher<T>
可以支持多個訂閱者,並可以根據訂閱者的邏輯進行推送序列元素。下面這個Excel計算就能說明一些Publisher<T>
的特點。
A1-A9就可以看做Publisher<T>
及其提供的元素序列。A10-A13分別是求和函數SUM(A1:A9)
、平均函數AVERAGE(A1:A9)
、最大值函數MAX(A1:A9)
、最小值函數MIN(A1:A9)
,可以看作訂閱者Subscriber
。假如說我們沒有A10-A13,那麼A1-A9就沒有實際意義,它們並不產生計算。這也是響應式的一個重要特點:當沒有訂閱時發佈者什麼也不做。
而Flux
和Mono
都是Publisher<T>
在Reactor 3實現。Publisher<T>
提供瞭subscribe
方法,允許消費者在有結果可用時進行消費。如果沒有消費者Publisher<T>
不會做任何事情,他根據消費情況進行響應。 Publisher<T>
可能返回零或者多個,甚至可能是無限的,為瞭更加清晰表示期待的結果就引入瞭兩個實現模型Mono
和Flux
。
4. Flux
Flux
是一個發出(emit)0-N
個元素組成的異步序列的Publisher<T>
,可以被onComplete
信號或者onError
信號所終止。在響應流規范中存在三種給下遊消費者調用的方法 onNext
, onComplete
, 和onError
。下面這張圖表示瞭Flux的抽象模型:
以上的的講解對於初次接觸反應式編程的依然是難以理解的,所以這裡有一個循序漸進的理解過程。
有些類比並不是很妥當,但是對於你循序漸進的理解這些新概念還是有幫助的。
傳統數據處理
我們在平常是這麼寫的:
public List<ClientUser> allUsers() { return Arrays.asList(new ClientUser("felord.cn", "reactive"), new ClientUser("Felordcn", "Reactor")); }
我們通過迭代返回值List
來get
這些元素進行再處理(消費),這種方式有點類似廚師做瞭很多菜,吃不吃在於食客。需要食客主動去來吃就行瞭(pull的方式),至於喜歡吃什麼不喜歡吃什麼自己隨意,怎麼吃也自己隨意。
流式數據處理
在Java 8中我們可以改寫為流的表示:
public Stream<ClientUser> allUsers() { return Stream.of(new ClientUser("felord.cn", "reactive"), new ClientUser("Felordcn", "Reactor")); }
依然是廚師做瞭很多菜,但是這種就更加高級瞭一些,提供瞭菜品的搭配方式(不包含具體細節),食客可以按照說明根據自己的習慣搭配著去吃,一但開始概不退換,吃完為止,過期不候。
反應式數據處理
在Reactor中我們又可以改寫為Flux
表示:
public Flux<ClientUser> allUsers(){ return Flux.just(new ClientUser("felord.cn", "reactive"), new ClientUser("Felordcn", "Reactor")); }
這時候食客隻需要訂餐就行瞭,做好瞭自然就呈上來,而且可以隨時根據食客的飯量進行調整。如果沒有食客訂餐那麼廚師就什麼都不用做。當然不止有這麼點特性,不過對於方便我們理解來說這就夠瞭。
5. Mono
Mono
是一個發出(emit)0-1
個元素的Publisher<T>
,可以被onComplete
信號或者onError
信號所終止。
這裡就不翻譯瞭,整體和Flux
差不多,隻不過這裡隻會發出0-1個元素。也就是說不是有就是沒有。象Flux
一樣,我們來看看Mono
的演化過程以幫助理解。
傳統數據處理
public ClientUser currentUser () { return isAuthenticated ? new ClientUser("felord.cn", "reactive") : null; }
直接返回符合條件的對象或者null
。
Optional的處理方式
public Optional<ClientUser> currentUser () { return isAuthenticated ? Optional.of(new ClientUser("felord.cn", "reactive")) : Optional.empty(); }
這個Optional
我覺得就有反應式的那種味兒瞭,當然它並不是反應式。當我們不從返回值Optional
取其中具體的對象時,我們不清楚裡面到底有沒有,但是Optional
是一定客觀存在的,不會出現NPE問題。
反應式數據處理
public Mono<ClientUser> currentUser () { return isAuthenticated ? Mono.just(new ClientUser("felord.cn", "reactive")) : Mono.empty(); }
和Optional
有點類似的機制,當然Mono
不是為瞭解決NPE問題的,它是為瞭處理響應流中單個值(也可能是Void
)而存在的。
6. 總結
Flux
和Mono
是Java反應式中的重要概念,但是很多同學包括我在開始都難以理解它們。這其實是規定瞭兩種流式范式,這種范式讓數據具有一些新的特性,比如基於發佈訂閱的事件驅動,異步流、背壓等等。另外數據是推送(Push)給消費者的以區別於平時我們的拉(Pull)模式。同時我們可以像Stream Api一樣使用類似map
、flatmap
等操作符(operator)來操作它們。對Flux
和Mono
這兩個概念需要花一些時間去理解它們,不能操之過急。
到此這篇關於Java反應式框架Reactor中的Mono和Flux的文章就介紹到這瞭,更多相關Java框架 Reactor中的Mono和Flux內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- 詳解Java中的reactive stream協議
- Reactive反應式編程及使用介紹
- SpringBoot深入分析webmvc和webflux的區別
- SpringBoot之webflux全面解析
- Project Reactor源碼解析publishOn使用示例