RocketMQ ConsumeQueue與IndexFile實時更新機制源碼解析

前言

前面我們介紹瞭消息是如何存儲的,消息是如何刷盤的,講的都是CommitLog是如何存儲和刷盤的。雖然CommitLog順序存儲著所有消息,但是CommitLog中的消息並沒有區分topic、keys等,如果需要消費某個topic的消息或者查找某一條消息隻能遍歷CommitLog文件去查找,性能相當低下,因此有瞭ConsumeLog和IndexFile兩個文件類型,這兩個文件的作用主要是提升消息消費和查詢的性能。

ConsumeQueue詳解

為瞭提高消費消息查詢性能,Broker會為每個Topic在~/store/consumequeue中創建一個Topic名稱的目錄,並再為該Topic創建目錄名為queueId的目錄,每個目錄存放著若幹consumequeue文件,consumequeue屬於commitLog的索引文件,可以根據consumequeue定位到具體的消息,consumequeue存儲文件見下圖

consumequeue文件名由20位數字構成,表示當前文件的第一個索引條目的起始偏移量。與commitLog文件名不同的是,consumequeue後續文件名是固定的,由於consumequeue文件大小是固定不變的。

consumequeue文件大小由mappedFileSizeConsumeQueue配置控制,它的默認大小是30W * ConsumeQueue.CQ_STORE_UNIT_SIZE(20),也就是600W字節大小,ConsumeQueue.CQ_STORE_UNIT_SIZE是consumequeue每個索引條目的大小,每隔索引條目包含瞭三個消息的重要屬性:消息在mappedFile文件中的物理偏移量(8字節)、消息的長度(4字節)、消息Tag的hashcode值,這三個屬性占瞭20個字節,單個索引條目結構如下圖所示

IndexFile詳解

RocketMQ除瞭提供消息的Topic給消息消費外,RocketMQ還提供瞭根據key來查找消息的功能,producer創建消息時可以傳入keys值,用於快速查找消息。

// 構建Message參數
Message msg = new Message("TopicTest",  // 消息topic
    "TagA",															// 消息Tag
    "key1 key2 key3",										// 消息keys,多個key用" "隔開
    "hello linshifu!".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息體

IndexFile可以看做是一個key的哈希索引文件,通過計算key的hash值,快速找到某個key對應的消息在commitLog中的位置。IndexFile由下面三個部分構成:

  • indexHeader
  • slots槽位
  • indexes索引數據

IndexFile結構如下圖所示

每個IndexFile的長度是固定的,其中indexHeader占用40字節,slots占用500W * 4字節,Index索引數據占用2000W * 20字節

IndexHeader

IndexHeader占用IndexFile的前40個字節,它主要存儲著IndexFile索引文件的相關信息,IndexHeader包含如下屬性

// org.apache.rocketmq.store.index.IndexHeader
public class IndexHeader {
    // 索引文件第一條消息在commitLog中的存儲時間
    private final AtomicLong beginTimestamp = new AtomicLong(0);
    // 索引文件最後一條消息在commitLog中的存儲時間
    private final AtomicLong endTimestamp = new AtomicLong(0);
    // 索引文件第一條消息的偏移量
    private final AtomicLong beginPhyOffset = new AtomicLong(0);
    // 索引文件最後一條消息的偏移量
    private final AtomicLong endPhyOffset = new AtomicLong(0);
    // 已經填充slot的hash槽數量
    private final AtomicInteger hashSlotCount = new AtomicInteger(0);
    // 該indexFile種包含的索引單元數量
    private final AtomicInteger indexCount = new AtomicInteger(1);
}

數據結構如下圖所示

slots槽位

在IndexFile中間部分存儲的是IndexFlie中key的hash槽,每個hash槽存儲的是index索引單元的indexNo,添加索引時會將key的hash值%500W的結果計算哈希槽序號,然後將index索引單元的indexNo放入slot槽中,indexNo是int類型,slots槽位總共有500W個,因此slots槽位占用的大小是500w * 4=2000w

indexes索引數據

index索引由2000W個索引單元構成,每個索引單元大小為20字節,每隔索引單元由下面四個部分構成

  • keyHash

keyHash是消息索引key的Hash值

  • phyOffet

phyOffset是當前key對應消息在commitLog中的偏移量commitLog offset

  • timeDiff

timeDiff是當前key對應消息存儲時間與當前indexFile第一個索引存儲時間差

  • preIndex

當前slot的index索引單元的前一個索引單元的indexNo

索引單元數據結構如下

實時更新ConsumeQueue與IndexFile源碼分析

之前的文章我們隻瞭解瞭Broker的CommitLog文件保存和刷盤的流程,現在我們來瞭解Broker實時更新ConsumeQueue和IndexFile的流程。

消息保存的過程僅僅會保存CommitLog,ConsumeQueue文件及IndexFile中的數據是通過ReputMessageService將CommitLog中的消息轉發到ConsumeQueue及IndexFile。

ReputMessageService和之前的刷盤服務類似,都是異步線程執行的服務。ReputMessageService是DefaultMessageStore的一個內部類,它跟隨者消息存儲對象DefaultMessageStore創建時共同創建。ReputMessageService刷新ConsumeQueue與IndexFile的邏輯可以從它的run()方法開始分析。

// org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#run
@Override
public void run() {
		// 死循環
    while (!this.isStopped()) {
        try {
             // 睡眠1ms
            Thread.sleep(1);
						// 更新consumeQueue和IndexFile
            this.doReput();
        } catch (Exception e) {
            DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
}

從上面代碼可以看出,更新ConsumeQueue與IndexFile在死循環中執行,每隔1ms執行一次doReput()來更新更新consumeQueue和IndexFile,在doReput()中的主要邏輯如下

  • 如果重放消息偏移量reputFromOffset小於CommitLog的最大offset,則會循環重放消息,更新ConsumeQueue及IndexFile
  • 從CommitLog的重放偏移量開始獲取映射緩沖結果SelectMappedBufferResult,SelectMappedBufferResult包含如下屬性
// org.apache.rocketmq.store.SelectMappedBufferResult
public class SelectMappedBufferResult {
    // mappedFile文件起始偏移量+position
    private final long startOffset;
    // reputFromOffset開始的緩沖
    private final ByteBuffer byteBuffer;
    // 消息size
    private int size;
    // commitLog的MappedFile
    private MappedFile mappedFile;
}
  • 根據SelectMappedBufferResult校驗消息,並創建轉發請求DispatchRequest,DispatchRequest中包含更新ConsumeQueue和IndexFile中需要用到的屬性,如topic,消息偏移量,消息key,消息存儲時間戳,消息長度,消息tagHashCode等。
  • 如果當前消息size>0,則說明當前消息需要被轉發更新ConsumeQueue和IndexFile,會調用關鍵方法DefaultMessageStore.this.doDispatch轉發更新
  • 如果當前消息size=0,則說明已經讀到瞭CommitLog當前MappedFile的結尾,因此需要讀取下一個MappedFile,並進行轉發。
// org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput
private void doReput() {
    // 1.reputFromOffset ≤ commitLog最大offset,則循環重放
    for (boolean doNext = true; this.isCommitLogAvailable()/*reputFromOffset≤commitLog最大offset*/&&doNext; ) {
        // 2.根據reputFromOffset的物理偏移量找到mappedFileQueue中對應的CommitLog文件的MappedFile
        // 然後從該MappedFile中截取一段自reputFromOffset偏移量開始的ByteBuffer,這段內存存儲著將要重放的消息
        SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
        if (result != null) {
            try {
                // 遍歷消息,開始reput
                for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                    // 3. 檢查消息屬性,並構建一個消息的dispatchRequest
                    DispatchRequest dispatchRequest =
                        DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                    if (dispatchRequest.isSuccess()) {
                        if (size > 0) {
                            // 4.消息分發,寫consumeQueue和Index
                            DefaultMessageStore.this.doDispatch(dispatchRequest);
                            // 設置reputOffset加上當前消息大小
                            this.reputFromOffset += size;
                            // 設置讀取的大小加上當前消息大小
                            readSize += size;
                             //如果size=0,說明讀取到瞭MappedFile的文件結尾
                        } else if (size == 0) {
                            // 5. 獲取下個文件的起始offset
                            this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                            // 設置readSize=0,結束循環
                            readSize = result.getSize();
                        }
                    } else if (!dispatchRequest.isSuccess()) {
                       // ...
                }
            } finally {
                result.release();
            }
        } else {
            doNext = false;
        }
    }
}

由上面代碼可知,轉發更新ConsumeQueue和IndexFile的關鍵代碼在DefaultMessageStore.this.doDispatch(dispatchRequest)中,在doDispatch()方法中循環遍歷dispatcherList中的CommitLogDispatcher。

public void doDispatch(DispatchRequest req) {
    for (CommitLogDispatcher dispatcher : this.dispatcherList) {
        dispatcher.dispatch(req);
    }
}

debug代碼可以中包含處理轉發請求的Dispatcher類,通過類名就可以很容易判斷出CommitLogDispatcherBuildConsumeQueue是將CommitLog轉發到ConsumeQueue中,CommitLogDispatcherBuildIndex是將消息構建IndexFile,下面我們來分別分析兩者是如何處理CommitLog消息轉發的。

CommitLogDispatcherBuildConsumeQueue源碼分析

CommitLogDispatcherBuildConsumeQueue將消息保存到ConsumeQueue如下所示,主要是下面兩步

  • 先根據消息Topic和QueueId從consumeQueueTable找到ConsumeQueue,如果找不到會創建一個新的consumeQueue
  • 調用ConsumeQueue#putMessagePositionInfoWrapper,將消息保存到consumeQueue中
// org.apache.rocketmq.store.DefaultMessageStore#putMessagePositionInfo
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    // 找到ConsumeQueue,如果找不到會創建一個ConsumeQueue
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId()); 
    // 消息保存到consumeQueue中
    cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest));
}

保存consumeQueue存儲單元消息如下,主要分為下面三個步驟

  • 將consumeQueue存儲單元offset(8字節)+消息長度(4字節)+tags的哈希碼(8字節)保存到consumeQueue的緩存byteBufferIndex中
  • 根據consumeQueue的offset找到MappedFile
  • 將緩沖中的存儲單元存儲到MappedFile中
// org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
        final long cqOffset) {
    this.byteBufferIndex.flip();
    // consumeQueue存儲單元的長度
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    // 消息物理偏移量
    this.byteBufferIndex.putLong(offset);
    // 消息長度
    this.byteBufferIndex.putInt(size);
    // 消息tags的哈希碼
    this.byteBufferIndex.putLong(tagsCode);
    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
		// 獲取最後一個mappedFile
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) {
        // 更新物理offset
        this.maxPhysicOffset = offset + size;
      	// 數據保存到consumeQueue
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
    return false;
}

CommitLogDispatcherBuildIndex源碼分析

除瞭CommitLogDispatcherBuildConsumeQueue,下面我們來分析在dispatcherList中另一個CommitLogDispatcher的實現類CommitLogDispatcherBuildIndex是如何將Index索引單元保存到IndexFile中的,存儲消息索引的核心邏輯如下所示。

  • 獲取或者創建最新的IndexFile
  • 將msgId構建Index索引單元並保存到IndexFile中
  • 將Message中的keys用空格分隔成key數組,並循環保存到indexFile中
public void buildIndex(DispatchRequest req) {
    // 獲取或者創建最新索引文件,支持重試最多3次
    IndexFile indexFile = retryGetAndCreateIndexFile();
    if (indexFile != null) {
        // 獲取結束物理索引
        long endPhyOffset = indexFile.getEndPhyOffset();
        DispatchRequest msg = req;
        // 獲取topic和keys
        String topic = msg.getTopic();
        String keys = msg.getKeys();
        // 如果當前消息的commitLogOffset小於當前IndexFile的endPhyOffset時,說明當前消息已經構建過Index索引,因此直接返回
        if (msg.getCommitLogOffset() < endPhyOffset) {
            return;
        }
        // 獲取客戶端生成的uniqueId(msgId),代表客戶端生成的唯一一條消息
        // 消息解密時生成的
        if (req.getUniqKey() != null) {
            indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
        }
        // 客戶端傳遞的keys,消息是從keys屬性中獲取的
        if (keys != null && keys.length() > 0) {
            String[] keyset = keys.split(MessageConst.KEY_SEPARATOR/*空格*/);
            for (int i = 0; i < keyset.length; i++) {
                String key = keyset[i];
                if (key.length() > 0) {
                    indexFile = putKey(indexFile, msg, buildKey(topic, key));
                    if (indexFile == null) {
                        return;
                    }
                }
            }
        }
    } else {
        log.error("build index error, stop building index");
    }
}

從上面源碼可知,保存消息的關鍵就在putKey方法中主要分為下面三個步驟

  • 獲取要保存到IndexFile的keyHashCode(keyHash),hashSlot的絕對位置(absSlotPos),hash槽中的索引值(slotValue),保存消息時間差(timeDiff),索引的絕對位置(absIndexPos)等。
  • 更新Index索引單元信息,keyHashCode(keyHash),消息在commitLog中的偏移量(phyOffset),消息存儲時間與索引文件開始存儲時間差(timeDiff),前置消息索引值(slotValue)
  • 更新slots的IndexCount
  • 更新IndexHeader中的indexCount,更新物理偏移量(phyoffset),最後存儲時間戳(sotreTimestamp)
    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        // 索引數量小於2000W,否則說明當前索引文件已經滿瞭,不能添加索引
        if (this.indexHeader.getIndexCount() < this.indexNum) {
            // keyHashCode
            int keyHash = indexKeyHashMethod(key);
            // 索引槽位置
            int slotPos = keyHash % this.hashSlotNum;
            // 絕對位置
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
            try {
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize/*哈希槽數量*哈希槽大小=500w*4*/
                        + this.indexHeader.getIndexCount() * indexSize;
                // 更新IndexFile索引單元信息
              	// keyHash(4)+消息在commitLog中的偏移量(8)+消息存儲時間-索引文件開始存儲時間(4)+前置消息索引值(4)
                this.mappedByteBuffer.putInt(absIndexPos/*索引位置*/, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
								// 更新slots的indexCount
                this.mappedByteBuffer.putInt(absSlotPos/*hash槽的絕對位置*/, this.indexHeader.getIndexCount());
              	//...
                // 更新IndexHeader信息
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);
                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            }
        } 
        return false;
    }

IndexFile如何解決Hash沖突

假設在IndexFile的索引IndexN的是一個keyHash為100的索引,如下圖所示,此時slots槽位100存儲著indexN的序號,在IndexFile索引單元保存的數據keyHash=100,preIndexNo=0。

如果又有一個索引單元indexN+X的keyHashCode=100,保存消息時發現solt-100已經指向瞭索引單元indexN,會將當前索引單元IndxeN+X的preIndexNo更新為indexN,使得當前索引單元indexN+X的前置索引單元指向indeNo,再更新slots-100槽位的值為indexN+X,保存完成後的索引關系如下圖所示。相當於在slots槽位下面掛瞭index索引單元鏈表,根據key查找消息時,可以根據key計算出keyHashCode,然後順著鏈表查詢鏈表中的消息。

總結

ConsumeQueue可以看成是消息消費的索引,不同Topic的ConsumeQueue存儲到不同目錄中,默認存儲在~/store/consumequeue/${topic}目錄中,其底層也是使用MappedFile,Broker會按照消息在CommitLog中的順序,異步轉發到ConsumeQueue中,每條消息在ConsumeQueue生成固定大小20字節的存儲單元指向CommitLog。

IndexFile保存著Producer發送消息keys中的索引,有瞭IndexFile就可以根據消息key快速找到消息。IndexFile的數據接口與HashMap類似,它使用鏈表的方式解決解決哈希沖突,並且使用頭插法將數據插入鏈表中。

以上就是RocketMQ ConsumeQueue與IndexFile實時更新機制源碼解析的詳細內容,更多關於RocketMQ 實時更新機制的資料請關註WalkonNet其它相關文章!

推薦閱讀: