SpringBoot 如何實現異步編程

首先我們來看看在Spring中為什麼要使用異步編程,它能解決什麼問題?

為什麼要用異步框架,它解決什麼問題?

在SpringBoot的日常開發中,一般都是同步調用的。但實際中有很多場景非常適合使用異步來處理,如:註冊新用戶,送100個積分;或下單成功,發送push消息等等。
就拿註冊新用戶這個用例來說,為什麼要異步處理?

  • 第一個原因:容錯性、健壯性,如果送積分出現異常,不能因為送積分而導致用戶註冊失敗;
  • 因為用戶註冊是主要功能,送積分是次要功能,即使送積分異常也要提示用戶註冊成功,然後後面在針對積分異常做補償處理。
  • 第二個原因:提升性能,例如註冊用戶花瞭20毫秒,送積分花費50毫秒,如果用同步的話,總耗時70毫秒,用異步的話,無需等待積分,故耗時20毫秒。

故,異步能解決2個問題,性能和容錯性。

SpringBoot如何實現異步調用?

對於異步方法調用,從Spring3開始提供瞭@Async註解,我們隻需要在方法上標註此註解,此方法即可實現異步調用。
當然,我們還需要一個配置類,通過Enable模塊驅動註解@EnableAsync 來開啟異步功能。

實現異步調用

第一步:新建配置類,開啟@Async功能支持
使用@EnableAsync來開啟異步任務支持,@EnableAsync註解可以直接放在SpringBoot啟動類上,也可以單獨放在其他配置類上。我們這裡選擇使用單獨的配置類SyncConfiguration。

@Configuration
@EnableAsync
public class AsyncConfiguration {

}

第二步:在方法上標記異步調用

增加一個Component類,用來進行業務處理,同時添加@Async註解,代表該方法為異步處理。

@Component
@Slf4j
public class AsyncTask {

    @SneakyThrows
    @Async
    public void doTask1() {
        long t1 = System.currentTimeMillis();
        Thread.sleep(2000);
        long t2 = System.currentTimeMillis();
        log.info("task1 cost {} ms" , t2-t1);
    }

    @SneakyThrows
    @Async
    public void doTask2() {
        long t1 = System.currentTimeMillis();
        Thread.sleep(3000);
        long t2 = System.currentTimeMillis();
        log.info("task2 cost {} ms" , t2-t1);
    }
}

第三步:在Controller中進行異步方法調用

@RestController
@RequestMapping("/async")
@Slf4j
public class AsyncController {
    @Autowired
    private AsyncTask asyncTask;

    @RequestMapping("/task")
    public void task() throws InterruptedException {
        long t1 = System.currentTimeMillis();
        asyncTask.doTask1();
        asyncTask.doTask2();
        Thread.sleep(1000);
        long t2 = System.currentTimeMillis();
        log.info("main cost {} ms", t2-t1);
    }
}

通過訪問http://localhost:8080/async/task查看控制臺日志:

2021-11-25 15:48:37 [http-nio-8080-exec-8] INFO  com.jianzh5.blog.async.AsyncController:26 – main cost 1009 ms
2021-11-25 15:48:38 [task-1] INFO  com.jianzh5.blog.async.AsyncTask:22 – task1 cost 2005 ms
2021-11-25 15:48:39 [task-2] INFO  com.jianzh5.blog.async.AsyncTask:31 – task2 cost 3005 ms

通過日志可以看到:主線程不需要等待異步方法執行完成,減少響應時間,提高接口性能。
通過上面三步我們就可以在SpringBoot中歡樂的使用異步方法來提高我們接口性能瞭,是不是很簡單?
不過,如果真實項目中你真這樣寫瞭,肯定會被老鳥們無情嘲諷,就這?

因為上面的代碼忽略瞭一個最大的問題,就是給@Async異步框架自定義線程池。

為什麼要給@Async自定義線程池?

使用@Async註解,在默認情況下用的是SimpleAsyncTaskExecutor線程池,該線程池不是真正意義上的線程池。

使用此線程池無法實現線程重用,每次調用都會新建一條線程。若系統中不斷的創建線程,最終會導致系統占用內存過高,引發OutOfMemoryError錯誤,關鍵代碼如下:

public void execute(Runnable task, long startTimeout) {
  Assert.notNull(task, "Runnable must not be null");
  Runnable taskToUse = this.taskDecorator != null ? this.taskDecorator.decorate(task) : task;
  //判斷是否開啟限流,默認為否
  if (this.isThrottleActive() && startTimeout > 0L) {
    //執行前置操作,進行限流
    this.concurrencyThrottle.beforeAccess();
    this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(taskToUse));
  } else {
    //未限流的情況,執行線程任務
    this.doExecute(taskToUse);
  }

}

protected void doExecute(Runnable task) {
  //不斷創建線程
  Thread thread = this.threadFactory != null ? this.threadFactory.newThread(task) : this.createThread(task);
  thread.start();
}

//創建線程
public Thread createThread(Runnable runnable) {
  //指定線程名,task-1,task-2...
  Thread thread = new Thread(this.getThreadGroup(), runnable, this.nextThreadName());
  thread.setPriority(this.getThreadPriority());
  thread.setDaemon(this.isDaemon());
  return thread;
}

我們也可以直接通過上面的控制臺日志觀察,每次打印的線程名都是[task-1]、[task-2]、[task-3]、[task-4]…..遞增的。
正因如此,所以我們在使用Spring中的@Async異步框架時一定要自定義線程池,替代默認的SimpleAsyncTaskExecutor。

Spring提供瞭多種線程池:

  • SimpleAsyncTaskExecutor:不是真的線程池,這個類不重用線程,每次調用都會創建一個新的線程。
  • SyncTaskExecutor:這個類沒有實現異步調用,隻是一個同步操作。隻適用於不需要多線程的地
  • ConcurrentTaskExecutor:Executor的適配類,不推薦使用。如果ThreadPoolTaskExecutor不滿足要求時,才用考慮使用這個類
  • ThreadPoolTaskScheduler:可以使用cron表達式
  • ThreadPoolTaskExecutor :最常使用,推薦。 其實質是對java.util.concurrent.ThreadPoolExecutor的包裝

為@Async實現一個自定義線程池

@Configuration
@EnableAsync
public class SyncConfiguration {
    @Bean(name = "asyncPoolTaskExecutor")
    public ThreadPoolTaskExecutor executor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //核心線程數
        taskExecutor.setCorePoolSize(10);
        //線程池維護線程的最大數量,隻有在緩沖隊列滿瞭之後才會申請超過核心線程數的線程
        taskExecutor.setMaxPoolSize(100);
        //緩存隊列
        taskExecutor.setQueueCapacity(50);
        //許的空閑時間,當超過瞭核心線程出之外的線程在空閑時間到達之後會被銷毀
        taskExecutor.setKeepAliveSeconds(200);
        //異步方法內部線程名稱
        taskExecutor.setThreadNamePrefix("async-");
        /**
         * 當線程池的任務緩存隊列已滿並且線程池中的線程數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕策略
         * 通常有以下四種策略:
         * ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
         * ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然後重新嘗試執行任務(重復此過程)
         * ThreadPoolExecutor.CallerRunsPolicy:重試添加當前的任務,自動重復調用 execute() 方法,直到成功
         */
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }
}

自定義線程池以後我們就可以大膽的使用@Async提供的異步處理能力瞭。

多個線程池處理

在現實的互聯網項目開發中,針對高並發的請求,一般的做法是高並發接口單獨線程池隔離處理。
假設現在2個高並發接口: 一個是修改用戶信息接口,刷新用戶redis緩存; 一個是下訂單接口,發送app push信息。往往會根據接口特征定義兩個線程池,這時候我們在使用@Async時就需要通過指定線程池名稱進行區分。

為@Async指定線程池名字

@SneakyThrows
@Async("asyncPoolTaskExecutor")
public void doTask1() {
  long t1 = System.currentTimeMillis();
  Thread.sleep(2000);
  long t2 = System.currentTimeMillis();
  log.info("task1 cost {} ms" , t2-t1);
}

當系統存在多個線程池時,我們也可以配置一個默認線程池,對於非默認的異步任務再通過@Async(“otherTaskExecutor”)來指定線程池名稱。

配置默認線程池

可以修改配置類讓其實現AsyncConfigurer,並重寫getAsyncExecutor()方法,指定默認線程池:

@Configuration
@EnableAsync
@Slf4j
public class AsyncConfiguration implements AsyncConfigurer {

    @Bean(name = "asyncPoolTaskExecutor")
    public ThreadPoolTaskExecutor executor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //核心線程數
        taskExecutor.setCorePoolSize(2);
        //線程池維護線程的最大數量,隻有在緩沖隊列滿瞭之後才會申請超過核心線程數的線程
        taskExecutor.setMaxPoolSize(10);
        //緩存隊列
        taskExecutor.setQueueCapacity(50);
        //許的空閑時間,當超過瞭核心線程出之外的線程在空閑時間到達之後會被銷毀
        taskExecutor.setKeepAliveSeconds(200);
        //異步方法內部線程名稱
        taskExecutor.setThreadNamePrefix("async-");
        /**
         * 當線程池的任務緩存隊列已滿並且線程池中的線程數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕策略
         * 通常有以下四種策略:
         * ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
         * ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然後重新嘗試執行任務(重復此過程)
         * ThreadPoolExecutor.CallerRunsPolicy:重試添加當前的任務,自動重復調用 execute() 方法,直到成功
         */
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }

    /**
     * 指定默認線程池
     */
    @Override
    public Executor getAsyncExecutor() {
        return executor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) ->
            log.error("線程池執行任務發送未知錯誤,執行方法:{}",method.getName(),ex);
    }
}

如下,doTask1()方法使用默認使用線程池asyncPoolTaskExecutor,doTask2()使用線程池otherTaskExecutor,非常靈活。

@Async
public void doTask1() {
  long t1 = System.currentTimeMillis();
  Thread.sleep(2000);
  long t2 = System.currentTimeMillis();
  log.info("task1 cost {} ms" , t2-t1);
}

@SneakyThrows
@Async("otherTaskExecutor")
public void doTask2() {
  long t1 = System.currentTimeMillis();
  Thread.sleep(3000);
  long t2 = System.currentTimeMillis();
  log.info("task2 cost {} ms" , t2-t1);
}

小結

@Async異步方法在日常開發中經常會用到,大傢好好掌握,爭取早日成為老鳥!!!

到此這篇關於SpringBoot 如何實現異步編程 的文章就介紹到這瞭,更多相關SpringBoot  異步編程內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: