Spring5新特性之Reactive響應式編程

1 什麼是響應式編程

一句話總結:響應式編程是一種編程范式,通用和專註於數據流和變化的,並且是異步的。

維基百科原文:

In computing, reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s), and that an inferred dependency within the associated execution model exists, which facilitates the automatic propagation of the change involved with data flow.

翻譯:

在計算機領域,響應式編程是一個專註於數據流和變化傳遞的**異步編程范式。**這意味著可以使用編程語言很容易地表示靜態(例如數組)或動態(例如事件發射器)數據流,並且在關聯的執行模型中,存在著可推斷的依賴關系,這個關系的存在有利於自動傳播與數據流有關的更改。

舉例:

例如,在命令式編程環境中, a:=b+c 表示將表達式的結果賦給a ,而之後改變b 或c 的值不會影響 。但在響應式編程中,a的值會隨著b或c 的更新而更新。電子表格程序就是響應式編程的一個例子。單元格可以包含字面值或類似"=B1+C1"的公式,而包含公式的單元格的值會依據其他單元格的值的變化而變化 。

響應式編程最初是為瞭簡化交互式用戶界面的創建和實時系統動畫的繪制而提出來的一種方法,但它本質上是一種通用的編程范式。

2 回顧Reactor

2.1 什麼是Reactor

還是維基百科:

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

翻譯:

反應器(reactor)設計模式是一種事件處理模式,用於處理由一個或多個輸入並發交付給服務處理程序的服務請求。然後,服務處理程序將傳入的請求解復用,並將它們同步地分派給相關的請求處理程序。

2.2 為什麼是Reactor

Reactor來源於網絡IO中同步非阻塞的I/O多路復用機制的模式。

  • 堵塞、非堵塞的區別是在於第一階段,即數據準備階段。無論是堵塞還是非堵塞,都是用應用主動找內核要數據,而read數據的過程是‘堵塞’的,直到數據讀取完。
  • 同步、異步的區別在於第二階段,若由請求者主動的去獲取數據,則為同步操作,需要說明的是:read/write操作也是‘堵塞’的,直到數據讀取完。

若數據的read都由kernel內核完成瞭(在內核read數據的過程中,應用進程依舊可以執行其他的任務),這就是異步操作。

換句話說,

  • BIO裡用戶最關心“我要讀”,
  • NIO裡用戶最關心"我可以讀瞭",
  • 在AIO模型裡用戶更需要關註的是“讀完瞭”。

NIO一個重要的特點是:socket主要的讀、寫、註冊和接收函數,在等待就緒階段都是非阻塞的,真正的I/O操作是同步阻塞的(消耗CPU但性能非常高)。
NIO是一種同步非阻塞的I/O模型,也是I/O多路復用的基礎。

Reactor模式基本結構:

  • Handle:本質上表示一種資源(比如說文件描述符,或是針對網絡編程中的socket描述符),是由操作系統提供的;該資源用於表示一個個的事件,事件既可以來自於外部,也可以來自於內部,Handle是事件產生的發源地。
  • Synchronous Event Demultiplexer(同步事件分離器):它本身是一個系統調用,用於等待事件的發生(事件可能是一個,也可能是多個)。調用方在調用它的時候會被阻塞,一直阻塞到同步事件分離器上有事件產生為止。對於Linux來說,同步事件分離器指的就是常用的I/O多路復用機制,比如說select、poll、epoll等。在Java NIO領域中,同步事件分離器對應的組件就是Selector;對應的阻塞方法就是select方法。
  • Event Handler(事件處理器):本身由多個回調方法構成,這些回調方法構成瞭與應用相關的對於某個事件的反饋機制。在Java NIO領域中並沒有提供事件處理器機制讓我們調用或去進行回調,是由我們自己編寫代碼完成的。Netty相比於Java NIO來說,在事件處理器這個角色上進行瞭一個升級,它為我們開發者提供瞭大量的回調方法,供我們在特定事件產生時實現相應的回調方法進行業務邏輯的處理,即,ChannelHandler。ChannelHandler中的方法對應的都是一個個事件的回調。
  • Concrete Event Handler(具體事件處理器):是事件處理器的實現。它本身實現瞭事件處理器所提供的各種回調方法,從而實現瞭特定於業務的邏輯。它本質上就是我們所編寫的一個個的處理器實現。
  • Initiation Dispatcher(初始分發器):實際上就是Reactor角色。它本身定義瞭一些規范,這些規范用於控制事件的調度方式,同時又提供瞭應用進行事件處理器的註冊、刪除等設施。它本身是整個事件處理器的核心所在,Initiation Dispatcher會通過Synchronous Event Demultiplexer來等待事件的發生。一旦事件發生,Initiation Dispatcher首先會分離出每一個事件,然後調用事件處理器,最後調用相關的回調方法來處理這些事件。Netty中ChannelHandler裡的一個個回調方法都是由bossGroup或workGroup中的某個EventLoop來調用的。

2.3 Reactor模式的經典實現—Netty

Netty的線程模式就是一個實現瞭Reactor模式的經典模式。

結構對應:

NioEventLoop ———— Initiation Dispatcher
Synchronous EventDemultiplexer ———— Selector
Evnet Handler ———— ChannelHandler
ConcreteEventHandler ———— 具體的ChannelHandler的實現

模式對應:

Netty服務端使用瞭“多Reactor線程模式”
mainReactor ———— bossGroup(NioEventLoopGroup) 中的某個NioEventLoop
subReactor ———— workerGroup(NioEventLoopGroup) 中的某個NioEventLoop
acceptor ———— ServerBootstrapAcceptor
ThreadPool ———— 用戶自定義線程池

流程:

① 當服務器程序啟動時,會配置ChannelPipelineChannelPipeline中是一個ChannelHandler鏈,所有的事件發生時都會觸發Channelhandler中的某個方法,這個事件會在ChannelPipeline中的ChannelHandler鏈裡傳播。然後,從bossGroup事件循環池中獲取一個NioEventLoop來現實服務端程序綁定本地端口的操作,將對應的ServerSocketChannel註冊到該NioEventLoop中的Selector上,並註冊ACCEPT事件為ServerSocketChannel所感興趣的事件。
② NioEventLoop事件循環啟動,此時開始監聽客戶端的連接請求。
③ 當有客戶端向服務器端發起連接請求時,NioEventLoop的事件循環監聽到該ACCEPT事件,Netty底層會接收這個連接,通過accept()方法得到與這個客戶端的連接(SocketChannel),然後觸發ChannelRead事件(即,ChannelHandler中的channelRead方法會得到回調),該事件會在ChannelPipeline中的ChannelHandler鏈中執行、傳播。
ServerBootstrapAcceptor的readChannel方法會該SocketChannel(客戶端的連接)註冊到workerGroup(NioEventLoopGroup) 中的某個NioEventLoop的Selector上,並註冊READ事件為SocketChannel所感興趣的事件。啟動SocketChannel所在NioEventLoop的事件循環,接下來就可以開始客戶端和服務器端的通信瞭。

3 Spring5中多Reactive的支持

3.1 Spring Webflux

3.1.1 依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

3.1.2 Controller代碼

@RestController
public class HelloController {

    @GetMapping("/hello")
    public Mono<String> hello() {
        return Mono.just("Hello Spring Webflux");
    }
    
}

3.1.3 測試

C:\Users\xxxx\Desktop\sb-reactive>curl http://localhost:8080/hello
Hello Spring Webflux

3.1.4 Spring MVC和Spring WebFlux模式上的不同

3.2 Spring Data Reactive Respositories

3.2.1 依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>

3.2.2 配置

public class RedisReactiveConfig {

    @Bean
    public ReactiveRedisConnectionFactory connectionFactory() {
        return new LettuceConnectionFactory("127.0.0.1", 6379);
    }

    @Bean
    public ReactiveStringRedisTemplate reactiveStringRedisTemplate(ReactiveRedisConnectionFactory factory) {
        return new ReactiveStringRedisTemplate(factory);
    }

}

3.3.3 測試

@SpringBootTest
class SbReactiveApplicationTests {

    @Autowired
    private ReactiveStringRedisTemplate reactiveRedisTemplate;

    @Test
    void contextLoads() {
        reactiveRedisTemplate
                .opsForValue().set("1", "zs")
                .subscribe(b -> System.out.println("success"),
                        e -> System.out.println("error"));
    }

}

4 如何理解Reactive響應式編程?

概念有很多,但是它相較我們的一般請求處理到底有什麼更好的價值體現?

解釋:

Reactive Programming 作為觀察者模式(Observer) 的延伸,不同於傳統的命令編程方式( Imperative programming)同步拉取數據的方式,如迭代器模式(Iterator) 。而是采用數據發佈者同步或異步地推送到數據流(Data Streams)的方案。當該數據流(Data Steams)訂閱者監聽到傳播變化時,立即作出響應動作。在實現層面上,Reactive Programming 可結合函數式編程簡化面向對象語言語法的臃腫性,屏蔽並發實現的復雜細節,提供數據流的有序操作,從而達到提升代碼的可讀性,以及減少 Bugs 出現的目的。同時,Reactive Programming 結合背壓(Backpressure)的技術解決發佈端生成數據的速率高於訂閱端消費的問題。

到此這篇關於Spring5新特性之Reactive響應式編程的文章就介紹到這瞭,更多相關Reactive響應式編程內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: