java項目中的多線程實踐記錄
項目開發中對於一些數據的處理需要用到多線程,比如文件的批量上傳,數據庫的分批寫入,大文件的分段下載等。 通常會使用spring自帶的線程池處理,做到對線程的定制化處理和更好的可控,建議使用自定義的線程池。 主要涉及到的幾個點:
1. 自定義線程工廠(ThreadFactoryBuilder),主要用於線程的命名,方便追蹤
2. 自定義的線程池(ThreadPoolExecutorUtils),可以按功能優化配置參數
3. 一個抽象的多線程任務處理接口(OperationThreadService)和通用實現(OperationThread)
4. 統一的調度實現(MultiThreadOperationUtils)
核心思想:分治歸並,每個線程計算出自己的結果,最後統一匯總。
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; /** * description: 自定義實現的線程池,遵循alibaba編程規范,使用ThreadPoolExecutor創建線程池使用 * 設置更有描述意義的線程名稱,默認的ThreadFactory,它給線程起名字大概規律就是pool-m-thread-n,如pool-1-thread-1。 * 當分析一個thread dump時,很難知道線程的目的,需要有描述意義的線程名稱來分析追蹤問題 * 設置線程是否是守護線程,默認的ThreadFactory總是提交非守護線程 * 設置線程優先級,默認ThreadFactory總是提交的一般優先級線程 * <p> * CustomThreadFactoryBuilder類實現瞭一種優雅的Builder Mechanism方式去得到一個自定義ThreadFactory實例。 * ThreadFactory接口中有一個接受Runnable類型參數的方法newThread(Runnable r), * 業務的factory邏輯就應該寫在這個方法中,去配置線程名稱、優先級、守護線程狀態等屬性。 * 原文鏈接:https://blog.csdn.net/zombres/article/details/80497515 * * @author Hlingoes * @date 2019/12/22 0:45 */ public class ThreadFactoryBuilder { private static Logger logger = LoggerFactory.getLogger(ThreadFactoryBuilder.class); private String nameFormat = null; private boolean daemon = false; private int priority = Thread.NORM_PRIORITY; public ThreadFactoryBuilder setNameFormat(String nameFormat) { if (nameFormat == null) { throw new NullPointerException(); } this.nameFormat = nameFormat; return this; } public ThreadFactoryBuilder setDaemon(boolean daemon) { this.daemon = daemon; return this; } public ThreadFactoryBuilder setPriority(int priority) { if (priority < Thread.MIN_PRIORITY) { throw new IllegalArgumentException(String.format( "Thread priority (%s) must be >= %s", priority, Thread.MIN_PRIORITY)); } if (priority > Thread.MAX_PRIORITY) { throw new IllegalArgumentException(String.format( "Thread priority (%s) must be <= %s", priority, Thread.MAX_PRIORITY)); } this.priority = priority; return this; } public ThreadFactory build() { return build(this); } private static ThreadFactory build(ThreadFactoryBuilder builder) { final String nameFormat = builder.nameFormat; final Boolean daemon = builder.daemon; final Integer priority = builder.priority; final AtomicLong count = new AtomicLong(0); return (Runnable runnable) -> { Thread thread = new Thread(runnable); if (nameFormat != null) { thread.setName(String.format(nameFormat, count.getAndIncrement())); } if (daemon != null) { thread.setDaemon(daemon); } thread.setPriority(priority); thread.setUncaughtExceptionHandler((t, e) -> { String threadName = t.getName(); logger.error("error occurred! threadName: {}, error msg: {}", threadName, e.getMessage(), e); }); return thread; }; } }
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.*; /** * description: 創建通用的線程池 * <p> * corePoolSize:線程池中核心線程數量 * maximumPoolSize:線程池同時允許存在的最大線程數量 * 內部處理邏輯如下: * 當線程池中工作線程數小於corePoolSize,創建新的工作線程來執行該任務,不管線程池中是否存在空閑線程。 * 如果線程池中工作線程數達到corePoolSize,新任務嘗試放入隊列,入隊成功的任務將等待工作線程空閑時調度。 * 1. 如果隊列滿並且線程數小於maximumPoolSize,創建新的線程執行該任務(註意:隊列中的任務繼續排序)。 * 2. 如果隊列滿且線程數超過maximumPoolSize,拒絕該任務 * <p> * keepAliveTime * 當線程池中工作線程數大於corePoolSize,並且線程空閑時間超過keepAliveTime,則這些線程將被終止。 * 同樣,可以將這種策略應用到核心線程,通過調用allowCoreThreadTimeout來實現。 * <p> * BlockingQueue * 任務等待隊列,用於緩存暫時無法執行的任務。分為如下三種堵塞隊列: * 1. 直接遞交,如SynchronousQueue,該策略直接將任務直接交給工作線程。如果當前沒有空閑工作線程,創建新線程。 * 這種策略最好是配合unbounded線程數來使用,從而避免任務被拒絕。但當任務生產速度大於消費速度,將導致線程數不斷的增加。 * 2. 無界隊列,如LinkedBlockingQueue,當工作的線程數達到核心線程數時,新的任務被放在隊列上。 * 因此,永遠不會有大於corePoolSize的線程被創建,maximumPoolSize參數失效。 * 這種策略比較適合所有的任務都不相互依賴,獨立執行。 * 但是當任務處理速度小於任務進入速度的時候會引起隊列的無限膨脹。 * 3. 有界隊列,如ArrayBlockingQueue,按前面描述的corePoolSize、maximumPoolSize、BlockingQueue處理邏輯處理。 * 隊列長度和maximumPoolSize兩個值會相互影響: * 長隊列 + 小maximumPoolSize。會減少CPU的使用、操作系統資源、上下文切換的消耗,但是會降低吞吐量, * 如果任務被頻繁的阻塞如IO線程,系統其實可以調度更多的線程。 * 短隊列 + 大maximumPoolSize。CPU更忙,但會增加線程調度的消耗. * 總結一下,IO密集型可以考慮多些線程來平衡CPU的使用,CPU密集型可以考慮少些線程減少線程調度的消耗 * * @author Hlingoes * @citation https://blog.csdn.net/wanghao112956/article/details/99292107 * @citation https://www.jianshu.com/p/896b8e18501b * @date 2020/2/26 0:46 */ public class ThreadPoolExecutorUtils { private static Logger logger = LoggerFactory.getLogger(ThreadFactoryBuilder.class); public static int defaultCoreSize = Runtime.getRuntime().availableProcessors(); private static int pollWaitingTime = 60; private static int defaultQueueSize = 10 * 1000; private static int defaultMaxSize = 4 * defaultCoreSize; private static String threadName = "custom-pool"; /** * description: 創建線程池 * * @param waitingTime * @param coreSize * @param maxPoolSize * @param queueSize * @return java.util.concurrent.ThreadPoolExecutor * @author Hlingoes 2020/4/12 */ public static ThreadPoolExecutor getExecutorPool(int waitingTime, int coreSize, int maxPoolSize, int queueSize) { pollWaitingTime = waitingTime; defaultCoreSize = coreSize; defaultMaxSize = maxPoolSize; defaultQueueSize = queueSize; return getExecutorPool(); } /** * description: 創建線程池 * * @param waitingTime * @param queueSize * @param maxPoolSize * @return java.util.concurrent.ThreadPoolExecutor * @author Hlingoes 2020/3/20 */ public static ThreadPoolExecutor getExecutorPool(int waitingTime, int queueSize, int maxPoolSize) { pollWaitingTime = waitingTime; defaultQueueSize = queueSize; defaultMaxSize = maxPoolSize; return getExecutorPool(); } /** * description: 創建線程池 * * @param waitingTime * @param queueSize * @return java.util.concurrent.ThreadPoolExecutor * @author Hlingoes 2020/3/20 */ public static ThreadPoolExecutor getExecutorPool(int waitingTime, int queueSize) { pollWaitingTime = waitingTime; defaultQueueSize = queueSize; return getExecutorPool(); } /** * description: 創建線程池 * * @param waitingTime * @return java.util.concurrent.ThreadPoolExecutor * @author Hlingoes 2020/3/20 */ public static ThreadPoolExecutor getExecutorPool(int waitingTime) { pollWaitingTime = waitingTime; return getExecutorPool(); } /** * description: 創建線程池 * * @param * @return java.util.concurrent.ThreadPoolExecutor * @author Hlingoes 2020/6/6 */ public static ThreadPoolExecutor getExecutorPool() { return getExecutorPool(threadName); } /** * description: 創建線程池 * * @param * @return java.util.concurrent.ThreadPoolExecutor * @author Hlingoes 2020/3/20 */ public static ThreadPoolExecutor getExecutorPool(String threadName) { ThreadFactory factory = new ThreadFactoryBuilder() .setNameFormat(threadName + "-%d") .build(); BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(defaultQueueSize); ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(defaultCoreSize, defaultMaxSize, 60, TimeUnit.SECONDS, queue, factory, (r, executor) -> { /** * 自定義的拒絕策略 * 當提交給線程池的某一個新任務無法直接被線程池中“核心線程”直接處理, * 又無法加入等待隊列,也無法創建新的線程執行; * 又或者線程池已經調用shutdown()方法停止瞭工作; * 又或者線程池不是處於正常的工作狀態; * 這時候ThreadPoolExecutor線程池會拒絕處理這個任務 */ if (!executor.isShutdown()) { logger.warn("ThreadPoolExecutor is over working, please check the thread tasks! "); } }) { /** * description: 針對提交給線程池的任務可能會拋出異常這一問題, * 可自行實現線程池的afterExecute方法,或者實現Thread的UncaughtExceptionHandler接口 * ThreadFactoryBuilder中已經實現瞭UncaughtExceptionHandler接口,這裡是為瞭進一步兼容 * * @param r * @param t * @return void * @author Hlingoes 2020/5/27 */ @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future<?>) { try { Future<?> future = (Future<?>) r; future.get(); } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } if (t != null) { logger.error("customThreadPool error msg: {}", t.getMessage(), t); } } }; /** * 備選方法,事先知道會有很多任務會提交給這個線程池,可以在初始化的時候完成核心線程的創建,提高系統性能 * 一個線程池創建出來之後,在沒有給它提交任何任務之前,這個線程池中的線程數為0 * 一個個去創建新線程開銷太大,影響系統性能 * 可以在創建線程池的時候就將所有的核心線程全部一次性創建完畢,系統起來之後就可以直接使用 */ poolExecutor.prestartAllCoreThreads(); return poolExecutor; } /** * description: 所有任務執行完之後,釋放線程池資源 * * @param pool * @return void * @author Hlingoes 2020/3/20 */ public static void closeAfterComplete(ThreadPoolExecutor pool) { /** * 當線程池調用該方法時,線程池的狀態則立刻變成SHUTDOWN狀態。 * 此時,則不能再往線程池中添加任何任務,否則將會拋出RejectedExecutionException異常。 * 但是,此時線程池不會立刻退出,直到添加到線程池中的任務都已經處理完成,才會退出。 * 唯一的影響就是不能再提交任務瞭,正則執行的任務即使在阻塞著也不會結束,在排隊的任務也不會取消。 */ pool.shutdown(); try { /** * awaitTermination方法可以設定線程池在關閉之前的最大超時時間, * 如果在超時時間結束之前線程池能夠正常關閉,這個方法會返回true,否則,一旦超時,就會返回false。 * 通常來說不可能無限制地等待下去,因此需要預估一個合理的超時時間,然後使用這個方法 */ if (!pool.awaitTermination(pollWaitingTime, TimeUnit.SECONDS)) { /** * 如果awaitTermination方法返回false,又希望盡可能在線程池關閉之後再做其他資源回收工作, * 可以考慮再調用一下shutdownNow方法, * 此時隊列中所有尚未被處理的任務都會被丟棄,同時會設置線程池中每個線程的中斷標志位。 * shutdownNow並不保證一定可以讓正在運行的線程停止工作,除非提交給線程的任務能夠正確響應中斷。 * 到瞭這一步,可以考慮繼續調用awaitTermination方法,或者直接放棄,去做接下來要做的事情。 */ pool.shutdownNow(); } } catch (InterruptedException e) { logger.error("ThreadPool overtime: {}", e.getMessage()); //(重新)丟棄所有尚未被處理的任務,同時會設置線程池中每個線程的中斷標志位 pool.shutdownNow(); // 保持中斷狀態 Thread.currentThread().interrupt(); } } }
import java.util.Arrays; /** * description: 分段參數 * * @author Hlingoes * @date 2020/5/22 23:50 */ public class PartitionElements { /** * 當前的分段任務索引 */ private long index; /** * 批量處理的任務個數 */ private long batchCounts; /** * 任務的分段個數 */ private long partitions; /** * 任務總數 */ private long totalCounts; private Object[] args; private Object data; public PartitionElements() { } public PartitionElements(long batchCounts, long totalCounts, Object[] args) { this.batchCounts = batchCounts; this.totalCounts = totalCounts; this.partitions = aquirePartitions(totalCounts, batchCounts); this.args = args; } public PartitionElements(long index, PartitionElements elements) { this.index = index; this.batchCounts = elements.getBatchCounts(); this.partitions = elements.getPartitions(); this.totalCounts = elements.getTotalCounts(); this.args = elements.getArgs(); } /** * description: 根據任務總量和單次任務處理量,計算任務個數 * * @param totalCounts * @param batchCounts * @return long partitions * @author Hlingoes 2020/5/23 */ public long aquirePartitions(long totalCounts, long batchCounts) { long partitions = totalCounts / batchCounts; if (totalCounts % batchCounts != 0) { partitions = partitions + 1; } // 兼容任務總數total = 1 的情況 if (partitions == 0) { partitions = 1; } return partitions; } public long getIndex() { return index; } public void setIndex(long index) { this.index = index; } public long getBatchCounts() { return batchCounts; } public void setBatchCounts(long batchCounts) { this.batchCounts = batchCounts; } public long getPartitions() { return partitions; } public void setPartitions(long partitions) { this.partitions = partitions; } public long getTotalCounts() { return totalCounts; } public void setTotalCounts(long totalCounts) { this.totalCounts = totalCounts; } public Object[] getArgs() { return args; } public void setArgs(Object[] args) { this.args = args; } public Object getData() { return data; } public void setData(Object data) { this.data = data; } @Override public String toString() { return "PartitionElements{" + "index=" + index + ", batchCounts=" + batchCounts + ", partitions=" + partitions + ", totalCounts=" + totalCounts + ", args=" + Arrays.toString(args) + '}'; } }
import cn.henry.study.common.bo.PartitionElements; /** * description: 業務分治歸並處理接口 * * @author Hlingoes 2020/5/22 */ public interface OperationThreadService { /** * description: 任務總量 * * @param args * @return long * @throws Exception * @author Hlingoes 2020/5/22 */ long count(Object[] args) throws Exception; /** * description: 在多線程分治任務之前的預處理方法,返回業務數據 * * @param args * @return Object * @throws Exception * @author Hlingoes 2020/5/23 */ Object prepare(Object[] args) throws Exception; /** * description: 多線程的任務邏輯 * * @param elements * @return java.lang.Object * @throws Exception * @author Hlingoes 2020/5/24 */ Object invoke(PartitionElements elements) throws Exception; /** * description: 多線程單個任務結束後的歸並方法 * * @param elements * @param object * @return void * @throws Exception * @author Hlingoes 2020/5/23 */ void post(PartitionElements elements, Object object) throws Exception; /** * description: 歸並結果之後的尾處理 * * @param object * @return java.lang.Object * @throws Exception * @author Hlingoes 2020/5/24 */ Object finished(Object object) throws Exception; }
import cn.henry.study.common.bo.PartitionElements; import cn.henry.study.common.service.OperationThreadService; import cn.henry.study.common.thread.OperationThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; /** * description: 多線程業務分治歸並處理 * * @author Hlingoes * @date 2020/5/22 0:42 */ public class MultiThreadOperationUtils { private static Logger logger = LoggerFactory.getLogger(MultiThreadOperationUtils.class); /** * description: 開啟多線程執行任務,按順序歸並處理任務結果 * 按照默認線程數,計算批量任務數 * * @param service * @param args * @return void * @author Hlingoes 2020/5/23 */ public static Object batchExecute(OperationThreadService service, Object[] args) throws Exception { long totalCounts = service.count(args); long batchCounts = totalCounts / ThreadPoolExecutorUtils.defaultCoreSize; // 兼容任務少於核心線程數的情況 if (batchCounts == 0) { batchCounts = 1L; } PartitionElements elements = new PartitionElements(batchCounts, totalCounts, args); return batchExecute(service, elements); } /** * description: 開啟多線程執行任務,按順序歸並處理任務結果 * 給定每頁顯示條目個數 * * @param service * @param batchCounts * @param args * @return void * @author Hlingoes 2020/5/23 */ public static Object batchExecute(OperationThreadService service, long batchCounts, Object[] args) throws Exception { long totalCounts = service.count(args); PartitionElements elements = new PartitionElements(batchCounts, totalCounts, args); return batchExecute(service, elements); } /** * description: 開啟多線程執行分治任務,按順序歸並處理任務結果 * * @param service * @param elements * @return void * @author Hlingoes 2020/5/23 */ private static Object batchExecute(OperationThreadService service, PartitionElements elements) throws Exception { ThreadPoolExecutor executor = ThreadPoolExecutorUtils.getExecutorPool(); // 在多線程分治任務之前的預處理方法,返回業務數據 final Object obj = service.prepare(elements.getArgs()); // 預防list和map的resize,初始化給定容量,可提高性能 ArrayList<Future<PartitionElements>> futures = new ArrayList<>((int) elements.getPartitions()); OperationThread opThread = null; Future<PartitionElements> future = null; // 添加線程任務 for (int i = 0; i < elements.getPartitions(); i++) { // 劃定任務分佈 opThread = new OperationThread(new PartitionElements(i + 1, elements), service); future = executor.submit(opThread); futures.add(future); } // 關閉線程池 executor.shutdown(); // 阻塞線程,同步處理數據 futures.forEach(f -> { try { // 線程單個任務結束後的歸並方法 service.post(f.get(), obj); } catch (Exception e) { logger.error("post routine fail", e); } }); return service.finished(obj); } }
import cn.henry.study.common.bo.PartitionElements; import cn.henry.study.common.service.OperationThreadService; import cn.henry.study.common.utils.MultiThreadOperationUtils; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; /** * description: 多線程的測試用例 * * @author Hlingoes * @date 2020/6/12 20:52 */ public class MultiThreadServiceTest implements OperationThreadService { private static Logger logger = LoggerFactory.getLogger(MultiThreadServiceTest.class); @Override public long count(Object[] args) throws Exception { return 100L; } @Override public Object prepare(Object[] args) throws Exception { return "success"; } @Override public Object invoke(PartitionElements elements) throws Exception { List<Object> list = new ArrayList<>((int) elements.getBatchCounts()); for (int i = 0; i < elements.getIndex(); i++) { list.add("test_" + i); } return list; } @Override public void post(PartitionElements elements, Object object) throws Exception { String insertSql = "insert into test (id) values "; StringBuilder sb = new StringBuilder(); List<Object> datas = (List<Object>) elements.getData(); for (int i = 0; i < datas.size(); i++) { if ((i + 1) % 5 == 0 || (i + 1) == datas.size()) { sb.append("('" + datas.get(i) + "')"); logger.info("{}: 測試insert sql: {}", elements, insertSql + sb.toString()); sb = new StringBuilder(); } else { sb.append("('" + datas.get(i) + "'),"); } } } @Override public Object finished(Object object) throws Exception { return object; } @Test public void testBatchExecute() { try { Object object = MultiThreadOperationUtils.batchExecute(new MultiThreadServiceTest(), 10, new Object[]{"test"}); logger.info("測試完成: {}", object.toString()); } catch (Exception e) { e.printStackTrace(); } } }
總結:這是一個抽象之後的多線程業務流程處理方式,已在生產環境使用,多線程的重點在業務分割和思想上,有清晰的責任劃分。
到此這篇關於java項目中的多線程實踐的文章就介紹到這瞭,更多相關java多線程實踐內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Java並發編程之Executor接口的使用
- 詳解Java ThreadPoolExecutor的拒絕策略
- 打印Java程序的線程棧信息方式
- 徹底搞懂Java多線程(三)
- Java redisTemplate阻塞式處理消息隊列