基於SpringBoot多線程@Async的使用體驗
多線程@Async的使用體驗
場景
導入:可以將大批量的數據insert操作采用多線程的方式並行執行
第三方服務的接口調用:由於存在個別第三方服務調用比較耗時的場景,此時就可以與自身服務的邏輯並行執行
簡而言之:接口中部份業務邏輯可以通過並行的方式來優化接口性能
1.線程池配置
@Configuration @EnableAsync public class TaskPoolConfig { @Bean("taskExecutor") // bean 的名稱,默認為首字母小寫的方法名 public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //核心線程數(CPU核心數+1) executor.setCorePoolSize(10); //最大線程數(2*CPU核心數+1) executor.setMaxPoolSize(20); //緩沖隊列數 executor.setQueueCapacity(200); //允許線程空閑時間(單位:默認為秒) executor.setKeepAliveSeconds(60); //線程池名前綴 executor.setThreadNamePrefix("sub-thread-"); // 增加 TaskDecorator 屬性的配置 executor.setTaskDecorator(new ContextDecorator()); // 線程池對拒絕任務的處理策略:CallerRunsPolicy:不在新線程中執行任務,而是由調用者所在的線程來執行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
2.子父線程之間共享一個Request的配置方案
1.實現TaskDecorator接口
/** * 子線程裝飾器 * * @author Da Shuai * @date 2021-06-10 18:28:17 */ public class SubThreadTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { RequestAttributes context = RequestContextHolder.currentRequestAttributes(); return () -> { try { RequestContextHolder.setRequestAttributes(context); runnable.run(); } finally { RequestContextHolder.resetRequestAttributes(); } }; } }
2.之前的線程池配置加如下代碼使其生效
// 增加 TaskDecorator 屬性的配置 executor.setTaskDecorator(new ContextDecorator());
3.阻塞主線程,等待所有子線程執行完畢後繼續執行主線程
1.CountDownLatch
思路:
- 實例化CountDownLatch對象,同時傳入x(線程數量:這個數量必須等於子線程數量)進行構造
- 每個子線程執行完畢後會調用countDown()方法
- 子線程邏輯後方調用await()方法
這樣線程計數器為0之前,主線程就一直處於pending狀態
主線程邏輯
new CountDownLatch(X) latch.await()
@Override @Transactional public void importExcel(File file) { CountDownLatch latch = new CountDownLatch(3); for (int i = 0; i < 3; i++) { VoteDO voteDO = new VoteDO(); voteDO.setTitle(i + ""); asyncManager.asyncSaveVote(voteDO); } //System.out.println(1/0); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } }
子線程邏輯
latch.countDown()
@Override @Async public void asyncSaveVote(VoteDO voteDO, CountDownLatch latch) { log.info("當前線程為 {},休眠10s開始", Thread.currentThread().getName()); try { Thread.sleep(10000L); } catch (InterruptedException e) { e.printStackTrace(); } log.info("當前線程為 {},休眠10s結束", Thread.currentThread().getName()); log.info("當前線程為 {},保存開始", Thread.currentThread().getName()); voteDO.setDesc(Thread.currentThread().getName()); voteDao.insert(voteDO); latch.countDown(); log.info("當前線程為 {},保存結束", Thread.currentThread().getName()); }
日志
2021-06-11 16:31:08.653 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ===============請求內容===============
2021-06-11 16:31:08.653 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 請求地址:http://localhost:8080/api/import
2021-06-11 16:31:08.653 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 請求方式:POST
2021-06-11 16:31:08.655 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 請求類方法:com.zhdj.controller.ImportController.importExcel
2021-06-11 16:31:08.655 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : 請求類方法參數:[org.springframework.web.multipart.support.StandardMultipartHttpServletRequest$StandardMultipartFile@42c3f403]
2021-06-11 16:31:08.655 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ===============請求內容===============
2021-06-11 16:31:08.676 INFO 27516 — [nio-8080-exec-1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 – Starting…
2021-06-11 16:31:08.894 INFO 27516 — [nio-8080-exec-1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 – Start completed.
2021-06-11 16:31:08.921 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-3,休眠10s開始
2021-06-11 16:31:08.921 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-1,休眠10s開始
2021-06-11 16:31:08.921 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-2,休眠10s開始
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-2,休眠10s結束
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-3,休眠10s結束
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-2,保存開始
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-1,休眠10s結束
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-3,保存開始
2021-06-11 16:31:18.921 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-1,保存開始
2021-06-11 16:31:19.080 DEBUG 27516 — [ sub-thread-3] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:31:19.080 DEBUG 27516 — [ sub-thread-1] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:31:19.080 DEBUG 27516 — [ sub-thread-2] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:31:19.156 DEBUG 27516 — [ sub-thread-1] com.zhdj.dao.VoteDao.insert : ==> Parameters: 0(String), sub-thread-1(String), 2021-06-11T16:31:19.032(LocalDateTime), 2021-06-11T16:31:19.037(LocalDateTime)
2021-06-11 16:31:19.156 DEBUG 27516 — [ sub-thread-3] com.zhdj.dao.VoteDao.insert : ==> Parameters: 2(String), sub-thread-3(String), 2021-06-11T16:31:19.032(LocalDateTime), 2021-06-11T16:31:19.037(LocalDateTime)
2021-06-11 16:31:19.156 DEBUG 27516 — [ sub-thread-2] com.zhdj.dao.VoteDao.insert : ==> Parameters: 1(String), sub-thread-2(String), 2021-06-11T16:31:19.032(LocalDateTime), 2021-06-11T16:31:19.037(LocalDateTime)
2021-06-11 16:31:19.172 DEBUG 27516 — [ sub-thread-3] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:31:19.178 DEBUG 27516 — [ sub-thread-2] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:31:19.187 DEBUG 27516 — [ sub-thread-1] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:31:19.224 INFO 27516 — [ sub-thread-3] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-3,保存結束
2021-06-11 16:31:19.224 INFO 27516 — [ sub-thread-1] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-1,保存結束
2021-06-11 16:31:19.224 INFO 27516 — [ sub-thread-2] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-2,保存結束
2021-06-11 16:31:19.226 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ————–返回內容—————-
2021-06-11 16:31:19.328 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : Response內容:null
2021-06-11 16:31:19.328 INFO 27516 — [nio-8080-exec-1] com.zhdj.config.LogAspect : ————–返回內容—————-
2.Future
思路:
1.子線程邏輯返回Future對象
2.主線程邏輯循環判斷每個子線程返回的Future對象isDone()是否為true
主線程邏輯
循環判斷future.isDone()是否為true
@Override @Transactional public void importExcel(File file) { List<Future> futureList = new ArrayList<>(); for (int i = 0; i < 3; i++) { VoteDO voteDO = new VoteDO(); voteDO.setTitle(i + ""); Future future = asyncManager.asyncSaveVote(voteDO); futureList.add(future); } //檢查所有子線程是否均執行完畢 while (true) { boolean isAllDone = true; for (Future future : futureList) { if (null == future || !future.isDone()) { isAllDone = false; } } if (isAllDone) { log.info("所有子線程執行完畢"); break; } } }
子線程邏輯
返回Future對象
@Override public Future asyncSaveVote(VoteDO voteDO) { log.info("當前線程為 {},休眠10s開始", Thread.currentThread().getName()); try { Thread.sleep(10000L); } catch (InterruptedException e) { e.printStackTrace(); } log.info("當前線程為 {},休眠10s結束", Thread.currentThread().getName()); log.info("當前線程為 {},保存開始", Thread.currentThread().getName()); voteDO.setDesc(Thread.currentThread().getName()); voteDao.insert(voteDO); log.info("當前線程為 {},保存結束", Thread.currentThread().getName()); //返回需要用AsyncResult類 return new AsyncResult<>(true); }
日志
2021-06-11 16:42:28.974 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ===============請求內容===============
2021-06-11 16:42:28.974 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 請求地址:http://localhost:8080/api/import
2021-06-11 16:42:28.974 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 請求方式:POST
2021-06-11 16:42:28.975 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 請求類方法:com.zhdj.controller.ImportController.importExcel
2021-06-11 16:42:28.975 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : 請求類方法參數:[org.springframework.web.multipart.support.StandardMultipartHttpServletRequest$StandardMultipartFile@7e23bacc]
2021-06-11 16:42:28.975 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ===============請求內容===============
2021-06-11 16:42:28.979 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-5,休眠10s開始
2021-06-11 16:42:28.979 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-4,休眠10s開始
2021-06-11 16:42:28.979 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-6,休眠10s開始
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-6,休眠10s結束
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-4,休眠10s結束
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-5,休眠10s結束
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-6,保存開始
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-5,保存開始
2021-06-11 16:42:38.980 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-4,保存開始
2021-06-11 16:42:38.981 DEBUG 20492 — [ sub-thread-4] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:42:38.981 DEBUG 20492 — [ sub-thread-5] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:42:38.981 DEBUG 20492 — [ sub-thread-6] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:42:38.982 DEBUG 20492 — [ sub-thread-5] com.zhdj.dao.VoteDao.insert : ==> Parameters: 1(String), sub-thread-5(String), 2021-06-11T16:42:38.980(LocalDateTime), 2021-06-11T16:42:38.981(LocalDateTime)
2021-06-11 16:42:38.982 DEBUG 20492 — [ sub-thread-4] com.zhdj.dao.VoteDao.insert : ==> Parameters: 0(String), sub-thread-4(String), 2021-06-11T16:42:38.980(LocalDateTime), 2021-06-11T16:42:38.981(LocalDateTime)
2021-06-11 16:42:38.982 DEBUG 20492 — [ sub-thread-6] com.zhdj.dao.VoteDao.insert : ==> Parameters: 2(String), sub-thread-6(String), 2021-06-11T16:42:38.980(LocalDateTime), 2021-06-11T16:42:38.981(LocalDateTime)
2021-06-11 16:42:38.988 DEBUG 20492 — [ sub-thread-5] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:42:38.989 INFO 20492 — [ sub-thread-5] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-5,保存結束
2021-06-11 16:42:38.993 DEBUG 20492 — [ sub-thread-6] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:42:38.993 INFO 20492 — [ sub-thread-6] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-6,保存結束
2021-06-11 16:42:39.004 DEBUG 20492 — [ sub-thread-4] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:42:39.005 INFO 20492 — [ sub-thread-4] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-4,保存結束
2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.service.impl.VoteServiceImpl : 所有子線程執行完畢
2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ————–返回內容—————-
2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : Response內容:null
2021-06-11 16:42:39.005 INFO 20492 — [nio-8080-exec-2] com.zhdj.config.LogAspect : ————–返回內容—————-
4.多線程共用一個事務
暫時無解決方案,這是弊端
異步調用@Async問題
1.使用背景
在項目中,當訪問其他人的接口較慢或者做耗時任務時,不想程序一直卡在耗時任務上,想程序能夠並行執行,我們可以使用多線程來並行的處理任務,也可以使用spring提供的異步處理方式@Async。
2.異步處理方式
調用之後,不返回任何數據。
調用之後,返回數據,通過Future來獲取返回數據
3.@Async不返回數據
使用@EnableAsync啟用異步註解
@Configuration @EnableAsync @Slf4j public class AsyncConfig{ }
在異步處理的方法dealNoReturnTask上添加註解@Async
@Component @Slf4j public class AsyncTask { @Async public void dealNoReturnTask(){ log.info("Thread {} deal No Return Task start", Thread.currentThread().getName()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("Thread {} deal No Return Task end at {}", Thread.currentThread().getName(), System.currentTimeMillis()); } }
Test測試類:
@SpringBootTest(classes = SpringbootApplication.class) @RunWith(SpringJUnit4ClassRunner.class) @Slf4j public class AsyncTest { @Autowired private AsyncTask asyncTask; @Test public void testDealNoReturnTask(){ asyncTask.dealNoReturnTask(); try { log.info("begin to deal other Task!"); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } }
日志打印結果為:
begin to deal other Task!
AsyncExecutorThread-1 deal No Return Task start
AsyncExecutorThread-1 deal No Return Task end at 1499751227034
從日志中我們可以看出,方法dealNoReturnTask()是異步執行完成的。
dealNoReturnTask()設置sleep 3s是為瞭模擬耗時任務
testDealNoReturnTask()設置sleep 10s是為瞭確認異步是否執行完成
4.@Async返回數據
異步調用返回數據,Future表示在未來某個點獲取執行結果,返回數據類型可以自定義
@Async public Future<String> dealHaveReturnTask() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } JSONObject jsonObject = new JSONObject(); jsonObject.put("thread", Thread.currentThread().getName()); jsonObject.put("time", System.currentTimeMillis()); return new AsyncResult<String>(jsonObject.toJSONString()); }
測試類用isCancelled判斷異步任務是否取消,isDone判斷任務是否執行結束
@Test public void testDealHaveReturnTask() throws Exception { Future<String> future = asyncTask.dealHaveReturnTask(); log.info("begin to deal other Task!"); while (true) { if(future.isCancelled()){ log.info("deal async task is Cancelled"); break; } if (future.isDone() ) { log.info("deal async task is Done"); log.info("return result is " + future.get()); break; } log.info("wait async task to end ..."); Thread.sleep(1000); } }
日志打印如下,我們可以看出任務一直在等待異步任務執行完畢,用future.get()來獲取異步任務的返回結果
begin to deal other Task!
wait async task to end …
wait async task to end …
wait async task to end …
wait async task to end …
deal async task is Done
return result is {“thread”:”AsyncExecutorThread-1″,”time”:1499752617330}
5.異常處理
我們可以實現AsyncConfigurer接口,也可以繼承AsyncConfigurerSupport類來實現
在方法getAsyncExecutor()中創建線程池的時候,必須使用 executor.initialize(),
不然在調用時會報線程池未初始化的異常。
如果使用threadPoolTaskExecutor()來定義bean,則不需要初始化
@Configuration @EnableAsync @Slf4j public class AsyncConfig implements AsyncConfigurer { // @Bean // public ThreadPoolTaskExecutor threadPoolTaskExecutor(){ // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // executor.setCorePoolSize(10); // executor.setMaxPoolSize(100); // executor.setQueueCapacity(100); // return executor; // } @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(100); executor.setQueueCapacity(100); executor.setThreadNamePrefix("AsyncExecutorThread-"); executor.initialize(); //如果不初始化,導致找到不到執行器 return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncExceptionHandler(); } }
異步異常處理類:
@Slf4j public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { log.info("Async method: {} has uncaught exception,params:{}", method.getName(), JSON.toJSONString(params)); if (ex instanceof AsyncException) { AsyncException asyncException = (AsyncException) ex; log.info("asyncException:{}",asyncException.getErrorMessage()); } log.info("Exception :"); ex.printStackTrace(); } }
異步處理異常類:
@Data @AllArgsConstructor public class AsyncException extends Exception { private int code; private String errorMessage; }
在無返回值的異步調用中,異步處理拋出異常,AsyncExceptionHandler的handleUncaughtException()會捕獲指定異常,原有任務還會繼續運行,直到結束。
在有返回值的異步調用中,異步處理拋出異常,會直接拋出異常,異步任務結束,原有處理結束執行。
以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。
推薦閱讀:
- Java Spring註解之@Async的基本用法和示例
- @Async異步線程池以及線程的命名方式
- Java線程的三種創建方式
- Java如何固定大小的線程池
- springboot @Async 註解如何實現方法異步