Java並發編程必備之Future機制

前言

Java 5在concurrency包中引入瞭java.util.concurrent.Callable 接口,它和Runnable接口很相似,但它可以返回一個對象或者拋出一個異常。

Callable接口使用泛型去定義它的返回類型。Executors類提供瞭一些有用的方法在線程池中執行Callable內的任務。由於Callable任務是並行的,我們必須等待它返回的結果。而線程是屬於異步計算模型,所以不可能直接從別的線程中得到函數返回值。

java.util.concurrent.Future對象為我們解決瞭這個問題。在線程池提交Callable任務後返回瞭一個Future對象,使用它可以知道Callable任務的狀態和得到Callable返回的執行結果。Future提供瞭get()方法讓我們可以等待Callable結束並獲取它的執行結果。

Future的作用

當做一定運算的時候,運算過程可能比較耗時,有時會去查數據庫,或是繁重的計算,比如壓縮、加密等,在這種情況下,如果我們一直在原地等待方法返回,顯然是不明智的,整體程序的運行效率會大大降低。

我們可以把運算的過程放到子線程去執行,再通過 Future 去控制子線程執行的計算過程,最後獲取到計算結果。

這樣一來就可以把整個程序的運行效率提高,是一種異步的思想。

同時在JDK 1.8的doc中,對Future的描述如下:

A Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation.

大概意思就是Future是一個用於異步計算的接口。

舉個例子:

比如去吃早點時,點瞭包子和涼菜,包子需要等3分鐘,涼菜隻需1分鐘,如果是串行的一個執行,在吃上早點的時候需要等待4分鐘,但是如果你在準備包子的時候,可以同時準備涼菜,這樣隻需要等待3分鐘。

Future就是後面這種執行模式。

創建Future

線程池

class Task implements Callable<String> {
  public String call() throws Exception {
    return longTimeCalculation(); 
  } 
} 
ExecutorService executor = Executors.newFixedThreadPool(4); 
// 定義任務:
Callable<String> task = new Task(); 
// 提交任務並獲得Future: 
Future<String> future = executor.submit(task); 
// 從Future獲取異步執行返回的結果: 
String result = future.get(); // 可能阻塞 

當我們提交一個Callable任務後,我們會同時獲得一個Future對象,然後,我們在主線程某個時刻調用Future對象的get()方法,就可以獲得異步執行的結果。

在調用get()時,如果異步任務已經完成,我們就直接獲得結果。如果異步任務還沒有完成,那麼get()會阻塞,直到任務完成後才返回結果

FutureTask

除瞭用線程池的 submit 方法會返回一個 future 對象之外,同樣還可以用 FutureTask 來獲取 Future 類和任務的結果。

我們來看一下 FutureTask 的代碼實現:

public class FutureTask<V> implements RunnableFuture<V>{
 ...
} 

可以看到,它實現瞭一個接口,這個接口叫作 RunnableFuture。

我們再來看一下 RunnableFuture 接口的代碼實現:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
} 

既然 RunnableFuture 繼承瞭 Runnable 接口和 Future 接口,而 FutureTask 又實現瞭 RunnableFuture 接口,所以 FutureTask 既可以作為 Runnable 被線程執行,又可以作為 Future 得到 Callable 的返回值。

典型用法是,把 Callable 實例當作 FutureTask 構造函數的參數,生成 FutureTask 的對象,然後把這個對象當作一個 Runnable 對象,放到線程池中或另起線程去執行,最後還可以通過 FutureTask 獲取任務執行的結果。

下面我們就用代碼來演示一下:

public class FutureTaskDemo {

    public static void main(String[] args) {
        Task task = new Task();
        FutureTask<Integer> integerFutureTask = new FutureTask<>(task);
        new Thread(integerFutureTask).start();

        try {
            System.out.println("task運行結果:"+integerFutureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

class Task implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println("子線程正在計算");
        int sum = 0;
        for (int i = 0; i < 100; i++) {
            sum += i;
        }
        return sum;
    }
} 

在這段代碼中可以看出,首先創建瞭一個實現瞭 Callable 接口的 Task,然後把這個 Task 實例傳入到 FutureTask 的構造函數中去,創建瞭一個 FutureTask 實例,並且把這個實例當作一個 Runnable 放到 new Thread() 中去執行,最後再用 FutureTask 的 get 得到結果,並打印出來。

Future常用方法

方法名 返回值 入參 備註 總結
cancel boolean (boolean mayInterruptIfRunning) 用來取消任務,如果取消任務成功則返回true,如果取消任務失敗則返回false。 也就是說Future提供瞭三種功能:判斷任務是否完成,能夠中斷任務,能夠獲取任務執行結果
isCancelled boolean 方法表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 true。
isDone boolean 方法表示任務是否已經完成,若任務完成,則返回true;
get V 方法用來獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回
get V (long timeout, TimeUnit unit) 用來獲取執行結果,如果在指定時間內,還沒獲取到結果,就直接返回null

get()方法

get方法最主要的作用就是獲取任務執行的結果

我們來看一個代碼示例:

public class FutureTest {

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(10);
        Future<Integer> future = service.submit(new CallableTask());
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        service.shutdown();
    }

    static class CallableTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            Thread.sleep(3000);
            return new Random().nextInt();
        }
    }
} 

在這段代碼中,main 方法新建瞭一個 10 個線程的線程池,並且用 submit 方法把一個任務提交進去。

這個任務它所做的內容就是先休眠三秒鐘,然後返回一個隨機數。

接下來我們就直接把future.get結果打印出來,其結果是正常打印出一個隨機數,比如 9527 等。

isDone()方法

該方法是用來判斷當前這個任務是否執行完畢瞭。

需要註意的是,這個方法如果返回 true 則代表執行完成瞭;如果返回 false 則代表還沒完成。

但這裡如果返回 true,並不代表這個任務是成功執行的,比如說任務執行到一半拋出瞭異常。那麼在這種情況下,對於這個 isDone 方法而言,它其實也是會返回 true 的,因為對它來說,雖然有異常發生瞭,但是這個任務在未來也不會再被執行,它確實已經執行完畢瞭。

所以 isDone 方法在返回 true 的時候,不代表這個任務是成功執行的,隻代表它執行完畢瞭。

我們用一個代碼示例來看一看,代碼如下所示:

public class GetException {

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(20);
        Future<Integer> future = service.submit(new CallableTask());

        try {
            for (int i = 0; i < 5; i++) {
                System.out.println(i);
                Thread.sleep(500);
            }
            System.out.println(future.isDone());
            future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    static class CallableTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            throw new IllegalArgumentException("Callable拋出異常");
        }
    }
} 

在這段代碼中,可以看到有一個線程池,並且往線程池中去提交任務,這個任務會直接拋出一個異常。

那麼接下來我們就用一個 for 循環去休眠,同時讓它慢慢打印出 0 ~ 4 這 5 個數字,這樣做的目的是起到瞭一定的延遲作用。

在這個執行完畢之後,再去調用 isDone() 方法,並且把這個結果打印出來,然後再去調用 future.get()

cancel方法

如果不想執行某個任務瞭,則可以使用 cancel 方法,會有以下三種情況:

  • 第一種情況最簡單,那就是當任務還沒有開始執行時,一旦調用 cancel,這個任務就會被正常取消,未來也不會被執行,那麼 cancel 方法返回 true。
  • 第二種情況也比較簡單。如果任務已經完成,或者之前已經被取消過瞭,那麼執行 cancel 方法則代表取消失敗,返回 false。因為任務無論是已完成還是已經被取消過瞭,都不能再被取消瞭。
  • 第三種情況就是這個任務正在執行,這個時候會根據我們傳入的參數mayInterruptIfRunning做判斷,如果傳入的參數是 true,執行任務的線程就會收到一個中斷的信號,正在執行的任務可能會有一些處理中斷的邏輯,進而停止,如果傳入的是 false 則就代表不中斷正在運行的任務

isCancelled()方法

判斷是否被取消,它和 cancel 方法配合使用,比較簡單。

應用場景

目前對於Future方式,我們經常使用的有這麼幾類:

Guava

ListenableFutrue,通過增加監聽器的方式,計算完成時立即得到結果,而無需一直循環查詢

CompletableFuture

Java8的CompletableFuture,使用thenApply,thenApplyAsync可以達到和Guava類似的鏈式調用效果。

不同的是,對於Java8,如果thenApplyAsync不傳入線程池,則會使用ForkJoinPools線程池來執行對應的方法,如此可以避免對其他線程產生影響。

Netty

Netty解決的問題:

  • 原生Future的isDone()方法判斷一個異步操作是否完成,但是定義比較模糊:正常終止、拋出異常、用戶取消都會使isDone方法返回true。
  • 對於一個異步操作,我們有些時候更關註的是這個異步操作觸發或者結束後能否再執行一系列的動作。

與JDK相比,增加瞭完成狀態的細分,增加瞭監聽者,異步線程結束之後能夠觸發一系列的動作。

註意事項

添加超時機制

假設一共有四個任務需要執行,我們都把它放到線程池中,然後它獲取的時候是按照從 1 到 4 的順序,也就是執行 get() 方法來獲取的

代碼如下所示:

public class FutureDemo {


    public static void main(String[] args) {
        //創建線程池
        ExecutorService service = Executors.newFixedThreadPool(10);
        //提交任務,並用 Future 接收返回結果
        ArrayList<Future> allFutures = new ArrayList<>();
        for (int i = 0; i < 4; i++) {
            Future<String> future;
            if (i == 0 || i == 1) {
                future = service.submit(new SlowTask());
            } else {
                future = service.submit(new FastTask());
            }
            allFutures.add(future);
        }

        for (int i = 0; i < 4; i++) {
            Future<String> future = allFutures.get(i);
            try {
                String result = future.get();
                System.out.println(result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        service.shutdown();
    }

    static class SlowTask implements Callable<String> {

        @Override
        public String call() throws Exception {
            Thread.sleep(5000);
            return "速度慢的任務";
        }
    }

    static class FastTask implements Callable<String> {

        @Override
        public String call() throws Exception {
            return "速度快的任務";
        }
    }
} 

可以看出,在代碼中我們新建瞭線程池,並且用一個 list 來保存 4 個 Future。

其中,前兩個 Future 所對應的任務是慢任務,也就是代碼下方的 SlowTask,而後兩個 Future 對應的任務是快任務。

慢任務在執行的時候需要 5 秒鐘的時間才能執行完畢,而快任務很快就可以執行完畢,幾乎不花費時間。

在提交完這 4 個任務之後,我們用 for 循環對它們依次執行 get 方法,來獲取它們的執行結果,然後再把這個結果打印出來。

實際上在執行的時候會先等待 5 秒,然後再很快打印出這 4 行語句。

所以問題是:

第三個的任務量是比較小的,它可以很快返回結果,緊接著第四個任務也會返回結果。

但是由於前兩個任務速度很慢,所以我們在利用 get 方法執行時,會卡在第一個任務上。也就是說,雖然此時第三個和第四個任務很早就得到結果瞭,但我們在此時使用這種 for 循環的方式去獲取結果,依然無法及時獲取到第三個和第四個任務的結果。直到 5 秒後,第一個任務出結果瞭,我們才能獲取到,緊接著也可以獲取到第二個任務的結果,然後才輪到第三、第四個任務。

假設由於網絡原因,第一個任務可能長達 1 分鐘都沒辦法返回結果,那麼這個時候,我們的主線程會一直卡著,影響瞭程序的運行效率。

此時我們就可以用 Future 的帶超時參數的get(long timeout, TimeUnit unit)方法來解決這個問題。

這個方法的作用是,如果在限定的時間內沒能返回結果的話,那麼便會拋出一個 TimeoutException 異常,隨後就可以把這個異常捕獲住,或者是再往上拋出去,這樣就不會一直卡著瞭。

源碼分析

超時實現原理

具體實現類:FutureTask

get()方法可以分為兩步:

  • 判斷當前任務的執行狀態,如果不是COMPLETING,就調用awaitDone()方法開始進行死循環輪旋,如果任務還沒有執行完成會使用nanos = deadline - System.nanoTime()檢查是否超時,如果方法已經超時,則會返回,在返回後如果任務的狀態仍然<=COMPLETING,就會拋出TimeoutException()。
  • 如果調用時任務沒有執行完成,會調用parkNanos(),調用線程會阻塞在這裡。

接下來分兩種情況:

  • 在阻塞時間完以後任務的執行狀態仍然沒有改變為完成,進入下一次循環,直接返回。
  • 如果在輪詢中狀態已經改變,任務完成,則會中斷死循環,返回任務執行的返回值。

到此這篇關於Java並發編程必備之Future機制的文章就介紹到這瞭,更多相關Java Future機制內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: