徹底搞懂java並發ThreadPoolExecutor使用
前言
線程池是Java中使用較多的並發框架,合理使用線程池,可以:降低資源消耗,提高響應速度,提高線程的可管理性。
本篇文章將從線程池簡單原理,線程池的創建,線程池執行任務和關閉線程池進行使用學習。
正文
一. 線程池的簡單原理
當一個任務提交到線程池ThreadPoolExecutor時,該任務的執行如下圖所示。
- 如果當前運行的線程數小於corePoolSzie(核心線程數),則創建新線程來執行任務(需要獲取全局鎖);
- 如果當前運行的線程數等於或大於corePoolSzie,則將任務加入BlockingQueue(任務阻塞隊列);
- 如果BlockingQueue已滿,則創建新的線程來執行任務(需要獲取全局鎖);
- 如果創建新線程會使當前線程數大於maximumPoolSize(最大線程數),則拒絕任務並調用RejectedExecutionHandler的rejectedExecution() 方法。
由於ThreadPoolExecutor存儲工作線程使用的集合是HashSet,因此執行上述步驟1和步驟3時需要獲取全局鎖來保證線程安全,而獲取全局鎖會導致線程池性能瓶頸,因此通常情況下,線程池完成預熱後(當前線程數大於等於corePoolSize),線程池的execute() 方法都是執行步驟2。
二. 線程池的創建
通過ThreadPoolExecutor能夠創建一個線程池,ThreadPoolExecutor的構造函數簽名如下。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
通過ThreadPoolExecutor創建線程池時,需要指定線程池的核心線程數,最大線程數,線程保活時間,線程保活時間單位和任務阻塞隊列,並按需指定線程工廠和飽和拒絕策略,如果不指定線程工廠和飽和拒絕策略,則ThreadPoolExecutor會使用默認的線程工廠和飽和拒絕策略。下面分別介紹這些參數的含義。
參數 | 含義 |
---|---|
corePoolSize | 核心線程數,即線程池的基本大小。當一個任務被提交到線程池時,如果線程池的線程數小於corePoolSize,那麼無論其餘線程是否空閑,也需創建一個新線程來執行任務。 |
maximumPoolSize | 最大線程數。當線程池中線程數大於等於corePoolSize時,新提交的任務會加入任務阻塞隊列,但是如果任務阻塞隊列已滿且線程數小於maximumPoolSize,此時會繼續創建新的線程來執行任務。該參數規定瞭線程池允許創建的最大線程數 |
keepAliveTime | 線程保活時間。當線程池的線程數大於核心線程數時,多餘的空閑線程會最大存活keepAliveTime的時間,如果超過這個時間且空閑線程還沒有獲取到任務來執行,則該空閑線程會被回收掉。 |
unit | 線程保活時間單位。通過TimeUnit指定線程保活時間的時間單位,可選單位有DAYS(天),HOURS(時),MINUTES(分),SECONDS(秒),MILLISECONDS(毫秒),MICROSECONDS(微秒)和NANOSECONDS(納秒),但無論指定什麼時間單位,ThreadPoolExecutor統一會將其轉換為NANOSECONDS。 |
workQueue | 任務阻塞隊列。線程池的線程數大於等於corePoolSize時,新提交的任務會添加到workQueue中,所有線程執行完上一個任務後,會循環從workQueue中獲取任務來執行。 |
threadFactory | 創建線程的工廠。可以通過線程工廠給每個創建出來的線程設置更有意義的名字。 |
handler | 飽和拒絕策略。如果任務阻塞隊列已滿且線程池中的線程數等於maximumPoolSize,說明線程池此時處於飽和狀態,應該執行一種拒絕策略來處理新提交的任務。 |
三. 線程池執行任務
1. 執行無返回值任務
通過ThreadPoolExecutor的execute() 方法,能執行Runnable任務,示例如下。
public class ThreadPoolExecutorTest { @Test public void ThreadPoolExecutor執行簡單無返回值任務() throws Exception { // 創建一個線程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300)); // 創建兩個任務 Runnable firstRunnable = new Runnable() { @Override public void run() { System.out.println("第一個任務執行"); } }; Runnable secondRunnable = new Runnable() { @Override public void run() { System.out.println("第二個任務執行"); } }; // 讓線程池執行任務 threadPoolExecutor.execute(firstRunnable); threadPoolExecutor.execute(secondRunnable); // 讓主線程睡眠1秒,等待線程池中的任務被執行完畢 Thread.sleep(1000); } }
運行測試程序,結果如下。
2. 執行有返回值任務
通過ThreadPoolExecutor的submit() 方法,能夠執行Callable任務,通過submit() 方法返回的RunnableFuture能夠拿到異步執行的結果。示例如下。
public class ThreadPoolExecutorTest { @Test public void ThreadPoolExecutor執行簡單有返回值任務() throws Exception { // 創建一個線程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300)); // 創建兩個任務,任務執行完有返回值 Callable<String> firstCallable = new Callable<String>() { @Override public String call() throws Exception { return "第一個任務返回值"; } }; Callable<String> secondCallable = new Callable<String>() { @Override public String call() throws Exception { return "第二個任務返回值"; } }; // 讓線程池執行任務 Future<String> firstFuture = threadPoolExecutor.submit(firstCallable); Future<String> secondFuture = threadPoolExecutor.submit(secondCallable); // 獲取執行結果,拿不到結果會阻塞在get()方法上 System.out.println(firstFuture.get()); System.out.println(secondFuture.get()); } }
運行測試程序,結果如下。
3. 執行有返回值任務時拋出錯誤
如果ThreadPoolExecutor在執行Callable任務時,在Callable任務中拋出瞭異常並且沒有捕獲,那麼這個異常是可以通過Future的get() 方法感知到的。示例如下。
public class ThreadPoolExecutorTest { @Test public void ThreadPoolExecutor執行簡單有返回值任務時拋出錯誤() { // 創建一個線程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300)); // 創建一個任務,任務有返回值,但是執行過程中拋出異常 Callable<String> exceptionCallable = new Callable<String>() { @Override public String call() throws Exception { throw new RuntimeException("發生瞭異常"); } }; // 讓線程池執行任務 Future<String> exceptionFuture = threadPoolExecutor.submit(exceptionCallable); try { System.out.println(exceptionFuture.get()); } catch (Exception e) { System.out.println(e.getMessage()); } } }
運行測試程序,結果如下。
4. ThreadPoolExecutor通過submit方式執行Runnable
ThreadPoolExecutor可以通過submit() 方法來運行Runnable任務,並且還可以異步獲取執行結果。示例如下。
public class ThreadPoolExecutorTest { @Test public void ThreadPoolExecutor通過submit的方式來提交並執行Runnable() throws Exception { // 創建一個線程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300)); // 創建結果對象 MyResult myResult = new MyResult(); // 創建Runnable對象 Runnable runnable = new Runnable() { @Override public void run() { myResult.setResult("任務執行瞭"); } }; // 通過ThreadPoolExecutor的submit()方法提交Runnable Future<MyResult> resultFuture = threadPoolExecutor.submit(runnable, myResult); // 獲取執行結果 MyResult finalResult = resultFuture.get(); // myResult和finalResult的地址實際相同 Assert.assertEquals(myResult, finalResult); // 打印執行結果 System.out.println(resultFuture.get().getResult()); } private static class MyResult { String result; public MyResult() {} public MyResult(String result) { this.result = result; } public String getResult() { return result; } public void setResult(String result) { this.result = result; } } }
運行測試程序,結果如下。
實際上ThreadPoolExecutor的submit() 方法無論是提交Runnable任務還是Callable任務,都是將任務封裝成瞭RunnableFuture接口的子類FutureTask,然後調用ThreadPoolExecutor的execute() 方法來執行FutureTask。
四. 關閉線程池
關閉線程池可以通過ThreadPoolExecutor的shutdown() 方法,但是shutdown() 方法不會去中斷正在執行任務的線程,所以如果線程池裡有Worker正在執行一個永遠不會結束的任務,那麼shutdown() 方法是無法關閉線程池的。示例如下。
public class ThreadPoolExecutorTest { @Test public void 通過shutdown關閉線程池() { // 創建一個線程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300)); // 創建Runnable對象 Runnable runnable = new Runnable() { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { LockSupport.parkNanos(1000 * 1000 * 1000); } System.out.println(Thread.currentThread().getName() + " 被中斷"); } }; // 讓線程池執行任務 threadPoolExecutor.execute(runnable); threadPoolExecutor.execute(runnable); // 調用shutdown方法關閉線程池 threadPoolExecutor.shutdown(); // 等待3秒觀察現象 LockSupport.parkNanos(1000 * 1000 * 1000 * 3L); } }
運行測試程序,會發現在主線程中等待3秒後,也沒有得到預期的打印結果。如果上述測試程序中使用shutdownNow,則是可以得到預期打印結果的,示例如下。
public class ThreadPoolExecutorTest { @Test public void 通過shutdownNow關閉線程池() { // 創建一個線程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300)); // 創建Runnable對象 Runnable runnable = new Runnable() { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { LockSupport.parkNanos(1000 * 1000 * 1000); } System.out.println(Thread.currentThread().getName() + " 被中斷"); } }; // 讓線程池執行任務 threadPoolExecutor.execute(runnable); threadPoolExecutor.execute(runnable); // 調用shutdown方法關閉線程池 threadPoolExecutor.shutdownNow(); // 等待3秒觀察現象 LockSupport.parkNanos(1000 * 1000 * 1000 * 3L); } }
運行測試程序,打印如下。
因為測試程序中的任務是響應中斷的,而ThreadPoolExecutor的shutdownNow() 方法會中斷所有Worker,所以執行shutdownNow() 方法後,正在運行的任務會響應中斷並結束運行,最終線程池關閉。
假如線程池中運行著一個永遠不會結束的任務,且這個任務不響應中斷,那麼無論是shutdown() 方法還是shutdownNow() 方法,都是無法關閉線程池的。
總結
ThreadPoolExecutor的使用總結如下。
- 通過ThreadPoolExecutor的execute() 方法能夠執行Runnable任務;
- 通過ThreadPoolExecutor的submit() 方法能夠執行Runnable任務和Callable任務,並且能夠獲取異步的執行結果;
- ThreadPoolExecutor的submit() 方法會返回一個Future對象(實際就是FutureTask),如果任務執行過程中發生瞭異常且未捕獲,那麼可以通過Future的get() 方法感知到異常;
- ThreadPoolExecutor的submit() 方法無論是提交Runnable任務還是Callable任務,都是將任務封裝成瞭RunnableFuture接口的子類FutureTask,然後調用ThreadPoolExecutor的execute() 方法來執行FutureTask;
- 關閉線程池時,如果運行的任務可以在有限時間內運行完畢,那麼可以使用shutdown() 方法來關閉線程池,這能夠保證在關閉線程池時,正在運行的任務會順利運行完畢;
- 關閉線程池時,如果運行的任務永遠不會結束但是響應中斷,那麼可以使用shutdownNow() 方法來關閉線程池,這種方式不保證任務順利運行完畢;
- 如果任務永遠不會結束且不響應中斷,那麼無論是shutdown() 方法還是shutdownNow() 方法,都無法關閉線程池。
以上就是徹底搞懂java並發ThreadPoolExecutor使用的詳細內容,更多關於java並發ThreadPoolExecutor的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- 淺談Java ThreadPoolExecutor的使用
- Java線程池並發執行多個任務方式
- 簡單聊一聊Java線程池ThreadPoolExecutor
- 一篇文章帶你瞭解Java中ThreadPool線程池
- java高級應用:線程池的全面講解(幹貨)