Reactive反應式編程及使用介紹

前言

前一篇分析瞭Spring WebFlux的設計及實現原理後,反應式編程又來瞭,Spring WebFlux其底層還是基於Reactive編程模型的,在java領域中,關於Reactive,有一個框架規范,叫【Reactive Streams】,在java9的ava.util.concurrent.Flow包中已經實現瞭這個規范。其他的優秀實現還有Reactor和Rxjava。在Spring WebFlux中依賴的就是Reactor。雖然你可能沒用過Reactive開發過應用,但是或多會少你接觸過異步Servlet,同時又有這麼一種論調:異步化非阻塞io並不能增強太多的系統性能,但是也不可否認異步化後並發性能上去瞭。聽到這種結論後在面對是否選擇Reactive編程後,是不是非常模棱兩可。因為我們不是很瞭解反應式編程,所以會有這種感覺。沒關系,下面看看反應式編程集大者Reactor是怎麼闡述反應式編程的。

  •   Reactor官網:https://projectreactor.io/
  •   Rxjava官網:http://reactivex.io/

反應式編程簡介

Reactor是Reactive Programming范例的一個實現,可以概括為:
反應式編程是一種涉及數據流和變化傳播的異步編程范例。這意味著可以通過所采用的編程語言輕松地表達靜態(例如陣列)或動態(例如事件發射器)數據流。
作為反應式編程方向的第一步,Microsoft在.NET生態系統中創建瞭Reactive Extensions(Rx)庫。然後RxJava在JVM上實現瞭響應式編程。隨著時間的推移,通過Reactive Streams工作出現瞭Java的標準化 ,這一規范定義瞭JVM上的反應庫的一組接口和交互規則。它的接口已經集成到父Flow類下的Java 9中。

反應式編程范例通常以面向對象的語言呈現,作為Observer設計模式的擴展。人們還可以將主要的反應流模式與熟悉的迭代器設計模式進行比較,因為在所有這些庫中對Iterable- Iterator對存在雙重性 。一個主要的區別是,雖然迭代器是基於拉的,但是反應流是基於推的。

使用迭代器是一種命令式編程模式,即使訪問值的方法完全由其負責Iterable。實際上,開發人員可以選擇何時訪問next()序列中的項目。在反應流中,相當於上述對Publisher-Subscriber。但是, 當它們出現時,Publisher它會通知訂閱者新的可用值,而這一推動方面是被動反應的關鍵。此外,應用於推送值的操作以聲明方式而非命令方式表示:程序員表達計算的邏輯而不是描述其精確的控制流。

除瞭推送值之外,還以明確定義的方式涵蓋錯誤處理和完成方面。A Publisher可以將新值推送到Subscriber(通過調用onNext),但也可以發出錯誤(通過調用onError)或完成(通過調用onComplete)。錯誤和完成都會終止序列。這可以概括為:

onNext x 0..N [onError | onComplete]

這種方法非常靈活。該模式支持沒有值,一個值或n值的用例(包括無限的值序列,例如時鐘的連續滴答)。

但是我們首先考慮一下,為什麼我們首先需要這樣的異步反應庫?

阻塞可能會浪費資源

現代應用程序可以覆蓋大量並發用戶,即使現代硬件的功能不斷提高,現代軟件的性能仍然是一個關鍵問題。
人們可以通過兩種方式來提高計劃的績效:

  • 並行化:使用更多線程和更多硬件資源。
  • 在現有資源的使用方式上尋求更高的效率。

通常,Java開發人員使用阻塞代碼編寫程序。這種做法很好,直到出現性能瓶頸,此時需要引入額外的線程,運行類似的阻塞代碼。但是,資源利用率的這種擴展會很快引入爭用和並發問題。

更糟糕的是,阻止浪費資源。如果仔細觀察,一旦程序涉及一些延遲(特別是I / O,例如數據庫請求或網絡調用),資源就會被浪費,因為線程(或許多線程)現在處於空閑狀態,等待數據。

所以並行化方法不是靈丹妙藥。為瞭獲得硬件的全部功能是必要的,但是理由也很復雜並且易受資源浪費的影響。

使用異步來解決?

第二種方法(前面提到過),尋求更高的效率,可以解決資源浪費問題。通過編寫異步,非阻塞代碼,您可以使用相同的底層資源將執行切換到另一個活動任務,然後在異步處理完成後返回到當前進程。

但是如何在JVM上生成異步代碼?Java提供瞭兩種異步編程模型:

回調:異步方法沒有返回值,但需要額外的 callback參數(lambda或匿名類),在結果可用時調用它們。一個眾所周知的例子是Swing的EventListener層次結構。

期貨:異步方法Future立即返回。異步進程計算一個T值,但該Future對象包含對它的訪問。該值不會立即可用,並且可以輪詢對象,直到該值可用。例如,ExecutorService運行Callable任務使用Future對象。

這些技術是否足夠好?不適用於所有用例,兩種方法都有局限性。

回調難以組合在一起,很快導致難以閱讀和維護的代碼(稱為“Callback Hell”)。

考慮一個示例:在用戶界面上顯示用戶的前五個收藏夾,或者如果她沒有收藏夾則提出建議。這通過三個服務(一個提供喜歡的ID,第二個提取喜歡的詳細信息,第三個提供詳細建議):

回調地獄的例子

userService.getFavorites(userId, new Callback() { 
  public void onSuccess(Listlist) { 
    if (list.isEmpty()) { 
      suggestionService.getSuggestions(new Callback() {
        public void onSuccess(Listlist) { 
          UiUtils.submitOnUiThread(() -> { 
            list.stream()
                .limit(5)
                .forEach(uiList::show); 
            });
        }
        public void onError(Throwable error) { 
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() 
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, 
            new Callback() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }
              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }
  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});
  • 我們有基於回調的服務:一個Callback接口,其中包含在異步過程成功時調用的方法,以及在發生錯誤時調用的方法。
  • 第一個服務使用喜歡的ID列表調用其回調。
  • 如果列表為空,我們必須去suggestionService。
  • 在suggestionService給出瞭一個List到第二個回調。
  • 由於我們處理UI,我們需要確保我們的消費代碼將在UI線程中運行。
  • 我們使用Java 8 Stream將處理的建議數限制為五個,並在UI中的圖形列表中顯示它們。
  • 在每個級別,我們以相同的方式處理錯誤:在彈出窗口中顯示它們。
  • 回到最喜歡的ID級別。如果服務返回完整列表,那麼我們需要轉到favoriteService獲取詳細Favorite對象。由於我們隻需要五個,我們首先流式傳輸ID列表,將其限制為五個。
  • 再一次,一個回調。這次我們得到一個完全成熟的Favorite對象,我們將其推送到UI線程內的UI。

這是很多代碼,它有點難以遵循並且具有重復的部分。考慮它在Reactor中的等價物:

與回調代碼等效的Reactor代碼示例

userService.getFavorites(userId) 
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions()) 
           .take(5) 
           .publishOn(UiUtils.uiThreadScheduler()) 
           .subscribe(uiList::show, UiUtils::errorPopup);
  • 我們從最喜歡的ID流開始。
  • 我們將它們異步轉換為詳細的Favorite對象(flatMap)。我們現在有一個流動Favorite。
  • 如果流量Favorite是空的,我們會切換到後退 suggestionService。
  • 我們最多隻對最終流程中的五個元素感興趣。
  • 最後,我們想要處理UI線程中的每個數據。
  • 我們通過描述如何處理數據的最終形式(在UI列表中顯示)以及在出現錯誤(顯示彈出窗口)時該怎麼做來觸發流程。

如果您想確保在不到800毫秒內檢索到喜歡的ID,或者如果需要更長時間從緩存中獲取它們,該怎麼辦?在基於回調的代碼中,這是一項復雜的任務。在Reactor中,它變得像timeout在鏈中添加運算符一樣簡單:

具有超時和回退的Reactor代碼示例

userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800)) 
           .onErrorResume(cacheService.cachedFavoritesFor(userId)) 
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);
  • 如果上面的部分發出的時間超過800毫秒,則傳播錯誤。
  • 如果出現錯誤,請回復cacheService。
  • 鏈的其餘部分與前面的示例類似。

盡管Java 8中帶來瞭改進,但期貨比回調要好一些,但它們在構圖方面仍然表現不佳CompletableFuture。一起編排多個未來是可行但不容易的。此外,Future還有其他問題:Future通過調用get() 方法很容易結束對象的另一個阻塞情況,它們不支持延遲計算,並且它們不支持多個值和高級錯誤處理。

考慮另一個例子:我們得到一個ID列表,我們要從中獲取一個名稱和一個統計信息,然後將它們成對地組合在一起,所有這些都是異步的。

CompletableFuture組合的例子

CompletableFutureids = ifhIds(); 
CompletableFutureresult = ids.thenComposeAsync(l -> { 
	Streamzip =
			l.stream().map(i -> { 
				CompletableFuturenameTask = ifhName(i); 
				CompletableFuturestatTask = ifhStat(i); 
				return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); 
			});
	ListcombinationList = zip.collect(Collectors.toList()); 
	CompletableFuture[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);
	CompletableFutureallDone = CompletableFuture.allOf(combinationArray); 
	return allDone.thenApply(v -> combinationList.stream()
			.map(CompletableFuture::join) 
			.collect(Collectors.toList()));
});
Listresults = result.join(); 
assertThat(results).contains(
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
  • 我們從一個未來開始,它為我們提供瞭一個id要處理的值列表。
  • 一旦得到列表,我們想要開始一些更深入的異步處理。
  • 對於列表中的每個元素:
  • 異步獲取關聯的名稱。
  • 異步獲取相關任務。
  • 結合兩個結果。
  • 我們現在有一個代表所有組合任務的期貨清單。為瞭執行這些任務,我們需要將列表轉換為數組。
  • 將數組傳遞給CompletableFuture.allOf,輸出Future完成所有任務後完成的數組。
  • 棘手的一點是allOf返回CompletableFuture,所以我們重申瞭期貨清單,通過收集結果join() (這裡沒有阻止,因為allOf確保期貨全部完成)。
  • 一旦觸發瞭整個異步管道,我們就等待它被處理並返回我們可以斷言的結果列表。

由於Reactor具有更多開箱即用的組合運算符,因此可以簡化此過程:

與未來代碼等效的Reactor代碼示例

Fluxids = ifhrIds(); 
Fluxcombinations =
		ids.flatMap(id -> { 
			MononameTask = ifhrName(id); 
			MonostatTask = ifhrStat(id); 
			return nameTask.zipWith(statTask, 
					(name, stat) -> "Name " + name + " has stats " + stat);
		});
Monoresult = combinations.collectList(); 
Listresults = result.block(); 
assertThat(results).containsExactly( 
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
  • 這一次,我們從異步提供的ids(a Flux)序列開始。
  • 對於序列中的每個元素,我們異步處理它(在body函數內部flatMap)兩次。
  • 獲取相關名稱。
  • 獲取相關統計信息。
  • 異步組合2個值。
  • 在將值List變為可用時將值聚合為a 。
  • 在生產中,我們將繼續Flux通過進一步組合或訂閱它來異步處理。最有可能的是,我們會回歸result Mono。由於我們在測試中,我們阻塞,等待處理完成,然後直接返回聚合的值列表。
  • 斷言結果。

Callback和Future的這些風險是相似的,並且是反應式編程與該Publisher-Subscriber對的關系。

從命令式到反應式編程

諸如Reactor之類的反應庫旨在解決JVM上“經典”異步方法的這些缺點,同時還關註一些其他方面:

  • 可組合性和可讀性
  • 數據作為一個用豐富的運算符詞匯表操縱的流程
  • 在您訂閱之前沒有任何事情發生
  • 背壓或消費者向生產者發出信號表明排放率過高的能力
  • 高級但高價值的抽象,與並發無關

可組合性和可讀性

通過可組合性,我們指的是編排多個異步任務的能力,使用先前任務的結果將輸入提供給後續任務或以fork-join方式執行多個任務,以及將異步任務重用為更高級別系統中的分立組件。

編排任務的能力與代碼的可讀性和可維護性緊密相關。隨著異步過程層數量和復雜性的增加,能夠編寫和讀取代碼變得越來越困難。正如我們所看到的,回調模型很簡單,但其主要缺點之一是,對於復雜的進程,您需要從回調執行回調,本身嵌套在另一個回調中,依此類推。那個混亂被稱為Callback Hell。正如你可以猜到的(或者從經驗中得知),這樣的代碼很難回歸並推理。

Reactor提供瞭豐富的組合選項,其中代碼反映瞭抽象過程的組織,並且所有內容通常都保持在同一級別(嵌套最小化)。

類比裝配線工作流程

您可以將響應式應用程序處理的數據視為在裝配線中移動。反應器既是傳送帶又是工作站。原材料從原料(原始Publisher)中倒出,最終成為成品,準備推送給消費者(或Subscriber)。

原材料可以經歷各種轉換和其他中間步驟,或者是將中間件聚集在一起的較大裝配線的一部分。如果在某一點出現毛刺或堵塞(也許裝箱產品需要不成比例的長時間),受影響的工作站可向上遊發出信號以限制原材料的流動。

操作符(運算符)

在Reactor中,運算符是我們的匯編類比中的工作站。每個操作符都將行為添加到a Publisher並將上一步驟包裝Publisher到新實例中。因此,整個鏈被鏈接,使得數據源自第一Publisher鏈並且向下移動鏈,由每個鏈轉換。最終,Subscriber完成瞭整個過程。請記住,在Subscriber訂閱a 之前沒有任何事情發生Publisher,下面就會提到。

瞭解操作員創建新實例可以幫助您避免一個常見錯誤,該錯誤會導致您認為您的鏈中使用的操作員未被應用。看到這個項目的常見問題。
雖然Reactive Streams規范根本沒有指定運算符,但Reactor等反應庫的最佳附加值之一是它們提供的豐富的運算符。這些涉及很多方面,從簡單的轉換和過濾到復雜的編排和錯誤處理。

在你訂閱之前什麼都不會發生

在Reactor中,當您編寫Publisher鏈時,默認情況下數據不會啟動。相反,您可以創建異步過程的抽象描述(這可以幫助重用和組合)。

通過訂閱行為,您將Publishera 綁定到a Subscriber,從而觸發整個鏈中的數據流。這是通過上遊傳播的單個request 信號在內部實現的Subscriber,一直傳回源 Publisher。

背壓

上遊傳播信號也用於實現背壓,我們在裝配線中將其描述為當工作站比上遊工作站處理速度慢時向線路發送的反饋信號。

Reactive Streams規范定義的真實機制非常接近於類比:訂閱者可以在無限制模式下工作,讓源以最快的速度推送所有數據,或者可以使用該request機制向源發送信號表明它已準備就緒處理最多的n元素。

中間操作員也可以在途中更改請求。想象一個buffer 運算符,它將元素分組為10個。如果訂閱者請求1個緩沖區,則源可以生成10個元素。一些操作員還實施 預取策略,這避免瞭request(1)往返,並且如果在請求之前生成元素並不太昂貴,則是有益的。

這將推模型轉換為推拉式混合動力,如果它們隨時可用,下遊可以從上遊拉出n個元素。但是如果元素沒有準備好,它們就會在生成時被上遊推動。

熱與冷

在反應庫的Rx傢族中,人們可以區分兩大類反應序列:熱和冷。這種區別主要與反應流如何對訂閱的用戶做出反應有關:

冷序列的含義是不論訂閱者在何時訂閱該序列,總是能收到序列中產生的全部消息。

而與之對應的熱序列,則是在持續不斷地產生消息,訂閱者隻能獲取到在其訂閱之後產生的消息。

以上就是Reactive反應式編程及使用介紹的詳細內容,更多關於Reactive反應式編程使用的資料請關註WalkonNet其它相關文章!

推薦閱讀: