RocketMQ消息存儲文件的加載與恢復機制源碼分析
前言
前面文章我們介紹瞭Broker是如何將消息全量存儲到CommitLog文件中,並異步生成dispatchRequest任務更新ConsumeQueue,IndexFile的過程以及ConsumeQueue和IndexFile的文件結構。由於是異步轉發消息,就可能出現消息成功存儲到CommitLog文件,轉發請求任務執行失敗,Broker宕機瞭,此時CommitLog和Index消息並未處理完,導致CommitLog與ConsumeQueue和IndexFile文件中的數據不一致。如果由一部分消息在CommitLog中存在,在ConsumeQueue中不存在,那麼這部分消息Consumer將永遠無法消費到瞭,那麼Broker是如何保證數據一致性的呢?
StoreCheckPoint介紹
StoreCheckPoint的作用是記錄CommitLog,ConsumeQueue和IndexFile的刷盤點,當Broker異常結束時會根據StoreCheckPoint的數據恢復,StoreCheckPoint屬性如下
public class StoreCheckpoint { // commitLog最後一條信息的刷盤時間戳 private volatile long physicMsgTimestamp = 0; // consumeQueue最後一個存儲單元刷盤時間戳 private volatile long logicsMsgTimestamp = 0; // 最近一個已經寫完IndexFile的最後一條記錄刷盤時間戳 private volatile long indexMsgTimestamp = 0; }
StoreCheckPoint文件的存儲位置是${user.home}/store/checkpoint
,文件的固定長度為4K,但StoreCheckPoint隻占用瞭前24個字節,存儲格式如下圖所示
StoreCheckPoint時間戳更新時機
physicMsgTimestamp
FlushRealTimeService刷盤時更新
// org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run public void run() { // ... // 更新CommitLog刷盤時間戳 if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } }
GroupCommitService刷盤時更新
// org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit private void doCommit() { // ... // 更新CommitLog刷盤時間戳 if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } }
logicsMsgTimestamp
ConsumeQueue保存消息存儲單元時更新
// org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfoWrapper public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) { // ... // 如果consumeQueue保存成功,則更新ConsumeQueue存儲點信息 if (result) { this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp()); } }
ConsumeQueue刷盤時更新並觸發StoreCheckPoint刷盤
// org.apache.rocketmq.store.DefaultMessageStore.FlushConsumeQueueService#doFlush private void doFlush(int retryTimes) { // ... // 更新ConsumeQueue存儲時間戳,並刷盤 if (0 == flushConsumeQueueLeastPages) { if (logicsMsgTimestamp > 0) { DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp); } // 更新存儲點 DefaultMessageStore.this.getStoreCheckpoint().flush(); } }
indexMsgTimestamp
// org.apache.rocketmq.store.index.IndexService#getAndCreateLastIndexFile public IndexFile getAndCreateLastIndexFile() { // 獲取最新IndexFile,如果IndexFile已經滿瞭,需要創建一個新的IndexFile if (indexFile == null) { indexFile = new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset, lastUpdateIndexTimestamp); // 如果創建新的IndexFile成功,原IndexFile刷盤 if (indexFile != null) { final IndexFile flushThisFile = prevIndexFile; Thread flushThread = new Thread(new Runnable() { @Override public void run() { // indexFile刷盤 IndexService.this.flush(flushThisFile); } }, "FlushIndexFileThread"); flushThread.setDaemon(true); flushThread.start(); } } return indexFile; } // org.apache.rocketmq.store.index.IndexService#flush public void flush(final IndexFile f) { if (null == f) return; long indexMsgTimestamp = 0; if (f.isWriteFull()) { indexMsgTimestamp = f.getEndTimestamp(); } f.flush(); if (indexMsgTimestamp > 0) { // 更新checkPoint的indexMsgTimestamp並觸發刷盤 this.defaultMessageStore.getStoreCheckpoint().setIndexMsgTimestamp(indexMsgTimestamp); this.defaultMessageStore.getStoreCheckpoint().flush(); } }
- 保存消息Index,獲取最新的IndexFile如果滿瞭,則會創建一個新的IndexFile,並且更新IndexMsgTimestamp並觸發StoreCheckPoint刷盤
StoreCheckPoint刷盤源碼
StoreCheckPoint刷盤源碼如下所示,就是將CommitLog,ConsumeQueue和IndexFile刷盤時間戳持久化到硬盤上,由上面源碼可知它的刷盤觸發時機
- ConsumeQueue刷盤時觸發
- 創建新IndexFile文件時觸發
StoreCheckPoint刷盤源碼如下
// org.apache.rocketmq.store.StoreCheckpoint#flush public void flush() { this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp); this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp); this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp); this.mappedByteBuffer.force(); }
消息加載源碼分析
在BrokerController啟動時會調用DefaultMessageStore#load
加載存儲文件加載和恢復過程主要分為下面幾步
- 判斷Broker上次是否正常退出。這個判斷邏輯是根據
${user.home}/store/abort
是否存在。如果文件存在,說明上次是異常退出,如果文件不存在,則說明是正常退出。 - 加載CommitLog
- 加載ConsumeQueue
- 加載StoreCheckPoint
- 加載IndexFile
- 恢復ConsumeQueue與IndexFile
- 加載延遲隊列服務
// org.apache.rocketmq.store.DefaultMessageStore#load public boolean load() { boolean result = true; try { // 1. Broker上次是否正常退出 boolean lastExitOK = !this.isTempFileExist(); log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally"); // 2. 加載commitLog result = result && this.commitLog.load(); // 3. 加載consumeQueue result = result && this.loadConsumeQueue(); if (result) { // 4. 加載StoreCheckPoint this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); // 5. 加載IndexFile this.indexService.load(lastExitOK); // 6. 恢復ConsumeQueue與IndexFile this.recover(lastExitOK); // 7. 延遲隊列服務加載 if (null != scheduleMessageService) { result = this.scheduleMessageService.load(); } } } return result; }
CommitLog加載
前面文章介紹過,CommitLog文件的存儲目錄是${user.home}/store/commitlog/
,並且CommitLog文件的底層是MappedFile,由MappedFileQueue管理。
CommitLog文件的加載其實調用的是MappedFileQueue#load
方法,代碼如下所示,load()中首先加載CommitLog文件目錄下的所有文件,並調用doLoad()方法加載CommitLog。
// org.apache.rocketmq.store.MappedFileQueue#load public boolean load() { File dir = new File(this.storePath/*${user.home}/store/commitlog/*/); File[] ls = dir.listFiles(); if (ls != null) { return doLoad(Arrays.asList(ls)); } return true; }
MappedFile的加載過程如下所示,核心邏輯主要分為下面三步
- 按照文件名稱將文件排序,排序好的文件就會按照消息保存的先後順序存放在列表中
- 校驗文件大小與mappedFile是否一致,如果commitLog文件大小與mappedFileSize不一致,則說明配置被改瞭,或者CommitLog文件被修改
- 創建mappedFile,並且設置wrotePosition,flushedPosition,committedPosition為mappedFileSize
public boolean doLoad(List<File> files) { // 按照文件名稱排序 files.sort(Comparator.comparing(File::getName)); for (File file : files) { // 如果commitLog文件大小與mappedFileSize不一致,則說明配置被改瞭,或者CommitLog文件被修改 if (file.length() != this.mappedFileSize) { return false; } try { // 創建MappedFile MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); mappedFile.setWrotePosition(this.mappedFileSize); mappedFile.setFlushedPosition(this.mappedFileSize); mappedFile.setCommittedPosition(this.mappedFileSize); this.mappedFiles.add(mappedFile); } } return true; }
看到這裡肯定會有疑問,加載後的MappedFile的wrotePosition,flushedPosition和committedPosition的值都為mappedFileSize,如果最後一個MappedFile沒有使用完,Broker啟動後還會從最後一個MappedFile開始寫麼?我們可以在後面消息文件恢復源碼分析找到答案。
ConsumeQueue加載
從前面文章我們知道,ConsumeQueue文件底層其實也是MappedFile,因此ConsumeQueue文件的加載與CommitLog加載差別不大。ConsumeQueue加載邏輯為
- 獲取ConsumeQueue目錄下存儲的所有Topic目錄,遍歷Topic目錄
- 遍歷每個Topic目錄下的所有queueId目錄,逐個加載ququeId中的所有MappedFile
// org.apache.rocketmq.store.DefaultMessageStore#loadConsumeQueue private boolean loadConsumeQueue() { // 獲取consumeQueue目錄 File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()/*${user.home}/store */)); // topic文件夾數組 File[] fileTopicList = dirLogic.listFiles(); if (fileTopicList != null) { // 遍歷topic for (File fileTopic : fileTopicList) { // 獲取topic名稱 String topic = fileTopic.getName(); // 獲取queueId文件夾數組 File[] fileQueueIdList = fileTopic.listFiles(); // 遍歷queueId if (fileQueueIdList != null) { for (File fileQueueId : fileQueueIdList) { int queueId; // 文件夾名稱就是queueId queueId = Integer.parseInt(fileQueueId.getName()); // 構建consumeQueue ConsumeQueue logic = new ConsumeQueue(/* ... */); this.putConsumeQueue(topic, queueId, logic); // ConsumeQueue加載 if (!logic.load()) { return false; } } } } } return true; }
IndexFile加載
IndexFile文件加載過程調用的是IndexService#load
,首先獲取${user.home}/store/index
目錄下的所有文件,遍歷所有文件,如果IndexFile最後存儲時間大於StoreCheckPoint中indexMsgTimestamp,則會先刪除IndexFile
// org.apache.rocketmq.store.index.IndexService#load public boolean load(final boolean lastExitOK) { // indexFile文件目錄 File dir = new File(this.storePath); // indexFile文件列表 File[] files = dir.listFiles(); if (files != null) { // 文件排序 Arrays.sort(files); for (File file : files) { try { IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0); f.load(); if (!lastExitOK) { // 文件最後存儲時間戳大於刷盤點,則摧毀indexFile,重建 if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()/*存儲點時間*/ .getIndexMsgTimestamp()) { f.destroy(0); continue; } } this.indexFileList.add(f); } } } return true; }
ConsumeQueue與IndexFile恢復
如果是正常退出,數據都已經正常刷盤,前面我們說到CommitLog在加載時的wrotePosition,flushedPosition,committedPosition都設置為mappedFileSize,
因此即使是正常退出,也會調用CommitLog#recoverNormally
找到最後一條消息的位置,更新這三個屬性。
// org.apache.rocketmq.store.DefaultMessageStore#recover private void recover(final boolean lastExitOK) { // consumeQueue中最大物理偏移量 long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue(); if (lastExitOK) { // 正常退出文件恢復 this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue); } else { // 異常退出文件恢復 this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue); } // 恢復topicQueueTable this.recoverTopicQueueTable(); }
正常恢復的源碼如下,由於Broker是正常關閉,因此CommitLog,ConsumeQueue與IndexFile都已經正確刷盤,並且三者的消息是一致的。正常恢復的主要目的是找到找到最後一條消息的偏移量,然後更新CommitLog的MappedFileQueue中的刷盤點(flushWhere)和提交點(committedWhere),
- 從最後3個mappedFile開始恢復,如果mappedFile總數不足3個,則從第0個mappedFile開始恢復
- 逐個遍歷mappedFile,找到每個MappedFile的最後一條消息的偏移量,並將其更新到CommitLog中MappedFileQueue的刷盤點和提交點中
- 清除ConsumeQueue冗餘數據
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { // 確認消息是否完整,默認是true boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { // 默認從最後3個mappedFile開始恢復 int index = mappedFiles.size() - 3; // 如果commitLog不足三個,則從第一個文件開始恢復 if (index < 0) index = 0; MappedFile mappedFile = mappedFiles.get(index); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); // 最後一個MappedFile的文件起始偏移量 long processOffset = mappedFile.getFileFromOffset(); // mappedFileOffset偏移量 long mappedFileOffset = 0; // 遍歷CommitLog文件 while (true) { // 校驗消息完整性 DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); // 獲取消息size int size = dispatchRequest.getMsgSize(); // 返回結果為true並且消息size>0,說明消息是完整的 if (dispatchRequest.isSuccess() && size > 0) { mappedFileOffset += size; } } // 最大物理偏移量 processOffset += mappedFileOffset; // 更新flushedWhere和committedPosition指針 this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); this.mappedFileQueue.truncateDirtyFiles(processOffset); // 清除ConsumeQueue冗餘數據 if (maxPhyOffsetOfConsumeQueue >= processOffset) { this.defaultMessageStore.truncateDirtyLogicFiles(processOffset/*CommitLog最大物理偏移量*/); } } }
異常恢復源碼如下,由於上次Broker沒有正常關閉,因此由可能存在CommitLog、ConsumeQueue與IndexFile不一致的情況,因此在異常恢復時可能需要恢復ConsumeQueue和IndexFile,異常恢復核心邏輯主要包括
- 倒序查CommitLog的mappedFile文件,找到第一條消息存儲的時間戳比StoreCheckPoint裡的physicMsgTimestamp,logicsMsgTimestamp和indexMsgTimestamp三者都小的最大MappedFile,該mappedFile至少有一部分消息是被正常轉發,正常存儲,正常刷盤的
- 從該mappedFile開始逐條轉發消息,重新恢復ConsumeQueue和IndexFile
- 當遍歷到最後一條消息,將其偏移量更新到CommitLog中MappedFileQueue的刷盤點和提交點中
- 清除ConsumeQueue冗餘數據
// org.apache.rocketmq.store.CommitLog#recoverAbnormally public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) { // 是否CRC校驗 boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { // 最後一個mappedFile的index int index = mappedFiles.size() - 1; MappedFile mappedFile = null; // 倒序遍歷mappedFile數組, for (; index >= 0; index--) { mappedFile = mappedFiles.get(index); // 1. 如果第一條消息的時間戳小於存儲點時間戳 if (this.isMappedFileMatchedRecover(mappedFile)) { break; } } long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0; while (true) { DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { mappedFileOffset += size; // 2. 轉發消息 if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()/*消息是否可以重復,默認是false*/) { if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) { this.defaultMessageStore.doDispatch(dispatchRequest); } } else { this.defaultMessageStore.doDispatch(dispatchRequest); } } } // 3. 更新MappedFileQueue中的刷盤位置和提交位置 processOffset += mappedFileOffset; this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); this.mappedFileQueue.truncateDirtyFiles(processOffset); // 清除ConsumeQueue中的冗餘數據 if (maxPhyOffsetOfConsumeQueue >= processOffset) { this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); } } }
總結
Broker啟動時會分別加載CommitLog、ConsumeQueue與IndexFile。加載完成後,如果Broker上次是正常退出,隻需要找到CommitLog的最後一條消息,並更新刷盤點和提交點。如果Broker上次是異常退出,就有可能出現ConsumeQueue、IndexFile與CommitLog不一致的情況,需要根據StoreCheckPoint存儲的時間戳從CommitLog找到消息,逐條恢復ConsumeQueue與IndexFile。
以上就是RocketMQ | 源碼分析】消息存儲文件的加載與恢復機制的詳細內容,更多關於RocketMQ 消息存儲文件加載恢復的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- RocketMQ ConsumeQueue與IndexFile實時更新機制源碼解析
- 一文徹底掌握RocketMQ 的存儲模型
- RocketMQ broker文件清理源碼解析
- RocketMQ broker 消息投遞流程處理PULL_MESSAGE請求解析
- RocketMQ特性Broker存儲事務消息實現