Java異步編程工具Twitter Future詳解
異步編程(Twitter Future)
為啥要異步
異步編程有點難以理解,這東西感覺不符合常理,因為我們思考都是按照串行的邏輯,事都是一件一件辦。但在異步計算的情況下,回調往往分散在代碼片段中,需要理解其中的意義。
最難搞的就是組合,嵌套。如果再加上遞歸,派發等邏輯,能寫的極其復雜,又難以理解。當我們需要處理其中一個步驟中可能發生的錯誤時,情況會變得更糟。
java在核心庫中引入瞭CompletableFuture,同時也是一個異步框架,有大約50種不同的方法用於組合、組合和執行異步計算步驟以及處理錯誤。
基本用法
1、封裝計算邏輯,異步返回。
CompletableFuture的靜態方法runAsync
和supplySync
允許我們相應地使用Runnable和SupplySync函數類型創建一個完整的future實例。如下就是一個簡單的示例。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3 * 1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "Hello"; }); System.out.println("Main goes on..."); String result = future.get(); System.out.println(result);
如上代碼片段,打印後的結果是Main goes on 先執行,異步任務在future.get() 阻塞結果返回。
2、異步計算結果串聯異步處理
如果想在一個future完畢後,接上另一個異步任務,則用法如下:
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { try { System.out.println("task1: " + Thread.currentThread().getName()); Thread.sleep(2 * 1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "Hello"; }); CompletableFuture<String> future = completableFuture.thenApply(s -> { try { System.out.println("task2: " + Thread.currentThread().getName()); Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return s + " World"; }); System.out.println(future.get());
3、並行多個異步任務,統一等待結果
當我們需要並行執行多個Future時,我們通常希望等待所有Futrue都能夠執行,然後處理它們的全部統一的返回結果。
CompletableFuture 的 allOf
靜態方法允許等待所有的future完成:
如下面的代碼片段:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "my"); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3); combinedFuture.get(); System.out.println(future1.isDone()); System.out.println(future2.isDone()); System.out.println(future3.isDone());
4、異步錯誤處理
CompletableFuture類不需要捕獲語法塊中的異常,而是允許我們用一種特殊的回調方法來處理。此方法接收兩個參數:計算結果
(如果成功完成)和異常結果
(如果某些計算步驟有異常)。
String name = "fengkai CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { if ("fengkai".equals(name)) { throw new RuntimeException("Computation error!"); } return "Hello, " + name; }).handle((s, t) -> s != null ? s : "Hello, Stranger!"); System.out.println(completableFuture.get());
Twitter包裝
對於以上的代碼,twitter工具包有自己的小包裝,可以提升一點編程的逼格。
以下是用法:
pom依賴
首先引入maven坐標,因為是用scala編寫的工具包,所以要引入scala的依賴。
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>util-core_2.12</artifactId> <version>${twitter.util.version}</version> </dependency>
1、封裝計算邏輯,異步返回
註意這裡的FuturePool,可以用ExecutorService去包裝。
Future<String> future = futurePool.apply(() -> { try { Thread.sleep(3 * 1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "Hello"; })
2、異步計算結果串聯異步處理
和CompletableFuture
相似的,有以下用法,不過是用的map方法
Future<String> future = futurePool.apply(() -> { try { System.out.println("task2: " + Thread.currentThread().getName()); Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "Hello"; }); Future<Object> mappedFuture = future.map(new Function1<String, Object>() { @Override public Object apply(String v1) { try { System.out.println("task2: " + Thread.currentThread().getName()); Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "World"; } }); Await.result(mappedFuture);
3、並行多個異步任務
這個相對看起來就簡潔的多瞭,用List
添加所有的異步結果,然後collect
收集起來,調用get()
或者其他方法阻塞等待。
List<Future> futures = new ArrayList<>(); Future<String> future1 = futurePool.apply(() -> "hello"); Future<String> future2 = futurePool.apply(() -> "my"); Future<String> future3 = futurePool.apply(() -> "world"); futures.add(future1); futures.add(future2); futures.add(future3); Future<List<String>> collect = Futures.collect(futureList);
4、錯誤處理
這部分處理也比較簡潔,註意這裡返回的是BoxedUnit.UNIT
,其實這是scala的語法,可以理解成void
的return
。
future.onFailure(new Function1<Throwable, BoxedUnit>() { @Override public BoxedUnit apply(Throwable v1) { System.out.println("Error"); return BoxedUnit.UNIT; } );
其他用法
除瞭以上的用法。其實還有很多用法。
例如:collectToTry
,會返回一個Try對象,Try代表瞭一個成功返回的結果,或者錯誤返回的異常.
可以使用try.isReturn()來判斷是否是正常返回的。這在多個Future異步結果的處理中用著很不錯。
Future<List<Try<String>>> futures = Futures.collectToTry(futureList);
flattern()
,該方法類似scala的扁平方法,可以將嵌套的異步對象拍平。
flatMap()
,和flatMap的用法一致,不過是異步的結果。
當你用不好twitter future的時候,隨時隨地可以轉成javaFuture。 toJavaFuture()
。所以,放心用。
其他更有趣的方法,可以自己研究下,還是有點騷東西的。
其他工具
twitter的這個工具包出瞭異步編程外,還有其他的很實用的工具。 包括:
codec
編解碼cahce
緩存hasing
哈希相關jackson
mock
thirft
validator
自行發掘吧。 地址是: github.com/twitter/uti…
到此這篇關於Java異步編程工具Twitter Future詳解的文章就介紹到這瞭,更多相關Java異步編程內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- 詳解Java CompletableFuture使用方法以及與FutureTask的區別
- Java多線程工具CompletableFuture的使用教程
- Java8 CompletableFuture runAsync學習總結submit() execute()等
- Java多線程異步調用性能調優方法詳解
- 一文搞懂Java創建線程的五種方法