Java8 CompletableFuture runAsync學習總結submit() execute()等

一般的 Executors 的 execute以及submit

並發包下 Executors 創建的線程存在 一個 execute(),以及三個 submit()

不同的是使用 execute() 執行的任務是沒有返回值的,使用 submit() 則是存在返回值的,這與接下裡要說的 CompletableFuture.runAsync 有些類似。

測試代碼如下:

    @Test
    public void justFor(){
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Future<Float> submit = executorService.submit(() -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            return 1.03f;
        });

        Float aFloat = null;
        try {
            aFloat = submit.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        // 根據自己需要決定是否需要調用關閉線程
//        executorService.shutdown();
        System.out.println("aFloat = " + aFloat);

    }

結果:

Thread.currentThread() = Thread[pool-2-thread-1,5,main]
aFloat = 1.03

使用 submit 可以通過 get獲取線程中任務的返回結果,可以通過對象獲取當前狀態 isDone 或者 isCancelled ;

子線程異步執行,主線程休眠等待子線程執行完成,子線程執行完成後喚醒主線程,主線程獲取任務執行結果後退出

此時我加入一個異常代碼,使其必定出錯再來看看結果

    @Test
    public void justFor(){
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Future<Float> submit = executorService.submit(() -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            int ii = 1/0;
            return 1.2f;
        });

        Float aFloat = null;
        try {
            aFloat = submit.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
//        executorService.shutdown();
        System.out.println("aFloat = " + aFloat);

    }

執行結果:

此時即使異常依舊終止瞭子線程以及主線程的執行。

CompletableFuture 的 supplyAsync() / runAsync()

  • supplyAsync 表示創建帶返回值的異步任務,相當於ExecutorService submit(Callable< T> task)
  • runAsync 表示創建無返回值的異步任務,相當於ExecutorService submit(Runnable task)方法,這兩個方法效果與 submit 一致

示例代碼:

    @Test
    public void justFor(){
        CompletableFuture<Float> floatCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            return 1.03f;
        });

        try {
            Float aFloat = floatCompletableFuture.get();
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            System.out.println("aFloat = " + aFloat);

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }

輸出結果:

Thread.currentThread() = Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread.currentThread() = Thread[main,5,main]
aFloat = 1.03

日志中 ForkJoinPool 為jdk1.7 提供的一個新的分而治之的性能更好的並發處理線程池,比一般的Executors 更好一點,適用於高密度計算的任務。

但也可以如此寫

即將該任務提交到指定的線程池中執行該任務;

輸出的線程池不一致

類似的 runAsync() 也可以這樣,使用自己的異步線程或者提交到指定的線程池中執行

可以看得出使用第二個參數均提供瞭可以指定 Executor 沒有指定時默認使用 ForkJoinPool.commonPool()

一般的

runAsync 如下:

CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            return;
        });

可以看得出 並沒有任何返回值

CompletableFuture 的 thenApply() / thenApplyAsync()

thenApply 表示某個任務執行完成後執行的動作即回調方法,會將該任務的執行結果即方法的返回值會作為作為入參傳遞到接下來的回調方法中

示例代碼:

    @Test
    public void justFor(){
        CompletableFuture<Float> floatCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            return 1.03f;
        });

        CompletableFuture<Float> floatCompletableFuture1 = floatCompletableFuture.thenApply((resultFloat) -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            System.out.println("接受上一個 resultFloat = " + resultFloat);
            return 2.01f;
        });

        CompletableFuture<Float> floatCompletableFuture2 = floatCompletableFuture1.thenApplyAsync((result2) -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            System.out.println("result2 = " + result2);
            return 2.21f;
        });

        try {
            Float aFloat = floatCompletableFuture.get();
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            System.out.println("aFloat = " + aFloat);

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

輸出結果:

Thread.currentThread() = Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread.currentThread() = Thread[main,5,main]
接受上一個 resultFloat = 1.03
Thread.currentThread() = Thread[main,5,main]
Thread.currentThread() = Thread[ForkJoinPool.commonPool-worker-1,5,main]
aFloat = 1.03
result2 = 2.01

thenApplyAsyncthenApply 區別:

  • thenApplyAsync 將任務異步處理,可以選擇提交到某一個線程池中執行
  • thenApply 則將會在上一個任務的同一個線程中執行

上面代碼也可以連著書寫如下:

CompletableFuture 的 thenAccept() / thenRun()

  • thenAccept thenApply 接收上一個任務的返回值作為參數但是沒有返回值
  • thenAcceptAsync 同上但為異步線程,可以指定提交到某一個線程池中
  • thenRun 方法沒有入參,也沒有返回值
  • thenRunAsync 同上但為異步線程,可以指定提交到某一個線程池中

示例代碼:

    @Test
    public void justFor(){

        CompletableFuture<Float> floatCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            return 1.03f;
        });

         CompletableFuture<Void> floatCompletableFuture1= floatCompletableFuture.thenApply((resultFloat) -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            System.out.println("接受上一個 resultFloat = " + resultFloat);
            return 2.01f;

        }).thenAccept((result)->{
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            System.out.println("result = " + result);

        }).thenRun(()->{
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            System.out.println(" doNothing");
        });
    }

CompletableFuture exceptionally

指定某個任務執行異常時執行的回調方法,會將拋出異常作為參數傳遞到回調方法中,如果該任務正常執行則 exceptionally方法返回的CompletionStage的result就是該任務正常執行的結果

正常示例:

    @Test
    public void justFor(){

        CompletableFuture<Float> floatCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
//            float ii = 1/0;
            return 1.03f;
        });

        CompletableFuture<Float> exceptionally = floatCompletableFuture.exceptionally((exception) -> {
            System.out.println("catch exception");
            exception.printStackTrace();
            return 0.0f;
        });

        floatCompletableFuture.thenAccept((result)->{
            System.out.println("is OK");
            System.out.println("result = " + result);
        });

    }

輸出結果:

Thread.currentThread() = Thread[ForkJoinPool.commonPool-worker-1,5,main]
is OK

異常示例:

    @Test
    public void justFor(){

        CompletableFuture<Float> floatCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());

            int a = 121/0;

            return 1.03f;
        });

        CompletableFuture<Float> exceptionally = floatCompletableFuture.exceptionally((exception) -> {
            System.out.println("catch exception");
            exception.printStackTrace();
            return 0.0f;
        });

        floatCompletableFuture.thenAccept((result)->{
            System.out.println("is OK");
            System.out.println("result = " + result);
        });

    }

結果:

Thread.currentThread() = Thread[ForkJoinPool.commonPool-worker-1,5,main]
catch exception

CompletableFuture whenComplete

當某個任務執行完成後執行的回調方法,會將執行結果或者執行期間拋出的異常傳遞給回調方法

  • 正常執行則異常為null,回調方法對應的CompletableFuture的result和該任務一致
  • 異常執行,則get方法拋出異常
  • 同樣提供 Async 異步相關方法

正常:

    @Test
    public void justFor(){
        CompletableFuture<Float> floatCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());

            return 1.03f;
        });

        floatCompletableFuture.whenComplete((result, exception) -> {
            System.out.println("result = " + result);
            System.out.println("exception = " + exception);
        });
    }

輸出:

Thread.currentThread() = Thread[ForkJoinPool.commonPool-worker-1,5,main]
result = 1.03
exception = null

異常時示例:

    public static void main(String[] args) {
        CompletableFuture<Float> floatCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            int ii = 12 / 0;
            return 1.03f;
        });

        floatCompletableFuture.whenComplete((result, exception) -> {
            System.out.println("result = " + result);
            System.out.println("exception = " + exception);
        });
    }

輸出:

Thread.currentThread() = Thread[ForkJoinPool.commonPool-worker-1,5,main]
result = null
exception = java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

CompletableFuture handle

whenComplete 基本一致

區別在於handle的回調方法有返回值,且handle方法返回的CompletableFuture的result是回調方法的執行結果或者回調方法執行期間拋出的異常,與原始CompletableFuture的result無關

示例代碼:

    @Test
    public void justFor(){
        CompletableFuture<Float> floatCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            int ii = 12 / 0;
            return 1.03f;
        });

        floatCompletableFuture.handle((result, exception) -> {
            System.out.println("result = " + result);
            System.out.println("exception = " + exception);

            return "???";
        });
    }

CompletableFuture 組合處理 thenCombine / thenAcceptBoth / runAfterBoth

三個方法都是將兩個 CompletableFuture 組合起來

隻有這兩個都正常執行完瞭才會執行某個任務區別在於

  • thenCombine 會將兩個任務的執行結果作為方法入參傳遞到指定方法中,且該方法有返回值;thenAcceptBoth 同樣將兩個任務的執行結果作為方法入參,但是無返回值;
  • runAfterBoth 沒有入參,也沒有返回值。註意兩個任務中隻要有一個執行異常,則將該異常信息作為指定任務的執行結果

同時這些也提供瞭Async 異步方法

示例代碼:

    @Test
    public void justFor(){
        CompletableFuture<Float> a1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            return 1.03f;
        });

        CompletableFuture<Float> a2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            return 2.03f;
        });

        // 傳遞結果 有返回值
        CompletableFuture<String> objectCompletableFuture = a1.thenCombine(a2, (a, b) -> {

            return "12";
        });

        // 傳遞結果  無返回值
        CompletableFuture<Void> voidCompletableFuture = a1.thenAcceptBoth(a2, (a, b) -> {
            
        });

        // 無入參 無返回值
        a1.runAfterBoth(a2, ()->{
            //
        });
    }

CompletableFuture applyToEither / acceptEither / runAfterEither

三個方法都是將兩個CompletableFuture組合起來

但與上面不同的是隻要其中一個執行完瞭就會執行某個任務,區別

  • applyToEither 會將已經執行完成的任務的執行結果作為方法入參,並有返回值;
  • acceptEither 同樣將已經執行完成的任務的執行結果作為方法入參,但是沒有返回值;runAfterEither 沒有方法入參,也沒有返回值。

註意 兩個任務中隻要有一個執行異常,則將該異常信息作為指定任務的執行結果

同時這些也提供瞭Async 異步方法

示例代碼:

    @Test
    public void justFor(){
        CompletableFuture<Float> a1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            return 1.03f;
        });

        CompletableFuture<Float> a2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            return 2.03f;
        });

        // 傳遞結果 有返回值
        CompletableFuture<String> objectCompletableFuture = a1.thenCombine(a2, (a, b) -> {

            return "12";
        });

        // 傳遞結果  無返回值
        CompletableFuture<Void> voidCompletableFuture = a1.thenAcceptBoth(a2, (a, b) -> {
            
        });

        // 無入參 無返回值
        a1.runAfterBoth(a2, ()->{
            //
        });
    }

CompletableFuture thenCompose

thenCompose

  • 在某個任務執行完成後,將該任務的執行結果作為方法入參然後執行指定方法,該方法會返回一個新的CompletableFuture實例
  • 如果該CompletableFuture實例的result不為null,則返回一個基於該result的新的CompletableFuture實例;
  • 如果該CompletableFuture實例為null,則執行這個新任務

同樣的提供Async方式

    @Test
    public void justFor(){
        CompletableFuture<Float> a1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            return 1.03f;
        });

        CompletableFuture<Float> a2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            return 2.03f;
        });

        // 傳遞結果 有返回值
        CompletableFuture<String> objectCompletableFuture = a1.applyToEither(a2, (b) -> {

            return "12";
        });

        // 傳遞結果  無返回值
        CompletableFuture<Void> voidCompletableFuture = a1.acceptEither(a2, (b) -> {

        });

        // 無入參 無返回值
        a1.runAfterEither(a2, ()->{
            //
        });
    }

CompletableFuture 的 allOf() anyOf()

  • allOf 返回的CompletableFuture是多個任務都執行完成後才會執行,隻要有一個任務執行異常,則返回的 CompletableFuture 執行get方法時會拋出異常,如果都正常執行,則get返回null
  • anyOf 隻要有一個任務執行完成,無論是正常執行或者執行異常,都會執行向下執行

示例代碼:

    @Test
    public void justFor(){
        CompletableFuture<Float> a1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            return 1.03f;
        });

        CompletableFuture<Float> a2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Thread.currentThread() = " + Thread.currentThread());
            return 2.03f;
        });

        // 傳遞結果 有返回值
        CompletableFuture<String> objectCompletableFuture = a1.applyToEither(a2, (b) -> {

            return "12";
        });

        // 傳遞結果  無返回值
        CompletableFuture<Void> voidCompletableFuture = a1.acceptEither(a2, (b) -> {

        });

        // 無入參 無返回值
        a1.runAfterEither(a2, ()->{
            //
        });
    }

參考文章

點此進入參考鏈接

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: