SpringCloud+RocketMQ實現分佈式事務的實踐
隨著互聯網公司的微服務越來越多,分佈式事務已經成為瞭我們的經常使用的。所以我們來一步一步的實現基於RocketMQ的分佈式事務。接下來,我們將要做的主題寫出來。
- RocketMQ的分佈式事務結構和說明
- 搭建RocketMQ步驟
- 事務場景,然後準備工程,運行代碼
一、RocketMQ的分佈式事務結構和說明
我們通過下圖來瞭解一下RocketMQ實現分佈式事務的結構。采用半消息機制實現分佈式事務,半消息顧名思義,就是發送方將消息發送到MQ中的Broker端,這個消息被標記為“暫不投遞”狀態,這個時間訂閱方是不能收到這個消息的,當發送方將提交瞭Commit後,這個消息才可以被訂閱方收到。如果發送方提交瞭Rollback,那麼訂閱方就不會收到以該消息。
RocketMQ采用的是最終一致性事務,因為它使用瞭半消息機制,訂閱方可能收不到消息,但是最終狀態是能保持一致的。
我們在講這個流程前,我們需要先看下圖的紅色部分,當發送方發給MQ的Commit/Rollback沒有收到的時候,這個時候需要啟用一個線程,從本地事務庫裡邊查找到當前的狀態,根據當前的狀態來決定是否重新發送Commit/Rollback,相當於重試。
接下來,我們說一下整個執行的過程。
發送方將半消息發送到MQ中,成功後MQ將消息保持好後將結果同步給發送方,即半消息發送成功。當MQ告訴發送方成功後,我們需要記錄下發送到MQ的相關的事情,比如發送的消息內容,發送時間,發送的相關ID,比如事務ID,都要做好相應記錄。當本地事務庫,也可以理解為日志庫,我們可以針對發生的問題進行追溯。本地庫記錄好事務後,發送方就可以將Commit/Rollback提交到MQ瞭。當MQ收到瞭Commit命令後,這個時候就會將該條消息發送給訂閱方。收到瞭Rollback後,那該條消息不會投遞給訂閱方,消息保存3天後刪除。
整個過程就是這些,大傢如果覺得有遺漏的,可以告訴我。
二、搭建RocketMQ
我使用的是windos10,所以需要下載windows版本。
進入到RocketMQ的官方地址:http://rocketmq.apache.org/release_notes/release-notes-4.3.0/
下載二進制文件壓縮包。如下圖:
先解壓,然後進行環境變量配置,打開菜單,直接輸入環境變量。
變量名:ROCKETMQ_HOME
值:解壓的release的路徑,如D:\rocketmq-all-4.3.0-bin-release
然後打開CMD命令,進入到解壓路徑中的bin目標,進行nameserver啟動。
start mqnameserv.cmd
啟動後的效果為:
再進行broker的啟動,啟動需要連到的nameserver為127.0.0.1:9876,打開自動創建Topic,這個時候當用到某個Topic沒有的情況下會自動創建一個。
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
這個時候Windows的rocketmq就下載好並且啟動成功瞭。
三、事務場景,然後準備工程,運行代碼
因為我們使用的RocketMq是使用的分佈式事務是最終一致性的,柔性的,所以我們使用的場景也要考慮到。我們用的場景就是下訂單充話費,用戶支付瞭訂單,那麼直接給用戶的進行充值。購買的訂單不會立馬充值到手機的,需要等一會才到賬,這就是最終一致性。
我們使用的如下代碼版本號,SpringCloud的版本號要和SpringBoot保持一致,否則會出現類找不到的情況。
SpringCloud(Finchley.RELEASE) + SpringBoot2.0.4.RELEASE + RocketMQ4.3 +MySQL + lombok(插件)
我們使用SpringCloud的幾個組件:
Euerka Server :用於提供服務註冊的能力和發現
Euerka Client : 用於進行服務註冊
Feign:用於服務間的調用
Config :用於進行配置
接下來,我們創建需要進行配置的表。
DROP TABLE IF EXISTS `spring_cloud_config`; CREATE TABLE `spring_cloud_config` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `akey` varchar(30) DEFAULT NULL, `avalue` varchar(128) DEFAULT NULL, `application` varchar(30) DEFAULT NULL, `aprofile` varchar(30) DEFAULT NULL, `label` varchar(30) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; INSERT INTO `spring_cloud_config` (`id`, `akey`, `avalue`, `application`, `aprofile`, `label`) VALUES (2,'name_server','127.0.0.1:9876','product-service','dev','dev'), (3,'name_server','127.0.0.1:9876','order-service','dev','dev'), (4,'order_topic','order_topic','order-service','dev','dev'), (5,'order_topic','order_topic','product-service','dev','dev');
我們構建瞭4個模塊,eureka模塊用於服務註冊和發現,springcloud-config將數據庫創建的配置加載供其他工程使用。charge-order-service用於下充值訂單,phone-charge-service用於給手機進行充值業務。
主要的流程為,通過charge-order-service產生瞭訂單,然後將訂單和數量到phone-charge-service進行二次確認看是否還有足夠的數量可以使用,如果數量不夠的話直接將事務進行rollback,這樣消息就不會到消費端。如果下過去的訂單號已經充值過瞭,那麼該消息將會被直接丟掉,這也是消息端的冪等設計。
接下來,我們看一下生產者以及事務的核心代碼。主要用於連接RocketMQ, 執行本地事務,在本地事務中進行二次確認,根據結果進行Commit、Rollback。當連接RocketMQ出現問題的時候可能會收到UNKNOWN,這個時候會調用checkLocalTransaction()方法,用於檢查是否將消息發送給RocketMQ瞭。
package com.hqs.chargeorderservice.mqservice; import com.alibaba.fastjson.JSONObject; import com.hqs.chargeorderservice.config.Jms; import com.hqs.chargeorderservice.service.ProduceOrderService; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.concurrent.*; /** * * @Description: 分佈式事務RocketMQ 生產者 */ @Slf4j @Component public class TransactionProducer { /** * 需要自定義事務監聽器 用於 事務的二次確認 和 事務回查 */ private TransactionListener transactionListener ; /** * 這裡的生產者和之前的不一樣 */ private TransactionMQProducer producer = null; /** * 官方建議自定義線程 給線程取自定義名稱 發現問題更好排查 */ private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); public TransactionProducer(@Autowired Jms jms, @Autowired ProduceOrderService produceOrderService) { transactionListener = new TransactionListenerImpl(produceOrderService); // 初始化 事務生產者 producer = new TransactionMQProducer(jms.getOrderTopic()); // 添加服務器地址 producer.setNamesrvAddr(jms.getNameServer()); // 添加事務監聽器 producer.setTransactionListener(transactionListener); // 添加自定義線程池 producer.setExecutorService(executorService); start(); } public TransactionMQProducer getProducer() { return this.producer; } /** * 對象在使用之前必須要調用一次,隻能初始化一次 */ public void start() { try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 一般在應用上下文,使用上下文監聽器,進行關閉 */ public void shutdown() { this.producer.shutdown(); } } /** * @Description: 自定義事務監聽器 */ @Slf4j class TransactionListenerImpl implements TransactionListener { @Autowired private ProduceOrderService produceOrderService ; public TransactionListenerImpl( ProduceOrderService produceOrderService) { this.produceOrderService = produceOrderService; } @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { log.info("=========本地事務開始執行============="); String message = new String(msg.getBody()); JSONObject jsonObject = JSONObject.parseObject(message); Integer productId = jsonObject.getInteger("productId"); Integer total = jsonObject.getInteger("total"); Integer tradeNo = jsonObject.getInteger("tradeNo"); int userId = Integer.parseInt(arg.toString()); //模擬執行本地事務begin======= /** * 本地事務執行會有三種可能 * 1、commit 成功 * 2、Rollback 失敗 * 3、網絡等原因服務宕機收不到返回結果 */ log.info("本地事務執行參數,用戶id={},商品ID={},銷售庫存={}",userId,productId,total); int result = produceOrderService.save(userId, productId, total, tradeNo); //模擬執行本地事務end======== //TODO 實際開發下面不需要我們手動返回,而是根據本地事務執行結果自動返回 //1、二次確認消息,然後消費者可以消費 if (result == 0) { return LocalTransactionState.COMMIT_MESSAGE; } //2、回滾消息,Broker端會刪除半消息 if (result == 1) { return LocalTransactionState.ROLLBACK_MESSAGE; } //3、Broker端會進行回查消息 if (result == 2) { return LocalTransactionState.UNKNOW; } return LocalTransactionState.COMMIT_MESSAGE; } /** * 隻有上面接口返回 LocalTransactionState.UNKNOW 才會調用查接口被調用 * * @param msg 消息 * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { log.info("==========回查接口========="); String key = msg.getKeys(); //TODO 1、必須根據key先去檢查本地事務消息是否完成。 /** * 因為有種情況就是:上面本地事務執行成功瞭,但是return LocalTransactionState.COMMIT_MESSAG的時候 * 服務掛瞭,那麼最終 Brock還未收到消息的二次確定,還是個半消息 ,所以當重新啟動的時候還是回調這個回調接口。 * 如果不先查詢上面本地事務的執行情況 直接在執行本地事務,那麼就相當於成功執行瞭兩次本地事務瞭。 */ // TODO 2、這裡返回要麼commit 要麼rollback。沒有必要在返回 UNKNOW return LocalTransactionState.COMMIT_MESSAGE; } }
我們啟動一下程序。我們登錄localhost:7001註冊中心,看到3個服務都成功註冊瞭。
然後我們進行下訂單調用接口,userId用戶編碼,productId產品號,total購買量,tradeNo交易訂單號
http://localhost:9001/api/v1/order/chargeOrder?userId=1&productId=1&total=1&tradeNo=4
charge-order-service顯示出來本地執行事務的參數。
phone-charge-service消費事務執行的日志。
當我們在創建charge-order-service和phone-charge-service的時候一定要註意,將工程裡邊的application.properties變為bootstrap.yml,要不然程序啟動的時候會從8888找config的配置內容。因為SpringCloud裡面有個“啟動上下文”,主要是用於加載遠端的配置,也就是加載ConfigServer裡面的配置,默認加載順序為:加載bootstrap.*裡面的配置 –> 鏈接configserver,加載遠程配置 –> 加載application.*裡面的配置。
好瞭,代碼也放到git地址瞭,https://github.com/stonehqs/springcloud-rocketmq-transaction,
到此這篇關於SpringCloud+RocketMQ實現分佈式事務的實踐的文章就介紹到這瞭,更多相關SpringCloud RocketMQ分佈式事務內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- docker安裝RocketMQ的實現步驟
- 分佈式消息隊列RocketMQ概念詳解
- 使用RocketMQTemplate發送帶tags的消息
- Springboot 整合 RocketMQ 收發消息的配置過程
- RocketMQ消息隊列實現隨機消息發送當做七夕禮物