java異步編程的7種實現方式小結

最近有很多小夥伴給我留言,能不能總結下異步編程,今天就和大傢簡單聊聊這個話題。

早期的系統是同步的,容易理解,我們來看個例子

同步編程

當用戶創建一筆電商交易訂單時,要經歷的業務邏輯流程還是很長的,每一步都要耗費一定的時間,那麼整體的RT就會比較長。

於是,聰明的人們開始思考能不能將一些非核心業務從主流程中剝離出來,於是有瞭異步編程雛形。

異步編程是讓程序並發運行的一種手段。它允許多個事件同時發生,當程序調用需要長時間運行的方法時,它不會阻塞當前的執行流程,程序可以繼續運行。

核心思路:采用多線程優化性能,將串行操作變成並行操作。異步模式設計的程序可以顯著減少線程等待,從而在高吞吐量場景中,極大提升系統的整體性能,顯著降低時延。

接下來,我們來講下異步有哪些編程實現方式

一、線程 Thread

直接繼承 Thread類 是創建異步線程最簡單的方式。

首先,創建Thread子類,普通類或匿名內部類方式;然後創建子類實例;最後通過start()方法啟動線程。

public class AsyncThread extends Thread{
    @Override
    public void run() {
        System.out.println("當前線程名稱:" + this.getName() + ", 執行線程名稱:" + Thread.currentThread().getName() + "-hello");
    }
}
public static void main(String[] args) {
 
  // 模擬業務流程
  // .......
  
    // 創建異步線程 
    AsyncThread asyncThread = new AsyncThread();
 
    // 啟動異步線程
    asyncThread.start();
}
 

當然如果每次都創建一個 Thread線程,頻繁的創建、銷毀,浪費系統資源。我們可以采用線程池

@Bean(name = "executorService")
public ExecutorService downloadExecutorService() {
    return new ThreadPoolExecutor(20, 40, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
            new ThreadFactoryBuilder().setNameFormat("defaultExecutorService-%d").build(),
            (r, executor) -> log.error("defaultExecutor pool is full! "));
}

將業務邏輯封裝到 Runnable 或 Callable 中,交由 線程池 來執行

二、Future

上述方式雖然達到瞭多線程並行處理,但有些業務不僅僅要執行過程,還要獲取執行結果。

Java 從1.5版本開始,提供瞭 Callable 和 Future,可以在任務執行完畢之後得到任務執行結果。

當然也提供瞭其他功能,如:取消任務、查詢任務是否完成等

Future類位於java.util.concurrent包下,接口定義:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

方法描述:

  • cancel():取消任務,如果取消任務成功返回true,如果取消任務失敗則返回false
  • isCancelled():表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 true
  • isDone():表示任務是否已經完成,如果完成,返回true
  • get():獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回
  • get(long timeout, TimeUnit unit):用來獲取執行結果,如果在指定時間內,還沒獲取到結果,就直接返回null

代碼示例:

public class CallableAndFuture {
 
    public static ExecutorService executorService = new ThreadPoolExecutor(4, 40,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(1024), new ThreadFactoryBuilder()
            .setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());
 
 
    static class MyCallable implements Callable<String> {
        @Override
        public String call() throws Exception {
            return "異步處理,Callable 返回結果";
        }
    }
 
    public static void main(String[] args) {
        Future<String> future = executorService.submit(new MyCallable());
        try {
            System.out.println(future.get());
        } catch (Exception e) {
            // nodo
        } finally {
            executorService.shutdown();
        }
    }
}

Future 表示一個可能還沒有完成的異步任務的結果,通過 get 方法獲取執行結果,該方法會阻塞直到任務返回結果。

三、FutureTask

FutureTask 實現瞭 RunnableFuture 接口,則 RunnableFuture 接口繼承瞭 Runnable 接口和 Future 接口,所以可以將 FutureTask 對象作為任務提交給 ThreadPoolExecutor 去執行,也可以直接被 Thread 執行;又因為實現瞭 Future 接口,所以也能用來獲得任務的執行結果。

FutureTask 構造函數:

public FutureTask(Callable<V> callable)
public FutureTask(Runnable runnable, V result)

FutureTask 常用來封裝 Callable 和 Runnable,可以作為一個任務提交到線程池中執行。除瞭作為一個獨立的類之外,也提供瞭一些功能性函數供我們創建自定義 task 類使用。

FutureTask 線程安全由CAS來保證。

ExecutorService executor = Executors.newCachedThreadPool();
// FutureTask包裝callbale任務,再交給線程池執行
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
    System.out.println("子線程開始計算:");
    Integer sum = 0;
    for (int i = 1; i <= 100; i++)
        sum += i;
    return sum;
});
 
// 線程池執行任務, 運行結果在 futureTask 對象裡面
executor.submit(futureTask);
 
try {
    System.out.println("task運行結果計算的總和為:" + futureTask.get());
} catch (Exception e) {
    e.printStackTrace();
}
executor.shutdown();

Callable 和 Future 的區別:Callable 用於產生結果,Future 用於獲取結果

如果是對多個任務多次自由串行、或並行組合,涉及多個線程之間同步阻塞獲取結果,Future 代碼實現會比較繁瑣,需要我們手動處理各個交叉點,很容易出錯。

四、異步框架 CompletableFuture

Future 類通過 get() 方法阻塞等待獲取異步執行的運行結果,性能比較差。

JDK1.8 中,Java 提供瞭 CompletableFuture 類,它是基於異步函數式編程。相對阻塞式等待返回結果,CompletableFuture 可以通過回調的方式來處理計算結果,實現瞭異步非阻塞,性能更優。

優點

  • 異步任務結束時,會自動回調某個對象的方法
  • 異步任務出錯時,會自動回調某個對象的方法
  • 主線程設置好回調後,不再關心異步任務的執行

泡茶示例:

(內容摘自:極客時間的《Java 並發編程實戰》)

//任務1:洗水壺->燒開水
CompletableFuture<Void> f1 =
        CompletableFuture.runAsync(() -> {
            System.out.println("T1:洗水壺...");
            sleep(1, TimeUnit.SECONDS);
 
            System.out.println("T1:燒開水...");
            sleep(15, TimeUnit.SECONDS);
        });
 
//任務2:洗茶壺->洗茶杯->拿茶葉
CompletableFuture<String> f2 =
        CompletableFuture.supplyAsync(() -> {
            System.out.println("T2:洗茶壺...");
            sleep(1, TimeUnit.SECONDS);
 
            System.out.println("T2:洗茶杯...");
            sleep(2, TimeUnit.SECONDS);
 
            System.out.println("T2:拿茶葉...");
            sleep(1, TimeUnit.SECONDS);
            return "龍井";
        });
 
//任務3:任務1和任務2完成後執行:泡茶
CompletableFuture<String> f3 =
        f1.thenCombine(f2, (__, tf) -> {
            System.out.println("T1:拿到茶葉:" + tf);
            System.out.println("T1:泡茶...");
            return "上茶:" + tf;
        });
 
//等待任務3執行結果
System.out.println(f3.join());
 
}

CompletableFuture 提供瞭非常豐富的API,大約有50種處理串行,並行,組合以及處理錯誤的方法。

五、 SpringBoot 註解 @Async

除瞭硬編碼的異步編程處理方式,SpringBoot 框架還提供瞭 註解式 解決方案,以 方法體 為邊界,方法體內部的代碼邏輯全部按異步方式執行。

首先,使用 @EnableAsync 啟用異步註解

@SpringBootApplication
@EnableAsync
public class StartApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(StartApplication.class, args);
    }
}

自定義線程池:

@Configuration
@Slf4j
public class ThreadPoolConfiguration {
 
    @Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
    public ThreadPoolExecutor systemCheckPoolExecutorService() {
 
        return new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(10000),
                new ThreadFactoryBuilder().setNameFormat("default-executor-%d").build(),
                (r, executor) -> log.error("system pool is full! "));
    }
}

在異步處理的方法上添加註解 @Async ,當對 execute 方法 調用時,通過自定義的線程池 defaultThreadPoolExecutor 異步化執行  execute 方法

@Service
public class AsyncServiceImpl implements AsyncService {
 
    @Async("defaultThreadPoolExecutor")
    public Boolean execute(Integer num) {
        System.out.println("線程:" + Thread.currentThread().getName() + " , 任務:" + num);
        return true;
    }
 
}

用 @Async 註解標記的方法,稱為異步方法。在spring boot應用中使用 @Async 很簡單:

  • 調用異步方法類上或者啟動類加上註解 @EnableAsync
  • 在需要被異步調用的方法外加上 @Async
  • 所使用的 @Async 註解方法的類對象應該是Spring容器管理的bean對象;

六、Spring ApplicationEvent 事件

事件機制在一些大型項目中被經常使用,Spring 專門提供瞭一套事件機制的接口,滿足瞭架構原則上的解耦。

ApplicationContext 通過 ApplicationEvent 類和 ApplicationListener 接口進行事件處理。如果將實現 ApplicationListener 接口的 bean 註入到上下文中,則每次使用 ApplicationContext 發佈 ApplicationEvent 時,都會通知該 bean。本質上,這是標準的觀察者設計模式

ApplicationEvent 是由 Spring 提供的所有 Event 類的基類

首先,自定義業務事件子類,繼承自 ApplicationEvent,通過泛型註入業務模型參數類。相當於 MQ 的消息體。

public class OrderEvent extends AbstractGenericEvent<OrderModel> {
    public OrderEvent(OrderModel source) {
        super(source);
    }
}

然後,編寫事件監聽器。ApplicationListener 接口是由 Spring 提供的事件訂閱者必須實現的接口,我們需要定義一個子類,繼承 ApplicationListener。相當於 MQ 的消費端

@Component
public class OrderEventListener implements ApplicationListener<OrderEvent> {
    @Override
    public void onApplicationEvent(OrderEvent event) {
 
        System.out.println("【OrderEventListener】監聽器處理!" + JSON.toJSONString(event.getSource()));
 
    }
}

最後,發佈事件,把某個事件告訴所有與這個事件相關的監聽器。相當於 MQ 的生產端。

OrderModel orderModel = new OrderModel();
orderModel.setOrderId((long) i);
orderModel.setBuyerName("Tom-" + i);
orderModel.setSellerName("judy-" + i);
orderModel.setAmount(100L);
// 發佈Spring事件通知
SpringUtils.getApplicationContext().publishEvent(new OrderEvent(orderModel));

加個餐:

[消費端]線程:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[生產端]線程:http-nio-8090-exec-1,發佈事件 1
[消費端]線程:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[生產端]線程:http-nio-8090-exec-1,發佈事件 2
[消費端]線程:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}
[生產端]線程:http-nio-8090-exec-1,發佈事件 3

上面是跑瞭個demo的運行結果,我們發現無論生產端還是消費端,使用瞭同一個線程 http-nio-8090-exec-1,Spring 框架的事件機制默認是同步阻塞的。隻是在代碼規范方面做瞭解耦,有較好的擴展性,但底層還是采用同步調用方式。

那麼問題來瞭,如果想實現異步調用,如何處理?

我們需要手動創建一個 SimpleApplicationEventMulticaster,並設置 TaskExecutor,此時所有的消費事件采用異步線程執行。

@Component
public class SpringConfiguration {
 
    @Bean
    public SimpleApplicationEventMulticaster applicationEventMulticaster(@Qualifier("defaultThreadPoolExecutor") ThreadPoolExecutor defaultThreadPoolExecutor) {
        SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
        simpleApplicationEventMulticaster.setTaskExecutor(defaultThreadPoolExecutor);
        return simpleApplicationEventMulticaster;
    }
 
}
 

我們看下改造後的運行結果:

[生產端]線程:http-nio-8090-exec-1,發佈事件 1
[生產端]線程:http-nio-8090-exec-1,發佈事件 2
[生產端]線程:http-nio-8090-exec-1,發佈事件 3
[消費端]線程:default-executor-1,消費事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[消費端]線程:default-executor-2,消費事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[消費端]線程:default-executor-0,消費事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}

SimpleApplicationEventMulticaster 這個我們自己實例化的 Bean 與系統默認的加載順序如何?會不會有沖突?

查瞭下 Spring 源碼,處理邏輯在 AbstractApplicationContext#initApplicationEventMulticaster 方法中,通過 beanFactory 查找是否有自定義的 Bean,如果沒有,容器會自己 new 一個 SimpleApplicationEventMulticaster 對象註入到容器中。

七、消息隊列

異步架構是互聯網系統中一種典型架構模式,與同步架構相對應。而消息隊列天生就是這種異步架構,具有超高吞吐量和超低時延。

消息隊列異步架構的主要角色包括消息生產者、消息隊列和消息消費者。

消息生產者就是主應用程序,生產者將調用請求封裝成消息發送給消息隊列。

消息隊列的職責就是緩沖消息,等待消費者消費。根據消費方式又分為點對點模式發佈訂閱模式兩種。

消息消費者,用來從消息隊列中拉取、消費消息,完成業務邏輯處理。

當然市面上消息隊列框架非常多,常見的有RabbitMQ、Kafka、RocketMQ、ActiveMQ 和 Pulsar 等

不同的消息隊列的功能特性會略有不同,但整體架構類似,這裡就不展開瞭。

我們隻需要記住一個關鍵點,借助消息隊列這個中間件可以高效的實現異步編程。

到此這篇關於java異步編程的7種實現方式小結的文章就介紹到這瞭,更多相關java異步編程內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: