Java自定義線程池的實現示例

一、Java語言本身也是多線程,回顧Java創建線程方式如下:

1、繼承Thread類,(Thread類實現Runnable接口),來個類圖加深印象。

2、實現Runnable接口實現無返回值、實現run()方法,啥時候run,黑話瞭。

3、實現Callable接口重寫call()+FutureTask獲取.

public class CustomThread {
    public static void main(String[] args) {
        // 自定義線程
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("Custom Run");
                System.out.println(Thread.currentThread().getName());
            }
        },"custom-thread-1").start();
    }
}

4、基於線程池集中管理創建線程系列周期.【本篇文章重點介紹】

二、JDK線程池工具類.

1、Executors工具類,是JDK中Doug Lea大佬實現供開發者使用。

隨著JDK版本迭代逐漸加入瞭基於工作竊取算法的線程池瞭,阿裡編碼規范也推薦開發者自定義線程池,禁止生產直接使用Executos線程池工具類,因此很有可能造成OOM異常。同時在某些類型的線程池裡面,使用無界隊列還會導致maxinumPoolSize、keepAliveTime、handler等參數失效。因此目前在大廠的開發規范中會強調禁止使用Executors來創建線程池。這裡說道阻塞隊列。LinkedBlockingQueue。

2、自定義線程池工具類基於ThreadPoolExecutor實現,那個JDK封裝的線程池工具類也是基於這個ThreadPoolExecutor實現的。

public class ConstomThreadPool extends ThreadPoolExecutor{
    /**
     *
     * @param corePoolSize 核心線程池
     * @param maximumPoolSize 線程池最大數量
     * @param keepAliveTime 線程存活時間
     * @param unit TimeUnit
     * @param workQueue 工作隊列,自定義大小
     * @param poolName 線程工廠自定義線程名稱
     */
    public ConstomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        setThreadFactory(new CustomThreadFactory(poolName, false));
    }
}

 自定義線程工廠類,這樣線程命名有開發者控制實現瞭,這樣參數可以做到可配置化,生產環境可以供不同業務模塊使用,如果系統配置值不生效,就給一個默認值,更加滿足業務需要.

/**
 * 自定義線程工廠
 */
public class CustomThreadFactory implements ThreadFactory {
    /**
     * 線程前綴,采用AtomicInteger實現線程編號線程安全自增
     */
    private final AtomicInteger atomicInteger = new AtomicInteger(1);
    /**
     * 線程命名前綴
     */
    private final String namePrefix;
    /**
     * 線程工廠創建的線程是否是守護線程
     */
    private final boolean isDaemon;
 
    public CustomThreadFactory(String prefix, boolean daemin) {
        if (StringUtils.isNoneBlank(prefix)) {
            this.namePrefix = prefix;
        } else {
            this.namePrefix = "thread_pool";
        }
        // 是否是守護線程
        isDaemon = daemin;
    }
 
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r, namePrefix + "-" + atomicInteger.getAndIncrement());
        thread.setDaemon(isDaemon);
        // 設置線程優先級
        if (thread.getPriority() != Thread.NORM_PRIORITY) {
            thread.setPriority(Thread.NORM_PRIORITY);
        }
        return thread;
    }
}

 這裡Spring框架提供的自定義線程池工廠類,當然瞭一些開源包也會提供這樣的輪子,這個比較簡單瞭.

@SuppressWarnings("serial")
public class CustomizableThreadFactory extends CustomizableThreadCreator implements ThreadFactory {
 
	/**
	 * Create a new CustomizableThreadFactory with default thread name prefix.
	 */
	public CustomizableThreadFactory() {
		super();
	}
 
	/**
	 * Create a new CustomizableThreadFactory with the given thread name prefix.
	 * @param threadNamePrefix the prefix to use for the names of newly created threads
	 */
	public CustomizableThreadFactory(String threadNamePrefix) {
		super(threadNamePrefix);
	}
 
 
	@Override
	public Thread newThread(Runnable runnable) {
		return createThread(runnable);
	}
 
}

 3、SpringBoot框架提供的自定義線程池,基於異步註解@Async名稱和一些業務自定義配置項,很好的實現瞭業務間線程池的隔離。

@Configuration
public class ThreadPoolConfig {
    /**
     * 
     * @return ThreadPoolTaskExecutor
     */
    @Bean("serviceTaskA")
    public ThreadPoolTaskExecutor serviceTaskA() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(10);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("service-a");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
 
    /**
     * 
     * @return ThreadPoolTaskExecutor
     */
    @Bean("serviceTaskB")
    public ThreadPoolTaskExecutor serviceTaskB() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(10);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("service-b");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
}

整體來看是Spring框架對JDK的線程池做瞭封裝,公開發者使用,畢竟框架嘛,肯定是把方便留給開發者。

4、並發流線程池。

        List<String> list = new ArrayList<>(4);
        list.add("A");
        list.add("B");
        list.add("C");
        list.add("D");
        list.parallelStream().forEach(string -> {
            string = string + "paralleStream";
            System.out.println(Thread.currentThread().getName()+":-> "+string);
        });

運行實例:

說明:並發流默認使用系統公共的線程池ForkJoinWorkerThread,供整個程序使用。

 類圖如下,基於分治法,雙端竊取算法實現的一種線程池。

 ForkJoin實現的瞭自己的線程工廠命名。

 也可以自定義並發流線程,然後提交任務,一般並發流適用於短暫耗時業務,避免拖垮整個線程池業務.

5、實現一個基於系統公用線程池工具類,運行這個系統中的異步業務.

public final class CustomExecutors  {
    /**
     * 核心線程數大小
     */
    private static final int CORE_POOL_SIZE=5;
    /**
     * 核心線程池大小
     */
    private static final int MAX_POOL_SIZE=10;
    /**
     * 線程存活時間
     */
    private static final int KEEP_ALIVE_TIME=60;
    /**
     * 工作隊列大小
     */
    private static final LinkedBlockingQueue queue=new LinkedBlockingQueue(100);
    /**
     * 自定義線程池名前綴
     */
    private static final String POOL_PREFIX_NAME="Custom-Common-Pool";
 
    private CustomExecutors(){
        //throw new XXXXException("un support create pool!");
    }
 
    private static ConstomThreadPool constomThreadPool;
 
    /**
     * 靜態塊初始化隻執行一次,不關閉,整個系統公用一個線程池
     */
    static {
        constomThreadPool=new ConstomThreadPool(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME,TimeUnit.SECONDS,queue,POOL_PREFIX_NAME);
    }
 
    /**
     *  單例模式獲取線程池
     * @return ExecutorService
     */
    private static ExecutorService getInstance(){
        return constomThreadPool;
    }
 
    private static Future<?> submit(Runnable task){
       return constomThreadPool.submit(task);
    }
 
    private static <T> Future<T> submit(Runnable task, T result){
        return constomThreadPool.submit(task,result);
    }
 
    private static <T> Future<T> submit(Callable<T> task){
        return constomThreadPool.submit(task);
    }
 
    private static void execute(Runnable task){
        constomThreadPool.execute(task);
    }
}

三、業界知名自定義線程池擴展使用.

1、org.apache.tomcat.util.threads;【Tomcat線程池】

 2、XXL-JOB分佈式任務調度框架的快慢線程池,線程池任務隔離.

public class JobTriggerPoolHelper {
    private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);
 
 
    // ---------------------- trigger pool ----------------------
 
    // fast/slow thread pool
    private ThreadPoolExecutor fastTriggerPool = null;
    private ThreadPoolExecutor slowTriggerPool = null;
 
    public void start(){
        fastTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
                    }
                });
 
        slowTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
                    }
                });
    }
 
 
    public void stop() {
        //triggerPool.shutdown();
        fastTriggerPool.shutdownNow();
        slowTriggerPool.shutdownNow();
        logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
    }
 
 
    // job timeout count
    private volatile long minTim = System.currentTimeMillis()/60000;     // ms > min
    private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();
 
 
    /**
     * add trigger
     */
    public void addTrigger(final int jobId,
                           final TriggerTypeEnum triggerType,
                           final int failRetryCount,
                           final String executorShardingParam,
                           final String executorParam,
                           final String addressList) {
 
        // choose thread pool
        ThreadPoolExecutor triggerPool_ = fastTriggerPool;
        AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
        if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
            triggerPool_ = slowTriggerPool;
        }
 
        // trigger
        triggerPool_.execute(new Runnable() {
            @Override
            public void run() {
 
                long start = System.currentTimeMillis();
 
                try {
                    // do trigger
                    XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                } finally {
 
                    // check timeout-count-map
                    long minTim_now = System.currentTimeMillis()/60000;
                    if (minTim != minTim_now) {
                        minTim = minTim_now;
                        jobTimeoutCountMap.clear();
                    }
 
                    // incr timeout-count-map
                    long cost = System.currentTimeMillis()-start;
                    if (cost > 500) {       // ob-timeout threshold 500ms
                        AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                        if (timeoutCount != null) {
                            timeoutCount.incrementAndGet();
                        }
                    }
 
                }
 
            }
        });
    }
 
 
 
    // ---------------------- helper ----------------------
 
    private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
 
    public static void toStart() {
        helper.start();
    }
    public static void toStop() {
        helper.stop();
    }
 
    /**
     * @param jobId
     * @param triggerType
     * @param failRetryCount
     * 			>=0: use this param
     * 			<0: use param from job info config
     * @param executorShardingParam
     * @param executorParam
     *          null: use job param
     *          not null: cover job param
     */
    public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
        helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
    }
 
}

①、定義兩個線程池,一個是fastTriggerPool,另一個是slowTriggerPool。
②、定義一個容器ConcurrentMap,存放每個任務的執行慢次數,60秒後自動清空該容器。
③、在線程的run()方法中計算每個任務的耗時,如果大於500ms,則任務的慢執行次數+1。

 3、基於線程池動態監控動態線程池 

引用圖片,線程池常見問題

 還有比較多啦,例如ES的基於JDK的線程池,Dubbo中等。

到此這篇關於Java自定義線程池的實現示例的文章就介紹到這瞭,更多相關Java自定義線程池內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: