從log4j2到Disruptor詳解

  • log4j2實現原理可查看://www.jb51.net/article/232602.htm
  • 文章同樣基於log4j-2.7版本,disruptor-3.3.6

相信看過log4j2的源碼後大傢應該明白為什麼第二代日志性能會提升那麼多,這其中最大的功臣莫過於Disruptor並發編程框架。

下面我們就跟著log4j2來走進Disruptor這個神奇的框(wang)架(zhan)

log4j2異步日志簡要回顧

從日志工廠(Log4jLoggerFactory)中獲取日志Logger實例

從日志上下文工廠(Log4jContextFactory)獲取日志上下文

啟用日志上下文(AsyncLoggerContext)

啟動Disruptor(AsyncLoggerDisruptor)

序列號屏障(ProcessingSequenceBarrier)等待序列號發佈

等待策略(WaitStrategy)等待序列號

返回Logger等待序列號(即等待日志寫入)

異步日志(AsyncLogger)寫入

日志內容與轉化者(RingBufferLogEventTranslator)綁定

Disruptor嘗試發佈轉化者tryPublish

RingBuffer嘗試發佈事件tryPublishEvent

獲取下一個可用序號

轉化並發佈序號,日志與序號對應的事件綁定,並發佈序號

RingBuffer發佈序號,MultiProducerSequencer發佈

等待策略(waitStrategy)喚醒阻塞

Disruptor在log4j2中的應用

AsyncLoggerDisruptor

異步日志Disruptor啟動

創建事件工廠EventFactory

計算ringBufferSize:AsyncLogger.RingBufferSize屬性

創建等待策略:AsyncLogger.WaitStrategy屬性

創建守護線程執行器executor

創建異步隊列滿時處理策略AsyncQueueFullPolicy(非Disruptor步驟)

創建Disruptor

  • 創建RingBuffer與Disruptor綁定
  • RingBuffer根據生產者類型創建對應的實例,例如多生產者:MultiProducerSequencer
  • 創建多生產者序號(bufferSize,waitStrategy)

綁定異常句柄(Disruptor.handleExceptionsWith)

綁定事件處理句柄(Disruptor.handleEventsWith)

  • 根據handle列表創建事件處理器createEventProcessors
  • RingBuffer為Sequence(MultiProducerSequencer)序列創建序列屏障ProcessingSequenceBarrier
  • 創建事件批處理器BatchEventProcessor
  • 為事件批處理器綁定異常處理句柄
  • 消費者倉庫(consumerRepository)添加消費者,創建事件處理信息EventProcessorInfo添加至消費者信息列表consumerInfos
  • RingBuffer添加處理序列號列表processorSequences為序列號閘
  • 如果存在序列號屏障,從閘門中移除屏障序列號並標識endOfChain為false

啟動Disruptor

  • 遍歷消費者倉庫放入執行器中執行消費者EventProcessorInfo
  • 啟動事件批處理器BatchEventProcessor
  • 事件批處理器序列號自增1
  • 死循環
  • 序列號屏障ProcessingSequenceBarrier等待下個有效序列號,默認為超時等待策略,超時會繼續下輪循環
  • 事件批處理器序列號如果小於等於有效序列號
  • 從RingBuffer中按照序列號獲取event事件
  • 通知回調事件句柄eventHandler.onEvent如果當前消費下標等於有效序列號availableSequence說明是當前批次的最後一個消息,endOfBatch為true:eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
  • 事件批處理器序列號設置為有效序列號

異步日志Disruptor寫入

嘗試發佈tryPublish事件轉化器EventTranslator:RingBufferLogEventTranslator

Disruptor獲取RingBuffer嘗試發佈事件tryPublishEvent

序列號獲取下個有效序號,步進為1,例如:MultiProducerSequencer.tryNext

遊標按照步進移動

判斷是否有足夠的空間,沒有則拋出InsufficientCapacityException異常

返回有效序列號

轉化器轉化消息為對應有效序列號的事件放入entries

發佈序列號

  • 設置有效序列號至緩存availableBuffer
  • 等待策略喚醒阻塞waitStrategy.signalAllWhenBlocking

架構及流程

紅色數字標識流程為獲取logger時Disruptor創建消費者流程

黑色數字標識流程為logger寫入日志時Disruptor創建事件並通知消費者流程

RingBuffer對於所有消費者、生產者是同一個實例

  • 環形隊列,dataProvide,數據的存儲與提供者

Sequencer:生產者

  • 對於所有消費者、生產者(可能是多生產者序列類型對於Multi類型)是同一個實例,包含一個遊標序列號Sequence

SequenceBarrier:序列號屏障

  • 對於所有消費者、生產者也是同一個實例,序列號屏障包含一個等待策略、一個RingBuffer引用、一個遊標序列號、一個依賴序列號(可能是組序列號類型)

BatchEventProcessor:消費者

  • 消費者包含一個RingBuffer引用
  • 一個序列號屏障,可以包含多個屏障序列號,默認為0個則使用RingBuffer的MultiProducerSequencer的遊標序列號Sequence
  • 一個EventHandler:RingBufferLogEventHandler
  • 遍歷EventHandler列表將其封裝為BatchEventProcessor,將其與原始eventHandler、barrier屏障註冊至消費者資源庫consumerRepository。
  • 獲取batchEventProcessor序列號默認為-1,將其緩存至processorSequences標識正在處理,並將processorSequences、disruptor、consumerRepository綁定至EventHandlerGroup。Disruptor啟動遍歷消費者資源庫啟動消費者:BatchEventProcessor

消費者入口

  • 消費者消費前先自增本地序列號(即-1+1=0序號),向序列號屏障申請該序列號的消費,默認為Timeout策略申請。
  • 屏障收到申請waitFor序列號,當前屏障遊標序列號小於申請的消費序列號,等待生產者生產至當前序列號,如果超時則拋出異常(本地序列號不更新繼續重試);如果沒有超時,將屏障的dependentSequence序列號(如果不是非多序列號屏障類型,log4j2使用的是非多序列號屏障,則是屏障的本地遊標)賦值為availableSequence返回。
  • 如果availableSequence有效的序列號(即屏障的遊標序列號)小於申請要消費的序列號直接返回availableSequence(即消費超出的生產的速度,消費者申請的序列號向後回移至有效序列號)。否則getHighestPublishedSequence判斷申請的序列號至availableSequence序列號之間的每個序列號對應的消息事件均是有效的則返回有效序列號(即生產者生產很快,消費者申請消費的序列號很小,向前移動至有效的,可能是本身也可能會跳躍多個下標),根據生產者的availableBuffer判斷是否有效,因為生產者先發佈序列號再寫入數據,此處避免瞭讀取數據異常,如果數據沒有寫入,有效序列號緩存標識沒有寫入(即無效),消費者會進行剛剛所說的“重試”,如果之間存在無效序列號則返回申請序列號-1(即回滾一個值,進入邏輯時增加瞭一個值,也就是回滾至申請前的點,可以理解為與超時相同,即重試)
  • 如果申請的序列號小於等於有效的序列號,則消費序列號對應的消息事件並更新本地BatchEventProcessor的序列號,按照下標去dataProvide(RingBuffer.entries)中提取對應位置的數據消費
  • 如果申請的序列號大於有效的序列號,則將消費者本地序列號設置為有效序列號(即消費超出的生產的速度,消費的序列號向後回移)
  • 如果期間出現任何未catch住的異常則會跳過當前下標,異常出現時的下標及對應的事件會交由exceptionHandler處理,默認為AsyncLoggerDefaultExceptionHandler異步處理,會將異常事件輸出至系統的標準錯誤管道,雖然是異步也是會占用消費者線程池資源

Disruptor:生產者入口

  • 獲取RingBuffer嘗試發佈消息,生產者(例如:MultiProducerSequencer)
  • 生產者遊標序列號嘗試自增,判斷當前是否有足夠的空間,當前遊標+步進-bufferSize是否大於最小的閘門序列號(gatingSequences,即:所有消費者的本地遊標序列號processorSequences列表),最小序列號會緩存至本地gatingSequenceCache用於下次判斷減少進行所有閘門序列號的遍歷次數,如果是說明已經沒有空間(因為生產者生產申請的序列號已經追上瞭消費者消費序列號的最小值。RingBuffer是一個環形隊列結構。上面已經講到消費者序列號會與生產者序列號同步,同步指消費者申請序列號小於有效序列號時會前進至有效序列號,即使有延遲也保證瞭有大於等於buffer值的緩沖空間供生產者生產),如果沒有空間返回false進入下一輪生產

在這裡插入圖片描述

  • 自增成功後,將消息轉化為對應序列號下標位置的事件數據
  • Sequencer發佈序列號,將當前序列號設置為有效(availableBuffer),並根據等待策略喚醒等待的消費者,被喚醒的消費者根據發佈的序列號獲取相應下標處事件數據進行處理

disruptor_new.jpg

Disruptor為什麼這麼快?

Disruptor采用無鎖並發編程,框架中主要使用CAS與volatile關鍵字保證並發安全

使用環形數據結構(另一個典型的應用是時鐘算法也是使用的環形數據結構),為環形結構添加序列號屏障來控制對環形隊列讀寫操作,保證存儲數據的並發安全

另一個點便是神奇的緩沖行填充瞭

Log4j2為什麼這麼快?

使用Disruptor並發編程框架

使用NIO寫入日志數據

當然log4j2中有很多細節,如果我們想要獲取線程棧信息,可以同樣學習一下這樣的寫法

// LOG4J2-1029 new Throwable().getStackTrace is faster than Thread.currentThread().getStackTrace().
final StackTraceElement[] stackTrace = new Throwable().getStackTrace();
StackTraceElement last = null;
for (int i = stackTrace.length - 1; i > 0; i--) {
    final String className = stackTrace[i].getClassName();
    if (fqcnOfLogger.equals(className)) {
        return last;
    }
    last = stackTrace[i];
}

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: