一篇文章徹底搞懂jdk8線程池

這可能是最簡短的線程池分析文章瞭。

頂層設計,定義執行接口

Interface Executor(){
    void execute(Runnable command);

}

ExecutorService,定義控制接口

interface ExecutorService extends Executor{
    
}

圖片

抽象實現ExecutorService中的大部分方法

abstract class AbstractExecutorService implements ExecutorService{    //此處把ExecutorService中的提交方法都實現瞭
}

圖片

我們看下提交中的核心

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // ① 
            //核心線程數沒有滿就繼續添加核心線程
            if (addWorker(command, true)) // ②
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { // ③
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))// ④
                reject(command); //⑦
            else if (workerCountOf(recheck) == 0) // ⑤
                //如果worker為0,則添加一個非核心worker,所以線程池裡至少有一個線程
                addWorker(null, false);// ⑥
        }
        //隊列滿瞭以後,添加非核心線程
        else if (!addWorker(command, false))// ⑧
            reject(command);//⑦
    }

圖片

這裡就會有幾道常見的面試題

1,什麼時候用核心線程,什麼時候啟用非核心線程?

添加任務時優先使用核心線程,核心線程滿瞭以後,任務放入隊列中。隻要隊列不填滿,就一直使用核心線程執行任務(代碼①②)。

當隊列滿瞭以後開始使用增加非核心線程來執行隊列中的任務(代碼⑧)。

2,0個核心線程,2個非核心線程,隊列100,添加99個任務是否會執行?

會執行,添加隊列成功後,如果worker的數量為0,會添加非核心線程執行任務(見代碼⑤⑥)

3,隊列滿瞭會怎麼樣?

隊列滿瞭,會優先啟用非核心線程執行任務,如果非核心線程也滿瞭,那就執行拒絕策略。

4,submit 和execute的區別是?

submit將執行任務包裝成瞭RunnableFuture,最終返回瞭Future,executor 方法執行無返回值。

addworker實現

ThreadPoolExecutor extends AbstractExecutorService{
    //保存所有的執行線程(worker)
    HashSet<Worker> workers = new HashSet<Worker>();
    //存放待執行的任務,這塊具體由指定的隊列實現

    BlockingQueue<Runnable> workQueue;
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler){
    }
    //添加執行worker
    private boolean addWorker(Runnable firstTask, boolean core) {
        //這裡每次都會基礎校驗和cas校驗,防止並發無法創建線程,
        retry:
        for(;;){
            for(;;){
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
        try{
            //創建一個worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            try{
                //加鎖校驗,添加到workers集合中
                workers.add(w);
            }
            //添加成功,將對應的線程啟動,執行任務
            t.start();
        }finally{
             //失敗執行進行釋放資源
            addWorkerFailed(Worker w) 
        }
  
       
    }
    //Worker 是對任務和線程的封裝
    private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
        //線程啟動後會循環執行任務
        public void run() {
            runWorker(this);
        }

    }

    //循環執行
    final void runWorker(Worker w) {
        try{
            while (task != null || (task = getTask()) != null) {
                //執行前的可擴展點
                beforeExecute(wt, task);
                try{
                     //執行任務
                    task.run();
                }finally{
                    //執行後的可擴展點,這塊也把異常給吃瞭
                    afterExecute(task, thrown);
                }
            }
            //這裡會對執行的任務進行統計
        }finally{
             //異常或者是循環退出都會走這裡
             processWorkerExit(w, completedAbruptly);
        }
    }
    //獲取執行任務,此處決定runWorker的狀態
    private Runnable getTask() {
        //worker的淘汰策略:允許超時或者工作線程>核心線程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //滿足淘汰策略且...,就返回null,交由processWorkerExit去處理線程
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
        // 滿足淘汰策略,就等一定的時間poll(),不滿足,就一直等待take()
        Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();
    }
    //處理任務退出(循環獲取不到任務的時候)
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        //異常退出的,不能調整線程數的
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
        
        //不管成功或失敗,都執行以下邏輯
        //1,計數,2,減去一個線程

        completedTaskCount += w.completedTasks;
        //將線程移除,並不關心是否非核心
        workers.remove(w);
        //如果是還是運行狀態

        if (!completedAbruptly) {
            //正常終止的,處理邏輯
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            //核心線程為0 ,最小值也是1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            //總線程數大於min就不再添加
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }        //異常退出一定還會添加worker,正常退出一般不會再添加線程,除非核心線程數為0
        addWorker(null, false);
    }
    
}

圖片

這裡涉及到幾個點:

1,任務異常以後雖然有throw異常,但是外面有好幾個finally代碼;

2,在finally中,進行瞭任務的統計以及worker移除;

3,如果還有等待處理的任務,最少添加一個worker(不管核心線程數是否為0)

這裡會引申出來幾個面試題:

1, 線程池中核心線程數如何設置?

cpu密集型:一般為核心線程數+1,盡可能減少cpu的並行;

IO密集型:可以設置核心線程數稍微多些,將IO等待期間的空閑cpu充分利用起來。

2,線程池使用隊列的意義?

a)線程的資源是有限的,且線程的創建成本比較高;

b)  要保證cpu資源的合理利用(不能直接給cpu提一堆任務,cpu處理不過來,大傢都慢瞭)

c) 利用瞭削峰填谷的思想(保證任務執行的可用性);

d) 隊列過大也會把內存撐爆。

3,為什麼要用阻塞隊列?而不是非阻塞隊列?

a) 利用阻塞的特性,在沒有任務時阻塞一定的時間,防止資源被釋放(getTask和processWorkExit);

b) 阻塞隊列在阻塞時,CPU狀態是wait,等有任務時,會被喚醒,不會占用太多的資源;

線程池有兩個地方:

1,在execute方法中(提交任務時),隻要工作線程為0,就至少添加一個Worker;

2,在processWorkerExit中(正常或異常結束時),隻要有待處理的任務,就會增加Worker

所以正常情況下線程池一定會保證所有任務的執行。

我們在看下ThreadPoolExecutor中以下幾個方法

    public boolean prestartCoreThread() {
        return workerCountOf(ctl.get()) < corePoolSize &&
            addWorker(null, true);
    }
    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

    public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))
            ++n;
        return n;
    }

確保瞭核心線程數必須是滿的,這些方法特別是在批處理的時候,或者動態調整核心線程數的大小時很有用。

我們再看下Executors中常見的創建線程池的方法:

一、newFixedThreadPool 與newSingleThreadExecutor

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
     public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

特點:

1,核心線程數和最大線程數大小一樣(唯一不同的是,一個是1,一個是自定義);

2,隊列用的是LinkedBlockingQueue(長度是Integer.Max_VALUE)

當任務的生產速度大於消費速度後,很容易將系統內存撐爆。

二、 newCachedThreadPool 和

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

特點:最大線程數為Integer.MAX_VALUE

當任務提交過多時,線程創建過多容易導致無法創建

三、 newWorkStealingPool

    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

這個主要是並行度,默認為cpu的核數。

四、newScheduledThreadPool

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

封裝起來的要麼最大線程數不可控,要麼隊列長度不可控,所以阿裡規約裡也不建議使用Executors方法創建線程池。

ps:

生產上使用線程池,最好是將關鍵任務和非關鍵任務分開設立線程池,非關鍵業務影響關鍵業務的執行。

總結

到此這篇關於jdk8線程池的文章就介紹到這瞭,更多相關jdk8線程池內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: