Java8 使用CompletableFuture 構建異步應用方式

概述

為瞭展示 CompletableFuture 的強大特性, 創建一個名為 best-price-finder 的應用,它會查詢多個在線商店,依據給定的產品或服務找出最低的價格。

這個過程中,會學到幾個重要的技能。

  • 如何提供異步API
  • 如何讓你使用瞭同步API的代碼變為非阻塞代碼

我們將共同學習如何使用流水線將兩個接續的異步操作合並為一個異步計算操作。 比如,在線商店返回瞭你想要購買的商品的原始價格,並附帶著一個折扣代碼——最終,要計算出該商品的實際價格,你不得不訪問第二個遠程折扣服務,查詢該折扣代碼對應的折扣比率

  • 如何以響應式的方式處理異步操作的完成事件,以及隨著各個商品返回它的商品價格,最佳價格查詢器如何持續的更新每種商品的最佳推薦,而不是等待所有的商店都返回他們各自的價格(這種方式存在著一定的風險,一旦某傢商店的服務中斷,用戶可能遭遇白屏)。

在這裡插入圖片描述

同步API VS 異步API

同步API

是對傳統方法的另一種稱呼:你調用瞭某個方法,調用方在被調用方運行的過程中會等待,被調用方運行結束返回,調用方取的瞭被調用方的返回值並繼續運行。

即使調用方和被調用方在不同的線程中運行,調用方還是需要等被調用方結束運行,這就是 阻塞式調用。

異步API

與同步API相反,異步API會直接返回,或者至少在被調用方計算完成之前,將它剩餘的計算任務交給另一個線程去做,該線程和調用方是異步的。 這就是非阻塞調用。

執行剩餘的計算任務的線程將他的計算結果返回給調用方。 返回的方式要麼通過回調函數,要麼由調用方再此執行一個“等待,指導計算完成”的方法調用。

同步的困擾

為瞭實現最佳價格查詢器應用,讓我們從每個商店都應該提供的API定義入手。

首先,商店應該聲明依據指定產品名稱返回價格的方法:

public class Shop {
	public double getPrice(String product) {
	// TODO
	}
}

該方法的內部實現會查詢商店的數據庫,但也有可能執行一些其他耗時的任務,比如聯系其他外部服務。

用 delay 方法模擬這些長期運行的方法的執行,模擬執行1S ,方法聲明如下。

public static void delay() {
	try {
		Thread.sleep(1000L);
	} catch (InterruptedException e) {
		throw new RuntimeException(e);
	}
}

getPrice 方法會調用 delay 方法,並返回一個隨機計算的值

public double getPrice(String product) {
	return calculatePrice(product);
}
private double calculatePrice(String product) {
	delay();
	return random.nextDouble() * product.charAt(0) + product.charAt(1);
}

很明顯,這個API的使用者(這個例子中為最佳價格查詢器)調用該方法時,它依舊會被阻塞。為等待同步事件完成而等待1S,這是無法接受的,尤其是考慮到最佳價格查詢器對網絡中的所有商店都要重復這種操作。

接下來我們會瞭解如何以異步方式使用同步API解決這個問題。但是,出於學習如何設計異步API的考慮, 你希望以異步API的方式重寫這段代碼, 假裝我們還在深受這一困難的煩惱,如何以異步API的方式重寫這段代碼,讓用戶更流暢地訪問呢?

在這裡插入圖片描述

實現異步API

將同步方法改為異步方法

為瞭實現這個目標,你首先需要將 getPrice 轉換為 getPriceAsync 方法,並修改它的返回值:

public Future<Double> getPriceAsync(String product) { ... }

我們知道 ,Java 5引入瞭 java.util.concurrent.Future 接口表示一個異步計算(即調用線程可以繼續運行,不會因為調用方法而阻塞)的結果 。

這意味著 Future 是一個暫時還不可知值的處理器,這個值在計算完成後,可以通過調用它的 get 方法取得。因為這樣的設計, getPriceAsync 方法才能立刻返回,給調用線程一個機會,能在同一時間去執行其他有價值的計算任務。

新的 CompletableFuture 類提供瞭大量的方法,讓我們有機會以多種可能的方式輕松地實現這個方法,比如下面就是這樣一段實現代碼

【getPriceAsync方法的實現】

在這裡插入圖片描述

在這段代碼中,創建瞭一個代表異步計算的 CompletableFuture 對象實例,它在計算完成時會包含計算的結果。

接著,調用 fork 創建瞭另一個線程去執行實際的價格計算工作,不等該耗時計算任務結束,直接返回一個 Future 實例。

當請求的產品價格最終計算得出時,你可以使用它的 complete 方法,結束completableFuture 對象的運行,並設置變量的值。

很顯然,這個新版 Future 的名稱也解釋瞭它所具有的特性。使用這個API的客戶端,可以通過下面的這段代碼對其進行調用。

【使用異步的API】

在這裡插入圖片描述

我們看到這段代碼中,客戶向商店查詢瞭某種商品的價格。由於商?提供瞭異步API,該次調用立刻返回瞭一個 Future 對象,通過該對象客戶可以在將來的某個時刻取得商品的價格。

這種方式下,客戶在進行商品價格查詢的同時,還能執行一些其他的任務,比如查詢其他傢商店中商品的價格,不會呆呆的阻塞在那裡等待第一傢商店返回請求的結果。

最後,如果所有有意義的工作都已經完成,客戶所有要執行的工作都依賴於商品價格時,再調用 Future 的 get 方法。執行瞭這個操作後,客戶要麼獲得 Future 中封裝的值(如果異步任務已經完成),要麼發生阻塞,直到該異步任務完成,期望的值能夠訪問。

輸出

在這裡插入圖片描述

你一定已經發現 getPriceAsync 方法的調用返回遠遠早於最終價格計算完成的時間。

我們有可能避免發生客戶端被住阻塞的風險。實際上這非常簡單, Future 執行完畢可以發出一個通知,僅在計算結果可用時執行一個由Lambda表達式或者方法引用定義的回
調函數。

不過,我們當下不會對此進行討論,現在我們要解決的是另一個問題:如何正確地管理
異步任務執行過程中可能出現的錯誤。

在這裡插入圖片描述

處理異常錯誤

如果沒有意外,我們目前開發的代碼工作得很正常。但是,如果價格計算過程中產生瞭錯誤會怎樣呢?非常不幸,這種情況下你會得到一個相當糟糕的結果:用於提示錯誤的異常會被限制在試圖計算商品價格的當前線程的范圍內,最終會殺死該線程,而這會導致等待 get 方法返回結果的客戶端永久的被阻塞。

客戶端可以使用重載版本的 get 方法,它使用一個超時參數來避免發生這樣的情況。這是一種值得推薦的做法,你應該盡量在你的代碼中添加超時判斷斷的邏輯,避免發生類似的問題。

使用這種方法至少能防止程序永遠的等待下去,超時發生時,程序會得到通知發生瞭 Timeout-Exception 。

不過,也因為如此,你不會有機會發現計算商品價格的線程內到底發生瞭什麼問題才引發瞭這樣的失效。

為瞭讓客戶端能瞭解商店無法提供請求商品價格的原因,你需要使用
CompletableFuture 的 completeExceptionally 方法將導致 CompletableFuture 內發生問題的異常拋出。

代碼如下

【拋出CompletableFuture內的異常】

1

客戶端現在會收到一個 ExecutionException 異常,該異常接收瞭一個包含失敗原因的Exception 參數,即價格計算方法最初拋出的異常。

所以,舉例來說,如果該方法拋出瞭一個運行時異常“product not available”,客戶端就會得到像下面這樣一段 ExecutionException :

java.util.concurrent.ExecutionException: java.lang.RuntimeException: product
not available at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2237)
at lambdasinaction.chap11.AsyncShopClient.main(AsyncShopClient.java:14)
… 5 more
Caused by: java.lang.RuntimeException: product not available
at lambdasinaction.chap11.AsyncShop.calculatePrice(AsyncShop.java:36)
at lambdasinaction.chap11.AsyncShop.lambda$getPrice$0(AsyncShop.java:23)
at lambdasinaction.chap11.AsyncShop$$Lambda$1/24071475.run(Unknown Source)
at java.lang.Thread.run(Thread.java:744)

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: