Seata AT模式TM處理流程圖文示例詳解
TM的作用
我們根據源碼解讀畫出瞭下圖,該圖示展現瞭TM在整個Seata AT模式的分佈式事務中所起的作用:
從上圖中可以看出,TM主要有兩個作用:
開啟分佈式事務,以拿到XID作為分佈式事務開啟的標識;一定是從TC拿到XID,不是從調用方傳遞過來的XID;
根據所有RM的處理結果來決定是提交分佈式事務還是回滾分佈式事務;
轉換成偽代碼如下:
try{ // 開啟分佈式事務 String xid = TM.beginGlobalTransaction(); // 執行業務邏輯,包含遠程rpc調用 RM1.execute(xid); -------RPC調用--------> RM2.execute(xid); // 提交分佈式事務 TM.commitGlobalTransaction(xid); }catch(Exception e){ // 回滾分佈式事務 TM.rollbackGlobalTransaction(xid); }finally{ // 恢復現場 }
源碼分解
在之前講述圖解Seata AT模式啟動流程中,我們已經知道瞭TM的處理流程是通過掃描註解@GlobalTransactional
來完成對業務邏輯的攔截的。
主要完成這個攔截功能的類是io.seata.spring.annotation.GlobalTransactionalInterceptor
,在這個類中,我們主要看invoke方法:
@Override public Object invoke(final MethodInvocation methodInvocation) throws Throwable { // 拿到被攔截的目標類 Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null; // 獲取目標方法 Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); // 判斷這個方法是不是Object類中的toString()、equals()等方法 if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) { // 通過被攔截的方法找出對應的註解GlobalTransactional和GlobalLock final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class); final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class); // 判斷是否開啟分佈式事務,或者TM是否被降級處理,默認是沒有被降級的 boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes); // 分佈式事務可以正常使用 if (!localDisable) { // 如果註解GlobalTransactional存在,那麼直接把裡面的配置解析成AspectTransactional if (globalTransactionalAnnotation != null || this.aspectTransactional != null) { AspectTransactional transactional; if (globalTransactionalAnnotation != null) { transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(), globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(), globalTransactionalAnnotation.rollbackForClassName(), globalTransactionalAnnotation.noRollbackFor(), globalTransactionalAnnotation.noRollbackForClassName(), globalTransactionalAnnotation.propagation(), globalTransactionalAnnotation.lockRetryInterval(), globalTransactionalAnnotation.lockRetryTimes()); } else { transactional = this.aspectTransactional; } // 調用handleGlobalTransaction處理 return handleGlobalTransaction(methodInvocation, transactional); } else if (globalLockAnnotation != null) { // 調用handleGlobalLock處理 return handleGlobalLock(methodInvocation, globalLockAnnotation); } } } // 如果是Object類中的方法的話,直接調用,不作攔截 return methodInvocation.proceed(); }
以上代碼就做瞭下面幾件事情:
判斷攔截的方法是否是一個合理的方法,像Object類中的toString()、equals()等方法是不應該被攔截的;
攔截的方法合理的話,那麼要確認是否允許開啟分佈式事務;
- 如果配置瞭
service.disableGlobalTransaction=true
,那麼說明不能開啟分佈式事務; - 另一個就是配置瞭允許TM降級
client.tm.degradeCheck=true
(默認是false),那麼就會開啟定時任務不斷地與TC通信,如果建立通信失敗的次數超過瞭閾值client.tm.degradeCheckAllowTimes
,那麼就會觸發TM降級,此時無法開啟新的分佈式事務,降級前開啟的分佈式事務沒有影響;
可以正常地準備分佈式事務瞭,那麼開始收集註解的相關信息;
- 如果是GlobalTransactional註解,交給
handleGlobalTransaction()
處理; - 如果是GlobalLock註解,交給
handleGlobalLock()
處理;
需要註意的是,我們從源碼當中瞭解到,原來TM還可以做一個降級的配置。降級後的TM是不會開啟新的分佈式事務的,這個時候隻能保證本地事務的正常進行,隻有當TM與TC通信恢復後,降級後的TM會立馬恢復,可以重新開啟新的分佈式事務。
在TM降級期間的需要業務側自行處理因降級導致的數據臟寫和臟讀問題。
handleGlobalTransaction
處理被@GlobalTransactional標註的業務邏輯
Object handleGlobalTransaction(final MethodInvocation methodInvocation, final AspectTransactional aspectTransactional) throws Throwable { // 默認succeed=true boolean succeed = true; try { // 執行分佈式事務處理邏輯 // 詳細內容後面介紹 return transactionalTemplate.execute(new TransactionalExecutor() { // 執行業務邏輯 @Override public Object execute() throws Throwable { return methodInvocation.proceed(); } // 分佈式事務名稱,沒有指定的話,就用【方法名+參數類型】命名 public String name() { String name = aspectTransactional.getName(); if (!StringUtils.isNullOrEmpty(name)) { return name; } return formatMethod(methodInvocation.getMethod()); } // 分佈式事務信息,其實就是@GlobalTransactional註解裡面拿到的配置 @Override public TransactionInfo getTransactionInfo() { // reset the value of timeout int timeout = aspectTransactional.getTimeoutMills(); if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) { timeout = defaultGlobalTransactionTimeout; } TransactionInfo transactionInfo = new TransactionInfo(); transactionInfo.setTimeOut(timeout); transactionInfo.setName(name()); transactionInfo.setPropagation(aspectTransactional.getPropagation()); transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval()); transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes()); Set<RollbackRule> rollbackRules = new LinkedHashSet<>(); for (Class<?> rbRule : aspectTransactional.getRollbackFor()) { rollbackRules.add(new RollbackRule(rbRule)); } for (String rbRule : aspectTransactional.getRollbackForClassName()) { rollbackRules.add(new RollbackRule(rbRule)); } for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) { rollbackRules.add(new NoRollbackRule(rbRule)); } for (String rbRule : aspectTransactional.getNoRollbackForClassName()) { rollbackRules.add(new NoRollbackRule(rbRule)); } transactionInfo.setRollbackRules(rollbackRules); return transactionInfo; } }); } catch (TransactionalExecutor.ExecutionException e) { // 發生異常 TransactionalExecutor.Code code = e.getCode(); switch (code) { // 已經回滾過瞭 case RollbackDone: throw e.getOriginalException(); // 開啟分佈式事務失敗 case BeginFailure: // 分佈式事務失敗 succeed = false; // 調用失敗處理邏輯 failureHandler.onBeginFailure(e.getTransaction(), e.getCause()); throw e.getCause(); // 分佈式事務提交失敗 case CommitFailure: // 分佈式事務失敗 succeed = false; // 調用失敗處理邏輯 failureHandler.onCommitFailure(e.getTransaction(), e.getCause()); throw e.getCause(); // 回滾失敗 case RollbackFailure: // 調用失敗處理邏輯 failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException()); throw e.getOriginalException(); // 回滾重試 case RollbackRetrying: // 調用失敗處理器中的回滾重試回調邏輯 failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException()); throw e.getOriginalException(); // 啥也不是,直接拋異常 default: throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code)); } } finally { // 如果允許TM降級,那麼這次處理完畢後,說明與TC恢復通信,可以解除降級 if (degradeCheck) { EVENT_BUS.post(new DegradeCheckEvent(succeed)); } } }
其實上面就一行代碼,使用的是模版模式,所以其實真正的重點還是應該進到模版裡面去看看具體是怎麼處理的。
public Object execute(TransactionalExecutor business) throws Throwable { // 1. 拿到整理好的@GlobalTransactional註解裡面的配置信息 TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } // 1.1 獲取當前的分佈式事務,如果為null的話,說明這是分佈式事務的發起者;如果不為null,說明這是分佈式事務的參與者 GlobalTransaction tx = GlobalTransactionContext.getCurrent(); // 1.2 獲取分佈式事務的傳播級別,其實就是按照spring的傳播級別來一套,區別就是spring事務是本地事務,這是分佈式事務,原理都一樣 Propagation propagation = txInfo.getPropagation(); SuspendedResourcesHolder suspendedResourcesHolder = null; try { // 這個switch裡面全都是處理分佈式事務傳播級別的 switch (propagation) { // 如果不支持分佈式事務,如果當前存在事務,那麼先掛起當前的分佈式事務,再執行業務邏輯 case NOT_SUPPORTED: // 分佈式事務存在,先掛起 if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); } // 執行業務邏輯 return business.execute(); // 如果是每次都要創建一個新的分佈式事務,先把當前存在的分佈式事務掛起,然後創建一個新分佈式事務 case REQUIRES_NEW: // 如果分佈式事務存在,先掛起當前分佈式事務,再創建一個新的分佈式事務 if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); tx = GlobalTransactionContext.createNew(); } // 之所以用break,是為瞭後面的代碼和其他的傳播級別一起共用,業務邏輯肯定還是要執行的 break; // 如果支持分佈式事務,如果當前不存在分佈式事務,那麼直接執行業務邏輯,否則以分佈式事務的方式執行業務邏輯 case SUPPORTS: // 如果不存在分佈式事務,直接執行業務邏輯 if (notExistingTransaction(tx)) { return business.execute(); } // 否則,以分佈式事務的方式執行業務邏輯 break; // 如果有分佈式事務,就在當前分佈式事務下執行業務邏輯,否則創建一個新的分佈式事務執行業務邏輯 case REQUIRED: // If current transaction is existing, execute with current transaction, // else continue and execute with new transaction. break; // 如果不允許有分佈式事務,那麼一旦發現存在分佈式事務,直接拋異常;隻有不存在分佈式事務的時候才正常執行 case NEVER: // 存在分佈式事務,拋異常 if (existingTransaction(tx)) { throw new TransactionException( String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s" , tx.getXid())); } else { // 不存在分佈式事務,執行業務邏輯 return business.execute(); } // 一定要有分佈式事務,分佈式事務不存在的話,拋異常; case MANDATORY: // 不存在分佈式事務,拋異常 if (notExistingTransaction(tx)) { throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'"); } // Continue and execute with current transaction. break; default: throw new TransactionException("Not Supported Propagation:" + propagation); } // 上面的傳播級別的邏輯處理完畢,下面就是公共的處理邏輯 // 1.3 如果當前分佈式事務沒有的話,那麼我們就要創建新的分佈式事務,此時我們就是分佈式事務的發起者,也就是TM本身,否則不能稱之為`TM` if (tx == null) { tx = GlobalTransactionContext.createNew(); } // 開始準備幹活的條件 // 把我們這個方法的全局鎖配置放進當前線程中,並且把線程中已有的全局鎖的配置取出來 // 我們在幹完自己的活後,會把這個取出來的配置放回去的 GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo); try { // 2. 如果我們是分佈式事務的發起者的話,那麼我們會和TC通信,並且拿到一個XID;如果我們不是分佈式事務的發起者的話,那麼這一步啥也不幹 // 這個XID可以從RootContext中獲取 beginTransaction(txInfo, tx); Object rs; try { // 執行業務邏輯 rs = business.execute(); } catch (Throwable ex) { // 3. 發生任何異常,我們準備啟動回滾機制 completeTransactionAfterThrowing(txInfo, tx, ex); throw ex; } // 4. 一切順利,通知提交分佈式事務 commitTransaction(tx); return rs; } finally { //5. 恢復現場,把之前的配置放回去 resumeGlobalLockConfig(previousConfig); // 觸發回調 triggerAfterCompletion(); // 清理工作 cleanUp(); } } finally { // 恢復之前掛起的事務 if (suspendedResourcesHolder != null) { tx.resume(suspendedResourcesHolder); } } }
根據上面的源碼分析,execute方法做瞭以下幾件事情:
處理分佈式事務的傳播級別,參照spring的事務傳播級別;
如果是分佈式事務的發起者,那麼需要與TC通信,並獲取XID開啟分佈式事務;
如果業務邏輯處理出現異常,說明分佈式事務需要準備回滾;如果沒有任何異常,那麼準備發起分佈式事務提交
分佈式事務處理完畢後,準備恢復現場
分佈式事務開啟:
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { try { // 回調,默認是空回調 triggerBeforeBegin(); // 發起分佈式事務 tx.begin(txInfo.getTimeOut(), txInfo.getName()); // 回調,默認是空回調 triggerAfterBegin(); } catch (TransactionException txe) { throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); } } @Override public void begin(int timeout, String name) throws TransactionException { // 如果不是分佈式事務發起者,那麼啥也不做 if (role != GlobalTransactionRole.Launcher) { assertXIDNotNull(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid); } return; } assertXIDNull(); // 如果當前已經處於分佈式事務當中,那麼拋異常,因為事務發起者不可能事先處於別的分佈式事務當中 String currentXid = RootContext.getXID(); if (currentXid != null) { throw new IllegalStateException("Global transaction already exists," + " can't begin a new global transaction, currentXid = " + currentXid); } // 發起分佈式事務 xid = transactionManager.begin(null, null, name, timeout); status = GlobalStatus.Begin; // 把xid綁定到當前線程中 RootContext.bind(xid); if (LOGGER.isInfoEnabled()) { LOGGER.info("Begin new global transaction [{}]", xid); } } @Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { // 發起分佈式事務開啟的請求 GlobalBeginRequest request = new GlobalBeginRequest(); request.setTransactionName(name); request.setTimeout(timeout); GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request); if (response.getResultCode() == ResultCode.Failed) { throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg()); } // 獲取拿到的xid,表示分佈式事務開啟成功 return response.getXid(); }
1.分佈式事務的發起其實就是TM向TC請求,獲取XID,並把XID綁定到當前線程中
異常回滾:
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException { // 如果異常類型和指定的類型一致,那麼發起回滾;不一致還是要提交分佈式事務 if (txInfo != null && txInfo.rollbackOn(originalException)) { try { // 回滾分佈式事務 rollbackTransaction(tx, originalException); } catch (TransactionException txe) { // 回滾失敗拋異常 throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, originalException); } } else { // 不是指定的異常類型,還是繼續提交分佈式事務 commitTransaction(tx); } } private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException { // 執行回調,默認空回調 triggerBeforeRollback(); // 回滾 tx.rollback(); // 執行回調,默認空回調 triggerAfterRollback(); // 就算回滾沒問題,照樣拋異常,目的應該是告知開發人員此處產生瞭回滾 throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus()) ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException); } @Override public void rollback() throws TransactionException { // 如果是分佈式事務參與者,那麼啥也不做,RM的回滾不在這裡,這是TM的回滾 if (role == GlobalTransactionRole.Participant) { // Participant has no responsibility of rollback if (LOGGER.isDebugEnabled()) { LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid); } return; } assertXIDNotNull(); // 下面就是一個循環重試發起分佈式事務回滾 int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT; try { while (retry > 0) { try { retry--; // 發起回滾的核心代碼 status = transactionManager.rollback(xid); // 回滾成功跳出循環 break; } catch (Throwable ex) { LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage()); // 重試失敗次數完成才會跳出循環 if (retry == 0) { throw new TransactionException("Failed to report global rollback", ex); } } } } finally { // 如果回滾的分佈式事務就是當前的分佈式事務,那麼從當前線程中解綁XID if (xid.equals(RootContext.getXID())) { suspend(); } } if (LOGGER.isInfoEnabled()) { LOGGER.info("[{}] rollback status: {}", xid, status); } } @Override public GlobalStatus rollback(String xid) throws TransactionException { // 準備發起請求給TC,回滾指定的分佈式事務 GlobalRollbackRequest globalRollback = new GlobalRollbackRequest(); globalRollback.setXid(xid); GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback); return response.getGlobalStatus(); }
分佈式事務回滾邏輯中有以下幾個點:
觸發回滾需要產生的異常和註解中指定的異常一致才會發起回滾,否則還是繼續提交;
回滾是可以設置重試次數的,隻有重試都失敗瞭,才會導致回滾失敗,否則隻要有一次成功,那麼回滾就成功;
TM發起的回滾其實隻是和TC發起一次分佈式事務回滾的通信,並沒有數據庫的操作;
分佈式事務提交:
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { try { // 回調,默認空回調 triggerBeforeCommit(); // 分佈式事務提交 tx.commit(); // 回調,默認空回調 triggerAfterCommit(); } catch (TransactionException txe) { // 4.1 提交出異常,提交失敗 throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure); } } @Override public void commit() throws TransactionException { // 如果隻是分佈式事務參與者,那麼啥也不幹,TM隻能有一個,哈哈 if (role == GlobalTransactionRole.Participant) { // Participant has no responsibility of committing if (LOGGER.isDebugEnabled()) { LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid); } return; } assertXIDNotNull(); // 分佈式事務提交也是有重試的 int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT; try { while (retry > 0) { try { retry--; // 發起分佈式事務提交 status = transactionManager.commit(xid); // 提交成功跳出循環 break; } catch (Throwable ex) { LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage()); // 重試結束,依然失敗就拋異常 if (retry == 0) { throw new TransactionException("Failed to report global commit", ex); } } } } finally { // 如果提交的分佈式事務就是當前事務,那麼需要清理當前線程中的XID if (xid.equals(RootContext.getXID())) { suspend(); } } if (LOGGER.isInfoEnabled()) { LOGGER.info("[{}] commit status: {}", xid, status); } } @Override public GlobalStatus commit(String xid) throws TransactionException { // 發起分佈式事務提交請求,這是與TC通信 GlobalCommitRequest globalCommit = new GlobalCommitRequest(); globalCommit.setXid(xid); GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit); return response.getGlobalStatus(); }
分佈式事務回滾也是可以設置重試次數的;
分佈式事務提交其實也是TM與TC進行通信,告知TC這個XID對應的分佈式事務可以提交瞭;
handleGlobalLock
private Object handleGlobalLock(final MethodInvocation methodInvocation, final GlobalLock globalLockAnno) throws Throwable { // 模版模式實現全局鎖 return globalLockTemplate.execute(new GlobalLockExecutor() { // 執行業務邏輯 @Override public Object execute() throws Throwable { return methodInvocation.proceed(); } // 獲取全局鎖配置 // 一個是全局鎖重試間隔時間 // 一個是全局鎖重試次數 @Override public GlobalLockConfig getGlobalLockConfig() { GlobalLockConfig config = new GlobalLockConfig(); config.setLockRetryInterval(globalLockAnno.lockRetryInterval()); config.setLockRetryTimes(globalLockAnno.lockRetryTimes()); return config; } }); } public Object execute(GlobalLockExecutor executor) throws Throwable { // 判斷當前是否有全局鎖 boolean alreadyInGlobalLock = RootContext.requireGlobalLock(); // 如果沒有全局鎖,那麼在當前線程中設置需要全局鎖標識 if (!alreadyInGlobalLock) { RootContext.bindGlobalLockFlag(); } // 把全局鎖的配置設置進當前線程,並把線程中已有的全局鎖配置拿出來,後面恢復現場需要用 GlobalLockConfig myConfig = executor.getGlobalLockConfig(); GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig); try { // 執行業務邏輯 return executor.execute(); } finally { // 清除線程中的全局鎖標記 if (!alreadyInGlobalLock) { RootContext.unbindGlobalLockFlag(); } // 恢復現場 if (previousConfig != null) { GlobalLockConfigHolder.setAndReturnPrevious(previousConfig); } else { GlobalLockConfigHolder.remove(); } } }
其實真正的全局鎖邏輯並不在TM當中,TM隻是負責根據@GlobalLock註解把相應的全局鎖標記綁定到線程中,真正負責處理全局鎖的還是底層的RM;
小結
至此我們已經把TM的所有工作都解讀完畢瞭,下面來做一個小結:
1.TM主要針對兩個註解GlobalTransactional和GlobalLock來實現處理邏輯,原理都是基於Aop和反射;處理邏輯裡面涉及到TM降級的一個情況,這是一個值得註意的點
2.處理GlobalTransactional主要分兩步:
- 開啟分佈式事務,需要與TC交互,存在rpc開銷;
- 根據RM的處理情況決定是提交分佈式事務還是回滾分佈式事務,也是需要與TC交互,存在rpc開銷;在提交或回滾分佈式事務中,還可以設置重試次數;
3.處理GlobalLock,主要就是在當前線程中設置一個需要檢查全局鎖的標記,讓底層的RM去做全局鎖的檢測動作;
以上就是Seata AT模式TM處理流程圖文示例詳解的詳細內容,更多關於Seata AT模式TM處理流程的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- 解決@Transaction註解導致動態切換更改數據庫失效問題
- 五分鐘教你手寫 SpringBoot 本地事務管理實現
- Spring事務捕獲異常後依舊回滾的解決
- Spring超詳細講解事務
- Seata AT模式如何實現行鎖詳解