springboot整合rocketmq實現分佈式事務
1 執行流程
(1) 發送方向 MQ 服務端發送消息。
(2) MQ Server 將消息持久化成功之後,向發送方 ACK 確認消息已經發送成功,此時消息為半消息。
(3) 發送方開始執行本地事務邏輯。
(4) 發送方根據本地事務執行結果向 MQ Server 提交二次確認(Commit 或是 Rollback),MQ Server 收到Commit 狀態則將半消息標記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態則刪除半消息,訂閱方將不會接受該消息。
(5) 在斷網或者是應用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達 MQ Server,經過固定時間後MQ Server 將對該消息發起消息回查。
(6) 發送方收到消息回查後,需要檢查對應消息的本地事務執行的最終結果。
(7) 發送方根據檢查得到的本地事務的最終狀態再次提交二次確認,MQ Server 仍按照步驟4對半消息進行操作。
2 工程
2.1 pom
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.0.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.71</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-collections4</artifactId> <version>4.2</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.3.0.RELEASE</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
2.2 application.yml
rocketmq: name-server: 192.168.38.50:9876 producer: group: transcation-group
2.3 TransactionListenerImpl
@RocketMQTransactionListener(txProducerGroup = "transaction-producer-group") @Slf4j public class TransactionListenerImpl implements RocketMQLocalTransactionListener { private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>(); /** * 執行業務邏輯 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); try { System.out.println("用戶A賬戶減500元."); System.out.println("用戶B賬戶加500元."); STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { e.printStackTrace(); } STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK); return RocketMQLocalTransactionState.UNKNOWN; } /** * 回查 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); log.info("回查消息 -> transId ={} , state = {}", transId, STATE_MAP.get(transId)); return STATE_MAP.get(transId); } }
2.4 SpringTransactionProducer
@Component @Slf4j public class SpringTransactionProducer { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 發送消息 * */ public void sendMsg(String topic, String msg) { Message<String> message = MessageBuilder.withPayload(msg).build(); this.rocketMQTemplate.sendMessageInTransaction("transaction-producer-group", topic, message, null); log.info("發送成功"); } }
2.5 SpringTxConsumer
@Component @RocketMQMessageListener(topic = "pay_topic", consumerGroup = "transaction-consumer-group", selectorExpression = "*") @Slf4j public class SpringTxConsumer implements RocketMQListener<String> { @Override public void onMessage(String msg) { log.info("接收到消息 -> {}", msg); } }
2.6 ProducerController
@RestController @RequestMapping("/producer") public class ProducerController { @Autowired private SpringTransactionProducer springTransactionProducer; @GetMapping("/sendMsg") public String sendMsg() { springTransactionProducer.sendMsg("pay_topic", "用戶A賬戶減500元,用戶B賬戶加500元。"); return "發送成功"; } }
2.7 RocketApplication
@SpringBootApplication public class RocketApplication { public static void main(String[] args) { SpringApplication.run(RocketApplication.class); } }
3 測試
3.1 正常消費測試
描述: 正常啟動及可。
3.2 回查代碼測試
描述: 執行本地事務時添加異常,重啟測試,發現消費者沒有收到消息。
到此這篇關於springboot整合rocketmq實現分佈式事務的文章就介紹到這瞭,更多相關springboot 分佈式事務內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- SpringBoot整合RocketMQ實現消息發送和接收的詳細步驟
- Springboot 整合 RocketMQ 收發消息的配置過程
- RocketMQTemplate 註入失敗的解決
- Springboot詳解RocketMQ實現廣播消息流程
- 分佈式消息隊列RocketMQ概念詳解