基於rocketmq的有序消費模式和並發消費模式的區別說明
rocketmq消費者註冊監聽有兩種模式
有序消費MessageListenerOrderly和並發消費MessageListenerConcurrently,這兩種模式返回值不同。
MessageListenerOrderly
正確消費返回
ConsumeOrderlyStatus.SUCCESS
稍後消費返回
ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
MessageListenerConcurrently
正確消費返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS
稍後消費返回
ConsumeConcurrentlyStatus.RECONSUME_LATER
顧名思義,有序消費模式是按照消息的順序進行消費,但是除此之外,在實踐過程中我發現和並發消費模式還有很大的區別的。
第一,速度,下面我打算用實驗來探究一下。
使用mq發送消息,消費者使用有序消費模式消費,具體的業務是阻塞100ms
Long totalTime = 0L; Date date1 = null; Date date2 = new Date(); new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { logger.info("==========CONSUME_START==========="); logger.info(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()); try { if(date1 == null) date1 = new Date();//在第一次消費時初始化 Thread.sleep(100); logger.info("total:"+(++total)); date2 = new Date(); totalTime = (date2.getTime() - date1.getTime()); logger.info("totalTime:"+totalTime); logger.info("==========CONSUME_SUCCESS==========="); return ConsumeOrderlyStatus.SUCCESS; }catch (Exception e) { logger.info("==========RECONSUME_LATER==========="); logger.error(e.getMessage(),e); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } }
消費100條消息
速度挺快的,為瞭讓結果更準確,將消息加到1000條
消費1000條消息
可以看到每一條消息平均耗時25ms,然而業務是阻塞100ms,這說明有序消費模式和同步消費可能並不是一回事,那如果不阻塞代碼我們再來看一下結果
不阻塞過後速度明顯提高瞭,那麼我阻塞300ms會怎麼樣呢?
時間相比阻塞100ms多瞭2倍
接下來我們測試並發消費模式
Long totalTime = 0L; Date date1 = null; Date date2 = new Date(); new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List< MessageExt > msgs, ConsumeConcurrentlyContext context) { logger.info(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()); try { if(date1 == null) date1 = new Date(); Thread.sleep(100); logger.info("total:"+(++total)); date2 = new Date(); totalTime = (date2.getTime() - date1.getTime()); logger.info("totalTime:"+totalTime); logger.info("==========CONSUME_SUCCESS==========="); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { logger.info("==========RECONSUME_LATER==========="); logger.error(e.getMessage(),e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }
基於上次的經驗,同樣測試三種情況,消費1000條不阻塞,消費1000條阻塞100ms,消費1000條阻塞300ms
消費1000條不阻塞的情況
和有序消費模式差不多,快個一兩秒。
消費1000條阻塞100ms
竟然比不阻塞的情況更快,可能是誤差把
消費1000條阻塞300ms
速度稍慢,但是還是比有序消費快得多。
結論是並發消費的消費速度要比有序消費更快。
另一個區別是消費失敗時的處理不同,有序消費模式返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT後,消費者會立馬消費這條消息,而使用並發消費模式,返回ConsumeConcurrentlyStatus.RECONSUME_LATER後,要過好幾秒甚至十幾秒才會再次消費。
我是在隻有一條消息的情況下測試的。更重要的區別是,
返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT並不會增加消息的消費次數,mq消息有個默認最大消費次數16,消費次數到瞭以後,這條消息會進入死信隊列,這個最大消費次數是可以在mqadmin中設置的。
mqadmin updateSubGroup -n 127.0.0.1:9876 -c DefaultCluster -g MonitorCumsumerGroupName -r 3
我測試後發現,並發模式下返回ConsumeConcurrentlyStatus.RECONSUME_LATER,同一個消息到達最大消費次數之後就不會再出現瞭。這說明有序消費模式可能並沒有這個機制,這意味著你再有序消費模式下拋出固定異常,那麼這條異常信息將會被永遠消費,並且很可能會影響之後正常的消息。下面依然做個試驗
Map<String, Integer> map = new HashMap<>();//保存消息錯誤消費次數 new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { try { if(1 == 1) throw new Exception(); return ConsumeOrderlyStatus.SUCCESS; }catch (Exception e) { MessageExt msg = msgs.get(0); if(map.containsKey(msg.getKeys())) {//消息每消費一次,加1 map.put(msg.getKeys(), map.get(msg.getKeys()) + 1); }else { map.put(msg.getKeys(), 1); } logger.info(msg.getKeys()+":"+map.get(msg.getKeys())); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } }
發送瞭十條消息
可以看到雖然我發瞭十條消息,但是一直在消費同樣四條消息,這可能跟消息broker有默認四條隊列有關系。同時從時間可以看到,消費失敗後,會馬上拉這條信息。
至於並發消費模式則不會無限消費,而且消費失敗後不會馬上再消費。具體的就不嘗試瞭。
結論是有序消費模式MessageListenerOrderly要慎重地處理異常,我則是用全局變量記錄消息的錯誤消費次數,隻要消費次數達到一定次數,那麼就直接返回ConsumeOrderlyStatus.SUCCESS。
突然想到之前測試有序消費模式MessageListenerOrderly的時候為什麼1000條消息阻塞100ms耗時25000ms瞭,因為有序消費模式是同時拉取四條隊列消息的,這就對上瞭。
以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。
推薦閱讀:
- RocketMQ普通消息實戰演練詳解
- RocketMQ實現隨緣分BUG小功能示例詳解
- 使用RocketMQTemplate發送帶tags的消息
- RocketMQ消息隊列實現隨機消息發送當做七夕禮物
- docker安裝RocketMQ的實現步驟