RocketMQ特性Broker存儲事務消息實現

引言

Broker中,事務消息的初始化是通過BrokerController.initialTransaction()方法執行的。

private void initialTransaction() {
    this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
    if (null == this.transactionalMessageService) {
        this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
        LOG.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
    }
    this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
    if (null == this.transactionalMessageCheckListener) {
        this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
        LOG.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
    }
    this.transactionalMessageCheckListener.setBrokerController(this);
    this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}

這裡有三個核心的初始化變量

TransactionalMessageService

事務消息主要處理服務。默認實現類是TransactionalMessageServiceImpl也可以自己定義事務消息處理實現類,通過ServiceProvider.loadClass()方法進行加載。

TransactionalMessageService類定義如下。內部屬性已加註釋標明。

public interface TransactionalMessageService {
    //用於保存Half事務消息
    PutMessageResult prepareMessage(MessageExtBrokerInner messageInner);
    CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner);
    //刪除事務消息
    boolean deletePrepareMessage(MessageExt messageExt);
    //提交事務消息
    OperationResult commitMessage(EndTransactionRequestHeader requestHeader);
    //回滾事務消息
    OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader);
    void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener);
    //打開事務消息
    boolean open();
    //關閉事務消息
    void close();
}

transactionalMessageCheckListener

事務消息回查監聽器

transactionalMessageCheckService

事務消息回查服務,啟動一個線程定時檢查超時的Half消息是否需要回查。

處理事務消息

當初始化完成之後,Broker就可以處理事務消息瞭。

Broker存儲事務消息的是org.apache.rocketmq.broker.processor.SendMessageProcessor,這和普通消息其實是一樣的。

但是有兩點針對事務消息的特殊處理

第一處:

org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage中:

//獲取擴展字段的值,若是該值為true則為事務消息
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
boolean sendTransactionPrepareMessage = false;
if (Boolean.parseBoolean(traFlag)
    && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { 
    //判斷當前Broker配置是否支持事務消息
    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(
            "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                + "] sending transaction message is forbidden");
        return response;
    }
    sendTransactionPrepareMessage = true;
}
if (sendTransactionPrepareMessage) {
    //保存Half信息
    putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
    putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}

第二處:

存儲事務消息前的預處理,對應方法是

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    //將原消息的topic保存在擴展字段中
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    //將原消息的QueueId保存在擴展字段中
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    //將原消息的SysFlag保存在擴展字段中
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    //修改topic的值為RMQ_SYS_TRANS_HALF_TOPIC
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    //修改Queueid為0
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

完成上述步驟之後,調用DefaultMessageStole.putMessage()方法將其保存到CommitLog中。

CommitLog存儲成功之後,通過org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend()方法對其進行處理。

final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
    // Prepared and Rollback message is not consumed, will not enter the consume queue
    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
        queueOffset = 0L;
        break;
    case MessageSysFlag.TRANSACTION_NOT_TYPE:
    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
    default:
        break;
}

這裡的邏輯是這樣的,當讀到的消息類型為事務消息時,設置當前消息的位點值為0,而不是設置真實的位點。這樣該位點就不會建立ConsumeQueue索引,也不會被消費

以上就是RocketMQ特性Broker存儲事務消息實現的詳細內容,更多關於RocketMQ Broker存儲事務消息的資料請關註WalkonNet其它相關文章!

推薦閱讀: