Java異步編程工具Twitter Future詳解

異步編程(Twitter Future)

為啥要異步

異步編程有點難以理解,這東西感覺不符合常理,因為我們思考都是按照串行的邏輯,事都是一件一件辦。但在異步計算的情況下,回調往往分散在代碼片段中,需要理解其中的意義。

最難搞的就是組合,嵌套。如果再加上遞歸,派發等邏輯,能寫的極其復雜,又難以理解。當我們需要處理其中一個步驟中可能發生的錯誤時,情況會變得更糟。

java在核心庫中引入瞭CompletableFuture,同時也是一個異步框架,有大約50種不同的方法用於組合、組合和執行異步計算步驟以及處理錯誤。

基本用法

1、封裝計算邏輯,異步返回。

CompletableFuture的靜態方法runAsyncsupplySync允許我們相應地使用Runnable和SupplySync函數類型創建一個完整的future實例。如下就是一個簡單的示例。

CompletableFuture<String> future  =  
      CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3 * 1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
      return "Hello";
});        
System.out.println("Main goes on...");        
String result = future.get();
System.out.println(result);

如上代碼片段,打印後的結果是Main goes on 先執行,異步任務在future.get() 阻塞結果返回。

2、異步計算結果串聯異步處理

如果想在一個future完畢後,接上另一個異步任務,則用法如下:

CompletableFuture<String> completableFuture
                = CompletableFuture.supplyAsync(() -> {
	try {
         System.out.println("task1: " + Thread.currentThread().getName());
         Thread.sleep(2 * 1000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return "Hello";
    });

CompletableFuture<String> future 
	= completableFuture.thenApply(s -> {
    try {
    	System.out.println("task2: " + Thread.currentThread().getName());
    	Thread.sleep(1000);
    } catch (InterruptedException e) {
    	throw new RuntimeException(e);
    }
    return s + " World";
   });
   
System.out.println(future.get());

3、並行多個異步任務,統一等待結果

當我們需要並行執行多個Future時,我們通常希望等待所有Futrue都能夠執行,然後處理它們的全部統一的返回結果。

CompletableFuture 的 allOf 靜態方法允許等待所有的future完成:

如下面的代碼片段:

CompletableFuture<String> future1
	= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
	= CompletableFuture.supplyAsync(() -> "my");
CompletableFuture<String> future3         
	= CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<Void> combinedFuture
                = CompletableFuture.allOf(future1, future2, future3);
combinedFuture.get();
System.out.println(future1.isDone());
System.out.println(future2.isDone());
System.out.println(future3.isDone());

4、異步錯誤處理

CompletableFuture類不需要捕獲語法塊中的異常,而是允許我們用一種特殊的回調方法來處理。此方法接收兩個參數:計算結果(如果成功完成)和異常結果(如果某些計算步驟有異常)。

String name = "fengkai
CompletableFuture<String> completableFuture
   =  CompletableFuture.supplyAsync(() -> {
   if ("fengkai".equals(name)) {
   	throw new RuntimeException("Computation error!"); 
   }
   return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");

System.out.println(completableFuture.get());

Twitter包裝

對於以上的代碼,twitter工具包有自己的小包裝,可以提升一點編程的逼格。

以下是用法:

pom依賴

首先引入maven坐標,因為是用scala編寫的工具包,所以要引入scala的依賴。

<dependency>
	<groupId>org.scala-lang</groupId>
	<artifactId>scala-library</artifactId>
     <version>${scala.version}</version>
</dependency>
<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>util-core_2.12</artifactId>
	<version>${twitter.util.version}</version>
</dependency>

1、封裝計算邏輯,異步返回

註意這裡的FuturePool,可以用ExecutorService去包裝。

Future<String> future = futurePool.apply(() -> {
  try {
		Thread.sleep(3 * 1000);
	} catch (InterruptedException e) {
	    throw new RuntimeException(e);
	}
	return "Hello";
})

2、異步計算結果串聯異步處理

CompletableFuture相似的,有以下用法,不過是用的map方法

Future<String> future = futurePool.apply(() -> {
	try {
	  System.out.println("task2: " + Thread.currentThread().getName());
	  Thread.sleep(1000);
	} catch (InterruptedException e) {
	  throw new RuntimeException(e);
	}
	return "Hello";
});
Future<Object> mappedFuture = future.map(new Function1<String, Object>() {
	@Override
	public Object apply(String v1) {
	  try {
	    System.out.println("task2: " + Thread.currentThread().getName());
	    Thread.sleep(1000);
	  } catch (InterruptedException e) {
	    throw new RuntimeException(e);
	  }
	  return "World";
	}
});

Await.result(mappedFuture);

3、並行多個異步任務

這個相對看起來就簡潔的多瞭,用List添加所有的異步結果,然後collect收集起來,調用get()或者其他方法阻塞等待。

List<Future> futures = new ArrayList<>();
Future<String> future1 = futurePool.apply(() -> "hello");
Future<String> future2 = futurePool.apply(() -> "my");
Future<String> future3 = futurePool.apply(() -> "world");
futures.add(future1);
futures.add(future2);
futures.add(future3);
Future<List<String>> collect = Futures.collect(futureList);

4、錯誤處理

這部分處理也比較簡潔,註意這裡返回的是BoxedUnit.UNIT,其實這是scala的語法,可以理解成voidreturn

future.onFailure(new Function1<Throwable, BoxedUnit>() {
      @Override
      public BoxedUnit apply(Throwable v1)
      {
        System.out.println("Error");
        return BoxedUnit.UNIT;
      }
);

其他用法

除瞭以上的用法。其實還有很多用法。

例如:collectToTry,會返回一個Try對象,Try代表瞭一個成功返回的結果,或者錯誤返回的異常.

可以使用try.isReturn()​來判斷是否是正常返回的。這在多個Future異步結果的處理中用著很​不錯。

Future<List<Try<String>>> futures = Futures.collectToTry(futureList);

flattern(),該方法類似scala的扁平方法,可以將嵌套的異步對象拍平。

flatMap(),和flatMap的用法一致,不過是異步的結果。

當你用不好twitter future的時候,隨時隨地可以轉成javaFuture。 toJavaFuture()。所以,放心用。

其他更有趣的方法,可以自己研究下,還是有點騷東西的。

其他工具

twitter的這個工具包出瞭異步編程外,還有其他的很實用的工具。 包括:

  • codec編解碼
  • cahce緩存
  • hasing哈希相關
  • jackson
  • mock
  • thirft
  • validator

自行發掘吧。 地址是: github.com/twitter/uti…

到此這篇關於Java異步編程工具Twitter Future詳解的文章就介紹到這瞭,更多相關Java異步編程內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: