詳解領域驅動設計之事件驅動與CQRS

一、前言:從物流詳情開始

大傢對物流跟蹤都不陌生,它詳細記錄瞭在什麼時間發生瞭什麼,並且數據作為重要憑證是不可變的。我理解其背後的價值有這麼幾個方面:業務方可以管控每個子過程、知道目前所處的環節;另一方面,當需要追溯時候僅僅通過每一步的記錄就可以回放整個歷史過程。

我在之前的文章中提出過“軟件項目也是人類社會生產關系的范疇,隻不過我們所創造的勞動成果看不見摸不著而已”。所以我們可以借鑒物流跟蹤的思路來開發軟件項目,把復雜過程拆解為一個個步驟、子過程、狀態,這和我們事件劃分是一致的,這就是事件驅動的典型案例。

二、領域事件

領域事件(Domain Events)是領域驅動設計(Domain Driven Design,DDD)中的一個概念,用於捕獲我們所建模的領域中所發生過的事情。

領域事件本身也作為通用語言(Ubiquitous Language)的一部分成為包括領域專傢在內的所有項目成員的交流用語。

比如在前述的跨境物流例子中,貨品達到保稅倉以後需要分派工作人員進行分揀分包,那麼“貨品已到達保稅倉”便是一個領域事件。

首先,從業務邏輯來說該事件關系到整個流程的成功或者失敗;同時又將觸發後續子流程;而對於業務方來說,該事件也是一個標志性的裡程碑,代表自己的貨品就快配送到自己手中。

所以通常來說,一個領域事件具有以下幾個特征:較高的業務價值,有助於形成完整的業務閉環,將導致進一步的業務操作。這裡還要強調一點,領域事件具有明確的邊界。

比如:如果你建模的是餐廳的結賬系統,那麼此時的“客戶已到達”便不是你關心的重點,因為你不可能在客戶到達時就立即向對方要錢,而“客戶已下單”才是對結賬系統有用的事件。

2.1、建模領域事件

在建模領域事件時,我們應該根據限界上下文中的通用語言來命名事件及屬性。如果事件由聚合上的命令操作產生,那麼我們通常根據該操作方法的名字來命名領域事件。

對於上面的例子“貨品已到達保稅倉”,我們將發佈與之對應的領域事件

GoodsArrivedBondedWarehouseEvent(當然在明確的界限上下文中也可以去掉聚合的名字,直接建模為ArrivedBondedWarehouseEvent,這都是命名方面的習慣)。

事件的名字表明瞭聚合上的命令方法在執行成功之後所發生的事情,換句話說待定項以及不確定的狀態是不能作為領域事件的。

一個行之有效的方法是畫出當前業務的狀態流轉圖,包含前置操作以及引起的狀態變更,這裡表達的是已經變更完成的狀態所以我們不用過去時態表示,比如刪除或者取消,即代表已經刪除或者已經取消。

然後對於其中的節點進行事件建模。如下圖是文件雲端存儲的業務,我們分別對預上傳、上傳完成確認、刪除等環節建模“過去時”事件,PreUploadedEvent、ConfirmUploadedEvent、RemovedEvent。

2.2、領域事件代碼解讀

package domain.event;

import java.util.Date;
import java.util.UUID;

public class DomainEvent {

    /**
     * 領域事件還包含瞭唯一ID,
     * 但是該ID並不是實體(Entity)層面的ID概念,
     * 而是主要用於事件追溯和日志。
     * 如果是數據庫存儲,該字段通常為唯一索引。
     */
    private final String id;

    /**
     * 創建時間用於追溯,另一方面不管使用瞭
     * 哪種事件存儲都有可能遇到事件延遲,
     * 我們通過創建時間能夠確保其發生順序。
     */
    private final Date occurredOn;

    public DomainEvent() {
        this.id = String.valueOf(UUID.randomUUID());
        this.occurredOn = new Date();
    }
}

在創建領域事件時,需要註意2點:

  • 領域事件本身應該是不變的(Immutable);
  • 領域事件應該攜帶與事件發生時相關的上下文數據信息,但是並不是整個聚合根的狀態數據。例如,在創建訂單時可以攜帶訂單的基本信息,而對於用戶更新訂單收貨地址事件AddressUpdatedEvent事件,隻需要包含訂單、用戶以及新的地址等信息即可。
public class AddressUpdatedEvent extends DomainEvent {
    //通過userId+orderId來校驗訂單的合法性;
    private String userId; 
    private String orderId;
    //新的地址
    private Address address;
    //略去具體業務邏輯
}

2.3、領域事件的存儲

事件的不可變性與可追溯性都決定瞭其必須要持久化的原則,我們來看看常見的幾種方案。

2.3.1、單獨的EventStore

有的業務場景中會創建一個單獨的事件存儲中心,可能是Mysql、Redis、Mongo、甚至文件存儲等。這裡以Mysql舉例,business_code、event_code用來區分不同業務的不同事件,具體的命名規則可以根據實際需要。

這裡需要註意該數據源與業務數據源不一致的場景,我們要確保當業務數據更新以後事件能夠準確無誤的記錄下來,實踐中盡量避免使用分佈式事務,或者盡量避免其跨庫的場景,否則你就得想想如何補償瞭。千萬要避免,用戶更新瞭收貨地址,但是AddressUpdatedEvent事件保存失敗。

總的原則就是對分佈式事務Say No,無論如何,我相信方法總比問題多,在實踐中我們總可以想到解決方案,區別在於該方案是否簡潔、是否做到瞭解耦。

# 考慮是否需要分表,事件存儲建議邏輯簡單
CREATE TABLE `event_store` (
  `event_id` int(11) NOT NULL auto increment,
  `event_code` varchar(32) NOT NULL,
  `event_name` varchar(64) NOT NULL,
  `event_body` varchar(4096) NOT NULL,
  `occurred_on` datetime NOT NULL,
  `business_code` varchar(128) NOT NULL,
  UNIQUE KEY (`event id`)
) ENGINE=InnoDB COMMENT '事件存儲表';

2.3.2、與業務數據一起存儲

在分佈式架構中,每個模塊都做的相對比較小,準確的說是“自治”。如果當前業務數據量較小,可以將事件與業務數據一起存儲,用相關標識區分是真實的業務數據還是事件記錄;或者在當前業務數據庫中建立該業務自己的事件存儲,但是要考慮到事件存儲的量級必然大於真實的業務數據,考慮是否需要分表。

這種方案的優勢:數據自治;避免分佈式事務;不需要額外的事件存儲中心。當然其劣勢就是不能復用。

2.4、領域事件如何發佈

2.4.1、由領域聚合發送領域事件

/*
* 一個關於比賽的充血模型例子
* 貧血模型會構造一個MatchService,我們這裡通過模型來觸發相應的事件
* 本例中略去瞭具體的業務細節
*/
public class Match {
    public void start() {
        //構造Event....
        MatchEvent matchStartedEvent = new MatchStartedEvent();
        //略去具體業務邏輯
        DefaultDomainEventBus.publish(matchStartedEvent);
    }

    public void finish() {
        //構造Event....
        MatchEvent matchFinishedEvent = new MatchFinishedEvent();
        //略去具體業務邏輯
        DefaultDomainEventBus.publish(matchFinishedEvent);
    }

    //略去Match對象基本屬性
}

2.4.2、事件總線VS消息中間件

微服務內的領域事件可以通過事件總線或利用應用服務實現不同聚合之間的業務協同。即微服務內發生領域事件時,由於大部分事件的集成發生在同一個線程內,不一定需要引入消息中間件。但一個事件如果同時更新多個聚合數據,按照 DDD“一個事務隻更新一個聚合根”的原則,可以考慮引入消息中間件,通過異步化的方式,對微服務內不同的聚合根采用不同的事務

三、Saga分佈式事務

3.1、Saga概要

我們看看如何使用 Saga 模式維護數據一致性?

Saga 是一種在微服務架構中維護數據一致性的機制,它可以避免分佈式事務所帶來的問題。

一個 Saga 表示需要更新的多個服務中的一個,即Saga由一連串的本地事務組成。每一個本地事務負責更新它所在服務的私有數據庫,這些操作仍舊依賴於我們所熟悉的ACID事務框架和函數庫。

模式:Saga

通過使用異步消息來協調一系列本地事務,從而維護多個服務之間的數據一致性。

請參閱(強烈建議):https://microservices.io/patterns/data/saga.html

Saga與TCC相比少瞭一步Try的操作,TCC無論最終事務成功失敗都需要與事務參與方交互兩次。而Saga在事務成功的情況下隻需要與事務參與方交互一次, 如果事務失敗,需要額外進行補償回滾。

  • 每個Saga由一系列sub-transaction Ti 組成;
  • 每個Ti 都有對應的補償動作Ci,補償動作用於撤銷Ti造成的結果;

可以看到,和TCC相比,Saga沒有“預留”動作,它的Ti就是直接提交到庫。

Saga的執行順序有兩種:

  • success:T1, T2, T3, …, Tn ;
  • failure:T1, T2, …, Tj, Cj,…, C2, C1,其中0 < j < n;

所以我們可以看到Saga的撤銷十分關鍵,可以說使用Saga的難點就在於如何設計你的回滾策略。

3.2、Saga實現

通過上面的例子我們對Saga有瞭初步的體感,現在來深入探討下如何實現。當通過系統命令啟動Saga時,協調邏輯必須選擇並通知第一個Saga參與方執行本地事務。一旦該事務完成,Saga協調選擇並調用下一個Saga參與方。

這個過程一直持續到Saga執行完所有步驟。如果任何本地事務失敗,則 Saga必須以相反的順序執行補償事務。以下幾種不同的方法可用來構建Saga的協調邏輯。

3.2.1、協同式(choreography)

把 Saga 的決策和執行順序邏輯分佈在 Saga的每一個參與方中,它們通過交換事件的方式來進行溝通。

(引用於《微服務架構設計模式》相關章節)

Order服務創建一個Order並發佈OrderCreated事件。

Consumer服務消費OrderCreated事件,驗證消費者是否可以下訂單,並發佈ConsumerVerified事件。

Kitchen服務消費OrderCreated事件,驗證訂單,在CREATE_PENDING狀態下創建故障單,並發佈TicketCreated事件。

Accounting服務消費OrderCreated事件並創建一個處於PENDING狀態的Credit CardAuthorization。

Accounting服務消費TicketCreated和ConsumerVerified事件,向消費者的信用卡收費,並發佈信用卡授權失敗事件。

Kitchen服務使用信用卡授權失敗事件並將故障單的狀態更改為REJECTED。

訂單服務消費信用卡授權失敗事件,並將訂單狀態更改為已拒絕。

3.2.2、編排式(orchestration)

把Saga的決策和執行順序邏輯集中在一個Saga編排器類中。Saga 編排器發出命令式消息給各個 Saga 參與方,指示這些參與方服務完成具體操作(本地事務)。類似於一個狀態機,當參與方服務完成操作以後會給編排器發送一個狀態指令,以決定下一步做什麼。

(引用於《微服務架構設計模式》相關章節)

我們來分析一下執行流程

Order Service首先創建一個Order和一個創建訂單控制器。之後,路徑的流程如下:

Saga orchestrator向Consumer Service發送Verify Consumer命令。

Consumer Service回復Consumer Verified消息。

Saga orchestrator向Kitchen Service發送Create Ticket命令。

Kitchen Service回復Ticket Created消息。

Saga協調器向Accounting Service發送授權卡消息。

Accounting服務部門使用卡片授權消息回復。

Saga orchestrator向Kitchen Service發送Approve Ticket命令。

Saga orchestrator向訂單服務發送批準訂單命令。

3.2.3、補償策略

之前的描述中我們說過Saga最重要的是如何處理異常,狀態機還定義瞭許多異常狀態。如上面的6就會發生失敗,觸發AuthorizeCardFailure,此時我們就要結束訂單並把之前提交的事務進行回滾。這裡面要區分哪些是校驗性事務、哪些是需要補償的事務。

一個Saga由三種不同類型的事務組成:可補償性事務(可以回滾,因此有一個補償事務);關鍵性事務(這是 Saga的成敗關鍵點,比如4賬戶代扣);以及可重復性事務,它不需要回滾並保證能夠完成(比如6更新狀態)。

在Create Order Saga 中,createOrder()、createTicket()步驟是可補償性事務且具有撤銷其更新的補償事務。

verifyConsumerDetails()事務是隻讀的,因此不需要補償事務。authorizeCreditCard()事務是這個 Saga的關鍵性事務。如果消費者的信用卡可以授權,那麼這個Saga保證完成。approveTicket()和approveRestaurantOrder()步驟是在關鍵性事務之後的可重復性事務。

認真拆解每個步驟、然後評估其補償策略尤為重要,正如你看到的,每種類型的事務在對策中扮演著不同的角色。

四、CQRS

前面講述瞭事件的概念,又分析瞭Saga如何解決復雜事務,現在我們來看看CQRS為什麼在DDD中廣泛被采用。除瞭讀寫分離的特征以外,我們用事件驅動的方式來實踐Command邏輯能有效降低業務的復雜度。

當你明白如何建模事件、如何規避復雜事務,明白什麼時候用消息中間件、什麼時候采用事件總線,才能理解為什麼是CQRS、怎麼正確應用。

下面是我們項目中的設計,這裡為什麼會出現Read/Write Service,是為瞭封裝調用,service內部是基於聚合發送事件。因為我發現在實際項目中,很多人都會第一時間問我要XXXService而不是XXX模型,所以在DDD沒有完全普及的項目中建議大傢采取這種居中策略。這也符合咱們的解耦,對方依賴我的抽象能力,然而我內部是基於DDD還是傳統的流程代碼對其是無關透明的。

我們先來看看事件以及處理器的時序關系。

這裡還是以文件雲端存儲業務為例,下面是一些處理器的核心代碼。註釋行是對代碼功能、用法以及擴展方面的解讀,請認真閱讀。

package domain;

import domain.event.DomainEvent;
import domain.handler.event.DomainEventHandler;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class DomainRegistry {

    private Map<String, List<DomainEventHandler>> handlerMap =
        new HashMap<String, List<DomainEventHandler>>();

    private static DomainRegistry instance;

    private DomainRegistry() {
    }

    public static DomainRegistry getInstance() {
        if (instance == null) {
            instance = new DomainRegistry();
        }
        return instance;
    }

    public Map<String, List<DomainEventHandler>> getHandlerMap() {
        return handlerMap;
    }

    public List<DomainEventHandler> find(String name) {
        if (name == null) {
            return null;
        }
        return handlerMap.get(name);
    }

    //事件註冊與維護,register分多少個場景根據業務拆分,
    //這裡是業務流的核心。如果多個事件需要維護前後依賴關系,
    //可以維護一個priority邏輯
    public void register(Class<? extends DomainEvent> domainEvent,
                         DomainEventHandler handler) {
        if (domainEvent == null) {
            return;
        }
        if (handlerMap.get(domainEvent.getName()) == null) {
            handlerMap.put(domainEvent.getName(), new ArrayList<DomainEventHandler>());
        }
        handlerMap.get(domainEvent.getName()).add(handler);
        //按照優先級進行事件處理器排序
        。。。
    }
}

文件上傳完畢事件的例子。

package domain.handler.event;

import domain.DomainRegistry;
import domain.StateDispatcher;
import domain.entity.meta.MetaActionEnums;
import domain.event.DomainEvent;
import domain.event.MetaEvent;
import domain.repository.meta.MetaRepository;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 * @Description:一個事件操作的處理器
 * 我們混合使用瞭Saga的兩種模式,外層事件交互;
 * 對於單個復雜的事件內部采取狀態流轉實現。
 */

@Component
public class MetaConfirmUploadedHandler implements DomainEventHandler {

    @Resource
    private MetaRepository metaRepository;

    public void handle(DomainEvent event) {
        //1.我們在當前的上下文中定義個ThreadLocal變量
        //用於存放事件影響的聚合根信息(線程共享)

        //2.當然如果有需要額外的信息,可以基於event所
        //攜帶的信息構造Specification從repository獲取
        // 代碼示例
        // metaRepository.queryBySpecification(SpecificationFactory.build(event));

        DomainEvent domainEvent = metaRepository.load();

        //此處是我們的邏輯
        。。。。

        //對於單個操作比較復雜的,可以使用狀態流轉進一步拆分
        domainEvent.setStatus(nextState);
        //在事件觸發之後,仍需要一個狀態跟蹤器來解決大事務問題
        //Saga編排式
        StateDispatcher.dispatch();
    }

    @PostConstruct
    public void autoRegister() {
        //此處可以更加細分,註冊在哪一類場景中,這也是事件驅動的強大、靈活之處。
        //避免瞭if...else判斷。我們可以有這樣的意識,一旦你的邏輯裡面充斥瞭大量
        //switch、if的時候來看看自己註冊的場景是否可以繼續細分
        DomainRegistry.getInstance().register(MetaEvent.class, this);
    }

    public String getAction() {
        return MetaActionEnums.CONFIRM_UPLOADED.name();
    }

    //適用於前後依賴的事件,通過優先級指定執行順序
    public Integer getPriority() {
        return PriorityEnums.FIRST.getValue();
    }
}

事件總線邏輯

package domain;

import domain.event.DomainEvent;
import domain.handler.event.DomainEventHandler;
import java.util.List;


public class DefaultDomainEventBus {

    public static void publish(DomainEvent event, String action,
                               EventCallback callback) {

        List<DomainEventHandler> handlers = DomainRegistry.getInstance().
            find(event.getClass().getName());
        handlers.stream().forEach(handler -> {
            if (action != null && action.equals(handler.getAction())) {
                Exception e = null;
                boolean result = true;
                try {
                    handler.handle(event);
                } catch (Exception ex) {
                    e = ex;
                    result = false;
                    //自定義異常處理
                    。。。
                } finally {
                    //write into event store
                    saveEvent(event);
                }

                //根據實際業務處理回調場景,DefaultEventCallback可以返回
                if (callback != null) {
                    callback.callback(event, action, result, e);       
                }
            }
        });
    }
}

五、自治服務和系統

DDD中強調限界上下文的自治特性,事實上,從更小的粒度來看,對象仍然需要具備自治的這四個特性,即:最小完備、自我履行、穩定空間、獨立進化。其中自我履行是重點,因為不強依賴外部所以穩定、因為穩定才可能獨立進化。這就是六邊形架構在DDD中較為普遍的原因。

六、結語

本文所講述的事件、Saga、CQRS的方案均可以單獨使用,可以應用到你的某個method、或者你的整個package。項目中我們並不一定要實踐一整套CQRS,隻要其中的某些思想解決瞭我們項目中的某個問題就足夠瞭。

也許你現在已經磨刀霍霍,準備在項目中實踐一下這些技巧。不過我們要明白“每一個硬幣都有兩面性”,我們不僅看到高擴展、解耦的、易編排的優點以外,仍然要明白其所帶來的問題。利弊分析以後再去決定如何實現才是正確的應對之道。

  • 這類編程模式有一定的學習曲線;
  • 基於消息傳遞的應用程序的復雜性;
  • 處理事件的演化有一定難度;
  • 刪除數據存在一定難度;
  • 查詢事件存儲庫非常有挑戰性。

不過我們還是要認識到在其適合的場景中,六邊形架構以及DDD戰術將加速我們的領域建模過程,也迫使我們從嚴格的通用語言角度來解釋一個領域,而不是一個個需求。任何更強調核心域而不是技術實現的方式都可以增加業務價值,並使我們獲得更大的競爭優勢。

以上就是詳解領域驅動設計之事件驅動與CQRS的詳細內容,更多關於領域驅動設計 事件驅動與CQRS的資料請關註WalkonNet其它相關文章!

推薦閱讀: