Java 自定義線程池和線程總數控制操作

1 概述

池化是常見的思想,線程池是非常典型的池化的實現,《Java並發編程實戰》也大篇幅去講解瞭Java中的線程池。本文實現一個簡單的線程池。

2 核心類

【1】接口定義

public interface IThreadPool<Job extends Runnable> {
 /**
 * 關閉線程池
 */
 public void shutAlldown();
 
 /**
 * 執行任務
 * 
 * @param job 任務
 */
 public void execute(Job job);
 
 /**
 * 添加工作者
 * 
 * @param addNum 添加數
 */
 public void addWorkers(int addNum);
 
 /**
 * 減少工作者
 * 
 * @param reduceNum 減少數目
 */
 public void reduceWorkers(int reduceNum);
}

【2】實現類

線程池的核心是維護瞭1個任務列表和1個工作者列表。

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List; 
public class XYThreadPool<Job extends Runnable> implements IThreadPool<Job> { 
 // 默認線程數
 private static int DEAFAULT_SIZE = 5;
 // 最大線程數
 private static int MAX_SIZE = 10; 
 // 任務列表
 private LinkedList<Job> tasks = new LinkedList<Job>();
 // 工作線程列表
 private List<Worker> workers = Collections
  .synchronizedList(new ArrayList<Worker>()); 
 /**
 * 默認構造函數
 */
 public XYThreadPool() {
 initWokers(DEAFAULT_SIZE);
 } 
 /**
 * 執行線程數
 * 
 * @param threadNums 線程數
 */
 public XYThreadPool(int workerNum) {
 workerNum = workerNum <= 0 ? DEAFAULT_SIZE
  : workerNum > MAX_SIZE ? MAX_SIZE : workerNum;
 initWokers(workerNum);
 } 
 /**
 * 初始化線程池
 * 
 * @param threadNums 線程數
 */
 public void initWokers(int threadNums) {
 for (int i = 0; i < threadNums; i++) {
  Worker worker = new Worker();
  worker.start();
  workers.add(worker);
 }
 // 添加關閉鉤子
 Runtime.getRuntime().addShutdownHook(new Thread() {
  public void run() {
  shutAlldown();
  }
 });
 } 
 @Override
 public void shutAlldown() {
 for (Worker worker : workers) {
  worker.shutdown();
 }
 } 
 @Override
 public void execute(Job job) {
 synchronized (tasks) {
  // 提交任務就是將任務對象加入任務隊列,等待工作線程去處理
  tasks.addLast(job);
  tasks.notifyAll();
 }
 } 
 @Override
 public void addWorkers(int addNum) {
 // 新線程數必須大於零,並且線程總數不能大於最大線程數
 if ((workers.size() + addNum) <= MAX_SIZE && addNum > 0) {
  initWokers(addNum);
 } else {
  System.out.println("addNum too large");
 }
 } 
 @Override
 public void reduceWorkers(int reduceNum) {
 if ((workers.size() - reduceNum <= 0))
  System.out.println("thread num too small");
 else {
  // 暫停指定數量的工作者
  int count = 0;
  while (count != reduceNum) {
  for (Worker w : workers) {
   w.shutdown();
   count++;
  }
  }
 }
 } 
 /**
 * 工作線程
 */
 class Worker extends Thread { 
 private volatile boolean flag = true; 
 @Override
 public void run() {
  while (flag) {
  Job job = null;
  // 加鎖(若隻有一個woker可不必加鎖,那就是所謂的單線程的線程池,線程安全)
  synchronized (tasks) {
   // 任務隊列為空
   while (tasks.isEmpty()) {
   try {
    // 阻塞,放棄對象鎖,等待被notify喚醒
    tasks.wait();
    System.out.println("block when tasks is empty");
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
   }
   // 不為空取出任務
   job = tasks.removeFirst();
   System.out.println("get job:" + job + ",do biz");
   job.run();
  }
  }
 } 
 public void shutdown() {
  flag = false;
 }
 }
}

(1) 當調用wait()方法時線程會放棄對象鎖,進入等待此對象的等待鎖定池,隻有針對此對象調用notify()方法後本線程才進入對象鎖定池準備

(2) Object的方法:void notify(): 喚醒一個正在等待該對象的線程。void notifyAll(): 喚醒所有正在等待該對象的線程。

notifyAll使所有原來在該對象上等待被notify的線程統統退出wait狀態,變成等待該對象上的鎖,一旦該對象被解鎖,它們會去競爭。

notify隻是選擇一個wait狀態線程進行通知,並使它獲得該對象上的鎖,但不驚動其它同樣在等待被該對象notify的線程們,當第一個線程運行完畢以後釋放對象上的鎖,此時如果該對象沒有再次使用notify語句,即便該對象已經空閑,其他wait狀態等待的線程由於沒有得到該對象的通知,繼續處在wait狀態,直到這個對象發出一個notify或notifyAll,它們等待的是被notify或notifyAll,而不是鎖。

3 無需控制線程總數

每調用一次就會創建一個擁有10個線程工作者的線程池。

public class TestService1 {
 public static void main(String[] args) {
 // 啟動10個線程
 XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);
 pool.execute(new Runnable() {
  @Override
  public void run() {
  System.out.println("====1 test====");
  }
 }); 
 }
} 
public class TestService2 {
 public static void main(String[] args) {
 // 啟動10個線程
 XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);
 pool.execute(new Runnable() {
  @Override
  public void run() {
  System.out.println("====2 test====");
  }
 });
 }
}

4 控制線程總數

在項目中所有的線程調用,一般都共用1個固定工作者數大小的線程池。

import javax.annotation.PostConstruct;
import org.springframework.stereotype.Component;
import com.xy.pool.XYThreadPool; 
/**
 * 統一線程池管理類 
 */
@Component
public class XYThreadManager { 
 private XYThreadPool<Runnable> executorPool; 
 @PostConstruct
 public void init() {
 executorPool = new XYThreadPool<Runnable>(10);
 } 
 public XYThreadPool<Runnable> getExecutorPool() {
 return executorPool;
 }
} 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; 
@Service("testService3")
public class TestService3 { 
 @Autowired
 private XYThreadManager threadManager; 
 public void test() {
 threadManager.getExecutorPool().execute(new Runnable() {
  @Override
  public void run() {
  System.out.println("====3 test====");
  }
 });
 }
} 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; 
@Service("testService4")
public class TestService4 { 
 @Autowired
 private XYThreadManager threadManager; 
 public void test() {
 threadManager.getExecutorPool().execute(new Runnable() {
  @Override
  public void run() {
  System.out.println("====4 test====");
  }
 });
 }
} 
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext; 
public class TestMain { 
 @SuppressWarnings("resource")
 public static void main(String[] args) {
 ApplicationContext atc = new ClassPathXmlApplicationContext("applicationContext.xml"); 
 TestService3 t3 = (TestService3) atc.getBean("testService3");
 t3.test(); 
 TestService4 t4 = (TestService4) atc.getBean("testService4");
 t4.test();
 } 
}

補充:論如何優雅的自定義ThreadPoolExecutor線程池

前言

線程池想必大傢也都用過,JDK的Executors 也自帶一些線程池。但是不知道大傢有沒有想過,如何才是最優雅的方式去使用過線程池嗎? 生產環境要怎麼去配置自己的線程池才是合理的呢?

今天周末,剛好有時間來總結一下自己所認為的’優雅’, 如有問題歡迎大傢指正。

線程池使用規則

要使用好線程池,那麼一定要遵循幾個規則:

線程個數大小的設置

線程池相關參數配置

利用Hook嵌入你的行為

線程池的關閉

線程池配置相關

線程池大小的設置

這其實是一個面試的考點,很多面試官會問你線程池coreSize 的大小來考察你對於線程池的理解。

首先針對於這個問題,我們必須要明確我們的需求是計算密集型還是IO密集型,隻有瞭解瞭這一點,我們才能更好的去設置線程池的數量進行限制。

1、計算密集型:

顧名思義就是應用需要非常多的CPU計算資源,在多核CPU時代,我們要讓每一個CPU核心都參與計算,將CPU的性能充分利用起來,這樣才算是沒有浪費服務器配置,如果在非常好的服務器配置上還運行著單線程程序那將是多麼重大的浪費。對於計算密集型的應用,完全是靠CPU的核數來工作,所以為瞭讓它的優勢完全發揮出來,避免過多的線程上下文切換,比較理想方案是:

線程數 = CPU核數+1,也可以設置成CPU核數*2,但還要看JDK的版本以及CPU配置(服務器的CPU有超線程)。

一般設置CPU * 2即可。

2、IO密集型

我們現在做的開發大部分都是WEB應用,涉及到大量的網絡傳輸,不僅如此,與數據庫,與緩存間的交互也涉及到IO,一旦發生IO,線程就會處於等待狀態,當IO結束,數據準備好後,線程才會繼續執行。因此從這裡可以發現,對於IO密集型的應用,我們可以多設置一些線程池中線程的數量,這樣就能讓在等待IO的這段時間內,線程可以去做其它事,提高並發處理效率。那麼這個線程池的數據量是不是可以隨便設置呢?當然不是的,請一定要記得,線程上下文切換是有代價的。目前總結瞭一套公式,對於IO密集型應用:

線程數 = CPU核心數/(1-阻塞系數) 這個阻塞系數一般為0.8~0.9之間,也可以取0.8或者0.9。

套用公式,對於雙核CPU來說,它比較理想的線程數就是20,當然這都不是絕對的,需要根據實際情況以及實際業務來調整:final int poolSize = (int)(cpuCore/(1-0.9))

針對於阻塞系數,《Programming Concurrency on the JVM Mastering》即《Java 虛擬機並發編程》中有提到一句話:

對於阻塞系數,我們可以先試著猜測,抑或采用一些細嫩分析工具或java.lang.management API 來確定線程花在系統/IO操作上的時間與CPU密集任務所耗的時間比值。

線程池相關參數配置

說到這一點,我們隻需要謹記一點,一定不要選擇沒有上限限制的配置項。

這也是為什麼不建議使用Executors 中創建線程的方法。

比如,Executors.newCachedThreadPool的設置與無界隊列的設置因為某些不可預期的情況,線程池會出現系統異常,導致線程暴增的情況或者任務隊列不斷膨脹,內存耗盡導致系統崩潰和異常。 我們推薦使用自定義線程池來避免該問題,這也是在使用線程池規范的首要原則! 小心無大錯,千萬別過度自信!

可以看下Executors中四個創建線程池的方法:

//使用無界隊列
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                   0L, TimeUnit.MILLISECONDS,
                   new LinkedBlockingQueue<Runnable>());
  }
 
//線程池數量是無限的
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                   60L, TimeUnit.SECONDS,
                   new SynchronousQueue<Runnable>());
  }

其他的就不再列舉瞭,大傢可以自行查閱源碼。

第二,合理設置線程數量、和線程空閑回收時間,根據具體的任務執行周期和時間去設定,避免頻繁的回收和創建,雖然我們使用線程池的目的是為瞭提升系統性能和吞吐量,但是也要考慮下系統的穩定性,不然出現不可預期問題會很麻煩!

第三,根據實際場景,選擇適用於自己的拒絕策略。進行補償,不要亂用JDK支持的自動補償機制!盡量采用自定義的拒絕策略去進行兜底!

第四,線程池拒絕策略,自定義拒絕策略可以實現RejectedExecutionHandler接口。

JDK自帶的拒絕策略如下:

AbortPolicy:直接拋出異常阻止系統正常工作。

CallerRunsPolicy:隻要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務。

DiscardOldestPolicy:丟棄最老的一個請求,嘗試再次提交當前任務。

DiscardPolicy:丟棄無法處理的任務,不給予任何處理。

利用Hook

利用Hook,留下線程池執行軌跡:

ThreadPoolExecutor提供瞭protected類型可以被覆蓋的鉤子方法,允許用戶在任務執行之前會執行之後做一些事情。我們可以通過它來實現比如初始化ThreadLocal、收集統計信息、如記錄日志等操作。這類Hook如beforeExecute和afterExecute。另外還有一個Hook可以用來在任務被執行完的時候讓用戶插入邏輯,如rerminated 。

如果hook方法執行失敗,則內部的工作線程的執行將會失敗或被中斷。

我們可以使用beforeExecute和afterExecute來記錄線程之前前和後的一些運行情況,也可以直接把運行完成後的狀態記錄到ELK等日志系統。

關閉線程池

內容當線程池不在被引用並且工作線程數為0的時候,線程池將被終止。我們也可以調用shutdown來手動終止線程池。如果我們忘記調用shutdown,為瞭讓線程資源被釋放,我們還可以使用keepAliveTime和allowCoreThreadTimeOut來達到目的!

當然,穩妥的方式是使用虛擬機Runtime.getRuntime().addShutdownHook方法,手工去調用線程池的關閉方法!

線程池使用實例

線程池核心代碼:

public class AsyncProcessQueue { 
 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
 /**
 * Task 包裝類<br>
 * 此類型的意義是記錄可能會被 Executor 吃掉的異常<br>
 */
 public static class TaskWrapper implements Runnable {
 private static final Logger _LOGGER = LoggerFactory.getLogger(TaskWrapper.class); 
 private final Runnable gift; 
 public TaskWrapper(final Runnable target) {
  this.gift = target;
 } 
 @Override
 public void run() {
 
  // 捕獲異常,避免在 Executor 裡面被吞掉瞭
  if (gift != null) {
 
  try {
   gift.run();
  } catch (Exception e) {
   _LOGGER.error("Wrapped target execute exception.", e);
  }
  }
 }
 } 
 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
 /**
 * 執行指定的任務
 * 
 * @param task
 * @return
 */
 public static boolean execute(final Runnable task) {
 return AsyncProcessor.executeTask(new TaskWrapper(task));
 }
}
public class AsyncProcessor {
 static final Logger LOGGER = LoggerFactory.getLogger(AsyncProcessor.class);
 
 /**
 * 默認最大並發數<br>
 */
 private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;
 
 /**
 * 線程池名稱格式
 */
 private static final String THREAD_POOL_NAME = "ExternalConvertProcessPool-%d";
 
 /**
 * 線程工廠名稱
 */
 private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder().namingPattern(THREAD_POOL_NAME)
  .daemon(true).build();
 
 /**
 * 默認隊列大小
 */
 private static final int DEFAULT_SIZE = 500;
 
 /**
 * 默認線程存活時間
 */
 private static final long DEFAULT_KEEP_ALIVE = 60L;
 
 /**NewEntryServiceImpl.java:689
 * Executor
 */
 private static ExecutorService executor;
 
 /**
 * 執行隊列
 */
 private static BlockingQueue<Runnable> executeQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE); 
 static {
 // 創建 Executor
 // 此處默認最大值改為處理器數量的 4 倍
 try {
  executor = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
   TimeUnit.SECONDS, executeQueue, FACTORY); 
  // 關閉事件的掛鉤
  Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { 
  @Override
  public void run() {
   AsyncProcessor.LOGGER.info("AsyncProcessor shutting down."); 
   executor.shutdown(); 
   try { 
   // 等待1秒執行關閉
   if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
    AsyncProcessor.LOGGER.error("AsyncProcessor shutdown immediately due to wait timeout.");
    executor.shutdownNow();
   }
   } catch (InterruptedException e) {
   AsyncProcessor.LOGGER.error("AsyncProcessor shutdown interrupted.");
   executor.shutdownNow();
   } 
   AsyncProcessor.LOGGER.info("AsyncProcessor shutdown complete.");
  }
  }));
 } catch (Exception e) {
  LOGGER.error("AsyncProcessor init error.", e);
  throw new ExceptionInInitializerError(e);
 }
 } 
 /**
 * 此類型無法實例化
 */
 private AsyncProcessor() {
 } 
 /**
 * 執行任務,不管是否成功<br>
 * 其實也就是包裝以後的 {@link Executer} 方法
 * 
 * @param task
 * @return
 */
 public static boolean executeTask(Runnable task) { 
 try {
  executor.execute(task);
 } catch (RejectedExecutionException e) {
  LOGGER.error("Task executing was rejected.", e);
  return false;
 } 
 return true;
 } 
 /**
 * 提交任務,並可以在稍後獲取其執行情況<br>
 * 當提交失敗時,會拋出 {@link }
 * 
 * @param task
 * @return
 */
 public static <T> Future<T> submitTask(Callable<T> task) { 
 try {
  return executor.submit(task);
 } catch (RejectedExecutionException e) {
  LOGGER.error("Task executing was rejected.", e);
  throw new UnsupportedOperationException("Unable to submit the task, rejected.", e);
 }
 }
}

使用方式:

AsyncProcessQueue.execute(new Runnable() {
     @Override
     public void run() {
        //do something
    }
});

可以根據自己的使用場景靈活變更,我這裡並沒有用到beforeExecute和afterExecute以及拒絕策略。

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。如有錯誤或未考慮完全的地方,望不吝賜教。

推薦閱讀: