Java線程池並發執行多個任務方式
Java線程池並發執行多個任務
Java在語言層面提供瞭多線程的支持,線程池能夠避免頻繁的線程創建和銷毀的開銷,因此很多時候在項目當中我們是使用的線程池去完成多線程的任務。
Java提供瞭Executors 框架提供瞭一些基礎的組件能夠輕松的完成多線程異步的操作,Executors提供瞭一系列的靜態工廠方法能夠獲取不同的ExecutorService實現,ExecutorService擴展瞭Executors接口,Executors相當簡單:
public interface Executor { void execute(Runnable command); }
把任務本身和任務的執行解耦瞭,如果說Runnable是可異步執行任務的抽象,那Executor就是如何執行可異步執行任務的抽象,說起來比較繞口。
本文不講解線程的一些基礎知識,因為網上的其他文章已經寫的足夠詳細和泛濫。我寫寫多個異步任務的並發執行與結果的獲取問題。
假設這樣一個場景:我們要組裝一個對象,這個對象由大量小的內容組成,這些內容是無關聯無依賴關系的,如果我們串行的去執行,如果每個任務耗時10秒鐘,一共有10個任務,那我們就需要100秒才能獲取到結果。顯然我們可以采用線程池,每個任務起一個線程,忽略線程啟動時間,我們隻需要10秒鐘就能獲取到結果。這裡還有兩種選擇,這10秒鐘我們可以去做其他事,也可以等待結果。
我們來完成這樣的操作:
// 這是任務的抽象 class GetContentTask implements Callable<String> { private String name; private Integer sleepTimes; public GetContentTask(String name, Integer sleepTimes) { this.name = name; this.sleepTimes = sleepTimes; } public String call() throws Exception { // 假設這是一個比較耗時的操作 Thread.sleep(sleepTimes * 1000); return "this is content : hello " + this.name; } }
采用completionService :
// 方法一 ExecutorService executorService = Executors.newCachedThreadPool(); CompletionService<String> completionService = new ExecutorCompletionService(executorService); ExecuteServiceDemo executeServiceDemo = new ExecuteServiceDemo(); // 十個 long startTime = System.currentTimeMillis(); int count = 0; for (int i = 0;i < 10;i ++) { count ++; GetContentTask getContentTask = new ExecuteServiceDemo.GetContentTask("micro" + i, 10); completionService.submit(getContentTask); } System.out.println("提交完任務,主線程空閑瞭, 可以去做一些事情。"); // 假裝做瞭8秒種其他事情 try { Thread.sleep(8000); System.out.println("主線程做完瞭,等待結果"); } catch (InterruptedException e) { e.printStackTrace(); } try { // 做完事情要結果 for (int i = 0;i < count;i ++) { Future<String> result = completionService.take(); System.out.println(result.get()); } long endTime = System.currentTimeMillis(); System.out.println("耗時 : " + (endTime - startTime) / 1000); } catch (Exception ex) { System.out.println(ex.getMessage()); }
執行結果為:
提交完任務,主線程空閑瞭, 可以去做一些事情。
主線程做完瞭,等待結果
this is content : hello micro9
this is content : hello micro7
this is content : hello micro2
this is content : hello micro5
this is content : hello micro4
this is content : hello micro8
this is content : hello micro1
this is content : hello micro3
this is content : hello micro0
this is content : hello micro6
耗時 : 10
如果多個不想一個一個提交,可以采用 invokeAll一並提交,但是會同步等待這些任務
// 方法二 ExecutorService executeService = Executors.newCachedThreadPool(); List<GetContentTask> taskList = new ArrayList<GetContentTask>(); long startTime = System.currentTimeMillis(); for (int i = 0;i < 10;i ++) { taskList.add(new GetContentTask("micro" + i, 10)); } try { System.out.println("主線程發起異步任務請求"); List<Future<String>> resultList = executeService.invokeAll(taskList); // 這裡會阻塞等待resultList獲取到所有異步執行的結果才會執行 for (Future<String> future : resultList) { System.out.println(future.get()); } // 主線程假裝很忙執行8秒鐘 Thread.sleep(8); long endTime = System.currentTimeMillis(); System.out.println("耗時 : " + (endTime - startTime) / 1000); } catch (Exception e) { e.printStackTrace(); }
主線程發起異步任務請求
this is content : hello micro0
this is content : hello micro1
this is content : hello micro2
this is content : hello micro3
this is content : hello micro4
this is content : hello micro5
this is content : hello micro6
this is content : hello micro7
this is content : hello micro8
this is content : hello micro9
耗時 : 10
如果一系列請求,我們並不需要等待每個請求,我們可以invokeAny,隻要某一個請求返回即可。
ExecutorService executorService = Executors.newCachedThreadPool(); ArrayList<GetContentTask> taskList = new ArrayList<GetContentTask>(); taskList.add(new GetContentTask("micro1",3)); taskList.add(new GetContentTask("micro2", 6)); try { List<Future<String>> resultList = executorService.invokeAll(taskList);// 等待6秒 // String result2 = executorService.invokeAny(taskList); // 等待3秒 // invokeAll 提交一堆任務並行處理並拿到結果 // invokeAny就是提交一堆並行任務拿到一個結果即可 for (Future<String> result : resultList) { System.out.println(result.get()); } // System.out.println(result2); } catch (Exception e) { e.printStackTrace(); } System.out.println("主線程等待");
如果我雖然發送瞭一堆異步的任務,但是我隻等待一定的時間,在規定的時間沒有返回我就不要瞭,例如很多時候的網絡請求其他服務器如果要數據,由於網絡原因不能一直等待,在規定時間內去拿,拿不到就我使用一個默認值。
這樣的場景,我們可以使用下面的寫法:
try { ExecutorService executorService = Executors.newCachedThreadPool(); List<Callable<String>> taskList = new ArrayList<Callable<String>>(); taskList.add(new GetContentTask("micro1", 4)); taskList.add(new GetContentTask("micro2", 6)); // 等待五秒 List<Future<String>> resultList = executorService.invokeAll(taskList, 5, TimeUnit.SECONDS); for (Future<String> future : resultList) { System.out.println(future.get()); } } catch (Exception e) { e.printStackTrace(); }
this is content : hello micro1
java.util.concurrent.CancellationException
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.micro.demo.spring.ExecuteServiceDemo.main(ExecuteServiceDemo.java:105)
因為隻等待5秒,6秒的那個任務自然獲取不到,拋出異常,如果將等待時間設置成8秒,就都能獲取到。
Java線程池的正確使用
線程可認為是操作系統可調度的最小的程序執行序列,一般作為進程的組成部分,同一進程中多個線程可共享該進程的資源(如內存等)。JVM線程跟內核輕量級進程有一對一的映射關系,所以JVM中的線程是很寶貴的。
一般在工程上多線程的實現是基於線程池的。因為相比自己創建線程,多線程具有以下優點:
- 線程是稀缺資源,使用線程池可以減少創建和銷毀線程的次數,每個工作線程都可以重復使用。
- 可以根據系統的承受能力,調整線程池中工作線程的數量,防止因為消耗過多內存導致服務器崩潰。
1.Executors存在什麼問題
看阿裡巴巴開發手冊並發編程這塊有一條:線程池不允許使用Executors去創建,而是通過ThreadPoolExecutor的方式。
2.Executors為什麼存在缺陷
2.1 線程池工作原理
當一個任務通過execute(Runnable)方法欲添加到線程池時:
- 如果此時線程池中的數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的務。
- 如果此時線程池中的數量等於 corePoolSize,但是緩沖隊列 workQueue未滿,那麼任務被放入緩沖隊列。
- 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。
- 那麼通過 handler所指定的策略來處理此任務。也就是:處理任務的優先級為:核心線程corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿瞭,使用handler處理被拒絕的任務。
- 當線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。
2.2 newFixedThreadPool分析
Java中的BlockingQueue主要有兩種實現,分別是ArrayBlockingQueue 和 LinkedBlockingQueue。
ArrayBlockingQueue是一個用數組實現的有界阻塞隊列,必須設置容量。
LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列,容量可以選擇進行設置,不設置的話,將是一個無邊界的阻塞隊列,最大長度為Integer.MAX_VALUE。
這裡的問題就出在:不設置的話,將是一個無邊界的阻塞隊列,最大長度為Integer.MAX_VALUE。也就是說,如果我們不設置LinkedBlockingQueue的容量的話,其默認容量將會是Integer.MAX_VALUE。而newFixedThreadPool中創建LinkedBlockingQueue時,並未指定容量。此時,LinkedBlockingQueue就是一個無邊界隊列,對於一個無邊界隊列來說,是可以不斷的向隊列中加入任務的,這種情況下就有可能因為任務過多而導致內存溢出問題。
2.3 newCachedThreadPool分析
結合上述流程圖,核心線程數=0,最大線程無限大,由於SynchronousQueue是一個緩存值為1的阻塞隊列。當有大量任務請求時,線程池會創建大量線程,造成OOM。
3. 線程池參數詳解
3.1 構造方法
3.2 線程池拒絕策略
RejectedExecutionHandler(飽和策略):當隊列和線程池都滿瞭,說明線程池處於飽和狀態,那麼必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。。以下是JDK1.5提供的四種策略。
- AbortPolicy:直接拋出異常
- CallerRunsPolicy:隻用調用者所在線程來運行任務。
- DiscardOldestPolicy:丟棄隊列裡最近的一個任務,並執行當前任務。DiscardPolicy:不處理,丟棄掉。
- 當然也可以根據應用場景需要來實現RejectedExecutionHandler接口自定義策略。如記錄日志或持久化不能處理的任務。
4. 線程池正確打開方式
4.1 創建線程池
避免使用Executors創建線程池,主要是避免使用其中的默認實現,那麼我們可以自己直接調用ThreadPoolExecutor的構造函數來自己創建線程池。在創建的同時,給BlockQueue指定容量就可以瞭。
4.2 向線程池提交任務
我們可以使用execute提交的任務,但是execute方法沒有返回值,所以無法判斷任務知否被線程池執行成功。通過以下代碼可知execute方法輸入的任務是一個Runnable類的實例。
我們也可以使用submit 方法來提交任務,它會返回一個future,那麼我們可以通過這個future來判斷任務是否執行成功,通過future的get方法來獲取返回值,get方法會阻塞住直到任務完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時間後立即返回,這時有可能任務沒有執行完。
4.3 關閉線程池
shutdown關閉線程池
方法定義:public void shutdown()
- 線程池的狀態變成SHUTDOWN狀態,此時不能再往線程池中添加新的任務,否則會拋出RejectedExecutionException異常。
- 線程池不會立刻退出,直到添加到線程池中的任務都已經處理完成,才會退出。
註意:這個函數不會等待提交的任務執行完成,要想等待全部任務完成,可以調用:
public boolean awaitTermination(longtimeout, TimeUnit unit)
shutdownNow關閉線程池並中斷任務
方法定義:public List shutdownNow()
- 線程池的狀態立刻變成STOP狀態,此時不能再往線程池中添加新的任務。
- 終止等待執行的線程,並返回它們的列表;
- 試圖停止所有正在執行的線程,試圖終止的方法是調用Thread.interrupt(),但是大傢知道,如果線程中沒有sleep 、wait、Condition、定時鎖等應用, interrupt()方法是無法中斷當前的線程的。所以,ShutdownNow()並不代表線程池就一定立即就能退出,它可能必須要等待所有正在執行的任務都執行完成瞭才能退出。
4.4 如何配置線程池大小
CPU密集型任務
- 該任務需要大量的運算,並且沒有阻塞,CPU一直全速運行,CPU密集任務隻有在真正的多核CPU上才可能通過多線程加速 CPU密集型任務配置盡可能少的線程數量:
- CPU核數+1個線程的線程池。
- 例如: CPU 16核,內存32G。線程數=16
IO密集型任務
- IO密集型任務線程並不是一直在執行任務,則應配置盡可能多的線程,如:CPU核數*2
- 某大廠設置策略:IO密集型時,大部分線程都阻塞,故需要多配置線程數:
- CPU核數/(1-阻塞系數)
- 例如: CPU 16核, 阻塞系數 0.9 ————->16/(1-0.9) = 160 個線程數。
- 此時非阻塞線程=16
以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。
推薦閱讀:
- Java多線程 CompletionService
- Java並行執行任務的幾種方案小結
- ExecutorService實現獲取線程返回值
- java高級應用:線程池的全面講解(幹貨)
- Java線程池的簡單使用方法實例教程