Java自定義線程池的實現示例
一、Java語言本身也是多線程,回顧Java創建線程方式如下:
1、繼承Thread類,(Thread類實現Runnable接口),來個類圖加深印象。
2、實現Runnable接口實現無返回值、實現run()方法,啥時候run,黑話瞭。
3、實現Callable接口重寫call()+FutureTask獲取.
public class CustomThread { public static void main(String[] args) { // 自定義線程 new Thread(new Runnable() { @Override public void run() { System.out.println("Custom Run"); System.out.println(Thread.currentThread().getName()); } },"custom-thread-1").start(); } }
4、基於線程池集中管理創建線程系列周期.【本篇文章重點介紹】
二、JDK線程池工具類.
1、Executors工具類,是JDK中Doug Lea大佬實現供開發者使用。
隨著JDK版本迭代逐漸加入瞭基於工作竊取算法的線程池瞭,阿裡編碼規范也推薦開發者自定義線程池,禁止生產直接使用Executos線程池工具類,因此很有可能造成OOM異常。同時在某些類型的線程池裡面,使用無界隊列還會導致maxinumPoolSize、keepAliveTime、handler等參數失效。因此目前在大廠的開發規范中會強調禁止使用Executors來創建線程池。這裡說道阻塞隊列。LinkedBlockingQueue。
2、自定義線程池工具類基於ThreadPoolExecutor實現,那個JDK封裝的線程池工具類也是基於這個ThreadPoolExecutor實現的。
public class ConstomThreadPool extends ThreadPoolExecutor{ /** * * @param corePoolSize 核心線程池 * @param maximumPoolSize 線程池最大數量 * @param keepAliveTime 線程存活時間 * @param unit TimeUnit * @param workQueue 工作隊列,自定義大小 * @param poolName 線程工廠自定義線程名稱 */ public ConstomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); setThreadFactory(new CustomThreadFactory(poolName, false)); } }
自定義線程工廠類,這樣線程命名有開發者控制實現瞭,這樣參數可以做到可配置化,生產環境可以供不同業務模塊使用,如果系統配置值不生效,就給一個默認值,更加滿足業務需要.
/** * 自定義線程工廠 */ public class CustomThreadFactory implements ThreadFactory { /** * 線程前綴,采用AtomicInteger實現線程編號線程安全自增 */ private final AtomicInteger atomicInteger = new AtomicInteger(1); /** * 線程命名前綴 */ private final String namePrefix; /** * 線程工廠創建的線程是否是守護線程 */ private final boolean isDaemon; public CustomThreadFactory(String prefix, boolean daemin) { if (StringUtils.isNoneBlank(prefix)) { this.namePrefix = prefix; } else { this.namePrefix = "thread_pool"; } // 是否是守護線程 isDaemon = daemin; } @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, namePrefix + "-" + atomicInteger.getAndIncrement()); thread.setDaemon(isDaemon); // 設置線程優先級 if (thread.getPriority() != Thread.NORM_PRIORITY) { thread.setPriority(Thread.NORM_PRIORITY); } return thread; } }
這裡Spring框架提供的自定義線程池工廠類,當然瞭一些開源包也會提供這樣的輪子,這個比較簡單瞭.
@SuppressWarnings("serial") public class CustomizableThreadFactory extends CustomizableThreadCreator implements ThreadFactory { /** * Create a new CustomizableThreadFactory with default thread name prefix. */ public CustomizableThreadFactory() { super(); } /** * Create a new CustomizableThreadFactory with the given thread name prefix. * @param threadNamePrefix the prefix to use for the names of newly created threads */ public CustomizableThreadFactory(String threadNamePrefix) { super(threadNamePrefix); } @Override public Thread newThread(Runnable runnable) { return createThread(runnable); } }
3、SpringBoot框架提供的自定義線程池,基於異步註解@Async名稱和一些業務自定義配置項,很好的實現瞭業務間線程池的隔離。
@Configuration public class ThreadPoolConfig { /** * * @return ThreadPoolTaskExecutor */ @Bean("serviceTaskA") public ThreadPoolTaskExecutor serviceTaskA() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(2); executor.setQueueCapacity(10); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("service-a"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } /** * * @return ThreadPoolTaskExecutor */ @Bean("serviceTaskB") public ThreadPoolTaskExecutor serviceTaskB() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(2); executor.setQueueCapacity(10); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("service-b"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } }
整體來看是Spring框架對JDK的線程池做瞭封裝,公開發者使用,畢竟框架嘛,肯定是把方便留給開發者。
4、並發流線程池。
List<String> list = new ArrayList<>(4); list.add("A"); list.add("B"); list.add("C"); list.add("D"); list.parallelStream().forEach(string -> { string = string + "paralleStream"; System.out.println(Thread.currentThread().getName()+":-> "+string); });
運行實例:
說明:並發流默認使用系統公共的線程池ForkJoinWorkerThread,供整個程序使用。
類圖如下,基於分治法,雙端竊取算法實現的一種線程池。
ForkJoin實現的瞭自己的線程工廠命名。
也可以自定義並發流線程,然後提交任務,一般並發流適用於短暫耗時業務,避免拖垮整個線程池業務.
5、實現一個基於系統公用線程池工具類,運行這個系統中的異步業務.
public final class CustomExecutors { /** * 核心線程數大小 */ private static final int CORE_POOL_SIZE=5; /** * 核心線程池大小 */ private static final int MAX_POOL_SIZE=10; /** * 線程存活時間 */ private static final int KEEP_ALIVE_TIME=60; /** * 工作隊列大小 */ private static final LinkedBlockingQueue queue=new LinkedBlockingQueue(100); /** * 自定義線程池名前綴 */ private static final String POOL_PREFIX_NAME="Custom-Common-Pool"; private CustomExecutors(){ //throw new XXXXException("un support create pool!"); } private static ConstomThreadPool constomThreadPool; /** * 靜態塊初始化隻執行一次,不關閉,整個系統公用一個線程池 */ static { constomThreadPool=new ConstomThreadPool(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME,TimeUnit.SECONDS,queue,POOL_PREFIX_NAME); } /** * 單例模式獲取線程池 * @return ExecutorService */ private static ExecutorService getInstance(){ return constomThreadPool; } private static Future<?> submit(Runnable task){ return constomThreadPool.submit(task); } private static <T> Future<T> submit(Runnable task, T result){ return constomThreadPool.submit(task,result); } private static <T> Future<T> submit(Callable<T> task){ return constomThreadPool.submit(task); } private static void execute(Runnable task){ constomThreadPool.execute(task); } }
三、業界知名自定義線程池擴展使用.
1、org.apache.tomcat.util.threads;【Tomcat線程池】
2、XXL-JOB分佈式任務調度框架的快慢線程池,線程池任務隔離.
public class JobTriggerPoolHelper { private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class); // ---------------------- trigger pool ---------------------- // fast/slow thread pool private ThreadPoolExecutor fastTriggerPool = null; private ThreadPoolExecutor slowTriggerPool = null; public void start(){ fastTriggerPool = new ThreadPoolExecutor( 10, XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()); } }); slowTriggerPool = new ThreadPoolExecutor( 10, XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()); } }); } public void stop() { //triggerPool.shutdown(); fastTriggerPool.shutdownNow(); slowTriggerPool.shutdownNow(); logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success."); } // job timeout count private volatile long minTim = System.currentTimeMillis()/60000; // ms > min private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>(); /** * add trigger */ public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam, final String addressList) { // choose thread pool ThreadPoolExecutor triggerPool_ = fastTriggerPool; AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId); if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min triggerPool_ = slowTriggerPool; } // trigger triggerPool_.execute(new Runnable() { @Override public void run() { long start = System.currentTimeMillis(); try { // do trigger XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { // check timeout-count-map long minTim_now = System.currentTimeMillis()/60000; if (minTim != minTim_now) { minTim = minTim_now; jobTimeoutCountMap.clear(); } // incr timeout-count-map long cost = System.currentTimeMillis()-start; if (cost > 500) { // ob-timeout threshold 500ms AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1)); if (timeoutCount != null) { timeoutCount.incrementAndGet(); } } } } }); } // ---------------------- helper ---------------------- private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper(); public static void toStart() { helper.start(); } public static void toStop() { helper.stop(); } /** * @param jobId * @param triggerType * @param failRetryCount * >=0: use this param * <0: use param from job info config * @param executorShardingParam * @param executorParam * null: use job param * not null: cover job param */ public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) { helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); } }
①、定義兩個線程池,一個是fastTriggerPool,另一個是slowTriggerPool。
②、定義一個容器ConcurrentMap,存放每個任務的執行慢次數,60秒後自動清空該容器。
③、在線程的run()方法中計算每個任務的耗時,如果大於500ms,則任務的慢執行次數+1。
3、基於線程池動態監控動態線程池
引用圖片,線程池常見問題
還有比較多啦,例如ES的基於JDK的線程池,Dubbo中等。
到此這篇關於Java自定義線程池的實現示例的文章就介紹到這瞭,更多相關Java自定義線程池內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Java如何固定大小的線程池
- SpringBoot實現線程池
- Spring Boot使用線程池處理上萬條數據插入功能
- Spring多線程的使用以及問題詳解
- 基於ThreadPoolTaskExecutor的使用說明