Android開發中線程池源碼解析

線程池(英語:thread pool):一種線程使用模式。線程過多會帶來調度開銷,進而影響緩存局部性和整體性能。而線程池維護著多個線程,等待著監督管理者分配可並發執行的任務。這避免瞭在處理短時間任務時創建與銷毀線程的代價。線程池不僅能夠保證內核的充分利用,還能防止過分調度。可用線程數量應該取決於可用的並發處理器、處理器內核、內存、網絡sockets等的數量。 例如,線程數一般取cpu數量+2比較合適,線程數過多會導致額外的線程切換開銷。—-摘自維基百科

我們在Android或者Java開發中,日常所使用的就是ThreadPoolExecutor瞭,我們先來看下如何使用一個線程池來代替多線程開發。

使用線程池

// 創建一個核心線程數為5,最大線程數為10,空閑線程存活時間為60s的線程池對象
val threadPoolExecutor = ThreadPoolExecutor(
    5, 10, 60,
    TimeUnit.MINUTES,
    ArrayBlockingQueue<Runnable>(100),
    RejectedExecutionHandler { _, _ -> println("reject submit thread to thread pool") }
)
 
// 測試
for (i in 1..10) {
    threadPoolExecutor.execute { println("execute thread is:${Thread.currentThread().name}") }
}
 
// 結果
// execute thread is:pool-1-thread-1
// execute thread is:pool-1-thread-1
// execute thread is:pool-1-thread-1
// execute thread is:pool-1-thread-1
// execute thread is:pool-1-thread-5
// execute thread is:pool-1-thread-5
// execute thread is:pool-1-thread-4
// execute thread is:pool-1-thread-3
// execute thread is:pool-1-thread-2
// execute thread is:pool-1-thread-1

從結果就可以看出來,執行時間操作,但是隻創建瞭5個線程,另外5次都是復用線程的。這樣就達到瞭復用存在的線程、減少對象的創建和銷毀的額外開銷;並且可以控制最大線程數,也就是控制瞭最大並發數。

知道如何使用一個線程池還不夠,我們需要看看ThreadPoolExecutor是如何創建、復用這些線程的。下面我們看看創建ThreadPoolExecutor對象的幾個參數:

構造方法

/**
     * 創建一個ThreadPoolExecutor對象
     *
     * @param corePoolSize 核心線程數,這些線程會一直在線程池中,除非設置瞭 allowCoreThreadTimeOut
     * @param maximumPoolSize 最大線程數,運行線程創建的最大值
     * @param keepAliveTime 當線程數>核心線程數的時候,這個值就是空閑且非核心線程存活的時間
     * @param unit keepAliveTime的單位
     * @param workQueue 保存task的隊列,直到執行execute()方法執行
     * @param threadFactory ThreadFactory是一個接口,裡面隻有Thread newThread(Runnable r)方法,用來創建線程,
     *                      默認采用Executors.defaultThreadFactory()
     * @param handler 拒絕處理任務時的策略,如果線程池滿瞭且所有線程都不處於空閑狀態,
     *                通過RejectedExecutionHandler接口的rejectedExecution(Runnable r, ThreadPoolExecutor executor)來處理傳進來的Runnable
     *                系統提供瞭四種:CallerRunsPolicy(), AbortPolicy(), DiscardPolicy(), DiscardOldestPolicy()
     *                默認采用new AbortPolicy()
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler){
        if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

我在方法頭註釋中我都一一解釋瞭幾個參數的作用,還有幾點需要註意的就是:

  • 核心線程數不能小於0;
  • 最大線程數不能小於0;
  • 最大線程數不能小於核心線程數;
  • 空閑線程的存活時間不能小於0;

通過上面的解釋我們很明白的知道前面幾個參數的作用,但是最後兩個參數我們並不能通過表面的解釋通曉它,既然不能通過表象看懂他倆,那就看看默認的實現是如何做的,這樣在接下來的源碼分析中很有幫助。

ThreadFactory:線程工廠

ThreadFactory 是一個接口,裡面隻由唯一的 Thread newThread(Runnable r); 方法,此方法是用來創建線程的,從接口中我們得到的就隻有這麼多,下面我們看看 Executors 默認的 DefaultThreadFactory 類:

// 靜態內部類
static class DefaultThreadFactory implements ThreadFactory {
    // 線程池的標識,從1開始沒創建一個線程池+1
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    // 線程組
    private final ThreadGroup group;
    // 線程名中的結尾標識,從1開始每創建一個線程+1
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    // 線程名
    private final String namePrefix;
 
    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }
 
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

RejectedExecutionHandler:拒絕處理任務的策略

RejectedExecutionHandler 也是一個接口,並且也隻提供瞭唯一的 void rejectedExecution(Runnable r, ThreadPoolExecutor executor); 方法。我們可以自定義策略,也可以用上面提到的封裝好的四種策略,先看一下四種策略分別怎麼拒絕任務的:

CallerRunsPolicy

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy() {
    }
 
    /**
     * 如果線程池還沒關閉,那麼就再次執行這個Runnable
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

AbortPolicy

public static class AbortPolicy implements RejectedExecutionHandler {
    /**
     * Creates an {@code AbortPolicy}.
     */
    public AbortPolicy() {
    }
 
    /**
     * 這個策略就是拋出異常,不做其他處理
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                " rejected from " +
                e.toString());
    }
}

DiscardPolicy

public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardPolicy}.
     */
    public DiscardPolicy() {
    }
 
    /**
     * 什麼也不做,也就是拋棄瞭這個Runnable
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

DiscardOldestPolicy

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() {
    }
 
    /**
     * 1. 線程池未關閉
     * 2. 獲取隊列中的下一個Runnable
     * 3. 獲取到瞭,但是不對它進行處理,也就是拋棄它
     * 4. 執行我們傳過來的這個Runnable
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

重要的參數

除瞭上述構造方法中的幾個參數外,線程池還有幾個比較核心的參數,如下:

public class ThreadPoolExecutor extends AbstractExecutorService {
 
    // ctl 的低29位表示線程池中的線程數,高3位表示當前線程狀態
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // (2^29) -1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 
    // 運行狀態:接受新任務並處理排隊的任務
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 關閉狀態:不接受新任務,但處理排隊的任務
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 停止狀態:不接受新任務,不處理排隊的任務,中斷正在進行的任務
    private static final int STOP       =  1 << COUNT_BITS;
    // 整理狀態:整理狀態,所有任務已終止,workerCount為零,線程將運行terminate()方法
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 終止狀態:terminate()方法執行完成
    private static final int TERMINATED =  3 << COUNT_BITS;
 
    // 表示線程是否允許或停止
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 線程的有效數量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    ......後面的源碼暫時省略
}

execute:執行

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 如果運行中的線程數小於核心線程數,執行addWorker(command, true)創建新的核心Thread執行任務
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 1. 已經滿足:運行中的線程數大於核心線程數,但是小於最大線程數
    // 2. 需要滿足:線程池在運行狀態
    // 3. 需要滿足:添加到工作隊列中成功
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 如果線程不在運行狀態,就從工作隊列中移除command
        // 並且執行拒絕策略
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 線程池處於運行狀態,但是沒有線程,則addWorker(null, false)
        // 至於這裡為什麼要傳入一個null,因為在最外層的if條件中我們已經將Runnable添加到工作隊列中瞭
        // 而且在runWorker()源碼中也可以得到答案,如果傳入的Runnable為空,就會去工作隊列中取task。
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 執行addWorker()創建新的非核心線程Thread執行任務
    // addWorker() 失敗,執行拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}

從上面源碼中可以看出,execute()一個新的任務,主要有以下這幾種情況:

1、核心線程未滿,直接新建核心線程並執行任務;
2、核心線程滿瞭,工作隊列未滿,將任務添加到工作隊列中;
3、核心線程和工作隊列都滿,但是最大線程數未達到,新建線程並執行任務;
4、上面條件都不滿足,那麼就執行拒絕策略。

更形象的可以看下方流程圖:

添加任務的流程圖

addWorker(Runnable , boolean):添加Worker

private boolean addWorker(Runnable firstTask, boolean core) {
    // 標記外循環,比如在內循環中break retry就直接跳出外循環
    retry:
    for (; ; ) {
        int c = ctl.get();
        int rs = runStateOf(c);
 
        // 直接返回false有以下3種情況:
        // 1. 線程池狀態為STOP、TIDYING、TERMINATED
        // 2. 線程池狀態不是running狀態,並且firstTask不為空
        // 3. 線程池狀態不是running狀態,並且工作隊列為空
        if (rs >= SHUTDOWN &&
                !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
            return false;
 
        for (; ; ) {
            int wc = workerCountOf(c);
            // 如果添加的是核心線程,但是運行的線程數大於等於核心線程數,那麼就不添加瞭,直接返回
            // 如果添加的是非核心線程,但是運行的線程數大於等於最大線程數,那麼也不添加,直接返回
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 增加workerCount的值 +1
            if (compareAndIncrementWorkerCount(c))
                // 跳出外循環
                break retry;
            c = ctl.get();  // 重新檢查線程池狀態
            if (runStateOf(c) != rs)
                continue retry;
            // 重新檢查的狀態和之前不合,再次從外循環進入
        }
    }
 
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 線程池重入鎖
            final ReentrantLock mainLock = this.mainLock;
            // 獲得鎖
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // 線程池在運行狀態或者是線程池關閉同時Runnable也為空
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 想Worker中添加新的Worker
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                // 釋放鎖
                mainLock.unlock();
            }
            // 如果添加成功,啟動線程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker() 主要就是在滿足種種條件(上述源碼中解釋瞭)後,新建一個Worker對象,並添加到HashSet<Worker> workers中去,最後調用新建Worker對象的Thread變量的start()方法。

Worker類

Worker是一個繼承瞭AQS並實現瞭Runnable的內部類,我們重點看看它的run()方法,因為上面addWorker()中,t.start()觸發的就是它的run()方法:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;
 
    /**
     * Thread this worker is running in.  Null if factory fails.
     */
    final Thread thread;
    /**
     * Initial task to run.  Possibly null.
     */
    Runnable firstTask;
    /**
     * Per-thread task counter
     */
    volatile long completedTasks;
 
    /**
     * Creates with given first task and thread from ThreadFactory.
     *
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 這邊是把Runnable傳給瞭Thread,也就是說Thread.run()就是執行瞭下面的run()方法
        this.thread = getThreadFactory().newThread(this);
    }
 
    /**
     * Delegates main run loop to outer runWorker
     */
    public void run() {
        runWorker(this);
    }
}

run()方法實際調用瞭runWorker(Worker)方法

runWorker(Worker)方法:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 釋放鎖,允許中斷
        boolean completedAbruptly = true;
        try {
            // 1. worker中的task不為空
            // 2. 如果worker的task為空,那麼取WorkerQueue的task
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 這是一個空方法,可由子類實現
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 執行task
                        task.run();
                    } 
                    .... 省略
                    // 這是一個空方法,可由子類實現
                    finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask():

```java
private Runnable getTask() {
    // 進入死循環
    for (; ; ) {
        try {
            // 為true的條件:
            // allowCoreThreadTimeOut=true: 核心線程需根據keepAliveTime超時等待
            // 核心線程數已滿
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 如果timed為true,執行BlockQueue.poll(),這個操作在取不到task的時候會等待keepAliveTime,然後返回null
            // 如果timed為false,執行BlockQueue.take(),這個操作在隊列為空的時候一直阻塞
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
        }
    }
}
```

線程池的源碼按照上述的幾個方法(execute(runnable) -> addWorker(runnable,core) -> Worker -> runWorker(worker) -> getTask())的順序來分析,你就可以很清晰的將運作過程瞭解清楚,同事構造方法和幾個重要的參數一定要懂,不然對於後面的源碼分析很受阻礙,相信大傢通過這篇文章可以加深對線程池的理解。

以上就是本文的全部內容,希望對大傢的學習有所幫助,也希望大傢多多支持WalkonNet。

推薦閱讀: