RocketMQ消息存儲文件的加載與恢復機制源碼分析

前言

前面文章我們介紹瞭Broker是如何將消息全量存儲到CommitLog文件中,並異步生成dispatchRequest任務更新ConsumeQueue,IndexFile的過程以及ConsumeQueue和IndexFile的文件結構。由於是異步轉發消息,就可能出現消息成功存儲到CommitLog文件,轉發請求任務執行失敗,Broker宕機瞭,此時CommitLog和Index消息並未處理完,導致CommitLog與ConsumeQueue和IndexFile文件中的數據不一致。如果由一部分消息在CommitLog中存在,在ConsumeQueue中不存在,那麼這部分消息Consumer將永遠無法消費到瞭,那麼Broker是如何保證數據一致性的呢?

StoreCheckPoint介紹

StoreCheckPoint的作用是記錄CommitLog,ConsumeQueue和IndexFile的刷盤點,當Broker異常結束時會根據StoreCheckPoint的數據恢復,StoreCheckPoint屬性如下

public class StoreCheckpoint {
    // commitLog最後一條信息的刷盤時間戳
    private volatile long physicMsgTimestamp = 0;
    // consumeQueue最後一個存儲單元刷盤時間戳
    private volatile long logicsMsgTimestamp = 0;
    // 最近一個已經寫完IndexFile的最後一條記錄刷盤時間戳
    private volatile long indexMsgTimestamp = 0;
}

StoreCheckPoint文件的存儲位置是${user.home}/store/checkpoint,文件的固定長度為4K,但StoreCheckPoint隻占用瞭前24個字節,存儲格式如下圖所示

StoreCheckPoint時間戳更新時機

physicMsgTimestamp

FlushRealTimeService刷盤時更新

// org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run
public void run() {
  // ...
  // 更新CommitLog刷盤時間戳
  if (storeTimestamp > 0) { 					       CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
  }
}

GroupCommitService刷盤時更新

// org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit
private void doCommit() {
  // ...
  // 更新CommitLog刷盤時間戳
  if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
  }
}

logicsMsgTimestamp

ConsumeQueue保存消息存儲單元時更新

// org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfoWrapper
public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) {
  // ...
  // 如果consumeQueue保存成功,則更新ConsumeQueue存儲點信息
  if (result) {
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
  }
}

ConsumeQueue刷盤時更新並觸發StoreCheckPoint刷盤

// org.apache.rocketmq.store.DefaultMessageStore.FlushConsumeQueueService#doFlush
private void doFlush(int retryTimes) {
  // ...
  // 更新ConsumeQueue存儲時間戳,並刷盤
  if (0 == flushConsumeQueueLeastPages) {
    if (logicsMsgTimestamp > 0) {
  DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
    }
    // 更新存儲點
    DefaultMessageStore.this.getStoreCheckpoint().flush();
  }
}

indexMsgTimestamp

// org.apache.rocketmq.store.index.IndexService#getAndCreateLastIndexFile
public IndexFile getAndCreateLastIndexFile() {
  // 獲取最新IndexFile,如果IndexFile已經滿瞭,需要創建一個新的IndexFile
  if (indexFile == null) {
          indexFile =
              new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
                  lastUpdateIndexTimestamp);
			// 如果創建新的IndexFile成功,原IndexFile刷盤
      if (indexFile != null) {
          final IndexFile flushThisFile = prevIndexFile;
          Thread flushThread = new Thread(new Runnable() {
              @Override
              public void run() {
                	// indexFile刷盤
                  IndexService.this.flush(flushThisFile);
              }
          }, "FlushIndexFileThread");
          flushThread.setDaemon(true);
          flushThread.start();
      }
  }
  return indexFile;
}
// org.apache.rocketmq.store.index.IndexService#flush
public void flush(final IndexFile f) {
    if (null == f)
        return;
    long indexMsgTimestamp = 0;
    if (f.isWriteFull()) {
        indexMsgTimestamp = f.getEndTimestamp();
    }
    f.flush();
    if (indexMsgTimestamp > 0) {
        // 更新checkPoint的indexMsgTimestamp並觸發刷盤
        this.defaultMessageStore.getStoreCheckpoint().setIndexMsgTimestamp(indexMsgTimestamp);
        this.defaultMessageStore.getStoreCheckpoint().flush();
    }
}
  • 保存消息Index,獲取最新的IndexFile如果滿瞭,則會創建一個新的IndexFile,並且更新IndexMsgTimestamp並觸發StoreCheckPoint刷盤

StoreCheckPoint刷盤源碼

StoreCheckPoint刷盤源碼如下所示,就是將CommitLog,ConsumeQueue和IndexFile刷盤時間戳持久化到硬盤上,由上面源碼可知它的刷盤觸發時機

  • ConsumeQueue刷盤時觸發
  • 創建新IndexFile文件時觸發

StoreCheckPoint刷盤源碼如下

// org.apache.rocketmq.store.StoreCheckpoint#flush
public void flush() {
    this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp);
    this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp);
    this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp);
    this.mappedByteBuffer.force();
}

消息加載源碼分析

在BrokerController啟動時會調用DefaultMessageStore#load加載存儲文件加載和恢復過程主要分為下面幾步

  • 判斷Broker上次是否正常退出。這個判斷邏輯是根據${user.home}/store/abort是否存在。如果文件存在,說明上次是異常退出,如果文件不存在,則說明是正常退出。
  • 加載CommitLog
  • 加載ConsumeQueue
  • 加載StoreCheckPoint
  • 加載IndexFile
  • 恢復ConsumeQueue與IndexFile
  • 加載延遲隊列服務
// org.apache.rocketmq.store.DefaultMessageStore#load
public boolean load() {
    boolean result = true;
    try {
      	// 1. Broker上次是否正常退出	
        boolean lastExitOK = !this.isTempFileExist();
        log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
        // 2. 加載commitLog
        result = result && this.commitLog.load();
				// 3. 加載consumeQueue
        result = result && this.loadConsumeQueue();
        if (result) {
            // 4. 加載StoreCheckPoint
            this.storeCheckpoint =
                new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
            // 5. 加載IndexFile
            this.indexService.load(lastExitOK);
            // 6. 恢復ConsumeQueue與IndexFile
            this.recover(lastExitOK);
						// 7. 延遲隊列服務加載
            if (null != scheduleMessageService) {
                result =  this.scheduleMessageService.load();
            }
        }
    } 
    return result;
}

CommitLog加載

前面文章介紹過,CommitLog文件的存儲目錄是${user.home}/store/commitlog/,並且CommitLog文件的底層是MappedFile,由MappedFileQueue管理。

CommitLog文件的加載其實調用的是MappedFileQueue#load方法,代碼如下所示,load()中首先加載CommitLog文件目錄下的所有文件,並調用doLoad()方法加載CommitLog。

// org.apache.rocketmq.store.MappedFileQueue#load
public boolean load() {
    File dir = new File(this.storePath/*${user.home}/store/commitlog/*/);
    File[] ls = dir.listFiles();
    if (ls != null) {
        return doLoad(Arrays.asList(ls));
    }
    return true;
}

MappedFile的加載過程如下所示,核心邏輯主要分為下面三步

  • 按照文件名稱將文件排序,排序好的文件就會按照消息保存的先後順序存放在列表中
  • 校驗文件大小與mappedFile是否一致,如果commitLog文件大小與mappedFileSize不一致,則說明配置被改瞭,或者CommitLog文件被修改
  • 創建mappedFile,並且設置wrotePosition,flushedPosition,committedPosition為mappedFileSize
public boolean doLoad(List<File> files) {
    // 按照文件名稱排序
    files.sort(Comparator.comparing(File::getName));
    for (File file : files) {
      	// 如果commitLog文件大小與mappedFileSize不一致,則說明配置被改瞭,或者CommitLog文件被修改
        if (file.length() != this.mappedFileSize) {
            return false;
        }
        try {
          	// 創建MappedFile
            MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
            mappedFile.setWrotePosition(this.mappedFileSize);
            mappedFile.setFlushedPosition(this.mappedFileSize);
            mappedFile.setCommittedPosition(this.mappedFileSize);
            this.mappedFiles.add(mappedFile);
        } 
    }
    return true;
}

看到這裡肯定會有疑問,加載後的MappedFile的wrotePosition,flushedPosition和committedPosition的值都為mappedFileSize,如果最後一個MappedFile沒有使用完,Broker啟動後還會從最後一個MappedFile開始寫麼?我們可以在後面消息文件恢復源碼分析找到答案。

ConsumeQueue加載

從前面文章我們知道,ConsumeQueue文件底層其實也是MappedFile,因此ConsumeQueue文件的加載與CommitLog加載差別不大。ConsumeQueue加載邏輯為

  • 獲取ConsumeQueue目錄下存儲的所有Topic目錄,遍歷Topic目錄
  • 遍歷每個Topic目錄下的所有queueId目錄,逐個加載ququeId中的所有MappedFile
// org.apache.rocketmq.store.DefaultMessageStore#loadConsumeQueue
private boolean loadConsumeQueue() {
  // 獲取consumeQueue目錄
  File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()/*${user.home}/store */));
  // topic文件夾數組
  File[] fileTopicList = dirLogic.listFiles();
  if (fileTopicList != null) {
      // 遍歷topic	
      for (File fileTopic : fileTopicList) {
          // 獲取topic名稱
          String topic = fileTopic.getName();
          // 獲取queueId文件夾數組
          File[] fileQueueIdList = fileTopic.listFiles();
          // 遍歷queueId
          if (fileQueueIdList != null) {
              for (File fileQueueId : fileQueueIdList) {
                  int queueId;
                  // 文件夾名稱就是queueId
                  queueId = Integer.parseInt(fileQueueId.getName());
                  // 構建consumeQueue
                  ConsumeQueue logic = new ConsumeQueue(/* ... */);
                  this.putConsumeQueue(topic, queueId, logic);
                  // ConsumeQueue加載
                  if (!logic.load()) {
                      return false;
                  }
              }
          }
      }
  }
  return true;
}

IndexFile加載

IndexFile文件加載過程調用的是IndexService#load,首先獲取${user.home}/store/index目錄下的所有文件,遍歷所有文件,如果IndexFile最後存儲時間大於StoreCheckPoint中indexMsgTimestamp,則會先刪除IndexFile

// org.apache.rocketmq.store.index.IndexService#load
public boolean load(final boolean lastExitOK) {
    // indexFile文件目錄
    File dir = new File(this.storePath);
    // indexFile文件列表
    File[] files = dir.listFiles();
    if (files != null) {
        // 文件排序
        Arrays.sort(files);
        for (File file : files) {
            try {
                IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);
                f.load();
                if (!lastExitOK) {
                    // 文件最後存儲時間戳大於刷盤點,則摧毀indexFile,重建
                    if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()/*存儲點時間*/
                        .getIndexMsgTimestamp()) {
                        f.destroy(0);
                        continue;
                    }
                }
                this.indexFileList.add(f);
            } 
        }
    }
    return true;
}

ConsumeQueue與IndexFile恢復

如果是正常退出,數據都已經正常刷盤,前面我們說到CommitLog在加載時的wrotePosition,flushedPosition,committedPosition都設置為mappedFileSize,

因此即使是正常退出,也會調用CommitLog#recoverNormally找到最後一條消息的位置,更新這三個屬性。

// org.apache.rocketmq.store.DefaultMessageStore#recover
private void recover(final boolean lastExitOK) {
    // consumeQueue中最大物理偏移量
    long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
    if (lastExitOK) {
        // 正常退出文件恢復
        this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
    } else {
        // 異常退出文件恢復
        this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
    }
    // 恢復topicQueueTable
    this.recoverTopicQueueTable();
}

正常恢復的源碼如下,由於Broker是正常關閉,因此CommitLog,ConsumeQueue與IndexFile都已經正確刷盤,並且三者的消息是一致的。正常恢復的主要目的是找到找到最後一條消息的偏移量,然後更新CommitLog的MappedFileQueue中的刷盤點(flushWhere)和提交點(committedWhere),

  • 從最後3個mappedFile開始恢復,如果mappedFile總數不足3個,則從第0個mappedFile開始恢復
  • 逐個遍歷mappedFile,找到每個MappedFile的最後一條消息的偏移量,並將其更新到CommitLog中MappedFileQueue的刷盤點和提交點中
  • 清除ConsumeQueue冗餘數據
    public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
        // 確認消息是否完整,默認是true
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            // 默認從最後3個mappedFile開始恢復
            int index = mappedFiles.size() - 3;
            // 如果commitLog不足三個,則從第一個文件開始恢復
            if (index < 0)
                index = 0;
            MappedFile mappedFile = mappedFiles.get(index);
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            // 最後一個MappedFile的文件起始偏移量
            long processOffset = mappedFile.getFileFromOffset();
            // mappedFileOffset偏移量
            long mappedFileOffset = 0;
            // 遍歷CommitLog文件
            while (true) {
                // 校驗消息完整性
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
                // 獲取消息size
                int size = dispatchRequest.getMsgSize();
                // 返回結果為true並且消息size>0,說明消息是完整的
                if (dispatchRequest.isSuccess() && size > 0) {
                    mappedFileOffset += size;
                }
            }
            // 最大物理偏移量
            processOffset += mappedFileOffset;
            // 更新flushedWhere和committedPosition指針
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
            // 清除ConsumeQueue冗餘數據
            if (maxPhyOffsetOfConsumeQueue >= processOffset) {
                this.defaultMessageStore.truncateDirtyLogicFiles(processOffset/*CommitLog最大物理偏移量*/);
            }
        } 
    }

異常恢復源碼如下,由於上次Broker沒有正常關閉,因此由可能存在CommitLog、ConsumeQueue與IndexFile不一致的情況,因此在異常恢復時可能需要恢復ConsumeQueue和IndexFile,異常恢復核心邏輯主要包括

  • 倒序查CommitLog的mappedFile文件,找到第一條消息存儲的時間戳比StoreCheckPoint裡的physicMsgTimestamp,logicsMsgTimestamp和indexMsgTimestamp三者都小的最大MappedFile,該mappedFile至少有一部分消息是被正常轉發,正常存儲,正常刷盤的
  • 從該mappedFile開始逐條轉發消息,重新恢復ConsumeQueue和IndexFile
  • 當遍歷到最後一條消息,將其偏移量更新到CommitLog中MappedFileQueue的刷盤點和提交點中
  • 清除ConsumeQueue冗餘數據
// org.apache.rocketmq.store.CommitLog#recoverAbnormally
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
    // 是否CRC校驗
    boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        // 最後一個mappedFile的index
        int index = mappedFiles.size() - 1;
        MappedFile mappedFile = null;
        // 倒序遍歷mappedFile數組,
        for (; index >= 0; index--) {
            mappedFile = mappedFiles.get(index);
            // 1. 如果第一條消息的時間戳小於存儲點時間戳
            if (this.isMappedFileMatchedRecover(mappedFile)) {
                break;
            }
        }
        long processOffset = mappedFile.getFileFromOffset();
        long mappedFileOffset = 0;
        while (true) {
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
            int size = dispatchRequest.getMsgSize();
            if (dispatchRequest.isSuccess()) {
                if (size > 0) {
                    mappedFileOffset += size;
                    // 2. 轉發消息
                    if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()/*消息是否可以重復,默認是false*/) {
                        if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                            this.defaultMessageStore.doDispatch(dispatchRequest);
                        }
                    } else {
                        this.defaultMessageStore.doDispatch(dispatchRequest);
                    }
                }
        }
				// 3. 更新MappedFileQueue中的刷盤位置和提交位置
        processOffset += mappedFileOffset;
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        this.mappedFileQueue.truncateDirtyFiles(processOffset);
        // 清除ConsumeQueue中的冗餘數據
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
    }
}

總結

Broker啟動時會分別加載CommitLog、ConsumeQueue與IndexFile。加載完成後,如果Broker上次是正常退出,隻需要找到CommitLog的最後一條消息,並更新刷盤點和提交點。如果Broker上次是異常退出,就有可能出現ConsumeQueue、IndexFile與CommitLog不一致的情況,需要根據StoreCheckPoint存儲的時間戳從CommitLog找到消息,逐條恢復ConsumeQueue與IndexFile。

以上就是RocketMQ | 源碼分析】消息存儲文件的加載與恢復機制的詳細內容,更多關於RocketMQ 消息存儲文件加載恢復的資料請關註WalkonNet其它相關文章!

推薦閱讀: