基於rocketmq的有序消費模式和並發消費模式的區別說明

rocketmq消費者註冊監聽有兩種模式

有序消費MessageListenerOrderly和並發消費MessageListenerConcurrently,這兩種模式返回值不同。

MessageListenerOrderly

正確消費返回

ConsumeOrderlyStatus.SUCCESS

稍後消費返回

ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
MessageListenerConcurrently

正確消費返回

ConsumeConcurrentlyStatus.CONSUME_SUCCESS

稍後消費返回

ConsumeConcurrentlyStatus.RECONSUME_LATER

顧名思義,有序消費模式是按照消息的順序進行消費,但是除此之外,在實踐過程中我發現和並發消費模式還有很大的區別的。

第一,速度,下面我打算用實驗來探究一下。

使用mq發送消息,消費者使用有序消費模式消費,具體的業務是阻塞100ms

Long totalTime = 0L;
Date date1 = null;
Date date2 = new Date();
new MessageListenerOrderly() { 
	@Override
	public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
			ConsumeOrderlyContext context) {
        logger.info("==========CONSUME_START===========");  
		logger.info(Thread.currentThread().getName()  
                            + " Receive New Messages: " + msgs.size());  
        try {
        	if(date1 == null)
        		date1 = new Date();//在第一次消費時初始化
        	Thread.sleep(100);
       		logger.info("total:"+(++total));
        	date2 = new Date();
       		totalTime = (date2.getTime() - date1.getTime());
       		logger.info("totalTime:"+totalTime);
            logger.info("==========CONSUME_SUCCESS===========");  
            return ConsumeOrderlyStatus.SUCCESS;  
        }catch (Exception e) {
            logger.info("==========RECONSUME_LATER===========");  
            logger.error(e.getMessage(),e);
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
	}
}

消費100條消息

速度挺快的,為瞭讓結果更準確,將消息加到1000條

消費1000條消息

可以看到每一條消息平均耗時25ms,然而業務是阻塞100ms,這說明有序消費模式和同步消費可能並不是一回事,那如果不阻塞代碼我們再來看一下結果

不阻塞過後速度明顯提高瞭,那麼我阻塞300ms會怎麼樣呢?

時間相比阻塞100ms多瞭2倍

接下來我們測試並發消費模式

Long totalTime = 0L;
Date date1 = null;
Date date2 = new Date();
new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(  
                       List< MessageExt > msgs, ConsumeConcurrentlyContext context) {  
 
    		logger.info(Thread.currentThread().getName()  
                                 + " Receive New Messages: " + msgs.size()); 
    		try {
    			if(date1 == null)
    				date1 = new Date();
            	Thread.sleep(100);
           		logger.info("total:"+(++total));
           		date2 = new Date();
           		totalTime = (date2.getTime() - date1.getTime());
           		logger.info("totalTime:"+totalTime);
                logger.info("==========CONSUME_SUCCESS===========");  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
            } catch (Exception e) {
                logger.info("==========RECONSUME_LATER===========");  
                logger.error(e.getMessage(),e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
    }  
}

基於上次的經驗,同樣測試三種情況,消費1000條不阻塞,消費1000條阻塞100ms,消費1000條阻塞300ms

消費1000條不阻塞的情況

和有序消費模式差不多,快個一兩秒。

消費1000條阻塞100ms

竟然比不阻塞的情況更快,可能是誤差把

消費1000條阻塞300ms

速度稍慢,但是還是比有序消費快得多。

結論是並發消費的消費速度要比有序消費更快。

另一個區別是消費失敗時的處理不同,有序消費模式返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT後,消費者會立馬消費這條消息,而使用並發消費模式,返回ConsumeConcurrentlyStatus.RECONSUME_LATER後,要過好幾秒甚至十幾秒才會再次消費。

我是在隻有一條消息的情況下測試的。更重要的區別是,

返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT並不會增加消息的消費次數,mq消息有個默認最大消費次數16,消費次數到瞭以後,這條消息會進入死信隊列,這個最大消費次數是可以在mqadmin中設置的。

mqadmin updateSubGroup -n 127.0.0.1:9876 -c DefaultCluster -g MonitorCumsumerGroupName -r 3

我測試後發現,並發模式下返回ConsumeConcurrentlyStatus.RECONSUME_LATER,同一個消息到達最大消費次數之後就不會再出現瞭。這說明有序消費模式可能並沒有這個機制,這意味著你再有序消費模式下拋出固定異常,那麼這條異常信息將會被永遠消費,並且很可能會影響之後正常的消息。下面依然做個試驗

Map<String, Integer> map = new HashMap<>();//保存消息錯誤消費次數
new MessageListenerOrderly() {
 
	@Override
	public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
			ConsumeOrderlyContext context) {
        try {
        	if(1 == 1)
        			throw new Exception();
            return ConsumeOrderlyStatus.SUCCESS;  
        }catch (Exception e) {
        	MessageExt msg = msgs.get(0);
			if(map.containsKey(msg.getKeys())) {//消息每消費一次,加1
			    map.put(msg.getKeys(), map.get(msg.getKeys()) + 1);
			}else {
			    map.put(msg.getKeys(), 1);
			}
			logger.info(msg.getKeys()+":"+map.get(msg.getKeys()));
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
	}	
}

發送瞭十條消息

可以看到雖然我發瞭十條消息,但是一直在消費同樣四條消息,這可能跟消息broker有默認四條隊列有關系。同時從時間可以看到,消費失敗後,會馬上拉這條信息。

至於並發消費模式則不會無限消費,而且消費失敗後不會馬上再消費。具體的就不嘗試瞭。

結論是有序消費模式MessageListenerOrderly要慎重地處理異常,我則是用全局變量記錄消息的錯誤消費次數,隻要消費次數達到一定次數,那麼就直接返回ConsumeOrderlyStatus.SUCCESS。

突然想到之前測試有序消費模式MessageListenerOrderly的時候為什麼1000條消息阻塞100ms耗時25000ms瞭,因為有序消費模式是同時拉取四條隊列消息的,這就對上瞭。

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: