RocketMQ特性Broker存儲事務消息實現
引言
在Broker中,事務消息的初始化是通過BrokerController.initialTransaction()
方法執行的。
private void initialTransaction() { this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class); if (null == this.transactionalMessageService) { this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore())); LOG.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName()); } this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class); if (null == this.transactionalMessageCheckListener) { this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener(); LOG.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName()); } this.transactionalMessageCheckListener.setBrokerController(this); this.transactionalMessageCheckService = new TransactionalMessageCheckService(this); }
這裡有三個核心的初始化變量
TransactionalMessageService
事務消息主要處理服務。默認實現類是TransactionalMessageServiceImpl
也可以自己定義事務消息處理實現類,通過ServiceProvider.loadClass()
方法進行加載。
TransactionalMessageService
類定義如下。內部屬性已加註釋標明。
public interface TransactionalMessageService { //用於保存Half事務消息 PutMessageResult prepareMessage(MessageExtBrokerInner messageInner); CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner); //刪除事務消息 boolean deletePrepareMessage(MessageExt messageExt); //提交事務消息 OperationResult commitMessage(EndTransactionRequestHeader requestHeader); //回滾事務消息 OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader); void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener); //打開事務消息 boolean open(); //關閉事務消息 void close(); }
transactionalMessageCheckListener
事務消息回查監聽器
transactionalMessageCheckService
事務消息回查服務,啟動一個線程定時檢查超時的Half消息是否需要回查。
處理事務消息
當初始化完成之後,Broker就可以處理事務消息瞭。
Broker存儲事務消息的是org.apache.rocketmq.broker.processor.SendMessageProcessor
,這和普通消息其實是一樣的。
但是有兩點針對事務消息的特殊處理:
第一處:
在org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage
中:
//獲取擴展字段的值,若是該值為true則為事務消息 String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); boolean sendTransactionPrepareMessage = false; if (Boolean.parseBoolean(traFlag) && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //判斷當前Broker配置是否支持事務消息 if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } sendTransactionPrepareMessage = true; }
if (sendTransactionPrepareMessage) { //保存Half信息 putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); }
第二處:
存儲事務消息前的預處理,對應方法是
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { //將原消息的topic保存在擴展字段中 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); //將原消息的QueueId保存在擴展字段中 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); //將原消息的SysFlag保存在擴展字段中 msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); //修改topic的值為RMQ_SYS_TRANS_HALF_TOPIC msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); //修改Queueid為0 msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }
完成上述步驟之後,調用DefaultMessageStole.putMessage()
方法將其保存到CommitLog
中。
CommitLog存儲成功之後,通過org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend()
方法對其進行處理。
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the consume queue case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: queueOffset = 0L; break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: default: break; }
這裡的邏輯是這樣的,當讀到的消息類型為事務消息時,設置當前消息的位點值為0,而不是設置真實的位點。這樣該位點就不會建立ConsumeQueue索引,也不會被消費。
以上就是RocketMQ特性Broker存儲事務消息實現的詳細內容,更多關於RocketMQ Broker存儲事務消息的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- 一文徹底掌握RocketMQ 的存儲模型
- 分佈式消息隊列RocketMQ概念詳解
- RocketMQ Namesrv架構工作原理詳解
- MQ的分類組成優缺點測試點入門教程
- RocketMQ broker 消息投遞流程處理PULL_MESSAGE請求解析