Java並行執行任務的幾種方案小結

背景

最近在排查生產環境問題,發現商品詳情接口時不時會報RPC調用超時,檢查代碼發現接口裡面查詢活動耗時比較長,都是串行執行的,仔細查看發現完全可以改成並行去執行,縮短接口查詢耗時。

比如我們的商品詳情接口,需要展示立減、階梯滿減、團購等活動標簽。需要查詢三次不同的活動信息,再組裝活動標簽信息。

如果每次查詢耗時1s,按照串行的方式去調用,整個接口下來至少需要3s,整個耗時,對於我們來講是無法接受的。其實在jdk中,給我們提供瞭幾種非常便捷的並行執行任務的方法。

  • CountDownLatch
  • ExecutorService.invokeAll()
  • Fork/Join 分而治之 有點類似MapReduce的影子,這個有興趣的可以自行去瞭解

改進方案

代碼例子:

    private void assemblyActivityTag(CartItemDTO itemDTO){
        //1.查詢立減活動信息,耗時1s         
        //2.查詢階梯滿減活動信息,耗時1s        
        //3.查詢團購活動信息,耗時1s        
        //4.組裝活動標簽信息,耗時1s 
        // 串行執行下來整個耗時4s
    }

CountDownLatch

    private void assemblyActivityTag(CartItemDTO itemDTO){
        ExecutorService executorService = Executors.newCachedThreadPool();
        CountDownLatch latch = new CountDownLatch(3);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
            //1.查詢立減活動信息
                latch.countDown();
            }
        });
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                //2.查詢階梯滿減活動信息
                latch.countDown();
            }
        });
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                //3.查詢團購活動信息
                latch.countDown();
            }
        });
        try {
            // 一定記得加上timeout時間,防止阻塞主線程
            latch.await(3000,TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //4.等待所有子任務完成,組裝活動標簽信息
         
        //5.關閉線程池
        executorService.shutdown();
    }

ExecutorService.invokeAll()

private void assemblyActivityTag(CartItemDTO itemDTO) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        List<Callable<String>> tasks = Lists.newArrayList();
        tasks.add(new Callable<String>() {
            @Override
            public String call() throws Exception {
                //1.查詢立減活動信息
                return null;
            }
        });
        tasks.add(new Callable<String>() {
            @Override
            public String call() throws Exception {
                //2.查詢階梯滿減活動信息
                return null;
            }
        });
        tasks.add(new Callable<String>() {
            @Override
            public String call() throws Exception {
                //3.查詢團購活動信息
                return null;
            }
        });
        try {
            List<Future<String>> futureList = executorService.invokeAll(tasks, 3000, TimeUnit.MILLISECONDS);
            for (Future<String> future : futureList) {
                // 獲取線程執行結果
                try {
                    String activityTag = future.get();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //4.組裝活動標簽信息
        //5.關閉線程池
        executorService.shutdown();
    }

註意點和區別

在使用CountDownLatch,盡可能使用線程安全的容器去處理子線程的返回值,避免多線程情況下,出現臟數據。

如果想知道每個子線程的對應的返回值,ExecutorService.invokeAll()方式,是沒法區分的,隻能依賴返回值的順序去匹配。

使用上面2種方式時,切記設置超時時間,防止子任務執行時間過長,阻塞主線程任務

線程池用完結束,記得shutdown()

java並行執行任務demo

在一個方法中同時調用多個方法或者服務,並等待所有結果返回

package com.test.demo;
import org.junit.Test;
import java.util.concurrent.CompletableFuture;
public class TestFuture {
    @Test
    public void testA(){
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> c());
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> a());
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> b());
        try {
            //獲取並行執行任務結果
            System.out.println(future3.get());
            System.out.println(future1.get());
            System.out.println(future2.get());
        }catch (Exception e){
        }
    }
    public String a(){
        try {
            Thread.sleep(1000);
        }catch (Exception e){
        }
        return "a";
    }
    private String b(){
        try {
            //模擬業務執行時間
            Thread.sleep(2000);
        }catch (Exception e){
        }
        return "b";
    }
    private String c(){
        try {
            //模擬業務執行時間
            Thread.sleep(5000);
        }catch (Exception e){
        }
        return "c";
    }
}

測試結果:

從執行結果中可以看到一共耗時5s,如果同步進行執行,耗時應該在8s

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: