@TransactionalEventListener的使用和實現原理分析

一、問題描述

平時我們在完成某些數據的入庫後,發佈瞭一個事件,此時使用的是@EventListener,然後在這個事件中,又去對剛才入庫的數據進行查詢,從而完成後續的操作。

例如(數據入庫=>對入庫數據進行查詢審核),這時候會發現,查詢不到剛才入庫的數據,這是因為事務還沒提交完成,在同一個事務當中,查詢不到才存入的數據,那麼就引出瞭下面的解決方式。

為瞭解決上述問題,Spring為我們提供瞭兩種方式:

(1) @TransactionalEventListener註解。

(2) 事務同步管理器TransactionSynchronizationManager。

以便我們可以在事務提交後再觸發某一事件來進行其他操作。

二、使用場景

在項目當中,我們有時候需要在執行數據庫操作之後,發送消息或事件來異步調用其他組件執行相應的操作,例如:

1.數據完成導入之後,發佈審核事件,對入庫的數據進行審核。

2.用戶在完成註冊後發送激活碼。

3.配置修改後,發送更新配置的事件。

三、@TransactionalEventListener詳解

我們可以從命名上直接看出,它就是個EventListener,

在Spring4.2+,有一種叫做@TransactionEventListener的方式,能夠實現在控制事務的同時,完成對對事件的處理。

我們知道,Spring的事件監聽機制(發佈訂閱模型)實際上並不是異步的(默認情況下),而是同步的來將代碼進行解耦。而@TransactionEventListener仍是通過這種方式,但是加入瞭回調的方式來解決,這樣就能夠在事務進行Commited,Rollback…等時候才去進行Event的處理,來達到事務同步的目的。

// @since 4.2 註解的方式提供的相對較晚,其實API的方式在第一個版本就已經提供瞭。
// 值得註意的是,在這個註解上面有一個註解:`@EventListener`,所以表明其實這個註解也是個事件監聽器。 
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener //有類似於註解繼承的效果
public @interface TransactionalEventListener {
 // 這個註解取值有:BEFORE_COMMIT、AFTER_COMMIT、AFTER_ROLLBACK、AFTER_COMPLETION
 // 各個值都代表什麼意思表達什麼功能,非常清晰,下面解釋瞭對應的枚舉類~
 // 需要註意的是:AFTER_COMMIT + AFTER_COMPLETION是可以同時生效的
 // AFTER_ROLLBACK + AFTER_COMPLETION是可以同時生效的
 TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;
 // 表明若沒有事務的時候,對應的event是否需要執行,默認值為false表示,沒事務就不執行瞭。
 boolean fallbackExecution() default false;
 // 這裡巧妙的用到瞭@AliasFor的能力,放到瞭@EventListener身上
 // 註意:一般建議都需要指定此值,否則默認可以處理所有類型的事件,范圍太廣瞭。
 @AliasFor(annotation = EventListener.class, attribute = "classes")
 Class<?>[] value() default {};
 @AliasFor(annotation = EventListener.class, attribute = "classes")
 Class<?>[] classes() default {};
 
 String condition() default "";
}
public enum TransactionPhase {
   // 指定目標方法在事務commit之前執行
   BEFORE_COMMIT,
   // 指定目標方法在事務commit之後執行
    AFTER_COMMIT,
    // 指定目標方法在事務rollback之後執行
    AFTER_ROLLBACK,
   // 指定目標方法在事務完成時執行,這裡的完成是指無論事務是成功提交還是事務回滾瞭
   AFTER_COMPLETION
  }

四、代碼示例

這裡主要是為瞭講解如何使用@TransactionalEventListener,所以就不列出所有代碼瞭。

@Data
public class User {
 private long id;
 private String name;
 private Integer age;
}

業務實現:

@Service
@Slf4j
public class UserServiceImpl extends implements UserService {
 @Autowired
    UserMapper userMapper;
     
 @Autowired
    ApplicationEventPublisher eventPublisher;
 
 public void userRegister(User user){
  userMapper.insertUser(user);
  eventPublisher.publishEvent(new UserRegisterEvent(new Date()));
 }
}

自定義事件:

@Getter
@Setter
public class UserRegisterEvent extends ApplicationEvent {
    private Date registerDate;
    public UserRegisterEvent(Date registerDate) {
        super(registerDate);
        this.registerDate = registerDate;
    }
}

事件監聽器:

@Slf4j
@Component
public class UserListener {
    @Autowired
    UserService userService;
    @Async
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = UserRegisterEvent.class)
    public void onUserRegisterEvent(UserRegisterEvent event) {
        userService.sendActivationCode(event.getRegisterDate());
    }
}

五、實現原理

關於事務的實現原理,這裡其實是比較簡單的,Spring對事務監控的處理邏輯在TransactionSynchronization中,如下是該接口的聲明:

public interface TransactionSynchronization extends Flushable {
// 在當前事務掛起時執行
default void suspend() {
}
// 在當前事務重新加載時執行
default void resume() {
}
// 在當前數據刷新到數據庫時執行
default void flush() {
}
// 在當前事務commit之前執行
default void beforeCommit(boolean readOnly) {
}
// 在當前事務completion之前執行
default void beforeCompletion() {
}
// 在當前事務commit之後實質性
default void afterCommit() {
}
// 在當前事務completion之後執行
default void afterCompletion(int status) {
}
}

很明顯,這裡的TransactionSynchronization接口隻是抽象瞭一些行為,用於事務事件發生時觸發,這些行為在Spring事務中提供瞭內在支持,即在相應的事務事件時,其會獲取當前所有註冊的TransactionSynchronization對象,然後調用其相應的方法。

那麼這裡TransactionSynchronization對象的註冊點對於我們瞭解事務事件觸發有至關重要的作用瞭。

這裡我們首先回到事務標簽的解析處,在前面講解事務標簽解析時,我們講到Spring會註冊一個TransactionalEventListenerFactory類型的bean到Spring容器中,這裡關於標簽的解析讀者可以閱讀本人前面的文章Spring事務用法示例與實現原理。

這裡註冊的TransactionalEventListenerFactory實現瞭EventListenerFactory接口,這個接口的主要作用是先判斷目標方法是否是某個監聽器的類型,然後為目標方法生成一個監聽器,其會在某個bean初始化之後由Spring調用其方法用於生成監聽器。如下是該類的實現:

public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {
// 指定當前監聽器的順序
private int order = 50;
public void setOrder(int order) {
    this.order = order;
}
@Override
public int getOrder() {
    return this.order;
}
// 指定目標方法是否是所支持的監聽器的類型,這裡的判斷邏輯就是如果目標方法上包含有
// TransactionalEventListener註解,則說明其是一個事務事件監聽器
@Override
public boolean supportsMethod(Method method) {
    return (AnnotationUtils.findAnnotation(method, TransactionalEventListener.class) != null);
}
// 為目標方法生成一個事務事件監聽器,這裡ApplicationListenerMethodTransactionalAdapter實現瞭
// ApplicationEvent接口
@Override
public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
    return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
}
}

這裡關於事務事件監聽的邏輯其實已經比較清楚瞭。

ApplicationListenerMethodTransactionalAdapter本質上是實現瞭ApplicationListener接口的,也就是說,其是Spring的一個事件監聽器,這也就是為什麼進行事務處理時需要使用ApplicationEventPublisher.publish()方法發佈一下當前事務的事件。

ApplicationListenerMethodTransactionalAdapter在監聽到發佈的事件之後會生成一個TransactionSynchronization對象,並且將該對象註冊到當前事務邏輯中,如下是監聽事務事件的處理邏輯:

@Override
 public void onApplicationEvent(ApplicationEvent event) {
// 如果當前TransactionManager已經配置開啟事務事件監聽,
// 此時才會註冊TransactionSynchronization對象
if (TransactionSynchronizationManager.isSynchronizationActive()) {
    // 通過當前事務事件發佈的參數,創建一個TransactionSynchronization對象
    TransactionSynchronization transactionSynchronization = 
        createTransactionSynchronization(event);
    // 註冊TransactionSynchronization對象到TransactionManager中
    TransactionSynchronizationManager
        .registerSynchronization(transactionSynchronization);
} else if (this.annotation.fallbackExecution()) {
    // 如果當前TransactionManager沒有開啟事務事件處理,但是當前事務監聽方法中配置瞭
    // fallbackExecution屬性為true,說明其需要對當前事務事件進行監聽,無論其是否有事務
    if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK 
        && logger.isWarnEnabled()) {
        logger.warn("Processing " 
                    + event + " as a fallback execution on AFTER_ROLLBACK phase");
    }
    processEvent(event);
} else {
    // 走到這裡說明當前是不需要事務事件處理的,因而直接略過
    if (logger.isDebugEnabled()) {
        logger.debug("No transaction is active - skipping " + event);
    }
}
}

這裡需要說明的是,上述annotation屬性就是在事務監聽方法上解析的TransactionalEventListener註解中配置的屬性。

可以看到,對於事務事件的處理,這裡創建瞭一個TransactionSynchronization對象,其實主要的處理邏輯就是在返回的這個對象中,而createTransactionSynchronization()方法內部隻是創建瞭一個TransactionSynchronizationEventAdapter對象就返回瞭。

這裡我們直接看該對象的源碼:

private static class TransactionSynchronizationEventAdapter 
    extends TransactionSynchronizationAdapter {
    private final ApplicationListenerMethodAdapter listener;
    private final ApplicationEvent event;
    private final TransactionPhase phase;
public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter 
    listener, ApplicationEvent event, TransactionPhase phase) {
    this.listener = listener;
    this.event = event;
    this.phase = phase;
}
@Override
public int getOrder() {
    return this.listener.getOrder();
}
// 在目標方法配置的phase屬性為BEFORE_COMMIT時,處理before commit事件
public void beforeCommit(boolean readOnly) {
    if (this.phase == TransactionPhase.BEFORE_COMMIT) {
        processEvent();
    }
}
// 這裡對於after completion事件的處理,雖然分為瞭三個if分支,但是實際上都是執行的processEvent()
// 方法,因為after completion事件是事務事件中一定會執行的,因而這裡對於commit,
// rollback和completion事件都在當前方法中處理也是沒問題的
public void afterCompletion(int status) {
    if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
        processEvent();
    } else if (this.phase == TransactionPhase.AFTER_ROLLBACK 
               && status == STATUS_ROLLED_BACK) {
        processEvent();
    } else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
        processEvent();
    }
}
// 執行事務事件
protected void processEvent() {
    this.listener.processEvent(this.event);
}
}

可以看到,對於事務事件的處理,最終都是委托給瞭ApplicationListenerMethodAdapter.processEvent()方法進行的。如下是該方法的源碼:

 public void processEvent(ApplicationEvent event) {
// 處理事務事件的相關參數,這裡主要是判斷TransactionalEventListener註解中是否配置瞭value
// 或classes屬性,如果配置瞭,則將方法參數轉換為該指定類型傳給監聽的方法;如果沒有配置,則判斷
// 目標方法是ApplicationEvent類型還是PayloadApplicationEvent類型,是則轉換為該類型傳入
Object[] args = resolveArguments(event);
// 這裡主要是獲取TransactionalEventListener註解中的condition屬性,然後通過
// Spring expression language將其與目標類和方法進行匹配
if (shouldHandle(event, args)) {
    // 通過處理得到的參數借助於反射調用事務監聽方法
    Object result = doInvoke(args);
    if (result != null) {
        // 對方法的返回值進行處理
        handleResult(result);
    } else {
        logger.trace("No result object given - no result to handle");
    }
}
 }
 // 處理事務監聽方法的參數
protected Object[] resolveArguments(ApplicationEvent event) {
// 獲取發佈事務事件時傳入的參數類型
ResolvableType declaredEventType = getResolvableType(event);
if (declaredEventType == null) {
    return null;
}
// 如果事務監聽方法的參數個數為0,則直接返回
if (this.method.getParameterCount() == 0) {
    return new Object[0];
}
// 如果事務監聽方法的參數不為ApplicationEvent或PayloadApplicationEvent,則直接將發佈事務
// 事件時傳入的參數當做事務監聽方法的參數傳入。從這裡可以看出,如果事務監聽方法的參數不是
// ApplicationEvent或PayloadApplicationEvent類型,那麼其參數必須隻能有一個,並且這個
// 參數必須與發佈事務事件時傳入的參數一致
Class<?> eventClass = declaredEventType.getRawClass();
if ((eventClass == null || !ApplicationEvent.class.isAssignableFrom(eventClass)) &&
    event instanceof PayloadApplicationEvent) {
    return new Object[] {((PayloadApplicationEvent) event).getPayload()};
} else {
    // 如果參數類型為ApplicationEvent或PayloadApplicationEvent,則直接將其傳入事務事件方法
    return new Object[] {event};
}
 }
 // 判斷事務事件方法方法是否需要進行事務事件處理
private boolean shouldHandle(ApplicationEvent event, @Nullable Object[] args) {
if (args == null) {
    return false;
}
String condition = getCondition();
if (StringUtils.hasText(condition)) {
    Assert.notNull(this.evaluator, "EventExpressionEvaluator must no be null");
    EvaluationContext evaluationContext = this.evaluator.createEvaluationContext(
        event, this.targetClass, this.method, args, this.applicationContext);
    return this.evaluator.condition(condition, this.methodKey, evaluationContext);
}
return true;
 }
 // 對事務事件方法的返回值進行處理,這裡的處理方式主要是將其作為一個事件繼續發佈出去,這樣就可以在
// 一個統一的位置對事務事件的返回值進行處理
protected void handleResult(Object result) {
// 如果返回值是數組類型,則對數組元素一個一個進行發佈
if (result.getClass().isArray()) {
    Object[] events = ObjectUtils.toObjectArray(result);
    for (Object event : events) {
        publishEvent(event);
    }
} else if (result instanceof Collection<?>) {
    // 如果返回值是集合類型,則對集合進行遍歷,並且發佈集合中的每個元素
    Collection<?> events = (Collection<?>) result;
    for (Object event : events) {
        publishEvent(event);
    }
} else {
    // 如果返回值是一個對象,則直接將其進行發佈
    publishEvent(result);
}
}

六、總結

對於事務事件的處理,總結而言,就是為每個事務事件監聽方法創建瞭一個TransactionSynchronizationEventAdapter對象,通過該對象在發佈事務事件的時候,會在當前線程中註冊該對象,這樣就可以保證每個線程每個監聽器中隻會對應一個TransactionSynchronizationEventAdapter對象。

在Spring進行事務事件的時候會調用該對象對應的監聽方法,從而達到對事務事件進行監聽的目的。

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: