簡單聊一聊Java線程池ThreadPoolExecutor

簡介

ThreadPoolExecutor是一個實現ExecutorService接口的線程池,ExecutorService是主要用來處理多線程任務的一個接口,通常比較簡單是用法是由Executors工廠類去創建。

線程池主要解決瞭兩個不同的問題:

  • 在執行大量異步任務時,為瞭能夠提高性能,通常會減少每個任務的調用開銷。
  • 提供瞭一系列多線程任務的管理方法,便於多任務執行時合理分配資源以及一些異常情況的處理。每個ThreadPoolExecutor還維護一些基本統計信息。例如:已完成任務的數量,當前獲得線程數等。

參數說明

ThreadPoolExecutor提供瞭幾個核心參數,方便開發人員根據具體場景合理分配線程資源。

  • corePoolSize:核心線程數,在線程池創建時就已初始化好的n個核心線程,即使線程空閑著也會一直保留在線程池中不被銷毀,除非調用線程池方法設置瞭java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(true)(允許核心線程超時銷毀)。
  • maximumPoolSize:線程池允許存在最大線程數。
  • keepAliveTime:當線程數大於核心線程數時,多餘的線程在執行任務結束後等待新任務的最大等待時間。
  • unitTimeUnit類型,是keepAliveTime多餘線程最大空餘時間單位。
  • workQueue:必須指定一個阻塞隊列,在線程池執行execute方法時新進來的任務在執行前都會保留到此隊列裡進入等待。
  • threadFactory:創建線程的工廠,默認采用Executors.defaultThreadFactory()創建線程。
  • handler:拒絕策略,當最大線程數已占滿,且隊列已滿,此時線程池將觸發拒絕策略,對新進來的任務做拒絕處理,具體的處理方案在後面詳細分析(默認使用java.util.concurrent.ThreadPoolExecutor.AbortPolicy直接拋出異常拒絕處理)。

註:maximumPoolSize如果大於corePoolSize,則多出的部分線程數隻有在阻塞隊列workQueue占滿時才會創建核心線程之外的線程去執行任務,如果我們設置的阻塞隊列為無界隊列(默認大小為Integer.MAX_VALUE),則隊列永遠無法占滿,就不會去創建額外的線程進行工作,一般情況如果任務數足夠,那麼也是在隊列大小還沒達到Integer.MAX_VALUE時就已經出現內存溢出瞭。Executors線程池工廠中的newFixedThreadPool()、newSingleThreadExecutor()方法就是使用瞭無界隊列LinkedBlockingQueue,防止內存溢出在日常開發過程中一般是不建議直接去使用Executors去創建線程池。

如何創建線程池

上面我們提到的可以使用Executors工廠直接創建線程池,但是Executors提供的創建線程池都是不可控的,我們還是得按自己的業務做好分析自定義一個線程池。

以下是線程池創建的一個案例:

@Slf4j
@Configuration
public class ThreadPoolConfig {

    @Value("${threadPool.corePoolSize:8}")
    private int corePoolSize;

    @Value("${threadPool.maximumPoolSize:16}")
    private int maximumPoolSize;

    @Value("${threadPool.keepAliveTime:60}")
    private int keepAliveTime;

    @Value("${threadPool.queueSize:99999}")
    private int queueSize;

    @Bean
    public ThreadPoolExecutor testExecutor() {
        LinkedBlockingQueue queue = new LinkedBlockingQueue(queueSize);
        return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, 
                TimeUnit.SECONDS, queue, getThreadFactory(), getRejectedExecutionHandler());
    }

    /**
     * 自定義線程池創建線程工廠,用於線程池創建線程的工廠
     * @return
     */
    private ThreadFactory getThreadFactory() {
        return new ThreadFactory() {

            @Override
            public Thread newThread(Runnable r) {
                log.info("===> Create new thread ...");
                return new Thread(r);
            }
        };
    }

    /**
     * 自定義拒絕策略,繼續往隊列裡添加任務進入等待
     * @return
     */
    private RejectedExecutionHandler getRejectedExecutionHandler() {
        return new RejectedExecutionHandler() {

            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // 繼續往隊列裡添加任務,這裡隻是一個案例,這種方式並不友好,會拋出隊列已滿的異常
                log.info("===> Handler runnable ......");
                executor.getQueue().add(r);
            }
        };
    }
}

application.properties配置文件

threadPool:
  corePoolSize: 8
  maximumPoolSize: 16
  keepAliveTime: 60
  # 為方便測試這裡我們配置隊列數小一點
  queueSize: 99

由以上的線程池配置,我們寫一個demo測試一下:

截取部分運行日志:

  • 紅框我們可以看到執行到瞭線程創建工廠部分代碼塊
  • 藍色框日志我們可以看到largestPoolSize=11,這是由於我們配置的maximumPoolSize=16 > corePoolSize=8,我們demo執行的是110個任務並發,隊列大小是99,由此分析得出(需要額外出創建線程數 = 並發任務總數110 – 核心線程數8 – 隊列大小99 = 3),所以線程池在隊列已滿時會多創建3個線程用於執行任務,在達到keepAliveTime配置的最大空閑時間後這3個線程即會自動銷毀。

註:可能有的同學會想線程池使用後需要銷毀嗎?在這裡補充一下,如果我們是作為局部變量創建出來的線程池(如:在執行的方法內使用Executors.newFixedThreadPool(10)創建臨時的線程池),這種情況我們用完就必須將它立即銷毀,否則主線程就會一直處於運行狀態。如果是全局配置的線程池,那麼就是為整個系統中諸多業務提供使用的,這種就不需要對線程池做銷毀,因為一旦銷毀瞭其他的任務就無法繼續使用該線程池執行任務。

  • 銷毀線程池主要有兩種方式:
    • shutdown():此方法對線程池做銷毀,線程池會優先將剩餘未完成的任務執行完才會執行銷毀任務。
    • shutdownNow():此方法會對線程池做立即銷毀,無論線程池中的任務是否執行完成。

拒絕策略

通常我們在配置好有限隊列大小後,就會有可能出現隊列占滿的情況,這時候我們的拒絕策略就會起到作用,接下來我們就來分析一下RejectedExecutionHandler接口具體有哪一些實現方式:

  • AbortPolicy:線程池的默認拒絕策略,在JDK提供的ThreadPoolExecutor線程池中有一個默認線程池變量private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();作為默認拒絕策略,查看如下圖源碼可知它就是直接拋出RejectedExecutionException異常,並且會直接丟棄當前任務,如果擔心異常影響後續任務執行開發人員需自行捕獲異常處理

  • CallerRunsPolicy:隻要在當線程池未被銷毀的情況下,不丟棄任務直接使用主線程(調用線程池執行的線程)執行該任務。因為該策略是由主線程直接執行任務的,所以不建議在並發度高的情況下使用,建議在並發度較低且任務不允許失敗的情況下才使用此策略

  • DiscardPolicy:直接丟棄當前任務,不做任何處理。直接丟棄任務的情況下,開發人員也無法排查到哪些任務被丟棄掉,一般不建議使用,除非是無關緊要的任務即使丟棄也無所謂的。

  • DiscardOldestPolicy:在線程池未被銷毀的情況下,丟棄最早進入隊列的一個任務(即最久未執行的任務),然後再重新將此任務加入線程池,在此策略下需註意被丟棄的任務的重要性,如果任務不重要可直接丟棄。

  • 自定義策略:在以上JDK提供的四種默認拒絕策略之外,我們還可以通過自定義的方式來處理被拒絕的任務。如果擔心任務被拒絕或者被丟棄造成不可預估的問題,在時效性沒有太大要求的情況下我們可以先將任務內容轉換成數據入庫做好日志記錄,後續可以使用定時任務或者通過MQ消息延遲處理。由以上的線程池配置Demo中的拒絕策略改造偽代碼如下:
private RejectedExecutionHandler getRejectedExecutionHandler() {
    return new RejectedExecutionHandler() {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            // 偽代碼
            log.info("===> 可根據任務的重要性區分對待,將任務做轉換入庫延遲處理 ......");
        }
    };
}

總結

線程池是為瞭充分利用CPU資源,在合理分批使用的情況下能夠極大的提高我們程序的性能,以上的參數配置僅作為參考,並沒有一個標準的依據,隻能在實際開發過程中開發人員自行多做一些測試來判斷參數如何配置更加合理。

在拒絕策略配置方面,如果被拒絕的任務相對緊急且重要不可丟棄的情況下,此類任務可獨立做一個線程池處理保證任務不丟失,程序隻能慢慢優化變得越來越好,不可能有完美的程序即保證高性能又保證安全可靠。

到此這篇關於Java線程池ThreadPoolExecutor的文章就介紹到這瞭,更多相關Java線程池ThreadPoolExecutor內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: