Java線程池並發執行多個任務方式

Java線程池並發執行多個任務

Java在語言層面提供瞭多線程的支持,線程池能夠避免頻繁的線程創建和銷毀的開銷,因此很多時候在項目當中我們是使用的線程池去完成多線程的任務。

Java提供瞭Executors 框架提供瞭一些基礎的組件能夠輕松的完成多線程異步的操作,Executors提供瞭一系列的靜態工廠方法能夠獲取不同的ExecutorService實現,ExecutorService擴展瞭Executors接口,Executors相當簡單:

public interface Executor {
    void execute(Runnable command);
}

把任務本身和任務的執行解耦瞭,如果說Runnable是可異步執行任務的抽象,那Executor就是如何執行可異步執行任務的抽象,說起來比較繞口。

本文不講解線程的一些基礎知識,因為網上的其他文章已經寫的足夠詳細和泛濫。我寫寫多個異步任務的並發執行與結果的獲取問題。

假設這樣一個場景:我們要組裝一個對象,這個對象由大量小的內容組成,這些內容是無關聯無依賴關系的,如果我們串行的去執行,如果每個任務耗時10秒鐘,一共有10個任務,那我們就需要100秒才能獲取到結果。顯然我們可以采用線程池,每個任務起一個線程,忽略線程啟動時間,我們隻需要10秒鐘就能獲取到結果。這裡還有兩種選擇,這10秒鐘我們可以去做其他事,也可以等待結果。

我們來完成這樣的操作:

// 這是任務的抽象
class GetContentTask implements Callable<String> {
        
        private String name;
        
        private Integer sleepTimes;
        
        public GetContentTask(String name, Integer sleepTimes) {
            this.name = name;
            this.sleepTimes = sleepTimes;
        }
        public String call() throws Exception {
            // 假設這是一個比較耗時的操作
            Thread.sleep(sleepTimes * 1000);
            return "this is content : hello " + this.name;
        }
        
    }

采用completionService :

// 方法一
        ExecutorService executorService = Executors.newCachedThreadPool();
        CompletionService<String> completionService = new ExecutorCompletionService(executorService);
        ExecuteServiceDemo executeServiceDemo = new ExecuteServiceDemo();
        // 十個
        long startTime = System.currentTimeMillis();
        int count = 0;
        for (int i = 0;i < 10;i ++) {
            count ++;
            GetContentTask getContentTask = new ExecuteServiceDemo.GetContentTask("micro" + i, 10);
            completionService.submit(getContentTask);
        }
        System.out.println("提交完任務,主線程空閑瞭, 可以去做一些事情。");
        // 假裝做瞭8秒種其他事情
        try {
            Thread.sleep(8000);
            System.out.println("主線程做完瞭,等待結果");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            // 做完事情要結果
            for (int i = 0;i < count;i ++) {
                Future<String> result = completionService.take();
                System.out.println(result.get());
            }
            long endTime = System.currentTimeMillis();
            System.out.println("耗時 : " + (endTime - startTime) / 1000);
        }  catch (Exception ex) {
            System.out.println(ex.getMessage());
        }

執行結果為:

提交完任務,主線程空閑瞭, 可以去做一些事情。
主線程做完瞭,等待結果
this is content : hello micro9
this is content : hello micro7
this is content : hello micro2
this is content : hello micro5
this is content : hello micro4
this is content : hello micro8
this is content : hello micro1
this is content : hello micro3
this is content : hello micro0
this is content : hello micro6
耗時 : 10

如果多個不想一個一個提交,可以采用 invokeAll一並提交,但是會同步等待這些任務

// 方法二
        ExecutorService executeService = Executors.newCachedThreadPool();
        List<GetContentTask> taskList = new ArrayList<GetContentTask>();
        long startTime = System.currentTimeMillis();
        for (int i = 0;i < 10;i ++) {
            taskList.add(new GetContentTask("micro" + i, 10));
        }
        try {
            System.out.println("主線程發起異步任務請求");
            List<Future<String>> resultList = executeService.invokeAll(taskList);
            // 這裡會阻塞等待resultList獲取到所有異步執行的結果才會執行 
            for (Future<String> future : resultList) {
                System.out.println(future.get());
            }
            // 主線程假裝很忙執行8秒鐘
            Thread.sleep(8);
            long endTime = System.currentTimeMillis();
            System.out.println("耗時 : " + (endTime - startTime) / 1000);
        } catch (Exception e) {
            e.printStackTrace();
        }

主線程發起異步任務請求
this is content : hello micro0
this is content : hello micro1
this is content : hello micro2
this is content : hello micro3
this is content : hello micro4
this is content : hello micro5
this is content : hello micro6
this is content : hello micro7
this is content : hello micro8
this is content : hello micro9
耗時 : 10

如果一系列請求,我們並不需要等待每個請求,我們可以invokeAny,隻要某一個請求返回即可。

ExecutorService executorService = Executors.newCachedThreadPool();
        ArrayList<GetContentTask> taskList = new ArrayList<GetContentTask>();
        taskList.add(new GetContentTask("micro1",3));
        taskList.add(new GetContentTask("micro2", 6));
        try {
            List<Future<String>> resultList = executorService.invokeAll(taskList);// 等待6秒 
//            String result2 = executorService.invokeAny(taskList); // 等待3秒
            // invokeAll 提交一堆任務並行處理並拿到結果
            // invokeAny就是提交一堆並行任務拿到一個結果即可
            for (Future<String> result : resultList) {
                System.out.println(result.get());
            }
//            System.out.println(result2);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("主線程等待");

如果我雖然發送瞭一堆異步的任務,但是我隻等待一定的時間,在規定的時間沒有返回我就不要瞭,例如很多時候的網絡請求其他服務器如果要數據,由於網絡原因不能一直等待,在規定時間內去拿,拿不到就我使用一個默認值。

這樣的場景,我們可以使用下面的寫法:

try {
            ExecutorService executorService = Executors.newCachedThreadPool();
            List<Callable<String>> taskList = new ArrayList<Callable<String>>();
            taskList.add(new GetContentTask("micro1", 4));
            taskList.add(new GetContentTask("micro2", 6));
            // 等待五秒
            List<Future<String>> resultList = executorService.invokeAll(taskList, 5, TimeUnit.SECONDS);
            for (Future<String> future : resultList) {
                System.out.println(future.get());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } 

this is content : hello micro1
java.util.concurrent.CancellationException
    at java.util.concurrent.FutureTask.report(FutureTask.java:121)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at com.micro.demo.spring.ExecuteServiceDemo.main(ExecuteServiceDemo.java:105)

因為隻等待5秒,6秒的那個任務自然獲取不到,拋出異常,如果將等待時間設置成8秒,就都能獲取到。

Java線程池的正確使用

線程可認為是操作系統可調度的最小的程序執行序列,一般作為進程的組成部分,同一進程中多個線程可共享該進程的資源(如內存等)。JVM線程跟內核輕量級進程有一對一的映射關系,所以JVM中的線程是很寶貴的。

一般在工程上多線程的實現是基於線程池的。因為相比自己創建線程,多線程具有以下優點:

  • 線程是稀缺資源,使用線程池可以減少創建和銷毀線程的次數,每個工作線程都可以重復使用。
  • 可以根據系統的承受能力,調整線程池中工作線程的數量,防止因為消耗過多內存導致服務器崩潰。

1.Executors存在什麼問題

看阿裡巴巴開發手冊並發編程這塊有一條:線程池不允許使用Executors去創建,而是通過ThreadPoolExecutor的方式。

2.Executors為什麼存在缺陷

2.1 線程池工作原理

當一個任務通過execute(Runnable)方法欲添加到線程池時:

  • 如果此時線程池中的數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的務。
  • 如果此時線程池中的數量等於 corePoolSize,但是緩沖隊列 workQueue未滿,那麼任務被放入緩沖隊列。
  • 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。
  • 那麼通過 handler所指定的策略來處理此任務。也就是:處理任務的優先級為:核心線程corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿瞭,使用handler處理被拒絕的任務。
  • 當線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。

2.2 newFixedThreadPool分析

Java中的BlockingQueue主要有兩種實現,分別是ArrayBlockingQueue 和 LinkedBlockingQueue。

ArrayBlockingQueue是一個用數組實現的有界阻塞隊列,必須設置容量。

LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列,容量可以選擇進行設置,不設置的話,將是一個無邊界的阻塞隊列,最大長度為Integer.MAX_VALUE。

這裡的問題就出在:不設置的話,將是一個無邊界的阻塞隊列,最大長度為Integer.MAX_VALUE。也就是說,如果我們不設置LinkedBlockingQueue的容量的話,其默認容量將會是Integer.MAX_VALUE。而newFixedThreadPool中創建LinkedBlockingQueue時,並未指定容量。此時,LinkedBlockingQueue就是一個無邊界隊列,對於一個無邊界隊列來說,是可以不斷的向隊列中加入任務的,這種情況下就有可能因為任務過多而導致內存溢出問題。

2.3 newCachedThreadPool分析

結合上述流程圖,核心線程數=0,最大線程無限大,由於SynchronousQueue是一個緩存值為1的阻塞隊列。當有大量任務請求時,線程池會創建大量線程,造成OOM。

3. 線程池參數詳解

3.1 構造方法

3.2 線程池拒絕策略

RejectedExecutionHandler(飽和策略):當隊列和線程池都滿瞭,說明線程池處於飽和狀態,那麼必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。。以下是JDK1.5提供的四種策略。

  • AbortPolicy:直接拋出異常
  • CallerRunsPolicy:隻用調用者所在線程來運行任務。
  • DiscardOldestPolicy:丟棄隊列裡最近的一個任務,並執行當前任務。DiscardPolicy:不處理,丟棄掉。
  • 當然也可以根據應用場景需要來實現RejectedExecutionHandler接口自定義策略。如記錄日志或持久化不能處理的任務。

4. 線程池正確打開方式

4.1 創建線程池

避免使用Executors創建線程池,主要是避免使用其中的默認實現,那麼我們可以自己直接調用ThreadPoolExecutor的構造函數來自己創建線程池。在創建的同時,給BlockQueue指定容量就可以瞭。

4.2 向線程池提交任務

我們可以使用execute提交的任務,但是execute方法沒有返回值,所以無法判斷任務知否被線程池執行成功。通過以下代碼可知execute方法輸入的任務是一個Runnable類的實例。

我們也可以使用submit 方法來提交任務,它會返回一個future,那麼我們可以通過這個future來判斷任務是否執行成功,通過future的get方法來獲取返回值,get方法會阻塞住直到任務完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時間後立即返回,這時有可能任務沒有執行完。

4.3 關閉線程池

shutdown關閉線程池

方法定義:public void shutdown()

  • 線程池的狀態變成SHUTDOWN狀態,此時不能再往線程池中添加新的任務,否則會拋出RejectedExecutionException異常。
  • 線程池不會立刻退出,直到添加到線程池中的任務都已經處理完成,才會退出。

註意:這個函數不會等待提交的任務執行完成,要想等待全部任務完成,可以調用:

public boolean awaitTermination(longtimeout, TimeUnit unit)

shutdownNow關閉線程池並中斷任務

方法定義:public List shutdownNow()

  • 線程池的狀態立刻變成STOP狀態,此時不能再往線程池中添加新的任務。
  • 終止等待執行的線程,並返回它們的列表;
  • 試圖停止所有正在執行的線程,試圖終止的方法是調用Thread.interrupt(),但是大傢知道,如果線程中沒有sleep 、wait、Condition、定時鎖等應用, interrupt()方法是無法中斷當前的線程的。所以,ShutdownNow()並不代表線程池就一定立即就能退出,它可能必須要等待所有正在執行的任務都執行完成瞭才能退出。

4.4 如何配置線程池大小

CPU密集型任務

  • 該任務需要大量的運算,並且沒有阻塞,CPU一直全速運行,CPU密集任務隻有在真正的多核CPU上才可能通過多線程加速 CPU密集型任務配置盡可能少的線程數量:
  • CPU核數+1個線程的線程池。
  • 例如: CPU 16核,內存32G。線程數=16

IO密集型任務

  • IO密集型任務線程並不是一直在執行任務,則應配置盡可能多的線程,如:CPU核數*2
  • 某大廠設置策略:IO密集型時,大部分線程都阻塞,故需要多配置線程數:
  • CPU核數/(1-阻塞系數)
  • 例如: CPU 16核, 阻塞系數 0.9 ————->16/(1-0.9) = 160 個線程數。
  • 此時非阻塞線程=16

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: