Springboot 如何使用@Async整合線程池

Springboot @Async整合線程池

開篇咱們先來聊聊線程池這個概念,或者說為什麼要使用線程池;簡言之,充分利用cpu資源,提高程序執行時間,但是相反,線程池異常提示、主線程和子線程事務問題也是顯而易見的。

那麼@Async這個註解又是什麼做用呢?其實就是標識方法為異步任務的一個註解,默認會自己維護一個線程池(存在弊端),利用子線程去執行任務;那麼如果把這兩者結合的話,線程池+Async又會有什麼效果呢!

循序漸進

提到線程池,可以采用Executors提供四種線程池下,使用某些特性的場景下,還是不錯的(簡便省事),當然此篇文章就不進行描述,我們一塊來看看自定義線程池的配置,具體參數自行查閱,需要註意“核心線程數”、“最大線程數”、“線程隊列數值”配置:

自定義線程池

接下來便要將Async和線程池一塊使用瞭,使用之前請先到springboot啟動類上加上@EnableAsync註解,具體使用方式如下:

此時便是開始在主線程裡面通過多線程來使用異步任務,不過此時也需要分情況考慮,這裡便提前和大傢一塊說瞭。

1:異步任務是否和主線程有關聯,簡言之,就是兩者之間是否相互不影響?

2:如果不影響的話,主線程和異步任務的子線程直接使用便可以;

3:如果影響的話,主線程是要等待子線程執行完的結果的,此時便可以考慮加鎖、或者使用一些提供好的並發類,比如CountDownLatch、CyclicBarrier,兩者的區別請自行查閱,結合項目中的需求,我這裡使用的前者CountDownLatch 。

如果沒有其它業務需求的話,那麼此時到這裡便可以正常使用瞭,但是如果碰到子線程中有一些“自定義的提示信息”,或者是“自定義的異常信息”,如果單純的考慮在主線程中通過Try…catch,或者SpringMVC中全局異常攔截**@RestControllerAdvice**能夠進行處理,那就大錯特錯瞭,根本不可能實現的;那麼此時我們便需要這樣使用,才可以讓主線程中捕獲信息,然後返回給接口調用方-前端。

但是你以為這樣就結束瞭麼,相對於上面的操作,我們可以將子線程是否需要返回提示信息分為以下兩種情況:

1:需要返回,子線程拋異常後,通過主線程返回提示信息,那麼采用上面圖片的處理方式

2:不需要返回,子線程拋異常後,要在某些地方進行日志記錄等等,此時可以換一種方式定義線程池,具體方式如下,本人親自測試過。

子線程異常處理

難道這篇博客的介紹就此結束瞭,休想,強行灌輸一波,主線程和多線程中的子線程事務問題,具體情況如下:

1:主線程異常、子線程未執行,此情況不會有任何影響

2:主線程先執行,子線程異常,此時會有影響;

2.1:主線程中加入@Transactional(rollbackFor = Exception.class)註解,子線程不加事務註解,此時如果都正常的話,那麼主線程和子線程在同一事務裡面,一塊提交;反之子線程如果不正常的話,那麼主線程和子線程都不會進行提交;

2.2:主線程加入事務註解,子線程中也加入事務註解@Transactional(propagation = Propagation.REQUIRES_NEW),如果都正常的話,子線程提交後主線程才會提交;反之主線程和子線程都不會進行提交。

問題思考

其實在上面設計過程中,偏於實踐操作的部分多一些,當然如果會一些理論知識會更好理解的;在項目中使用多線程技術後,發現多線程其實也並沒有那麼難理解,更多的都是在定義方面,這點也很好解決,反復理解和思考就行瞭;

個人認為,利用一些鎖的方式來解決主線程和子線程協同的問題會更帥一些 !

SpringBoot線程池的創建、@Async配置步驟及註意事項

最近在做訂單模塊,用戶購買服務類產品之後,需要進行預約,預約成功之後分別給商傢和用戶發送提醒短信。考慮發短信耗時的情況所以我想用異步的方法去執行,於是就在網上看見瞭Spring的@Async瞭。

但是遇到瞭許多問題,使得@Async無效,也一直沒有找到很好的文章去詳細的說明@Async的正確及錯誤的使用方法及需要註意的地方,這裡簡單整理瞭一下遇見的問題,Sring是以配置文件的形式來開啟@Async,而SpringBoot則是以註解的方式開啟。

我們可以使用springBoot默認的線程池,不過一般我們會自定義線程池(因為比較靈活),配置方式有:

1、使用 xml 文件配置的方式

2、使用Java代碼結合@Configuration進行配置(推薦使用)

下面分別實現兩種配置方式

第一步、配置@Async

一、springBoot啟動類的配置:

在Spring Boot的主程序中配置@EnableAsync,如下所示:

@ServletComponentScan
@SpringBootApplication
@EnableAsync
public class ClubApiApplication {
    public static void main(String[] args) {
        SpringApplication.run(ClubApiApplication.class, args);
    }
}

二、Spring XML的配置方式:

1.applicationContext.xml同目錄下創建文件threadPool.xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
 
    <!-- 開啟異步,並引入線程池 -->
    <task:annotation-driven executor="threadPool" />
 
    <!-- 定義線程池 -->
    <bean id="threadPool"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <!-- 核心線程數,默認為1 -->
        <property name="corePoolSize" value="10" />
 
        <!-- 最大線程數,默認為Integer.MAX_VALUE -->
        <property name="maxPoolSize" value="50" />
 
        <!-- 隊列最大長度,一般需要設置值>=notifyScheduledMainExecutor.maxNum;默認為Integer.MAX_VALUE -->
        <property name="queueCapacity" value="100" />
 
        <!-- 線程池維護線程所允許的空閑時間,默認為60s -->
        <property name="keepAliveSeconds" value="30" />
 
        <!-- 完成任務自動關閉 , 默認為false-->
        <property name="waitForTasksToCompleteOnShutdown" value="true" />
 
        <!-- 核心線程超時退出,默認為false -->
        <property name="allowCoreThreadTimeOut" value="true" />
 
        <!-- 線程池對拒絕任務(無線程可用)的處理策略,目前隻支持AbortPolicy、CallerRunsPolicy;默認為後者 -->
        <property name="rejectedExecutionHandler">
            <!-- AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常 -->
            <!-- CallerRunsPolicy:主線程直接執行該任務,執行完之後嘗試添加下一個任務到線程池中,可以有效降低向線程池內添加任務的速度 -->
            <!-- DiscardOldestPolicy:拋棄舊的任務、暫不支持;會導致被丟棄的任務無法再次被執行 -->
            <!-- DiscardPolicy:拋棄當前任務、暫不支持;會導致被丟棄的任務無法再次被執行 -->
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
        </property>
    </bean>
</beans>

2.然後在applicationContext.xml中引入threadPool.xml:<import resource=”threadPool.xml” />

<!--如果不使用自定義線程池,可以直接使用下面這段標簽-->
<!-- 
<task:executor id="WhifExecutor" pool-size="10"/> 
-->
<import resource="threadPool.xml" />
<task:annotation-driven executor="WhifExecutor" />

第二步:創建兩個異步方法的類,如下所示

第一個類(這裡模擬取消訂單後發短信,有兩個發送短信的方法):

@Service
public class TranTest2Service {
    // 發送提醒短信 1
    @Async
    public void sendMessage1() throws InterruptedException {
        System.out.println("發送短信方法---- 1   執行開始");
        Thread.sleep(5000); // 模擬耗時
        System.out.println("發送短信方法---- 1   執行結束");
    }
 
    // 發送提醒短信 2
    @Async
    public void sendMessage2() throws InterruptedException {
        System.out.println("發送短信方法---- 2   執行開始");
        Thread.sleep(2000); // 模擬耗時
        System.out.println("發送短信方法---- 2   執行結束");
    }
}

第二個類。調用發短信的方法 (異步方法不能與被調用的異步方法在同一個類中,否則無效):

@Service
public class OrderTaskServic {
    @Autowired
    private TranTest2Service tranTest2Service;
 
    // 訂單處理任務
    public void orderTask() throws InterruptedException {
        this.cancelOrder(); // 取消訂單
        tranTest2Service.sendMessage1(); // 發短信的方法   1
        tranTest2Service.sendMessage2(); // 發短信的方法  2
    }
 
    // 取消訂單
    public void cancelOrder() throws InterruptedException {
        System.out.println("取消訂單的方法執行------開始");
        System.out.println("取消訂單的方法執行------結束 ");
    }
}

經過測試得到如下結果:

1.沒有使用@Async

2.使用瞭@Async

可以看出,沒有使用@Async方式實現的發送短信是同步執行的,意思就是說第一條發送之後再發送第二條,第二條發送成功之後再給用戶提示,這樣顯然會影響用戶體驗,再看使用瞭@Async實現的,在執行第一個發送短信方法之後馬上開啟另一個線程執行第二個方法,顯然這樣我們的處理速度回快很多。

使用Java代碼結合@Configuration註解的配置方式(推薦使用)

1. 新建一個配置類

package com.boot.common.conf;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
/**
 * 線程池配置
 * @author zhh
 *
 */
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
 
/** 
 *   默認情況下,在創建瞭線程池後,線程池中的線程數為0,當有任務來之後,就會創建一個線程去執行任務,
 *    當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中;
 *  當隊列滿瞭,就繼續創建線程,當線程數量大於等於maxPoolSize後,開始使用拒絕策略拒絕 
 */
 
    /** 核心線程數(默認線程數) */
    private static final int corePoolSize = 20;
    /** 最大線程數 */
    private static final int maxPoolSize = 100;
    /** 允許線程空閑時間(單位:默認為秒) */
    private static final int keepAliveTime = 10;
    /** 緩沖隊列大小 */
    private static final int queueCapacity = 200;
    /** 線程池名前綴 */
    private static final String threadNamePrefix = "Async-Service-";
 
    @Bean("taskExecutor") // bean的名稱,默認為首字母小寫的方法名
    public ThreadPoolTaskExecutor taskExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);   
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveTime);
        executor.setThreadNamePrefix(threadNamePrefix);
 
        // 線程池對拒絕任務的處理策略
        // CallerRunsPolicy:由調用線程(提交任務的線程)處理該任務
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 初始化
        executor.initialize();
        return executor;
    }
}

2.創建兩個異步方法的類(和之前的類類似僅僅是方法上註解不一樣),如下所示:

第一個類(這裡模擬取消訂單後發短信,有兩個發送短信的方法):

package com.boot.test1.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
 
@Service
public class TranTest2Service {
    Logger log = LoggerFactory.getLogger(TranTest2Service.class);
 
    // 發送提醒短信 1
        @PostConstruct // 加上該註解項目啟動時就執行一次該方法
    @Async("taskExecutor")
    public void sendMessage1() throws InterruptedException {
        log.info("發送短信方法---- 1   執行開始");
        Thread.sleep(5000); // 模擬耗時
        log.info("發送短信方法---- 1   執行結束");
    }
 
    // 發送提醒短信 2
        @PostConstruct // 加上該註解項目啟動時就執行一次該方法
    @Async("taskExecutor")
    public void sendMessage2() throws InterruptedException {
 
        log.info("發送短信方法---- 2   執行開始");
        Thread.sleep(2000); // 模擬耗時
        log.info("發送短信方法---- 2   執行結束");
    }
}

代碼中的 @Async(“taskExecutor”) 對應我們自定義線程池中的 @Bean(“taskExecutor”) ,表示使用我們自定義的線程池。

第二個類。調用發短信的方法 (異步方法不能與被調用的異步方法在同一個類中,否則無效):

@Service
public class OrderTaskServic {
    @Autowired
    private TranTest2Service tranTest2Service;
    // 訂單處理任務
    public void orderTask() throws InterruptedException {
 
        this.cancelOrder(); // 取消訂單
        tranTest2Service.sendMessage1(); // 發短信的方法   1
        tranTest2Service.sendMessage2(); // 發短信的方法  2
    }
 
    // 取消訂單
    public void cancelOrder() throws InterruptedException {
        System.out.println("取消訂單的方法執行------開始");
        System.out.println("取消訂單的方法執行------結束 ");
    }
}

運行截圖:

註意看,截圖中的 [nio-8090-exec-1] 是Tomcat的線程名稱

[Async-Service-1]、[Async-Service-2]表示線程1和線程2 ,是我們自定義的線程池裡面的線程名稱,我們在配置類裡面定義的線程池前綴:

private static final String threadNamePrefix = "Async-Service-"; // 線程池名前綴,說明我們自定義的線程池被使用瞭

註意事項

如下方式會使@Async失效

  • 異步方法使用static修飾
  • 異步類沒有使用@Component註解(或其他註解)導致spring無法掃描到異步類
  • 異步方法不能與被調用的異步方法在同一個類中
  • 類中需要使用@Autowired或@Resource等註解自動註入,不能自己手動new對象
  • 如果使用SpringBoot框架必須在啟動類中增加@EnableAsync註解

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: