springboot線程池監控的簡單實現
背景
- 在我們實際項目開發中,常常會為不同的優先級的任務設置相對應的線程池。
- 一般我們隻關註相關池的相關參數如核心線程數據,最大線程數據等等參數,容易忽略瞭對線程池中實際運行情況的監控。
- 綜上所述:線程池如果相當於黑盒一樣在運行的話,對系統的不利的。本文提供瞭一種簡單獲取線程池運行狀態的方式,可以將詳情打印到日志或者對接到Prometheus上進行展示。
- 詳細有不少博主給出瞭動態修改線程的方式,但是由於生產環境是禁止,因此本文隻提供瞭監控的功能。
- 本代碼應用項目架構為springboot。
代碼
代碼類結構
- ThreadPoolMonitor:線程池擴展類
- ThreadPoolUtil:線程池工具類
- ThreadPoolDetailInfo:bean類
- ExecutorThreadPoolManager:線程池實現類
- ThreadPoolController:線程池測試方法
線程池擴展類
- 從類主要重寫瞭ThreadPoolExecutor類中的shutdown/shutdownNow/beforeExecute/afterExecute,用於對每個任務進行執行前後的攔截,計算出每個任務的運行時間。
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; import java.util.List; import java.util.concurrent.*; /** * @ClassName ThreadPoolMonitor * @authors kantlin * @Date 2021/12/16 17:45 **/ public class ThreadPoolMonitor extends ThreadPoolExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolMonitor.class); private final ConcurrentHashMap<String, Date> startTimes; private final String poolName; private long totalDiff; public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, String poolName) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); this.startTimes = new ConcurrentHashMap(); this.poolName = poolName; } @Override public void shutdown() { LOGGER.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", new Object[]{this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()}); super.shutdown(); } @Override public List<Runnable> shutdownNow() { LOGGER.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", new Object[]{this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()}); return super.shutdownNow(); } @Override protected void beforeExecute(Thread t, Runnable r) { this.startTimes.put(String.valueOf(r.hashCode()), new Date()); } @Override protected void afterExecute(Runnable r, Throwable t) { Date startDate = this.startTimes.remove(String.valueOf(r.hashCode())); Date finishDate = new Date(); long diff = finishDate.getTime() - startDate.getTime(); this.totalDiff += diff; } public long getTotalDiff() { return this.totalDiff; } }
線程工具類
import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; /** * @ClassName ThreadPoolUtil * @authors kantlin * @Date 2021/12/16 17:45 **/ @Component public class ThreadPoolUtil { private final HashMap<String, ThreadPoolMonitor> threadPoolExecutorHashMap = new HashMap(); public ThreadPoolUtil() { } public ThreadPoolMonitor creatThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,String poolName) { ThreadPoolMonitor threadPoolExecutor = new ThreadPoolMonitor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, poolName); this.threadPoolExecutorHashMap.put(poolName, threadPoolExecutor); return threadPoolExecutor; } public HashMap<String, ThreadPoolMonitor> getThreadPoolExecutorHashMap() { return this.threadPoolExecutorHashMap; }
線程bean類
import lombok.Data; @Data public class ThreadPoolDetailInfo { //線程池名字 private String threadPoolName; //當前線程池大小 private Integer poolSize; //線程池核心線程數量 private Integer corePoolSize; //線程池生命周期中最大線程數量 private Integer largestPoolSize; //線程池中允許的最大線程數 private Integer maximumPoolSize; //線程池完成的任務數目 private long completedTaskCount; //線程池中當前活躍個數 private Integer active; //線程池完成的任務個數 private long task; //線程最大空閑時間 private long keepAliveTime; //當前活躍線程的占比 private int activePercent; //任務隊列容量(阻塞隊列) private Integer queueCapacity; //當前隊列中任務的數量 private Integer queueSize; //線程池中任務平均執行時長 private long avgExecuteTime; public ThreadPoolDetailInfo(String threadPoolName, Integer poolSize, Integer corePoolSize, Integer largestPoolSize, Integer maximumPoolSize, long completedTaskCount, Integer active, long task, long keepAliveTime, int activePercent, Integer queueCapacity, Integer queueSize, long avgExecuteTime) { this.threadPoolName = threadPoolName; this.poolSize = poolSize; this.corePoolSize = corePoolSize; this.largestPoolSize = largestPoolSize; this.maximumPoolSize = maximumPoolSize; this.completedTaskCount = completedTaskCount; this.active = active; this.task = task; this.keepAliveTime = keepAliveTime; this.activePercent = activePercent; this.queueCapacity = queueCapacity; this.queueSize = queueSize; this.avgExecuteTime = avgExecuteTime; } }
線程池實現類
- 在我的項目中,將線程池依次劃分為high、normal、low、single四種線程池類型。不同優先級的任務將會被submit到不同的線程池中執行。
- 在業務中有判斷線程池中queue的長度來決定是否投遞任務,由於沒有相應的拒絕策略,所以隊列不設置長度。
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.*.newThread.ThreadPoolUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Component public class ExecutorThreadPoolManager { @Autowired private ThreadPoolUtil threadPoolUtil; @Value("${thread_pool_normal_level_thread_max_num}") private Integer normalLevelThreadPoolThreadMaxNum; @Value("${thread_pool_normal_level_core_thread_num}") private Integer normalLevelThreadPoolCoreThreadNum; @Value("${thread_pool_low_level_thread_max_num}") private Integer lowLevelThreadPoolThreadMaxNum; @Value("${thread_pool_low_level_core_thread_num}") private Integer lowLevelThreadPoolCoreThreadNum; private ThreadPoolExecutor normalThreadPoolExecutor; private ThreadPoolExecutor highPriorityExecutor; private ThreadPoolExecutor lowPriorityExecutor; private ThreadPoolExecutor singleThreadPoolExecutor; @PostConstruct public void initExecutor() { ThreadFactory normalThreadFactory = new ThreadFactoryBuilder().setNameFormat("normal_task_thread_%d").build(); normalThreadPoolExecutor = threadPoolUtil.creatThreadPool(normalLevelThreadPoolCoreThreadNum, normalLevelThreadPoolThreadMaxNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), normalThreadFactory,"normal_level_thread_pool"); ThreadFactory highPriorityThreadFactory = new ThreadFactoryBuilder().setNameFormat("high_priority_level_task_thread_%d").build(); highPriorityExecutor = threadPoolUtil.creatThreadPool(normalLevelThreadPoolCoreThreadNum, normalLevelThreadPoolThreadMaxNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), highPriorityThreadFactory,"high_level_thread_pool"); ThreadFactory lowPriorityThreadFactory = new ThreadFactoryBuilder().setNameFormat("low_priority_level_task_thread_%d").build(); lowPriorityExecutor = threadPoolUtil.creatThreadPool(lowLevelThreadPoolCoreThreadNum, lowLevelThreadPoolThreadMaxNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), lowPriorityThreadFactory,"low_level_thread_pool"); ThreadFactory singleFactory = new ThreadFactoryBuilder().setNameFormat("single_task_thread_%d").build(); singleThreadPoolExecutor =threadPoolUtil.creatThreadPool(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), singleFactory,"single_level_thread_pool"); } /** * @author kantlin * @date 2021/9/9 * @describe 定義三種線程池, 一般采集類的用低優, 正常業務的用中優, 用戶手動請求API的用高優線程池 **/ public ThreadPoolExecutor getNormalThreadPoolExecutor() { return normalThreadPoolExecutor; } public ThreadPoolExecutor getHighPriorityExecutor() { return highPriorityExecutor; } public ThreadPoolExecutor getLowPriorityExecutor() { return lowPriorityExecutor; } public ThreadPoolExecutor getSingleThreadPoolExecutor() { return singleThreadPoolExecutor; } }
線程池監控接口類
import com.alibaba.fastjson.JSONObject; import com.*.newThread.ThreadPoolDetailInfo; import com.*.newThread.ThreadPoolMonitor; import com.*.newThread.ThreadPoolUtil; import com.*.thread.ExecutorThreadPoolManager; import io.swagger.annotations.Api; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.web.bind.annotation.*; import java.math.BigDecimal; import java.text.NumberFormat; import java.util.*; import java.util.concurrent.TimeUnit; /** * @ClassName ThreadPoolController * @authors kantlin * @Date 2021/12/17 14:53 **/ @Api(description = "線程池監控接口") @RestController @RequestMapping(value = "api/threadpool") public class ThreadPoolController { private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolController.class); @Autowired private ExecutorThreadPoolManager threadPool; @Autowired private ThreadPoolUtil threadPoolUtil; @GetMapping(value = "/getThreadPools") private List<String> getThreadPools() { List<String> threadPools = new ArrayList(); if (!this.threadPoolUtil.getThreadPoolExecutorHashMap().isEmpty()) { Iterator var2 = this.threadPoolUtil.getThreadPoolExecutorHashMap().entrySet().iterator(); while (var2.hasNext()) { Map.Entry<String, ThreadPoolMonitor> entry = (Map.Entry) var2.next(); threadPools.add(entry.getKey()); } } return threadPools; } @GetMapping(value = "/getThreadPoolListInfo") @Scheduled(cron = "${thread.poll.status.cron}") private List<ThreadPoolDetailInfo> getThreadPoolListInfo() { List<ThreadPoolDetailInfo> detailInfoList = new ArrayList(); if (!this.threadPoolUtil.getThreadPoolExecutorHashMap().isEmpty()) { Iterator var2 = this.threadPoolUtil.getThreadPoolExecutorHashMap().entrySet().iterator(); while (var2.hasNext()) { Map.Entry<String, ThreadPoolMonitor> entry = (Map.Entry) var2.next(); ThreadPoolDetailInfo threadPoolDetailInfo = this.threadPoolInfo(entry.getValue(), (String) entry.getKey()); detailInfoList.add(threadPoolDetailInfo); } } LOGGER.info("Execute details of cuurent thread poll:{}", JSONObject.toJSONString(detailInfoList)); return detailInfoList; } private ThreadPoolDetailInfo threadPoolInfo(ThreadPoolMonitor threadPool, String threadPoolName) { BigDecimal activeCount = new BigDecimal(threadPool.getActiveCount()); BigDecimal maximumPoolSize = new BigDecimal(threadPool.getMaximumPoolSize()); BigDecimal result = activeCount.divide(maximumPoolSize, 2, 4); NumberFormat numberFormat = NumberFormat.getPercentInstance(); numberFormat.setMaximumFractionDigits(2); int queueCapacity = 0; return new ThreadPoolDetailInfo(threadPoolName, threadPool.getPoolSize(), threadPool.getCorePoolSize(), threadPool.getLargestPoolSize(), threadPool.getMaximumPoolSize(), threadPool.getCompletedTaskCount(), threadPool.getActiveCount(), threadPool.getTaskCount(), threadPool.getKeepAliveTime(TimeUnit.MILLISECONDS), new Double(result.doubleValue() * 100).intValue(), queueCapacity, threadPool.getQueue().size(), threadPool.getTaskCount() == 0L ? 0L : threadPool.getTotalDiff() / threadPool.getTaskCount()); } }
運行結果
- 上面controller中的方法除瞭可以通過接口進行暴露外,還設置瞭定時任務定期的打印到日志中。方便對系統狀態進行排查。
[ { "active": 0, "activePercent": 0, "avgExecuteTime": 0, "completedTaskCount": 0, "corePoolSize": 20, "keepAliveTime": 0, "largestPoolSize": 0, "maximumPoolSize": 20, "poolSize": 0, "queueCapacity": 0, "queueSize": 0, "task": 0, "threadPoolName": "high_level_thread_pool" }, { "active": 0, "activePercent": 0, "avgExecuteTime": 0, "completedTaskCount": 0, "corePoolSize": 33, "keepAliveTime": 0, "largestPoolSize": 0, "maximumPoolSize": 33, "poolSize": 0, "queueCapacity": 0, "queueSize": 0, "task": 0, "threadPoolName": "low_level_thread_pool" }, { "active": 0, "activePercent": 0, "avgExecuteTime": 371, "completedTaskCount": 14, "corePoolSize": 20, "keepAliveTime": 0, "largestPoolSize": 14, "maximumPoolSize": 20, "poolSize": 14, "queueCapacity": 0, "queueSize": 0, "task": 14, "threadPoolName": "normal_level_thread_pool" }, { "active": 0, "activePercent": 0, "avgExecuteTime": 0, "completedTaskCount": 0, "corePoolSize": 1, "keepAliveTime": 0, "largestPoolSize": 0, "maximumPoolSize": 1, "poolSize": 0, "queueCapacity": 0, "queueSize": 0, "task": 0, "threadPoolName": "single_level_thread_pool" } ]
到此這篇關於springboot線程池監控的簡單實現的文章就介紹到這瞭,更多相關springboot線程池監控內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- 簡單聊一聊Java線程池ThreadPoolExecutor
- 淺談Java ThreadPoolExecutor的使用
- Java並發編程面試之線程池
- 解決@Autowired註入空指針問題(利用Bean的生命周期)
- 淺談為什麼阿裡巴巴要禁用Executors創建線程池