Java多線程ThreadPoolExecutor詳解

前言:

根據ThreadPoolExecutor的構造方法,JDK提供瞭很多工廠方法來創建各種用途的線程池.

1 newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads,
 0L, TimeUnit.MILLISECONDS,
 new LinkedBlockingQueue<Runnable>());
}

說明:

  • 核心線程數 == 最大線程數(沒有救急線程被創建),因此也無需超時時間
  • 阻塞隊列是無界的,可以放任意數量的任務(最大為Integer.MAX_VALUE)

適用於 任務量一已知,相對耗時的任務

2 newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
 60L, TimeUnit.SECONDS,
 new SynchronousQueue<Runnable>());
}

說明:

  • 核心線程數是 0, 最大線程數是 Integer.MAX_VALUE,救急線程的空閑生存時間是 60s
    • 全部都是救急線程(60s 後可以回收)
    • 救急線程可以無限創建(最大是Integer.MAX_VALUE)
  • 隊列采用瞭 SynchronousQueue 實現特點是,它沒有容量,沒有線程來取是放不進去的(一手交錢、一手交 貨)

如下案例:

SynchronousQueue<Integer> integers = new SynchronousQueue<>();
new Thread(() -> {
     try {
         log.debug("putting {} ", 1);
         integers.put(1);
         log.debug("{} putted...", 1);
         log.debug("putting...{} ", 2);
         integers.put(2);
         log.debug("{} putted...", 2);
     } catch (InterruptedException e) {
         e.printStackTrace();
     }
},"t1").start();
sleep(1);
new Thread(() -> {
     try {
         log.debug("taking {}", 1);
         integers.take();
     } catch (InterruptedException e) {
         e.printStackTrace();
     }
},"t2").start();
sleep(1);
new Thread(() -> {
     try {
         log.debug("taking {}", 2);
         integers.take();
     } catch (InterruptedException e) {
         e.printStackTrace();
     }
},"t3").start();
/*
運行結果:
11:48:15.500 c.TestSynchronousQueue [t1] - putting 1 
11:48:16.500 c.TestSynchronousQueue [t2] - taking 1 
11:48:16.500 c.TestSynchronousQueue [t1] - 1 putted... 
11:48:16.500 c.TestSynchronousQueue [t1] - putting...2 
11:48:17.502 c.TestSynchronousQueue [t3] - taking 2 
11:48:17.503 c.TestSynchronousQueue [t1] - 2 putted... 
*/

整個線程池表現為線程數會根據任務量不斷增長,沒有上限,當任務執行完畢,空閑 1分鐘後釋放線程。

適用於 任務數比較密集,但每個任務執行時間較短的情況

3 newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
 return new FinalizableDelegatedExecutorService
 (new ThreadPoolExecutor(1, 1,
 0L, TimeUnit.MILLISECONDS,
 new LinkedBlockingQueue<Runnable>()));
}

希望多個任務排隊執行。線程數固定為 1,任務數多於 1 時,會放入無界隊列排隊。任務執行完畢,這唯一的線程也不會被釋放.

與其他線程區別:

  • 自己創建一個單線程串行執行任務,如果任務執行失敗而終止那麼沒有任何補救措施,而線程池還會新建一 個線程,保證池的正常工作
  • Executors.newSingleThreadExecutor() 線程個數始終為1,不能修改
    • FinalizableDelegatedExecutorService 應用的是裝飾器模式,隻對外暴露瞭 ExecutorService 接口,因此不能調用 ThreadPoolExecutor 中特有的方法.
  • Executors.newFixedThreadPool(1) 初始時為1,以後還可以修改
    • 對外暴露的是 ThreadPoolExecutor 對象,可以強轉後調用 setCorePoolSize 等方法進行修改

4 提交任務

// 執行任務
void execute(Runnable command);
// 提交任務 task,用返回值 Future 獲得任務執行結果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任務
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
 throws InterruptedException;
// 提交 tasks 中所有任務,帶超時時間
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
 long timeout, TimeUnit unit)
 throws InterruptedException;
// 提交 tasks 中所有任務,哪個任務先成功執行完畢,返回此任務執行結果,其它任務取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
 throws InterruptedException, ExecutionException;

// 提交 tasks 中所有任務,哪個任務先成功執行完畢,返回此任務執行結果,其它任務取消,帶超時時間
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
 long timeout, TimeUnit unit)
 throws InterruptedException, ExecutionException, TimeoutException;

上述都是提供的提交任務的方法,根據不同的業務場景需求,選擇對應的提交方法.

5 關閉線程池

shutdown

/*
線程池狀態變為 SHUTDOWN
- 不會接收新任務
- 但已提交任務會執行完
- 此方法不會阻塞調用線程的執行
*/
void shutdown();
public void shutdown() {
     final ReentrantLock mainLock = this.mainLock;
     mainLock.lock();
     try {
     checkShutdownAccess();
     // 修改線程池狀態
     advanceRunState(SHUTDOWN);
     // 僅會打斷空閑線程
     interruptIdleWorkers();
     onShutdown(); // 擴展點 ScheduledThreadPoolExecutor
     } finally {
     mainLock.unlock();
     }
     // 嘗試終結(沒有運行的線程可以立刻終結,如果還有運行的線程也不會等)
     tryTerminate();
}

shutdownNow

/*
線程池狀態變為 STOP
- 不會接收新任務
- 會將隊列中的任務返回
- 並用 interrupt 的方式中斷正在執行的任務
*/
List<Runnable> shutdownNow();
public List<Runnable> shutdownNow() {
     List<Runnable> tasks;
     final ReentrantLock mainLock = this.mainLock;
     mainLock.lock();
     try {
     checkShutdownAccess();
     // 修改線程池狀態
     advanceRunState(STOP);
     // 打斷所有線程
     interruptWorkers();
     // 獲取隊列中剩餘任務
     tasks = drainQueue();
     } finally {
     mainLock.unlock();
     }
     // 嘗試終結
     tryTerminate();
     return tasks;
}

其他打斷方法

// 不在 RUNNING 狀態的線程池,此方法就返回 true
boolean isShutdown();
// 線程池狀態是否是 TERMINATED
boolean isTerminated();
// 調用 shutdown 後,由於調用線程並不會等待所有任務運行結束,因此如果它想在線程池 TERMINATED 後做些事
情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

到此這篇關於Java多線程ThreadPoolExecutor詳解的文章就介紹到這瞭,更多相關Java ThreadPoolExecutor內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: