深入理解Java線程池從設計思想到源碼解讀
線程池:從設計思想到源碼解析 前言初識線程池線程池優勢線程池設計思路 深入線程池構造方法任務隊列拒絕策略線程池狀態初始化&容量調整&關閉 使用線程池ThreadPoolExecutorExecutors封裝線程池 解讀線程池execute()addWorker()Worker類runWorker()processWorkerExit()
前言
各位小夥伴兒,春節已經結束瞭,在此獻上一篇肝瞭一個春節假期的遲來的拜年之作,希望讀者朋友們都能有收獲。
根據穆氏哲學,投入越多,收獲越大。我作此文時,披肝瀝膽,汝讀此文時,一目十行,我們的收獲當然不同。
…
那怎麼有更大的收獲呢?根據科學研究,當你為一個事物付出時(包括情緒付出),你就會對它更專註。最直接的付出是什麼呢?當然是點贊和收藏啦。
初識線程池
我們知道,線程的創建和銷毀都需要映射到操作系統,因此其代價是比較高昂的。出於避免頻繁創建、銷毀線程以及方便線程管理的需要,線程池應運而生。
線程池優勢
- 降低資源消耗:線程池通常會維護一些線程(數量為
corePoolSize
),這些線程被重復使用來執行不同的任務,任務完成後不會銷毀。在待處理任務量很大的時候,通過對線程資源的復用,避免瞭線程的頻繁創建與銷毀,從而降低瞭系統資源消耗。提高響應速度:由於線程池維護瞭一批alive
狀態的線程,當任務到達時,不需要再創建線程,而是直接由這些線程去執行任務,從而減少瞭任務的等待時間。 - 提高線程的可管理性:使用線程池可以對線程進行統一的分配,調優和監控。
線程池設計思路
有句話叫做藝術來源於生活,編程語言也是如此,很多設計思想能映射到日常生活中,比如面向對象思想、封裝、繼承,等等。今天我們要說的線程池,它同樣可以在現實世界找到對應的實體——工廠。
先假想一個工廠的生產流程:
工廠中有固定的一批工人,稱為正式工人,工廠接收的訂單由這些工人去完成。當訂單增加,正式工人已經忙不過來瞭,工廠會將生產原料暫時堆積在倉庫中,等有空閑的工人時再處理(因為工人空閑瞭也不會主動處理倉庫中的生產任務,所以需要調度員實時調度)。倉庫堆積滿瞭後,訂單還在增加怎麼辦?工廠隻能臨時擴招一批工人來應對生產高峰,而這批工人高峰結束後是要清退的,所以稱為臨時工。當時臨時工也以招滿後(受限於工位限制,臨時工數量有上限),後面的訂單隻能忍痛拒絕瞭。
我們做如下一番映射:
- 工廠——線程池
- 訂單——任務(Runnable)
- 正式工人——核心線程
- 臨時工——普通線程
- 倉庫——任務隊列
- 調度員——getTask()
getTask()是一個方法,將任務隊列中的任務調度給空閑線程,在解讀線程池有詳細介紹
映射後,形成線程池流程圖如下,兩者是不是有異曲同工之妙?
這樣,線程池的工作原理或者說流程就很好理解瞭,提煉成一個簡圖:
深入線程池
那麼接下來,問題來瞭,線程池是具體如何實現這套工作機制的呢?從Java線程池Executor
框架體系可以看出:線程池的真正實現類是ThreadPoolExecutor
,因此我們接下來重點研究這個類。
構造方法
研究一個類,先從它的構造方法開始。ThreadPoolExecutor
提供瞭4個有參構造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
解釋一下構造方法中涉及到的參數:
- corePoolSize(必需):核心線程數。即池中一直保持存活的線程數,即使這些線程處於空閑。但是將
allowCoreThreadTimeOut
參數設置為true
後,核心線程處於空閑一段時間以上,也會被回收。 - maximumPoolSize(必需):池中允許的最大線程數。當核心線程全部繁忙且任務隊列打滿之後,線程池會臨時追加線程,直到總線程數達到
maximumPoolSize
這個上限。 - keepAliveTime(必需):線程空閑超時時間。當非核心線程處於空閑狀態的時間超過這個時間後,該線程將被回收。將
allowCoreThreadTimeOut
參數設置為true
後,核心線程也會被回收。 - unit(必需):
keepAliveTime
參數的時間單位。有:TimeUnit.DAYS(天)、TimeUnit.HOURS(小時)、TimeUnit.MINUTES(分鐘)、TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)、TimeUnit.MICROSECONDS(微秒)、TimeUnit.NANOSECONDS(納秒) - workQueue(必需):任務隊列,采用阻塞隊列實現。當核心線程全部繁忙時,後續由
execute
方法提交的Runnable
將存放在任務隊列中,等待被線程處理。 - threadFactory(可選):線程工廠。指定線程池創建線程的方式。
- handler(可選):拒絕策略。當線程池中線程數達到
maximumPoolSize
且workQueue
打滿時,後續提交的任務將被拒絕,handler
可以指定用什麼方式拒絕任務。
放到一起再看一下:
任務隊列
使用ThreadPoolExecutor
需要指定一個實現瞭BlockingQueue
接口的任務等待隊列。在ThreadPoolExecutor
線程池的API文檔中,一共推薦瞭三種等待隊列,它們是:SynchronousQueue
、LinkedBlockingQueue
和ArrayBlockingQueue
;
- SynchronousQueue:同步隊列。這是一個內部沒有任何容量的阻塞隊列,任何一次插入操作的元素都要等待相對的刪除/讀取操作,否則進行插入操作的線程就要一直等待,反之亦然。
- LinkedBlockingQueue:無界隊列(嚴格來說並非無界,上限是
Integer.MAX_VALUE
),基於鏈表結構。使用無界隊列後,當核心線程都繁忙時,後續任務可以無限加入隊列,因此線程池中線程數不會超過核心線程數。這種隊列可以提高線程池吞吐量,但代價是犧牲內存空間,甚至會導致內存溢出。另外,使用它時可以指定容量,這樣它也就是一種有界隊列瞭。 - ArrayBlockingQueue:有界隊列,基於數組實現。在線程池初始化時,指定隊列的容量,後續無法再調整。這種有界隊列有利於防止資源耗盡,但可能更難調整和控制。
另外,Java還提供瞭另外4種隊列:
- PriorityBlockingQueue:支持優先級排序的無界阻塞隊列。存放在
PriorityBlockingQueue
中的元素必須實現Comparable
接口,這樣才能通過實現compareTo()
方法進行排序。優先級最高的元素將始終排在隊列的頭部;PriorityBlockingQueue
不會保證優先級一樣的元素的排序,也不保證當前隊列中除瞭優先級最高的元素以外的元素,隨時處於正確排序的位置。DelayQueue:延遲隊列。基於二叉堆實現,同時具備:無界隊列、阻塞隊列、優先隊列的特征。DelayQueue
延遲隊列中存放的對象,必須是實現Delayed
接口的類對象。通過執行時延從隊列中提取任務,時間沒到任務取不出來。更多內容請見DelayQueue。 - LinkedBlockingDeque:雙端隊列。基於鏈表實現,既可以從尾部插入/取出元素,還可以從頭部插入元素/取出元素。
- LinkedTransferQueue:由鏈表結構組成的無界阻塞隊列。這個隊列比較特別的時,采用一種預占模式,意思就是消費者線程取元素時,如果隊列不為空,則直接取走數據,若隊列為空,那就生成一個節點(節點元素為null)入隊,然後消費者線程被等待在這個節點上,後面生產者線程入隊時發現有一個元素為null的節點,生產者線程就不入隊瞭,直接就將元素填充到該節點,並喚醒該節點等待的線程,被喚醒的消費者線程取走元素。
拒絕策略
線程池有一個重要的機制:拒絕策略。當線程池workQueue
已滿且無法再創建新線程池時,就要拒絕後續任務瞭。拒絕策略需要實現RejectedExecutionHandler
接口,不過Executors
框架已經為我們實現瞭4種拒絕策略:
- AbortPolicy(默認):丟棄任務並拋出
RejectedExecutionException
異常。 - CallerRunsPolicy:直接運行這個任務的
run
方法,但並非是由線程池的線程處理,而是交由任務的調用線程處理。 - DiscardPolicy:直接丟棄任務,不拋出任何異常。
- DiscardOldestPolicy:將當前處於等待隊列列頭的等待任務強行取出,然後再試圖將當前被拒絕的任務提交到線程池執行。
線程工廠指定創建線程的方式,這個參數不是必選項,Executors
類已經為我們非常貼心地提供瞭一個默認的線程工廠:
/** * The default thread factory */ static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
線程池狀態
線程池有5種狀態:
volatile int runState; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
runState
表示當前線程池的狀態,它是一個 volatile
變量用來保證線程之間的可見性。
下面的幾個static final
變量表示runState
可能的幾個取值,有以下幾個狀態:
- RUNNING:當創建線程池後,初始時,線程池處於
RUNNING
狀態; - SHUTDOWN:如果調用瞭
shutdown()
方法,則線程池處於SHUTDOWN
狀態,此時線程池不能夠接受新的任務,它會等待所有任務執行完畢; - STOP:如果調用瞭shutdownNow()方法,則線程池處於
STOP
狀態,此時線程池不能接受新的任務,並且會去嘗試終止正在執行的任務; - TERMINATED:當線程池處於
SHUTDOWN
或STOP
狀態,並且所有工作線程已經銷毀,任務緩存隊列已經清空或執行結束後,線程池被設置為TERMINATED
狀態。
初始化&容量調整&關閉
1、線程初始化
默認情況下,創建線程池之後,線程池中是沒有線程的,需要提交任務之後才會創建線程。
在實際中如果需要線程池創建之後立即創建線程,可以通過以下兩個方法辦到:
- prestartCoreThread():boolean prestartCoreThread(),初始化一個核心線程
- prestartAllCoreThreads():int prestartAllCoreThreads(),初始化所有核心線程,並返回初始化的線程數
public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); //註意傳進去的參數是null } public int prestartAllCoreThreads() { int n = 0; while (addIfUnderCorePoolSize(null))//註意傳進去的參數是null ++n; return n; }
2、線程池關閉
ThreadPoolExecutor
提供瞭兩個方法,用於線程池的關閉:
- shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完後才終止,但再也不會接受新的任務shutdownNow():立即終止線程池,並嘗試打斷正在執行的任務,並且清空任務緩存隊列,返回尚未執行的任務
3、線程池容量調整
ThreadPoolExecutor
提供瞭動態調整線程池容量大小的方法:
- setCorePoolSize:設置核心池大小
- setMaximumPoolSize:設置線程池最大能創建的線程數目大小
當上述參數從小變大時,ThreadPoolExecutor
進行線程賦值,還可能立即創建新的線程來執行任務。
使用線程池
ThreadPoolExecutor
通過構造方法使用ThreadPoolExecutor
是線程池最直接的使用方式,下面看一個實例:
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class MyTest { public static void main(String[] args) { // 創建線程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 5, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(5)); // 向線程池提交任務 for (int i = 0; i < threadPool.getCorePoolSize(); i++) { threadPool.execute(new Runnable() { @Override public void run() { for (int x = 0; x < 2; x++) { System.out.println(Thread.currentThread().getName() + ":" + x); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } }); } // 關閉線程池 threadPool.shutdown(); // 設置線程池的狀態為SHUTDOWN,然後中斷所有沒有正在執行任務的線程 // threadPool.shutdownNow(); // 設置線程池的狀態為STOP,然後嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表,該方法要慎用,容易造成不可控的後果 } }
運行結果:
pool-1-thread-2:0
pool-1-thread-1:0
pool-1-thread-3:0
pool-1-thread-2:1
pool-1-thread-3:1
pool-1-thread-1:1
Executors封裝線程池
另外,Executors
封裝好瞭4種常見的功能線程池(還是那麼地貼心):
1、FixedThreadPool
固定容量線程池。其特點是最大線程數就是核心線程數,意味著線程池隻能創建核心線程,keepAliveTime
為0,即線程執行完任務立即回收。任務隊列未指定容量,代表使用默認值Integer.MAX_VALUE
。適用於需要控制並發線程的場景。
// 使用默認線程工廠 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } // 需要自定義線程工廠 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
使用示例:
// 1. 創建線程池對象,設置核心線程和最大線程數為5 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); // 2. 創建Runnable(任務) Runnable task =new Runnable(){ public void run() { System.out.println(Thread.currentThread().getName() + "--->運行"); } }; // 3. 向線程池提交任務 fixedThreadPool.execute(task);
2、 SingleThreadExecutor
單線程線程池。
特點是線程池中隻有一個線程(核心線程),線程執行完任務立即回收,使用有界阻塞隊列(容量未指定,使用默認值Integer.MAX_VALUE
)
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } // 為節省篇幅,省略瞭自定義線程工廠方式的源碼
使用示例:
// 1. 創建單線程線程池 ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); // 2. 創建Runnable(任務) Runnable task = new Runnable(){ public void run() { System.out.println(Thread.currentThread().getName() + "--->運行"); } }; // 3. 向線程池提交任務 singleThreadExecutor.execute(task);
3、 ScheduledThreadPool
定時線程池。指定核心線程數量,普通線程數量無限,線程執行完任務立即回收,任務隊列為延時阻塞隊列。這是一個比較特別的線程池,適用於執行定時或周期性的任務。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } // 繼承瞭 ThreadPoolExecutor public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { // 構造函數,省略瞭自定義線程工廠的構造函數 public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } // 延時執行任務 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { ... } // 定時執行任務 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {...} }
使用示例:
// 1. 創建定時線程池 ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); // 2. 創建Runnable(任務) Runnable task = new Runnable(){ public void run() { System.out.println(Thread.currentThread().getName() + "--->運行"); } }; // 3. 向線程池提交任務 scheduledThreadPool.schedule(task, 2, TimeUnit.SECONDS); // 延遲2s後執行任務 scheduledThreadPool.scheduleAtFixedRate(task,50,2000,TimeUnit.MILLISECONDS);// 延遲50ms後、每隔2000ms執行任務
4、CachedThreadPool
緩存線程池。沒有核心線程,普通線程數量為Integer.MAX_VALUE
(可以理解為無限),線程閑置60s後回收,任務隊列使用SynchronousQueue
這種無容量的同步隊列。適用於任務量大但耗時低的場景。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
使用示例:
// 1. 創建緩存線程池 ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); // 2. 創建Runnable(任務) Runnable task = new Runnable(){ public void run() { System.out.println(Thread.currentThread().getName() + "--->運行"); } }; // 3. 向線程池提交任務 cachedThreadPool.execute(task);
解讀線程池
OK,相信前面內容閱讀起來還算輕松愉悅吧,那麼從這裡開始就進入深水區瞭,如果後面內容能吃透,那麼線程池知識就真的被你掌握瞭。
我們知道,向線程池提交任務是用ThreadPoolExecutor
的execute()
方法,但在其內部,線程任務的處理其實是相當復雜的,涉及到ThreadPoolExecutor
、Worker
、Thread
三個類的6個方法:
execute()
在ThreadPoolExecutor
類中,任務提交方法的入口是execute(Runnable command)
方法(submit()
方法也是調用瞭execute()
),該方法其實隻在嘗試做一件事:經過各種校驗之後,調用 addWorker(Runnable command,boolean core)
方法為線程池創建一個線程並執行任務,與之相對應,execute() 的結果有兩個:
參數說明:
Runnable command:待執行的任務
執行流程:
1、通過 ctl.get()
得到線程池的當前線程數,如果線程數小於corePoolSize
,則調用 addWorker(commond,true)
方法創建新的線程執行任務,否則執行步驟2;
2、步驟1失敗,說明已經無法再創建新線程,那麼考慮將任務放入阻塞隊列,等待執行完任務的線程來處理。基於此,判斷線程池是否處於Running
狀態(隻有Running
狀態的線程池可以接受新任務),如果任務添加到任務隊列成功則進入步驟3,失敗則進入步驟4;
3、來到這一步需要說明任務已經加入任務隊列,這時要二次校驗線程池的狀態,會有以下情形:
線程池不再是Running
狀態瞭,需要將任務從任務隊列中移除,如果移除成功則拒絕本次任務線程池是Running
狀態,則判斷線程池工作線程是否為0,是則調用 addWorker(commond,true)
添加一個沒有初始任務的線程(這個線程將去獲取已經加入任務隊列的本次任務並執行),否則進入步驟4;線程池不是Running
狀態,但從任務隊列移除任務失敗(可能已被某線程獲取?),進入步驟4;
4、將線程池擴容至maximumPoolSize
並調用 addWorker(commond,false)
方法創建新的線程執行任務,失敗則拒絕本次任務。
流程圖:
源碼詳讀:
/** * 在將來的某個時候執行給定的任務。任務可以在新線程中執行,也可以在現有的池線程中執行。 * 如果由於此執行器已關閉或已達到其容量而無法提交任務以供執行,則由當前的{@code RejectedExecutionHandler}處理該任務。 * * @param command the task to execute 待執行的任務命令 */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. 如果運行的線程少於corePoolSize,將嘗試以給定的命令作為第一個任務啟動新線程。 * * 2. 如果一個任務可以成功排隊,那麼我們仍然需要仔細檢查兩點,其一,我們是否應該添加一個線程 * (因為自從上次檢查至今,一些存在的線程已經死亡),其二,線程池狀態此時已改變成非運行態。因此,我們重新檢查狀態,如果檢查不通過,則移除已經入列的任務,如果檢查通過且線程池線程數為0,則啟動新線程。 * * 3. 如果無法將任務加入任務隊列,則將線程池擴容到極限容量並嘗試創建一個新線程,如果失敗則拒絕任務。 */ int c = ctl.get(); // 步驟1:判斷線程池當前線程數是否小於線程池大小 if (workerCountOf(c) < corePoolSize) { // 增加一個工作線程並添加任務,成功則返回,否則進行步驟2 // true代表使用coreSize作為邊界約束,否則使用maximumPoolSize if (addWorker(command, true)) return; c = ctl.get(); } // 步驟2:不滿足workerCountOf(c) < corePoolSize或addWorker失敗,進入步驟2 // 校驗線程池是否是Running狀態且任務是否成功放入workQueue(阻塞隊列) if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次校驗,如果線程池非Running且從任務隊列中移除任務成功,則拒絕該任務 if (! isRunning(recheck) && remove(command)) reject(command); // 如果線程池工作線程數量為0,則新建一個空任務的線程 else if (workerCountOf(recheck) == 0) // 如果線程池不是Running狀態,是加入不進去的 addWorker(null, false); } // 步驟3:如果線程池不是Running狀態或任務入列失敗,嘗試擴容maxPoolSize後再次addWorker,失敗則拒絕任務 else if (!addWorker(command, false)) reject(command); }
addWorker()
addWorker(Runnable firstTask, boolean core)
方法,顧名思義,向線程池添加一個帶有任務的工作線程。
參數說明:
- Runnable firstTask:新創建的線程應該首先運行的任務(如果沒有,則為空)。
- boolean core:該參數決定瞭線程池容量的約束條件,即當前線程數量以何值為極限值。參數為
true
則使用corePollSize
作為約束值,否則使用maximumPoolSize
。
執行流程:
1、外層循環判斷線程池的狀態是否可以新增工作線程。這層校驗基於下面兩個原則:
線程池為Running
狀態時,既可以接受新任務也可以處理任務線程池為關閉狀態時隻能新增空任務的工作線程(worker
)處理任務隊列(workQueue
)中的任務不能接受新任務
2、內層循環向線程池添加工作線程並返回是否添加成功的結果。
首先校驗線程數是否已經超限制,是則返回false
,否則進入下一步通過CAS
使工作線程數+1,成功則進入步驟3,失敗則再次校驗線程池是否是運行狀態,是則繼續內層循環,不是則返回外層循環
3、核心線程數量+1成功的後續操作:添加到工作線程集合,並啟動工作線程
首先獲取鎖之後,再次校驗線程池狀態(具體校驗規則見代碼註解),通過則進入下一步,未通過則添加線程失敗線程池狀態校驗通過後,再檢查線程是否已經啟動,是則拋出異常,否則嘗試將線程加入線程池檢查線程是否啟動成功,成功則返回true
,失敗則進入 addWorkerFailed
方法
流程圖:
源碼詳讀:
private boolean addWorker(Runnable firstTask, boolean core) { // 外層循環:判斷線程池狀態 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); /** * 1.線程池為非Running狀態(Running狀態則既可以新增核心線程也可以接受任務) * 2.線程為shutdown狀態且firstTask為空且隊列不為空 * 3.滿足條件1且條件2不滿足,則返回false * 4.條件2解讀:線程池為shutdown狀態時且任務隊列不為空時,可以新增空任務的線程來處理隊列中的任務 */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 內層循環:線程池添加核心線程並返回是否添加成功的結果 for (;;) { int wc = workerCountOf(c); // 校驗線程池已有線程數量是否超限: // 1.線程池最大上限CAPACITY // 2.corePoolSize或maximumPoolSize(取決於入參core) if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 通過CAS操作使工作線程數+1,跳出外層循環 if (compareAndIncrementWorkerCount(c)) break retry; // 線程+1失敗,重讀ctl c = ctl.get(); // Re-read ctl // 如果此時線程池狀態不再是running,則重新進行外層循環 if (runStateOf(c) != rs) continue retry; // 其他 CAS 失敗是因為工作線程數量改變瞭,繼續內層循環嘗試CAS對線程數+1 // else CAS failed due to workerCount change; retry inner loop } } /** * 核心線程數量+1成功的後續操作:添加到工作線程集合,並啟動工作線程 */ boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 下面代碼需要加鎖:線程池主鎖 mainLock.lock(); try { // 持鎖期間重新檢查,線程工廠創建線程失敗或獲取鎖之前關閉的情況發生時,退出 int c = ctl.get(); int rs = runStateOf(c); // 再次檢驗線程池是否是running狀態或線程池shutdown但線程任務為空 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 線程已經啟動,則拋出非法線程狀態異常 // 為什麼會存在這種狀態呢?未解決 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); //加入線程池 int s = workers.size(); // 如果當前工作線程數超過線程池曾經出現過的最大線程數,刷新後者值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); // 釋放鎖 } if (workerAdded) { // 工作線程添加成功,啟動該線程 t.start(); workerStarted = true; } } } finally { //線程啟動失敗,則進入addWorkerFailed if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
Worker類
Worker
類是內部類,既實現瞭Runnable
,又繼承瞭AbstractQueuedSynchronizer
(以下簡稱AQS
),所以其既是一個可執行的任務,又可以達到鎖的效果。
Worker
類主要維護正在運行任務的線程的中斷控制狀態,以及其他次要的記錄。這個類適時地繼承瞭AbstractQueuedSynchronizer
類,以簡化獲取和釋放鎖(該鎖作用於每個任務執行代碼)的過程。這樣可以防止去中斷正在運行中的任務,隻會中斷在等待從任務隊列中獲取任務的線程。
我們實現瞭一個簡單的不可重入互斥鎖,而不是使用可重入鎖,因為我們不希望工作任務在調用setCorePoolSize
之類的池控制方法時能夠重新獲取鎖。另外,為瞭在線程真正開始運行任務之前禁止中斷,我們將鎖狀態初始化為負值,並在啟動時清除它(在runWorker
中)。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ // 通過構造函數初始化, Worker(Runnable firstTask) { //設置AQS的同步狀態 // state:鎖狀態,-1為初始值,0為unlock狀態,1為lock狀態 setState(-1); // inhibit interrupts until runWorker 在調用runWorker前,禁止中斷 this.firstTask = firstTask; // 線程工廠創建一個線程 this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); //runWorker()是ThreadPoolExecutor的方法 } // Lock methods // The value 0 represents the unlocked state. 0代表“沒被鎖定”狀態 // The value 1 represents the locked state. 1代表“鎖定”狀態 protected boolean isHeldExclusively() { return getState() != 0; } /** * 嘗試獲取鎖的方法 * 重寫AQS的tryAcquire(),AQS本來就是讓子類來實現的 */ protected boolean tryAcquire(int unused) { // 判斷原值為0,且重置為1,所以state為-1時,鎖無法獲取。 // 每次都是0->1,保證瞭鎖的不可重入性 if (compareAndSetState(0, 1)) { // 設置exclusiveOwnerThread=當前線程 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } /** * 嘗試釋放鎖 * 不是state-1,而是置為0 */ protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } /** * 中斷(如果運行) * shutdownNow時會循環對worker線程執行 * 且不需要獲取worker鎖,即使在worker運行時也可以中斷 */ void interruptIfStarted() { Thread t; //如果state>=0、t!=null、且t沒有被中斷 //new Worker()時state==-1,說明不能中斷 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
runWorker()
可以說,runWorker(Worker w)
是線程池中真正處理任務的方法,前面的execute()
和 addWorker()
都是在為該方法做準備和鋪墊。
參數說明:
Worker w:封裝的Worker,攜帶瞭工作線程的諸多要素,包括Runnable
(待處理任務)、lock
(鎖)、completedTasks
(記錄線程池已完成任務數)
執行流程:
1、判斷當前任務或者從任務隊列中獲取的任務是否不為空,都為空則進入步驟2,否則進入步驟3
2、任務為空,則將completedAbruptly
置為false
(即線程不是突然終止),並執行processWorkerExit(w,completedAbruptly)
方法進入線程退出程序
3、任務不為空,則進入循環,並加鎖
4、判斷是否為線程添加中斷標識,以下兩個條件滿足其一則添加中斷標識:
線程池狀態>=STOP
,即STOP
或TERMINATED
一開始判斷線程池狀態<STOP
,接下來檢查發現Thread.interrupted()
為true
,即線程已經被中斷,再次檢查線程池狀態是否>=STOP
(以消除該瞬間shutdown
方法生效,使線程池處於STOP
或TERMINATED
)
5、執行前置方法 beforeExecute(wt, task)
(該方法為空方法,由子類實現)後執行task.run()
方法執行任務(執行不成功拋出相應異常)
6、執行後置方法 afterExecute(task, thrown)
(該方法為空方法,由子類實現)後將線程池已完成的任務數+1,並釋放鎖。
7、再次進行循環條件判斷。
流程圖:
源碼詳讀:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // allow interrupts // new Worker()是state==-1,此處是調用Worker類的tryRelease()方法,將state置為0,而interruptIfStarted()中隻有state>=0才允許調用中斷 w.unlock(); // 線程退出的原因,true是任務導致,false是線程正常退出 boolean completedAbruptly = true; try { // 當前任務和從任務隊列中獲取的任務都為空,方停止循環 while (task != null || (task = getTask()) != null) { //上鎖可以防止在shutdown()時終止正在運行的worker,而不是應對並發 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt /** * 判斷1:確保隻有在線程處於stop狀態且wt未中斷時,wt才會被設置中斷標識 * 條件1:線程池狀態>=STOP,即STOP或TERMINATED * 條件2:一開始判斷線程池狀態<STOP,接下來檢查發現Thread.interrupted()為true,即線程已經被中斷,再次檢查線程池狀態是否>=STOP(以消除該瞬間shutdown方法生效,使線程池處於STOP或TERMINATED), * 條件1與條件2任意滿意一個,且wt不是中斷狀態,則中斷wt,否則進入下一步 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); //當前線程調用interrupt()中斷 try { //執行前(空方法,由子類重寫實現) beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //執行後(空方法,由子類重寫實現) afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; //完成任務數+1 w.unlock(); //釋放鎖 } } // completedAbruptly = false; } finally { //處理worker的退出 processWorkerExit(w, completedAbruptly); } }
5、getTask()
由函數調用關系圖可知,在ThreadPoolExecutor
類的實現中,Runnable getTask()
方法是為void runWorker(Worker w)
方法服務的,它的作用就是在任務隊列(workQueue
)中獲取 task(Runnable
)。
參數說明:無參數
執行流程:
- 將
timedOut
(上次獲取任務是否超時)置為false
(首次執行方法,無上次,自然為false
),進入一個無限循環 - 如果線程池為
Shutdown
狀態且任務隊列為空(線程池shutdown
狀態可以處理任務隊列中的任務,不再接受新任務,這個是重點)或者線程池為STOP
或TERMINATED
狀態,則意味著線程池不必再獲取任務瞭,當前工作線程數量-1並返回null
,否則進入步驟3 - 如果線程池數量超限制或者時間超限且(任務隊列為空或當前線程數>1),則進入步驟4,否則進入步驟5。
- 移除工作線程,成功則返回
null
,不成功則進入下輪循環。 - 嘗試用
poll()
或者take()
(具體用哪個取決於timed
的值)獲取任務,如果任務不為空,則返回該任務。如果為空,則將timeOut
置為true
進入下一輪循環。如果獲取任務過程發生異常,則將timeOut
置為 false 後進入下一輪循環。
流程圖:
源碼詳讀:
private Runnable getTask() { // 最新一次poll是否超時 boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /** * 條件1:線程池狀態SHUTDOWN、STOP、TERMINATED狀態 * 條件2:線程池STOP、TERMINATED狀態或workQueue為空 * 條件1與條件2同時為true,則workerCount-1,並且返回null * 註:條件2是考慮到SHUTDOWN狀態的線程池不會接受任務,但仍會處理任務 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? /** * 下列兩個條件滿足任意一個,則給當前正在嘗試獲取任務的工作線程設置阻塞時間限制(超時會被銷毀?不太確定這點),否則線程可以一直保持活躍狀態 * 1.allowCoreThreadTimeOut:當前線程是否以keepAliveTime為超時時限等待任務 * 2.當前線程數量已經超越瞭核心線程數 */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 兩個條件全部為true,則通過CAS使工作線程數-1,即剔除工作線程 // 條件1:工作線程數大於maximumPoolSize,或(工作線程阻塞時間受限且上次在任務隊列拉取任務超時) // 條件2:wc > 1或任務隊列為空 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 移除工作線程,成功則返回null,不成功則進入下輪循環 if (compareAndDecrementWorkerCount(c)) return null; continue; } // 執行到這裡,說明已經經過前面重重校驗,開始真正獲取task瞭 try { // 如果工作線程阻塞時間受限,則使用poll(),否則使用take() // poll()設定阻塞時間,而take()無時間限制,直到拿到結果為止 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // r不為空,則返回該Runnable if (r != null) return r; // 沒能獲取到Runable,則將最近獲取任務是否超時設置為true timedOut = true; } catch (InterruptedException retry) { // 響應中斷,進入下一次循環前將最近獲取任務超時狀態置為false timedOut = false; } } }
processWorkerExit()
processWorkerExit(Worker w, boolean completedAbruptly)
執行線程退出的方法
參數說明:
- Worker w:要結束的工作線程。
- boolean completedAbruptly: 是否突然完成(異常導致),如果工作線程因為用戶異常死亡,則
completedAbruptly
參數為true
。
執行流程:
1、如果 completedAbruptly
為 true
,即工作線程因為異常突然死亡,則執行工作線程-1操作。
2、主線程獲取鎖後,線程池已經完成的任務數追加 w(當前工作線程) 完成的任務數,並從worker
的set
集合中移除當前worker
。
3、根據線程池狀態進行判斷是否執行tryTerminate
()結束線程池。
4、是否需要增加工作線程,如果線程池還沒有完全終止,仍需要保持一定數量的線程。
如果當前線程是突然終止的,調用addWorker()
創建工作線程
當前線程不是突然終止,但當前工作線程數量小於線程池需要維護的線程數量,則創建工作線程。需要維護的線程數量為corePoolSize
(取決於成員變量 allowCoreThreadTimeOut
是否為 false
)或1。
源碼詳讀:
/** * Performs cleanup and bookkeeping for a dying worker. Called * only from worker threads. Unless completedAbruptly is set, * assumes that workerCount has already been adjusted to account * for exit. This method removes thread from worker set, and * possibly terminates the pool or replaces the worker if either * it exited due to user task exception or if fewer than * corePoolSize workers are running or queue is non-empty but * there are no workers. * * @param w the worker * @param completedAbruptly if the worker died due to user exception */ private void processWorkerExit(Worker w, boolean completedAbruptly) { /** * 1.工作線程-1操作 * 1)如果completedAbruptly 為true,說明工作線程發生異常,那麼將正在工作的線程數量-1 * 2)如果completedAbruptly 為false,說明工作線程無任務可以執行,由getTask()執行worker-1操作 */ if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); // 2.從線程set集合中移除工作線程,該過程需要加鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 將該worker已完成的任務數追加到線程池已完成的任務數 completedTaskCount += w.completedTasks; // HashSet<Worker>中移除該worker workers.remove(w); } finally { mainLock.unlock(); } // 3.根據線程池狀態進行判斷是否結束線程池 tryTerminate(); /** * 4.是否需要增加工作線程 * 線程池狀態是running 或 shutdown * 如果當前線程是突然終止的,addWorker() * 如果當前線程不是突然終止的,但當前線程數量 < 要維護的線程數量,addWorker() * 故如果調用線程池shutdown(),直到workQueue為空前,線程池都會維持corePoolSize個線程,然後再逐漸銷毀這corePoolSize個線程 */ int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
到此這篇關於深入理解Java線程池從設計思想到源碼解讀的文章就介紹到這瞭,更多相關java線程池內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- 淺談Java ThreadPoolExecutor的使用
- java線程池中Worker線程執行流程原理解析
- 一篇文章徹底搞懂jdk8線程池
- 詳解Java線程池的使用及工作原理
- Java線程池ThreadPoolExecutor源碼深入分析