Java並發線程池實例分析講解

一.為什麼要用線程池

先來看個簡單的例子

1.直接new Thread的情況:

   public static void main(String[] args) throws InterruptedException {
        long start = System.currentTimeMillis();
        final List<Integer> list = new ArrayList<>();
        final Random random = new Random();
        for (int i = 0; i < 100000; i++) {
            Thread thread = new Thread() {
                @Override
                public void run() {
                    list.add(random.nextInt());
                }
            };
            thread.start();
            thread.join();
        }
        System.out.println("執行時間:" + (System.currentTimeMillis() - start));
        System.out.println("執行大小:" + list.size());
    }

執行時間:6437

執行大小:100000

2.使用線程池時

  public static void main(String[] args) throws InterruptedException {
        long start = System.currentTimeMillis();
        final List<Integer> list = new ArrayList<>();
        final Random random = new Random();
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 100000; i++) {
            executorService.execute(()->{
                list.add(random.nextInt());
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("執行時間:" + (System.currentTimeMillis() - start));
        System.out.println("執行大小:" + list.size());
    }

執行時間:82

執行大小:100000

從執行時間可以看出來,使用線程池的效率要遠遠超過直接new Thread。

二.線程池的好處

  • 降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。
  • 提高響應速度。當任務到達時,任務可以不需要的等到線程創建就能立即執行。
  • 提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。

三.原理解析

四.4種線程池

1.newCachedThreadPool

  public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

特點:newCachedThreadPool會創建一個可緩存線程池,如果當前線程池的長度超過瞭處理的需要時,可以靈活的回收空閑的線程,當需要增加時,它可以靈活的添加新的線程,而不會對線程池的長度作任何限制。

因為其最大線程數是Integer.MAX_VALUE,若新建的線程數多瞭,會超過機器的可用內存而OOM,但是因為其不是無界隊列,所以在OOM之前一般會CPU 100%。

2.newFixedThreadPool

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

該方法會創建一個固定長度的線程池,控制最大並發數,超出的線程會在隊列中等待,因為線程的數量是固定的,但是阻塞隊列是無界的,如果請求數較多時,會造成阻塞隊列越來越長,超出可用內存 進而OOM,所以要根據系統資源設置線程池的大小。Runtime.getRuntime().availableProcessors()

3.newSingleThreadExecutor

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

會創建一個單一的線程,前一個任務執行完畢才會執行下一個線程,FIFO,保證順序執行。但是高並發下不太適用

4.newScheduledThreadPool

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

創建一個固定長度的線程池,而且支持定時的以及周期性的任務執行,所有任務都是串行執行的,同一時間隻能有一個任務在執行,前一個任務的延遲或異常都將會影響到之後的任務。

阿裡規范中不推薦使用以上線程池,推薦使用自定義的線程池,當然如果你的項目中的數量級比較小的話那到沒什麼影響。

自定義線程池:

 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20,
                0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10),new MonkeyRejectedExecutionHandler());

執行優先級 : 核心線程>非核心線程>隊列

提交優先級 : 核心線程>隊列>非核心線程

五.線程池處理流程

流程圖:

六.源碼分析

流程圖

ThreadPoolExecutor的execute方法

public void execute(Runnable command) {
	if (command == null)
            throw new NullPointerException();
 	int c = ctl.get();
 	//1.判斷線程數是否小於核心線程數,如果是則使用入參任務通過addWorker方法創建一個新的線程,如果能完成新線程創建execute方法結束,成功提交任務
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //2.在第一步沒有完成任務提交;狀態為運行並且能成功加入任務到工作隊列後,再進行一次check,如果狀態在任務加入隊列後變為瞭非運行(有可能是在執行到這裡線程池shtdown瞭),非運行狀態下當然是需要reject;
    // offer和add方法差不多,add方法就是調用的offer,隻不過比offer多拋出一個異常 throw new IllegalStateException("Queue full")
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
            //3.判斷當前工作線程池數是否為0,如果是創建一個null任務,任務在堵塞隊列存在瞭就會從隊列中取出這樣做的意義是保證線程池在running狀態必須有一個任務在執行
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //4.如果不能加入任務到工作隊列,將嘗試使用任務新增一個線程,如果失敗,則是線程池已經shutdown或者線程池已經達到飽和狀態,所以reject.拒絕策略不僅僅是在飽和狀態下使用,在線程池進入到關閉階段同樣需要使用到;
    else if (!addWorker(command, false))
        reject(command);
 	}
}

再進入到addWork方法

private boolean addWorker(Runnable firstTask, boolean core) {
		// goto寫法 重試
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                   //線程狀態非運行並且非shutdown狀態任務為空,隊列非空就不能新增線程瞭
                return false;
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    //當前線程達到瞭最大閾值 就不再新增線程瞭
                    return false;
                if (compareAndIncrementWorkerCount(c))
                	//ctl+1工作線程池數量+1如果成功 就跳出死循環
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                //進來的狀態和此時的狀態發生改變重頭開始重試
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);//內部類封裝瞭線程和任務 通過threadfactory創建線程
            //毎一個worker就是一個線程數
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                   //重新獲取線程狀態
                    int rs = runStateOf(ctl.get());
					// 狀態小於shutdown 就是running狀態 或者 為shutdown並且firstTask為空是從隊列中處理      任務那就可以放到集合中
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                       // 線程還沒start就是alive就直接異常
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                        // 記錄最大線程數
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
            //失敗回退從wokers移除w線程數減1嘗試結束線程池
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;
        /** Thread this worker is running in.  Null if factory fails. */
        //正在運行woker線程
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        //傳入的任務
        Runnable firstTask;
        /** Per-thread task counter */
        //完成的任務數監控用
        volatile long completedTasks;
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            //禁止線程中斷
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

再來看runworker方法

final void runWorker(Worker w) {
		//獲取當前線程
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts 把state從‐1改為0意思是可以允許中斷
        boolean completedAbruptly = true;
        try {
        	//task不為空或者阻塞隊列中拿到瞭任務
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //如果當前線程池狀態等於stop就中斷
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                  //這設置為空等下次循環就會從隊列裡面獲取 
                    task = null;
                    //完成任務數+1
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

獲取任務的方法

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//獲取線程池運行狀態
            // Check if queue empty only if necessary.
            //shutdown或者為空那就工作線程‐1同時返回為null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
			//重新獲取工作線程數
            int wc = workerCountOf(c);
            // Are workers subject to culling?
            // timed是標志超時銷毀 核心線程池也是可以銷毀的
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

runWorker中的processWorkerExit

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

ThreadPoolExecutor內部有實現4個拒絕策略:(1)、

  • CallerRunsPolicy,由調用execute方法提交任務的線程來執行這個任務;
  • AbortPolicy,拋出異常RejectedExecutionException拒絕提交任務;
  • DiscardPolicy,直接拋棄任務,不做任何處理;
  • DiscardOldestPolicy,去除任務隊列中的第一個任務(最舊的),重新提交

ScheduledThreadPoolExecutor

  • schedule:延遲多長時間之後隻執行一次;
  • scheduledAtFixedRate固定:延遲指定時間後執行一次,之後按照固定的時長周期執行;
  • scheduledWithFixedDelay非固定:延遲指定時間後執行一次,之後按照:上一次任務執行時長+周期的時長的時間去周期執行;
   private void delayedExecute(RunnableScheduledFuture<?> task) {
   		//如果線程池不是RUNNING狀態,則使用拒絕策略把提交任務拒絕掉
        if (isShutdown())
            reject(task);
        else {
        //與ThreadPoolExecutor不同,這裡直接把任務加入延遲隊列
            super.getQueue().add(task);
            //如果當前狀態無法執行任務,則取消
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
            //和ThreadPoolExecutor不一樣,corePoolSize沒有達到會增加Worker;
            //增加Worker,確保提交的任務能夠被執行
                ensurePrestart();
        }
    }

add方法裡其實是調用瞭offer方法

public boolean add(Runnable e) {
            return offer(e);
        }
public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                //容量擴增50%
                    grow();
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                //插入堆尾
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                //如果新加入的元素成為瞭堆頂,則原先的leader就無效瞭
                    leader = null;
               //由於原先leader已經無效被設置為null瞭,這裡隨便喚醒一個線程(未必是原先的leader)來取走堆頂任務
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

siftup方法:主要是對隊列進行排序

 private void siftUp(int k, RunnableScheduledFuture<?> key) {
            while (k > 0) {
            //獲取父節點
                int parent = (k - 1) >>> 1;
                RunnableScheduledFuture<?> e = queue[parent];
                //如果key節點的執行時間大於父節點的執行時間,不需要再排序瞭
                if (key.compareTo(e) >= 0)
                    break;
                    //如果key.compareTo(e)<0,說明key節點的執行時間小於父節點的執行時間,需要把父節點移到後面
                queue[k] = e;
                setIndex(e, k);
                //設置索引為k
                k = parent;
            }
            //key設置為排序後的位置中
            queue[k] = key;
            setIndex(key, k);
        }

run方法:

public void run() {
			//是否周期性,就是判斷period是否為0
            boolean periodic = isPeriodic();
            //檢查任務是否可以被執行
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            //如果非周期性任務直接調用run運行即可
            else if (!periodic)
                ScheduledFutureTask.super.run();
            //如果成功runAndRest,則設置下次運行時間並調用reExecutePeriodic
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
            //需要重新將任務(outerTask)放到工作隊列中。此方法源碼會在後文介紹ScheduledThreadPoolExecutor本身API時提及
                reExecutePeriodic(outerTask);
            }
        }
		private void setNextRunTime() {
            long p = period;
            //fixed‐rate模式,時間設置為上一次時間+p,這裡的時間隻是可以被執行的最小時間,不代表到點就要執行
            if (p > 0)
                time += p;
            else
            //fixed‐delay模式,計算下一次任務可以被執行的時間, 差不多就是當前時間+delay值
                time = triggerTime(-p);
        }
        long triggerTime(long delay) {
        //如果delay<Long.Max_VALUE/2,則下次執行時間為當前時間+delay,否則為瞭避免隊列中出現由於溢出導致的排序紊亂,需要調用overflowFree來修正一下delay
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }
    /**
    * 主要就是有這麼一種情況:
    * 工作隊列中維護任務順序是基於compareTo的,在compareTo中比較兩個任務的順序會用time相減,負數則說明優先級高,那麼就有可能出現一個delay為正數,減去另一個為負數的delay,結果上溢為負數,則會導致compareTo產生錯誤的結果.
    * 為瞭特殊處理這種情況,首先判斷一下隊首的delay是不是負數,如果是正數不用管瞭,怎麼減都不會溢出。
    * 否則可以拿當前delay減去隊首的delay來比較看,如果不出現上溢,則整個隊列都ok,排序不會亂。
    * 不然就把當前delay值給調整為Long.MAX_VALUE+隊首delay
    /
   private long overflowFree(long delay) {
        Delayed head = (Delayed) super.getQueue().peek();
        if (head != null) {
            long headDelay = head.getDelay(NANOSECONDS);
            if (headDelay < 0 && (delay - headDelay < 0))
                delay = Long.MAX_VALUE + headDelay;
        }
        return delay;
    }

到此這篇關於Java並發線程池實例分析講解的文章就介紹到這瞭,更多相關Java並發線程池內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: