基於SpringBoot多線程@Async的使用體驗

多線程@Async的使用體驗

場景

導入:可以將大批量的數據insert操作采用多線程的方式並行執行

第三方服務的接口調用:由於存在個別第三方服務調用比較耗時的場景,此時就可以與自身服務的邏輯並行執行

簡而言之:接口中部份業務邏輯可以通過並行的方式來優化接口性能

1.線程池配置

@Configuration
@EnableAsync
public class TaskPoolConfig {

    @Bean("taskExecutor") // bean 的名稱,默認為首字母小寫的方法名
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心線程數(CPU核心數+1)
        executor.setCorePoolSize(10);
        //最大線程數(2*CPU核心數+1)
        executor.setMaxPoolSize(20);
        //緩沖隊列數
        executor.setQueueCapacity(200);
        //允許線程空閑時間(單位:默認為秒)
        executor.setKeepAliveSeconds(60);
        //線程池名前綴
        executor.setThreadNamePrefix("sub-thread-");
        // 增加 TaskDecorator 屬性的配置
        executor.setTaskDecorator(new ContextDecorator());
        // 線程池對拒絕任務的處理策略:CallerRunsPolicy:不在新線程中執行任務,而是由調用者所在的線程來執行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

2.子父線程之間共享一個Request的配置方案

1.實現TaskDecorator接口

/**
 * 子線程裝飾器
 *
 * @author Da Shuai
 * @date 2021-06-10 18:28:17
 */
public class SubThreadTaskDecorator implements TaskDecorator {

    @Override
    public Runnable decorate(Runnable runnable) {
        RequestAttributes context = RequestContextHolder.currentRequestAttributes();
        return () -> {
            try {
                RequestContextHolder.setRequestAttributes(context);
                runnable.run();
            } finally {
                RequestContextHolder.resetRequestAttributes();
            }
        };
    }
}

2.之前的線程池配置加如下代碼使其生效

// 增加 TaskDecorator 屬性的配置
executor.setTaskDecorator(new ContextDecorator());

3.阻塞主線程,等待所有子線程執行完畢後繼續執行主線程

1.CountDownLatch

思路:

  • 實例化CountDownLatch對象,同時傳入x(線程數量:這個數量必須等於子線程數量)進行構造
  • 每個子線程執行完畢後會調用countDown()方法
  • 子線程邏輯後方調用await()方法

這樣線程計數器為0之前,主線程就一直處於pending狀態

主線程邏輯

new CountDownLatch(X)
latch.await()
@Override
@Transactional
public void importExcel(File file) {
    CountDownLatch latch = new CountDownLatch(3);
    for (int i = 0; i < 3; i++) {
        VoteDO voteDO = new VoteDO();
        voteDO.setTitle(i + "");
        asyncManager.asyncSaveVote(voteDO);
    }
    //System.out.println(1/0);
    try {
        latch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

子線程邏輯

latch.countDown()
@Override
@Async
public void asyncSaveVote(VoteDO voteDO, CountDownLatch latch) {
    log.info("當前線程為 {},休眠10s開始", Thread.currentThread().getName());
    try {
        Thread.sleep(10000L);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    log.info("當前線程為 {},休眠10s結束", Thread.currentThread().getName());
    log.info("當前線程為 {},保存開始", Thread.currentThread().getName());
    voteDO.setDesc(Thread.currentThread().getName());
    voteDao.insert(voteDO);
    latch.countDown();
    log.info("當前線程為 {},保存結束", Thread.currentThread().getName());
}

日志

2021-06-11 16:31:08.653 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ===============請求內容===============
2021-06-11 16:31:08.653 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 請求地址:http://localhost:8080/api/import
2021-06-11 16:31:08.653 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 請求方式:POST
2021-06-11 16:31:08.655 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 請求類方法:com.zhdj.controller.ImportController.importExcel
2021-06-11 16:31:08.655 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 請求類方法參數:[org.springframework.web.multipart.support.StandardMultipartHttpServletRequest$StandardMultipartFile@42c3f403]
2021-06-11 16:31:08.655 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ===============請求內容===============
2021-06-11 16:31:08.676 INFO 27516 — [nio-8080-exec-1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 – Starting…
2021-06-11 16:31:08.894 INFO 27516 — [nio-8080-exec-1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 – Start completed.
2021-06-11 16:31:08.921 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-3,休眠10s開始
2021-06-11 16:31:08.921 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-1,休眠10s開始
2021-06-11 16:31:08.921 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-2,休眠10s開始
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-2,休眠10s結束
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-3,休眠10s結束
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-2,保存開始
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-1,休眠10s結束
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-3,保存開始
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-1,保存開始
2021-06-11 16:31:19.080 DEBUG 27516 — [ sub-thread-3] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:31:19.080 DEBUG 27516 — [ sub-thread-1] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:31:19.080 DEBUG 27516 — [ sub-thread-2] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:31:19.156 DEBUG 27516 — [ sub-thread-1] com.zhdj.dao.VoteDao.insert : ==> Parameters: 0(String), sub-thread-1(String), 2021-06-11T16:31:19.032(LocalDateTime), 2021-06-11T16:31:19.037(LocalDateTime)
2021-06-11 16:31:19.156 DEBUG 27516 — [ sub-thread-3] com.zhdj.dao.VoteDao.insert : ==> Parameters: 2(String), sub-thread-3(String), 2021-06-11T16:31:19.032(LocalDateTime), 2021-06-11T16:31:19.037(LocalDateTime)
2021-06-11 16:31:19.156 DEBUG 27516 — [ sub-thread-2] com.zhdj.dao.VoteDao.insert : ==> Parameters: 1(String), sub-thread-2(String), 2021-06-11T16:31:19.032(LocalDateTime), 2021-06-11T16:31:19.037(LocalDateTime)
2021-06-11 16:31:19.172 DEBUG 27516 — [ sub-thread-3] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:31:19.178 DEBUG 27516 — [ sub-thread-2] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:31:19.187 DEBUG 27516 — [ sub-thread-1] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:31:19.224 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-3,保存結束
2021-06-11 16:31:19.224 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-1,保存結束
2021-06-11 16:31:19.224 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-2,保存結束
2021-06-11 16:31:19.226 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ————–返回內容—————-
2021-06-11 16:31:19.328 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : Response內容:null
2021-06-11 16:31:19.328 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ————–返回內容—————-

2.Future

思路:

1.子線程邏輯返回Future對象

2.主線程邏輯循環判斷每個子線程返回的Future對象isDone()是否為true

主線程邏輯

循環判斷future.isDone()是否為true

@Override
@Transactional
public void importExcel(File file) {
 List<Future> futureList = new ArrayList<>();
 for (int i = 0; i < 3; i++) {
     VoteDO voteDO = new VoteDO();
     voteDO.setTitle(i + "");
     Future future = asyncManager.asyncSaveVote(voteDO);
     futureList.add(future);
 }
 //檢查所有子線程是否均執行完畢
 while (true) {
     boolean isAllDone = true;
     for (Future future : futureList) {
         if (null == future || !future.isDone()) {
             isAllDone = false;
         }
     }
     if (isAllDone) {
         log.info("所有子線程執行完畢");
         break;
     }
 }
}

子線程邏輯

返回Future對象

@Override
public Future asyncSaveVote(VoteDO voteDO) {
    log.info("當前線程為 {},休眠10s開始", Thread.currentThread().getName());
    try {
        Thread.sleep(10000L);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    log.info("當前線程為 {},休眠10s結束", Thread.currentThread().getName());
    log.info("當前線程為 {},保存開始", Thread.currentThread().getName());
    voteDO.setDesc(Thread.currentThread().getName());
    voteDao.insert(voteDO);
    log.info("當前線程為 {},保存結束", Thread.currentThread().getName());
    //返回需要用AsyncResult類
    return new AsyncResult<>(true);
}

日志

2021-06-11 16:42:28.974 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ===============請求內容===============
2021-06-11 16:42:28.974 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 請求地址:http://localhost:8080/api/import
2021-06-11 16:42:28.974 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 請求方式:POST
2021-06-11 16:42:28.975 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 請求類方法:com.zhdj.controller.ImportController.importExcel
2021-06-11 16:42:28.975 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 請求類方法參數:[org.springframework.web.multipart.support.StandardMultipartHttpServletRequest$StandardMultipartFile@7e23bacc]
2021-06-11 16:42:28.975 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ===============請求內容===============
2021-06-11 16:42:28.979 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-5,休眠10s開始
2021-06-11 16:42:28.979 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-4,休眠10s開始
2021-06-11 16:42:28.979 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-6,休眠10s開始
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-6,休眠10s結束
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-4,休眠10s結束
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-5,休眠10s結束
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-6,保存開始
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-5,保存開始
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-4,保存開始
2021-06-11 16:42:38.981 DEBUG 20492 — [ sub-thread-4] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:42:38.981 DEBUG 20492 — [ sub-thread-5] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:42:38.981 DEBUG 20492 — [ sub-thread-6] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:42:38.982 DEBUG 20492 — [ sub-thread-5] com.zhdj.dao.VoteDao.insert : ==> Parameters: 1(String), sub-thread-5(String), 2021-06-11T16:42:38.980(LocalDateTime), 2021-06-11T16:42:38.981(LocalDateTime)
2021-06-11 16:42:38.982 DEBUG 20492 — [ sub-thread-4] com.zhdj.dao.VoteDao.insert : ==> Parameters: 0(String), sub-thread-4(String), 2021-06-11T16:42:38.980(LocalDateTime), 2021-06-11T16:42:38.981(LocalDateTime)
2021-06-11 16:42:38.982 DEBUG 20492 — [ sub-thread-6] com.zhdj.dao.VoteDao.insert : ==> Parameters: 2(String), sub-thread-6(String), 2021-06-11T16:42:38.980(LocalDateTime), 2021-06-11T16:42:38.981(LocalDateTime)
2021-06-11 16:42:38.988 DEBUG 20492 — [ sub-thread-5] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:42:38.989 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-5,保存結束
2021-06-11 16:42:38.993 DEBUG 20492 — [ sub-thread-6] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:42:38.993 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-6,保存結束
2021-06-11 16:42:39.004 DEBUG 20492 — [ sub-thread-4] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:42:39.005 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-4,保存結束
2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.service.impl.VoteServiceImpl : 所有子線程執行完畢
2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ————–返回內容—————-
2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : Response內容:null
2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ————–返回內容—————-

4.多線程共用一個事務

暫時無解決方案,這是弊端

異步調用@Async問題

1.使用背景

在項目中,當訪問其他人的接口較慢或者做耗時任務時,不想程序一直卡在耗時任務上,想程序能夠並行執行,我們可以使用多線程來並行的處理任務,也可以使用spring提供的異步處理方式@Async。

2.異步處理方式

調用之後,不返回任何數據。

調用之後,返回數據,通過Future來獲取返回數據

3.@Async不返回數據

使用@EnableAsync啟用異步註解

@Configuration
@EnableAsync
@Slf4j
public class AsyncConfig{
}

在異步處理的方法dealNoReturnTask上添加註解@Async

@Component
@Slf4j
public class AsyncTask {

    @Async
    public void dealNoReturnTask(){
        log.info("Thread {} deal No Return Task start", Thread.currentThread().getName());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("Thread {} deal No Return Task end at {}", Thread.currentThread().getName(), System.currentTimeMillis());
    }
}

Test測試類:

@SpringBootTest(classes = SpringbootApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
@Slf4j
public class AsyncTest {
    @Autowired
    private AsyncTask asyncTask;

    @Test
    public void testDealNoReturnTask(){
        asyncTask.dealNoReturnTask();
        try {
            log.info("begin to deal other Task!");
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

日志打印結果為:

begin to deal other Task!
AsyncExecutorThread-1 deal No Return Task start
AsyncExecutorThread-1 deal No Return Task end at 1499751227034

從日志中我們可以看出,方法dealNoReturnTask()是異步執行完成的。

dealNoReturnTask()設置sleep 3s是為瞭模擬耗時任務

testDealNoReturnTask()設置sleep 10s是為瞭確認異步是否執行完成

4.@Async返回數據

異步調用返回數據,Future表示在未來某個點獲取執行結果,返回數據類型可以自定義

    @Async
    public Future<String> dealHaveReturnTask() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("thread", Thread.currentThread().getName());
        jsonObject.put("time", System.currentTimeMillis());
        return new AsyncResult<String>(jsonObject.toJSONString());
    }

測試類用isCancelled判斷異步任務是否取消,isDone判斷任務是否執行結束

    @Test
    public void testDealHaveReturnTask() throws Exception {

        Future<String> future = asyncTask.dealHaveReturnTask();
        log.info("begin to deal other Task!");
        while (true) {
            if(future.isCancelled()){
                log.info("deal async task is Cancelled");
                break;
            }
            if (future.isDone() ) {
                log.info("deal async task is Done");
                log.info("return result is " + future.get());
                break;
            }
            log.info("wait async task to end ...");
            Thread.sleep(1000);
        }
    }

日志打印如下,我們可以看出任務一直在等待異步任務執行完畢,用future.get()來獲取異步任務的返回結果

begin to deal other Task!
wait async task to end …
wait async task to end …
wait async task to end …
wait async task to end …
deal async task is Done
return result is {“thread”:”AsyncExecutorThread-1″,”time”:1499752617330}

5.異常處理

我們可以實現AsyncConfigurer接口,也可以繼承AsyncConfigurerSupport類來實現

在方法getAsyncExecutor()中創建線程池的時候,必須使用 executor.initialize(),

不然在調用時會報線程池未初始化的異常。

如果使用threadPoolTaskExecutor()來定義bean,則不需要初始化

@Configuration
@EnableAsync
@Slf4j
public class AsyncConfig implements AsyncConfigurer {

//    @Bean
//    public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
//        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//        executor.setCorePoolSize(10);
//        executor.setMaxPoolSize(100);
//        executor.setQueueCapacity(100);
//        return executor;
//    }

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("AsyncExecutorThread-");
        executor.initialize(); //如果不初始化,導致找到不到執行器
        return executor;
    }
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncExceptionHandler();
    }
}

異步異常處理類:

@Slf4j
public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        log.info("Async method: {} has uncaught exception,params:{}", method.getName(), JSON.toJSONString(params));

        if (ex instanceof AsyncException) {
            AsyncException asyncException = (AsyncException) ex;
            log.info("asyncException:{}",asyncException.getErrorMessage());
        }

        log.info("Exception :");
        ex.printStackTrace();
    }
}

異步處理異常類:

@Data
@AllArgsConstructor
public class AsyncException extends Exception {
    private int code;
    private String errorMessage;
}

在無返回值的異步調用中,異步處理拋出異常,AsyncExceptionHandler的handleUncaughtException()會捕獲指定異常,原有任務還會繼續運行,直到結束。

在有返回值的異步調用中,異步處理拋出異常,會直接拋出異常,異步任務結束,原有處理結束執行。

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: