淺析Disruptor高性能線程消息傳遞並發框架
前言碎語
Disruptor是英國LMAX公司開源的高性能的線程間傳遞消息的並發框架,和jdk中的BlockingQueue非常類似,但是性能卻是BlockingQueue不能比擬的,下面是官方給出的一分測試報告,可以直觀的看出兩者的性能區別:
Disruptor 項目地址:https://github.com/LMAX-Exchange/disruptor
核心概念?
這麼性能炸裂的框架肯定要把玩一番,試用前,我們先瞭解下disruptor的主要的概念,然後結合樓主的weblog項目(之前使用的BlockingQueue),來實踐下
RingBuffer
:環形的緩沖區,消息事件信息的載體。曾經 RingBuffer 是 Disruptor 中的最主要的對象,但從3.0版本開始,其職責被簡化為僅僅負責對通過 Disruptor 進行交換的數據(事件)進行存儲和更新。在一些更高級的應用場景中,Ring Buffer 可以由用戶的自定義實現來完全替代。
Event
:定義生產者和消費者之間進行交換的數據類型。
EventFactory
:創建事件的工廠類接口,由用戶實現,提供具體的事件
EventHandler
:事件處理接口,由用戶實現,用於處理事件。
目前為止,我們瞭解以上核心內容即可,更多的詳情,可以移步wiki文檔:https://github.com/LMAX-Exchange/disruptor
核心架構圖:
實踐Disruptor
改造boot-websocket-log項目,這是一個典型的生產者消費者模式的實例。然後將BlockingQueue替換成Disruptor,完成功能,有興趣的可以對比下。
第一步,定義事件類型
/** * Created by kl on 2018/8/24. * Content :進程日志事件內容載體 */ public class LoggerEvent { private LoggerMessage log; public LoggerMessage getLog() { return log; } public void setLog(LoggerMessage log) { this.log = log; } }
第二步,定義事件工廠
/** * Created by kl on 2018/8/24. * Content :進程日志事件工廠類 */ public class LoggerEventFactory implements EventFactory{ @Override public LoggerEvent newInstance() { return new LoggerEvent(); } }
第三步,定義數據處理器
/** * Created by kl on 2018/8/24. * Content :進程日志事件處理器 */ @Component public class LoggerEventHandler implements EventHandler{ @Autowired private SimpMessagingTemplate messagingTemplate; @Override public void onEvent(LoggerEvent stringEvent, long l, boolean b) { messagingTemplate.convertAndSend("/topic/pullLogger",stringEvent.getLog()); } }
第四步,創建Disruptor實操類,定義事件發佈方法,發佈事件
/** * Created by kl on 2018/8/24. * Content :Disruptor 環形隊列 */ @Component public class LoggerDisruptorQueue { private Executor executor = Executors.newCachedThreadPool(); // The factory for the event private LoggerEventFactory factory = new LoggerEventFactory(); private FileLoggerEventFactory fileLoggerEventFactory = new FileLoggerEventFactory(); // Specify the size of the ring buffer, must be power of 2. private int bufferSize = 2 * 1024; // Construct the Disruptor private Disruptordisruptor = new Disruptor<>(factory, bufferSize, executor);; private DisruptorfileLoggerEventDisruptor = new Disruptor<>(fileLoggerEventFactory, bufferSize, executor);; private static RingBufferringBuffer; private static RingBufferfileLoggerEventRingBuffer; @Autowired LoggerDisruptorQueue(LoggerEventHandler eventHandler,FileLoggerEventHandler fileLoggerEventHandler) { disruptor.handleEventsWith(eventHandler); fileLoggerEventDisruptor.handleEventsWith(fileLoggerEventHandler); this.ringBuffer = disruptor.getRingBuffer(); this.fileLoggerEventRingBuffer = fileLoggerEventDisruptor.getRingBuffer(); disruptor.start(); fileLoggerEventDisruptor.start(); } public static void publishEvent(LoggerMessage log) { long sequence = ringBuffer.next(); // Grab the next sequence try { LoggerEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor // for the sequence event.setLog(log); // Fill with data } finally { ringBuffer.publish(sequence); } } public static void publishEvent(String log) { if(fileLoggerEventRingBuffer == null) return; long sequence = fileLoggerEventRingBuffer.next(); // Grab the next sequence try { FileLoggerEvent event = fileLoggerEventRingBuffer.get(sequence); // Get the entry in the Disruptor // for the sequence event.setLog(log); // Fill with data } finally { fileLoggerEventRingBuffer.publish(sequence); } } }
文末結語
以上四步已經完成瞭Disruptor的使用,啟動項目後就會不斷的發佈日志事件,處理器會將事件內容通過websocket傳送到前端頁面上展示,
boot-websocket-log項目地址:https://gitee.com/kailing/boot-websocket-log
Disruptor是高性能的進程內線程間的數據交換框架,特別適合日志類的處理。Disruptor也是從https://github.com/alipay/sofa-tracer瞭解到的,這是螞蟻金服 團隊開源的分佈式鏈路追蹤項目,其中日志處理部分就是使用瞭Disruptor。
以上就是淺析Disruptor高性能線程消息傳遞並發框架的詳細內容,更多關於Disruptor線程消息傳遞並發框架的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- SpringBoot disruptor高性能隊列使用
- 從log4j2到Disruptor詳解
- spring boot集成WebSocket日志實時輸出到web頁面
- Java多線程 CompletionService
- 分佈式利器redis及redisson的延遲隊列實踐