java線程池中Worker線程執行流程原理解析

引言

在《【高並發】別鬧瞭,這樣理解線程池執行任務的核心流程才正確!!》一文中我們深度分析瞭線程池執行任務的核心流程,在ThreadPoolExecutor類的addWorker(Runnable, boolean)方法中,使用CAS安全的更新線程的數量之後,接下來就是創建新的Worker線程執行任務,所以,我們先來分析下Worker類的源碼。

Worker類分析

Worker類從類的結構上來看,繼承瞭AQS(AbstractQueuedSynchronizer類)並實現瞭Runnable接口。本質上,Worker類既是一個同步組件,也是一個執行任務的線程。

接下來,我們看下Worker類的源碼,如下所示。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
	private static final long serialVersionUID = 6138294804551838833L;
	//執行任務的線程類
	final Thread thread;
	//初始化執行的任務,第一次執行的任務
	Runnable firstTask;
	//完成任務的計數
	volatile long completedTasks;
	//Worker類的構造方法,初始化任務並調用線程工廠創建執行任務的線程
	Worker(Runnable firstTask) {
		setState(-1); 
		this.firstTask = firstTask;
		this.thread = getThreadFactory().newThread(this);
	}
	//重寫Runnable接口的run()方法
	public void run() {
		//調用ThreadPoolExecutor類的runWorker(Worker)方法
		runWorker(this);
	}
	//檢測是否是否獲取到鎖
	//state=0表示未獲取到鎖
	//state=1表示已獲取到鎖
	protected boolean isHeldExclusively() {
		return getState() != 0;
	}
	//使用AQS設置線程狀態
	protected boolean tryAcquire(int unused) {
		if (compareAndSetState(0, 1)) {
			setExclusiveOwnerThread(Thread.currentThread());
			return true;
		}
		return false;
	}
	//嘗試釋放鎖
	protected boolean tryRelease(int unused) {
		setExclusiveOwnerThread(null);
		setState(0);
		return true;
	}
	public void lock()        { acquire(1); }
	public boolean tryLock()  { return tryAcquire(1); }
	public void unlock()      { release(1); }
	public boolean isLocked() { return isHeldExclusively(); }
	void interruptIfStarted() {
		Thread t;
		if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
			try {
				t.interrupt();
			} catch (SecurityException ignore) {
			}
		}
	}
}

在Worker類的構造方法中,可以看出,首先將同步狀態state設置為-1,設置為-1是為瞭防止runWorker方法運行之前被中斷。這是因為如果其他線程調用線程池的shutdownNow()方法時,如果Worker類中的state狀態的值大於0,則會中斷線程,如果state狀態的值為-1,則不會中斷線程。

Worker類實現瞭Runnable接口,需要重寫run方法,而Worker的run方法本質上調用的是ThreadPoolExecutor類的runWorker方法,在runWorker方法中,會首先調用unlock方法,該方法會將state置為0,所以這個時候調用shutDownNow方法就會中斷當前線程,而這個時候已經進入瞭runWork方法,就不會在還沒有執行runWorker方法的時候就中斷線程。

註意:大傢需要重點理解Worker類的實現。

Worker類中調用瞭ThreadPoolExecutor類的runWorker(Worker)方法。接下來,我們一起看下ThreadPoolExecutor類的runWorker(Worker)方法的實現。

runWorker(Worker)方法

首先,我們看下RunWorker(Worker)方法的源碼,如下所示。

final void runWorker(Worker w) {
	Thread wt = Thread.currentThread();
	Runnable task = w.firstTask;
	w.firstTask = null;
	//釋放鎖,將state設置為0,允許中斷任務的執行
	w.unlock();
	boolean completedAbruptly = true;
	try {
		//如果任務不為空,或者從任務隊列中獲取的任務不為空,則執行while循環
		while (task != null || (task = getTask()) != null) {
			//如果任務不為空,則獲取Worker工作線程的獨占鎖
			w.lock();
			//如果線程已經停止,或者中斷線程後線程終止並且沒有成功中斷線程
			//大傢好好理解下這個邏輯
			if ((runStateAtLeast(ctl.get(), STOP) ||
				 (Thread.interrupted() &&
				  runStateAtLeast(ctl.get(), STOP))) &&
				!wt.isInterrupted())
				//中斷線程
				wt.interrupt();
			try {
				//執行任務前執行的邏輯
				beforeExecute(wt, task);
				Throwable thrown = null;
				try {
					//調用Runable接口的run方法執行任務
					task.run();
				} catch (RuntimeException x) {
					thrown = x; throw x;
				} catch (Error x) {
					thrown = x; throw x;
				} catch (Throwable x) {
					thrown = x; throw new Error(x);
				} finally {
					//執行任務後執行的邏輯
					afterExecute(task, thrown);
				}
			} finally {
				//任務執行完成後,將其設置為空
				task = null;
				//完成的任務數量加1
				w.completedTasks++;
				//釋放工作線程獲得的鎖
				w.unlock();
			}
		}
		completedAbruptly = false;
	} finally {
		//執行退出Worker線程的邏輯
		processWorkerExit(w, completedAbruptly);
	}
}

這裡,我們拆解runWorker(Worker)方法。

(1)獲取當前線程的句柄和工作線程中的任務,並將工作線程中的任務設置為空,執行unlock方法釋放鎖,將state狀態設置為0,此時可以中斷工作線程,代碼如下所示。

Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//釋放鎖,將state設置為0,允許中斷任務的執行
w.unlock();

(2)在while循環中進行判斷,如果任務不為空,或者從任務隊列中獲取的任務不為空,則執行while循環,否則,調用processWorkerExit(Worker, boolean)方法退出Worker工作線程。

while (task != null || (task = getTask()) != null)

(3)如果滿足while的循環條件,首先獲取工作線程內部的獨占鎖,並執行一系列的邏輯判斷來檢測是否需要中斷當前線程的執行,代碼如下所示。

//如果任務不為空,則獲取Worker工作線程的獨占鎖
w.lock();
//如果線程已經停止,或者中斷線程後線程終止並且沒有成功中斷線程
//大傢好好理解下這個邏輯
if ((runStateAtLeast(ctl.get(), STOP) ||
	 (Thread.interrupted() &&
	  runStateAtLeast(ctl.get(), STOP))) &&
	!wt.isInterrupted())
	//中斷線程
	wt.interrupt();

(4)調用執行任務前執行的邏輯,如下所示

//執行任務前執行的邏輯
beforeExecute(wt, task);

(5)調用Runable接口的run方法執行任務

//調用Runable接口的run方法執行任務
task.run();

(6)調用執行任務後執行的邏輯

//執行任務後執行的邏輯
afterExecute(task, thrown);

(7)將完成的任務設置為空,完成的任務數量加1並釋放工作線程的鎖。

//任務執行完成後,將其設置為空
task = null;
//完成的任務數量加1
w.completedTasks++;
//釋放工作線程獲得的鎖
w.unlock();

(8)退出Worker線程的執行,如下所示

//執行退出Worker線程的邏輯
processWorkerExit(w, completedAbruptly);

從代碼分析上可以看到,當從Worker線程中獲取的任務為空時,會調用getTask()方法從任務隊列中獲取任務,接下來,我們看下getTask()方法的實現。

getTask()方法

我們先來看下getTask()方法的源代碼,如下所示。

private Runnable getTask() {
	//輪詢是否超時的標識
	boolean timedOut = false;
	//自旋for循環
	for (;;) {
		//獲取ctl
		int c = ctl.get();
		//獲取線程池的狀態
		int rs = runStateOf(c);
		//檢測任務隊列是否在線程池停止或關閉的時候為空
		//也就是說任務隊列是否在線程池未正常運行時為空
		if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
			//減少Worker線程的數量
			decrementWorkerCount();
			return null;
		}
		//獲取線程池中線程的數量
		int wc = workerCountOf(c);
		//檢測當前線程池中的線程數量是否大於corePoolSize的值或者是否正在等待執行任務
		boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
		//如果線程池中的線程數量大於corePoolSize
		//獲取大於corePoolSize或者是否正在等待執行任務並且輪詢超時
		//並且當前線程池中的線程數量大於1或者任務隊列為空
		if ((wc > maximumPoolSize || (timed && timedOut))
			&& (wc > 1 || workQueue.isEmpty())) {
			//成功減少線程池中的工作線程數量
			if (compareAndDecrementWorkerCount(c))
				return null;
			continue;
		}
		try {
			//從任務隊列中獲取任務
			Runnable r = timed ?
				workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
				workQueue.take();
			//任務不為空直接返回任務
			if (r != null)
				return r;
			timedOut = true;
		} catch (InterruptedException retry) {
			timedOut = false;
		}
	}
}

getTask()方法的邏輯比較簡單,大傢看源碼就可以瞭,我這裡就不重復描述瞭。

接下來,我們看下在正式調用Runnable的run()方法前後,執行的beforeExecute方法和afterExecute方法。

beforeExecute(Thread, Runnable)方法

beforeExecute(Thread, Runnable)方法的源代碼如下所示。

protected void beforeExecute(Thread t, Runnable r) { }

可以看到,beforeExecute(Thread, Runnable)方法的方法體為空,我們可以創建ThreadPoolExecutor的子類來重寫beforeExecute(Thread, Runnable)方法,使得線程池正式執行任務之前,執行我們自己定義的業務邏輯。

afterExecute(Runnable, Throwable)方法

afterExecute(Runnable, Throwable)方法的源代碼如下所示。

protected void afterExecute(Runnable r, Throwable t) { }

可以看到,afterExecute(Runnable, Throwable)方法的方法體同樣為空,我們可以創建ThreadPoolExecutor的子類來重寫afterExecute(Runnable, Throwable)方法,使得線程池在執行任務之後執行我們自己定義的業務邏輯。

接下來,就是退出工作線程的processWorkerExit(Worker, boolean)方法。

processWorkerExit(Worker, boolean)方法

processWorkerExit(Worker, boolean)方法的邏輯主要是執行退出Worker線程,並且對一些資源進行清理,源代碼如下所示。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
	//執行過程中出現瞭異常,突然中斷
	if (completedAbruptly)
		//將工作線程的數量減1
		decrementWorkerCount();
	//獲取全局鎖
	final ReentrantLock mainLock = this.mainLock;
	mainLock.lock();
	try {
		//累加完成的任務數量
		completedTaskCount += w.completedTasks;
		//將完成的任務從workers集合中移除
		workers.remove(w);
	} finally {
		//釋放鎖
		mainLock.unlock();
	}
	//嘗試終止工作線程的執行
	tryTerminate();
	//獲取ctl
	int c = ctl.get();
	//判斷當前線程池的狀態是否小於STOP(RUNNING或者SHUTDOWN)
	if (runStateLessThan(c, STOP)) {
		//如果沒有突然中斷完成
		if (!completedAbruptly) {
			//如果allowCoreThreadTimeOut為true,為min賦值為0,否則賦值為corePoolSize
			int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
			//如果min為0並且工作隊列不為空
			if (min == 0 && ! workQueue.isEmpty())
				//min的值設置為1
				min = 1;
			//如果線程池中的線程數量大於min的值
			if (workerCountOf(c) >= min)
				//返回,不再執行程序
				return; 
		}
		//調用addWorker方法
		addWorker(null, false);
	}
}

接下來,我們拆解processWorkerExit(Worker, boolean)方法。

(1)執行過程中出現瞭異常,突然中斷執行,則將工作線程數量減1,如下所示。

//執行過程中出現瞭異常,突然中斷
if (completedAbruptly)
	//將工作線程的數量減1
	decrementWorkerCount();

(2)獲取鎖累加完成的任務數量,並將完成的任務從workers集合中移除,並釋放,如下所示。

//獲取全局鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
	//累加完成的任務數量
	completedTaskCount += w.completedTasks;
	//將完成的任務從workers集合中移除
	workers.remove(w);
} finally {
	//釋放鎖
	mainLock.unlock();
}

(3)嘗試終止工作線程的執行

//嘗試終止工作線程的執行
tryTerminate();

(4)處判斷當前線程池中的線程個數是否小於核心線程數,如果是,需要新增一個線程保證有足夠的線程可以執行任務隊列中的任務或者提交的任務。

//獲取ctl
int c = ctl.get();
//判斷當前線程池的狀態是否小於STOP(RUNNING或者SHUTDOWN)
if (runStateLessThan(c, STOP)) {
	//如果沒有突然中斷完成
	if (!completedAbruptly) {
		//如果allowCoreThreadTimeOut為true,為min賦值為0,否則賦值為corePoolSize
		int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
		//如果min為0並且工作隊列不為空
		if (min == 0 && ! workQueue.isEmpty())
			//min的值設置為1
			min = 1;
		//如果線程池中的線程數量大於min的值
		if (workerCountOf(c) >= min)
			//返回,不再執行程序
			return; 
	}
	//調用addWorker方法
	addWorker(null, false);
}

接下來,我們看下tryTerminate()方法。

tryTerminate()方法

tryTerminate()方法的源代碼如下所示。

final void tryTerminate() {
	//自旋for循環
	for (;;) {
		//獲取ctl
		int c = ctl.get();
		//如果線程池的狀態為RUNNING
		//或者狀態大於TIDYING
		//或者狀態為SHUTDOWN並且任務隊列為空
		//直接返回程序,不再執行後續邏輯
		if (isRunning(c) ||
			runStateAtLeast(c, TIDYING) ||
			(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
			return;
		//如果當前線程池中的線程數量不等於0
		if (workerCountOf(c) != 0) { 
			//中斷線程的執行
			interruptIdleWorkers(ONLY_ONE);
			return;
		}
		//獲取線程池的全局鎖
		final ReentrantLock mainLock = this.mainLock;
		mainLock.lock();
		try {
			//通過CAS將線程池的狀態設置為TIDYING
			if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
				try {
					//調用terminated()方法
					terminated();
				} finally {
					//將線程池狀態設置為TERMINATED
					ctl.set(ctlOf(TERMINATED, 0));
					//喚醒所有因為調用線程池的awaitTermination方法而被阻塞的線程
					termination.signalAll();
				}
				return;
			}
		} finally {
			//釋放鎖
			mainLock.unlock();
		}
	}
}

(1)獲取ctl,根據情況設置線程池狀態或者中斷線程的執行,並返回。

//獲取ctl
int c = ctl.get();
//如果線程池的狀態為RUNNING
//或者狀態大於TIDYING
//或者狀態為SHUTDOWN並且任務隊列為空
//直接返回程序,不再執行後續邏輯
if (isRunning(c) ||
	runStateAtLeast(c, TIDYING) ||
	(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
	return;
//如果當前線程池中的線程數量不等於0
if (workerCountOf(c) != 0) { 
	//中斷線程的執行
	interruptIdleWorkers(ONLY_ONE);
	return;
}

(2)獲取全局鎖,通過CAS設置線程池的狀態,調用terminated()方法執行邏輯,最終將線程池的狀態設置為TERMINATED,喚醒所有因為調用線程池的awaitTermination方法而被阻塞的線程,最終釋放鎖,如下所示。

//獲取線程池的全局
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
	//通過CAS將線程池的狀態設置為TIDYING
	if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
		try {
			//調用terminated()方法
			terminated();
		} finally {
			//將線程池狀態設置為TERMINATED
			ctl.set(ctlOf(TERMINATED, 0));
			//喚醒所有因為調用線程池的awaitTermination方法而被阻塞的線程
			termination.signalAll();
		}
		return;
	}
} finally {
	//釋放鎖
	mainLock.unlock();
}

接下來,看下terminated()方法。

terminated()方法

terminated()方法的源代碼如下所示。

protected void terminated() { }

可以看到,terminated()方法的方法體為空,我們可以創建ThreadPoolExecutor的子類來重寫terminated()方法,值得Worker線程調用tryTerminate()方法時執行我們自己定義的terminated()方法的業務邏輯。

以上就是java線程池中Worker線程執行流程原理解析的詳細內容,更多關於java線程池Worker線程執行的資料請關註WalkonNet其它相關文章!

推薦閱讀: