java異步編程的7種實現方式小結
最近有很多小夥伴給我留言,能不能總結下異步編程,今天就和大傢簡單聊聊這個話題。
早期的系統是同步的,容易理解,我們來看個例子
同步編程
當用戶創建一筆電商交易訂單時,要經歷的業務邏輯流程還是很長的,每一步都要耗費一定的時間,那麼整體的RT就會比較長。
於是,聰明的人們開始思考能不能將一些非核心業務從主流程中剝離出來,於是有瞭異步編程
雛形。
異步編程是讓程序並發運行的一種手段。它允許多個事件同時發生,當程序調用需要長時間運行的方法時,它不會阻塞當前的執行流程,程序可以繼續運行。
核心思路:采用多線程優化性能,將串行操作變成並行操作。異步模式設計的程序可以顯著減少線程等待,從而在高吞吐量場景中,極大提升系統的整體性能,顯著降低時延。
接下來,我們來講下異步有哪些編程實現方式
一、線程 Thread
直接繼承 Thread類
是創建異步線程最簡單的方式。
首先,創建Thread子類,普通類或匿名內部類方式;然後創建子類實例;最後通過start()方法啟動線程。
public class AsyncThread extends Thread{ @Override public void run() { System.out.println("當前線程名稱:" + this.getName() + ", 執行線程名稱:" + Thread.currentThread().getName() + "-hello"); } }
public static void main(String[] args) { // 模擬業務流程 // ....... // 創建異步線程 AsyncThread asyncThread = new AsyncThread(); // 啟動異步線程 asyncThread.start(); }
當然如果每次都創建一個 Thread線程
,頻繁的創建、銷毀,浪費系統資源。我們可以采用線程池
@Bean(name = "executorService") public ExecutorService downloadExecutorService() { return new ThreadPoolExecutor(20, 40, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactoryBuilder().setNameFormat("defaultExecutorService-%d").build(), (r, executor) -> log.error("defaultExecutor pool is full! ")); }
將業務邏輯封裝到 Runnable
或 Callable
中,交由 線程池
來執行
二、Future
上述方式雖然達到瞭多線程並行處理,但有些業務不僅僅要執行過程,還要獲取執行結果。
Java 從1.5版本開始,提供瞭 Callable
和 Future
,可以在任務執行完畢之後得到任務執行結果。
當然也提供瞭其他功能,如:取消任務、查詢任務是否完成等
Future類位於java.util.concurrent包下,接口定義:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
方法描述:
- cancel():取消任務,如果取消任務成功返回true,如果取消任務失敗則返回false
- isCancelled():表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 true
- isDone():表示任務是否已經完成,如果完成,返回true
- get():獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回
- get(long timeout, TimeUnit unit):用來獲取執行結果,如果在指定時間內,還沒獲取到結果,就直接返回null
代碼示例:
public class CallableAndFuture { public static ExecutorService executorService = new ThreadPoolExecutor(4, 40, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), new ThreadFactoryBuilder() .setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy()); static class MyCallable implements Callable<String> { @Override public String call() throws Exception { return "異步處理,Callable 返回結果"; } } public static void main(String[] args) { Future<String> future = executorService.submit(new MyCallable()); try { System.out.println(future.get()); } catch (Exception e) { // nodo } finally { executorService.shutdown(); } } }
Future 表示一個可能還沒有完成的異步任務的結果,通過 get
方法獲取執行結果,該方法會阻塞直到任務返回結果。
三、FutureTask
FutureTask
實現瞭 RunnableFuture
接口,則 RunnableFuture
接口繼承瞭 Runnable
接口和 Future
接口,所以可以將 FutureTask
對象作為任務提交給 ThreadPoolExecutor
去執行,也可以直接被 Thread
執行;又因為實現瞭 Future
接口,所以也能用來獲得任務的執行結果。
FutureTask 構造函數:
public FutureTask(Callable<V> callable) public FutureTask(Runnable runnable, V result)
FutureTask 常用來封裝 Callable
和 Runnable
,可以作為一個任務提交到線程池中執行。除瞭作為一個獨立的類之外,也提供瞭一些功能性函數供我們創建自定義 task 類使用。
FutureTask 線程安全由CAS來保證。
ExecutorService executor = Executors.newCachedThreadPool(); // FutureTask包裝callbale任務,再交給線程池執行 FutureTask<Integer> futureTask = new FutureTask<>(() -> { System.out.println("子線程開始計算:"); Integer sum = 0; for (int i = 1; i <= 100; i++) sum += i; return sum; }); // 線程池執行任務, 運行結果在 futureTask 對象裡面 executor.submit(futureTask); try { System.out.println("task運行結果計算的總和為:" + futureTask.get()); } catch (Exception e) { e.printStackTrace(); } executor.shutdown();
Callable 和 Future 的區別:Callable 用於產生結果,Future 用於獲取結果
如果是對多個任務多次自由串行、或並行組合,涉及多個線程之間同步阻塞獲取結果,Future 代碼實現會比較繁瑣,需要我們手動處理各個交叉點,很容易出錯。
四、異步框架 CompletableFuture
Future 類通過 get()
方法阻塞等待獲取異步執行的運行結果,性能比較差。
JDK1.8 中,Java 提供瞭 CompletableFuture
類,它是基於異步函數式編程。相對阻塞式等待返回結果,CompletableFuture
可以通過回調的方式來處理計算結果,實現瞭異步非阻塞,性能更優。
優點:
- 異步任務結束時,會自動回調某個對象的方法
- 異步任務出錯時,會自動回調某個對象的方法
- 主線程設置好回調後,不再關心異步任務的執行
泡茶示例:
(內容摘自:極客時間的《Java 並發編程實戰》)
//任務1:洗水壺->燒開水 CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> { System.out.println("T1:洗水壺..."); sleep(1, TimeUnit.SECONDS); System.out.println("T1:燒開水..."); sleep(15, TimeUnit.SECONDS); }); //任務2:洗茶壺->洗茶杯->拿茶葉 CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { System.out.println("T2:洗茶壺..."); sleep(1, TimeUnit.SECONDS); System.out.println("T2:洗茶杯..."); sleep(2, TimeUnit.SECONDS); System.out.println("T2:拿茶葉..."); sleep(1, TimeUnit.SECONDS); return "龍井"; }); //任務3:任務1和任務2完成後執行:泡茶 CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf) -> { System.out.println("T1:拿到茶葉:" + tf); System.out.println("T1:泡茶..."); return "上茶:" + tf; }); //等待任務3執行結果 System.out.println(f3.join()); }
CompletableFuture 提供瞭非常豐富的API,大約有50種處理串行,並行,組合以及處理錯誤的方法。
五、 SpringBoot 註解 @Async
除瞭硬編碼的異步編程處理方式,SpringBoot 框架還提供瞭 註解式
解決方案,以 方法體
為邊界,方法體內部的代碼邏輯全部按異步方式執行。
首先,使用 @EnableAsync
啟用異步註解
@SpringBootApplication @EnableAsync public class StartApplication { public static void main(String[] args) { SpringApplication.run(StartApplication.class, args); } }
自定義線程池:
@Configuration @Slf4j public class ThreadPoolConfiguration { @Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown") public ThreadPoolExecutor systemCheckPoolExecutorService() { return new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10000), new ThreadFactoryBuilder().setNameFormat("default-executor-%d").build(), (r, executor) -> log.error("system pool is full! ")); } }
在異步處理的方法上添加註解 @Async
,當對 execute 方法
調用時,通過自定義的線程池 defaultThreadPoolExecutor
異步化執行 execute 方法
@Service public class AsyncServiceImpl implements AsyncService { @Async("defaultThreadPoolExecutor") public Boolean execute(Integer num) { System.out.println("線程:" + Thread.currentThread().getName() + " , 任務:" + num); return true; } }
用 @Async 註解標記的方法,稱為異步方法。在spring boot應用中使用 @Async 很簡單:
- 調用異步方法類上或者啟動類加上註解 @EnableAsync
- 在需要被異步調用的方法外加上 @Async
- 所使用的 @Async 註解方法的類對象應該是Spring容器管理的bean對象;
六、Spring ApplicationEvent 事件
事件機制在一些大型項目中被經常使用,Spring 專門提供瞭一套事件機制的接口,滿足瞭架構原則上的解耦。
ApplicationContext
通過 ApplicationEvent
類和 ApplicationListener
接口進行事件處理。如果將實現 ApplicationListener
接口的 bean 註入到上下文中,則每次使用 ApplicationContext
發佈 ApplicationEvent
時,都會通知該 bean。本質上,這是標準的觀察者設計模式
。
ApplicationEvent 是由 Spring 提供的所有 Event 類的基類
首先,自定義業務事件子類,繼承自 ApplicationEvent
,通過泛型註入業務模型參數類。相當於 MQ 的消息體。
public class OrderEvent extends AbstractGenericEvent<OrderModel> { public OrderEvent(OrderModel source) { super(source); } }
然後,編寫事件監聽器。ApplicationListener
接口是由 Spring 提供的事件訂閱者必須實現的接口,我們需要定義一個子類,繼承 ApplicationListener
。相當於 MQ 的消費端
@Component public class OrderEventListener implements ApplicationListener<OrderEvent> { @Override public void onApplicationEvent(OrderEvent event) { System.out.println("【OrderEventListener】監聽器處理!" + JSON.toJSONString(event.getSource())); } }
最後,發佈事件,把某個事件告訴所有與這個事件相關的監聽器。相當於 MQ 的生產端。
OrderModel orderModel = new OrderModel(); orderModel.setOrderId((long) i); orderModel.setBuyerName("Tom-" + i); orderModel.setSellerName("judy-" + i); orderModel.setAmount(100L); // 發佈Spring事件通知 SpringUtils.getApplicationContext().publishEvent(new OrderEvent(orderModel));
加個餐:
[消費端]線程:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[生產端]線程:http-nio-8090-exec-1,發佈事件 1
[消費端]線程:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[生產端]線程:http-nio-8090-exec-1,發佈事件 2
[消費端]線程:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}
[生產端]線程:http-nio-8090-exec-1,發佈事件 3
上面是跑瞭個demo的運行結果,我們發現無論生產端還是消費端,使用瞭同一個線程 http-nio-8090-exec-1
,Spring 框架的事件機制默認是同步阻塞的。隻是在代碼規范方面做瞭解耦,有較好的擴展性,但底層還是采用同步調用方式。
那麼問題來瞭,如果想實現異步調用,如何處理?
我們需要手動創建一個 SimpleApplicationEventMulticaster
,並設置 TaskExecutor
,此時所有的消費事件采用異步線程執行。
@Component public class SpringConfiguration { @Bean public SimpleApplicationEventMulticaster applicationEventMulticaster(@Qualifier("defaultThreadPoolExecutor") ThreadPoolExecutor defaultThreadPoolExecutor) { SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster(); simpleApplicationEventMulticaster.setTaskExecutor(defaultThreadPoolExecutor); return simpleApplicationEventMulticaster; } }
我們看下改造後的運行結果:
[生產端]線程:http-nio-8090-exec-1,發佈事件 1
[生產端]線程:http-nio-8090-exec-1,發佈事件 2
[生產端]線程:http-nio-8090-exec-1,發佈事件 3
[消費端]線程:default-executor-1,消費事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[消費端]線程:default-executor-2,消費事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[消費端]線程:default-executor-0,消費事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}
SimpleApplicationEventMulticaster
這個我們自己實例化的 Bean 與系統默認的加載順序如何?會不會有沖突?
查瞭下 Spring 源碼,處理邏輯在 AbstractApplicationContext#initApplicationEventMulticaster
方法中,通過 beanFactory 查找是否有自定義的 Bean,如果沒有,容器會自己 new 一個 SimpleApplicationEventMulticaster
對象註入到容器中。
七、消息隊列
異步架構是互聯網系統中一種典型架構模式,與同步架構相對應。而消息隊列天生就是這種異步架構,具有超高吞吐量和超低時延。
消息隊列異步架構的主要角色包括消息生產者、消息隊列和消息消費者。
消息生產者就是主應用程序,生產者將調用請求封裝成消息發送給消息隊列。
消息隊列的職責就是緩沖消息,等待消費者消費。根據消費方式又分為點對點模式
和發佈訂閱模式
兩種。
消息消費者,用來從消息隊列中拉取、消費消息,完成業務邏輯處理。
當然市面上消息隊列框架非常多,常見的有RabbitMQ、Kafka、RocketMQ、ActiveMQ 和 Pulsar 等
不同的消息隊列的功能特性會略有不同,但整體架構類似,這裡就不展開瞭。
我們隻需要記住一個關鍵點,借助消息隊列這個中間件可以高效的實現異步編程。
到此這篇關於java異步編程的7種實現方式小結的文章就介紹到這瞭,更多相關java異步編程內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Java並發教程之Callable和Future接口詳解
- Java使用Runnable和Callable實現多線程的區別詳解
- Java多線程 Callable、Future 和FutureTask
- 一文搞懂Java創建線程的五種方法
- Java多線程之FutureTask的介紹及使用