Java RabbitMQ的持久化和發佈確認詳解
1. 持久化
當RabbitMQ服務停掉以後消息生產者發送過的消息不丟失。默認情況下RabbitMQ退出或者崩潰時,會忽視掉隊列和消息。為瞭保證消息不丟失需要將隊列和消息都標記為持久化。
1.1 實現持久化
1.隊列持久化:在創建隊列時將channel.queueDeclare();
第二個參數改為true。
2.消息持久化:在使用信道發送消息時channel.basicPublish();
將第三個參數改為:MessageProperties.PERSISTENT_TEXT_PLAIN
表示持久化消息。
/** * @Description 持久化MQ * @date 2022/3/7 9:14 */ public class Producer3 { private static final String LONG_QUEUE = "long_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 持久化隊列 channel.queueDeclare(LONG_QUEUE,true,false,false,null); Scanner scanner = new Scanner(System.in); int i = 0; while (scanner.hasNext()){ i++; String msg = scanner.next() + i; // 持久化消息 channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("發送消息:'" + msg + "'成功"); } } }
但是存儲消息還有存在一個緩存的間隔點,沒有真正的寫入磁盤,持久性保證不夠強,但是對於簡單隊列而言也綽綽有餘。
1.2 不公平分發
輪詢分發的方式在消費者處理效率不同的情況下並不適用。所以真正的公平應該是遵循能者多勞的前提。
在消費者處修改channel.basicQos(1);
表示開啟不公平分發
/** * @Description 不公平分發消費者 * @date 2022/3/7 9:27 */ public class Consumer2 { private static final String LONG_QUEUE = "long_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { // 模擬並發沉睡三十秒 try { Thread.sleep(30000); System.out.println("線程B接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8)); channel.basicAck(message.getEnvelope().getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } }; // 設置不公平分發 channel.basicQos(1); channel.basicConsume(LONG_QUEUE,false,deliverCallback, consumerTag -> { System.out.println(consumerTag + "消費者取消消費"); }); } }
1.3 測試不公平分發
測試目的:是否能實現能者多勞。
測試方法:兩個消費者睡眠不同的事件來模擬處理事件不同,如果處理時間(睡眠時間)短的能夠處理多個消息就代表目的達成。
先啟動生產者創建隊列,再分別啟動兩個消費者。
生產者按照順序發四條消息:
睡眠時間短的線程A接收到瞭三條消息
而睡眠時間長的線程B隻接收到的第二條消息:
因為線程B在處理消息時消耗的時間較長,所以就將其他消息分配給瞭線程A。
實驗成功!
1.4 預取值
消息的發送和手動確認都是異步完成的,因此就存在一個未確認消息的緩沖區,開發人員希望能夠限制緩沖區的大小,用來避免緩沖區裡面無限制的未確認消息問題。
這裡的預期值就值得是上述方法channel.basicQos();
裡面的參數,如果在當前信道上存在等於參數的消息就不會在安排當前信道進行消費消息。
1.4.1 代碼測試
測試方法:
1.新建兩個不同的消費者分別給定預期值5個2。
2.給睡眠時間長的指定為5,時間短的指定為2。
3.假如按照指定的預期值獲取消息則表示測試成功,但並不是代表一定會按照5和2分配,這個類似於權重的判別。
代碼根據上述代碼修改預期值即可。
2. 發佈確認
發佈確認就是生產者發佈消息到隊列之後,隊列確認進行持久化完畢再通知給生產者的過程。這樣才能保證消息不會丟失。
需要註意的是需要開啟隊列持久化才能使用確認發佈。
開啟方法:channel.confirmSelect();
2.1 單個確認發佈
是一種同步發佈的方式,即發送完一個消息之後隻有確認它確認發佈後,後續的消息才會繼續發佈,在指定的時間內沒有確認就會拋出異常。缺點就是特別慢。
/** * @Description 確認發佈——單個確認 * @date 2022/3/7 14:49 */ public class SoloProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_solo"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 產生隊列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 開啟確認發佈 channel.confirmSelect(); // 記錄開始時間 long beginTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = ""+i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 單個發佈確認 boolean flag = channel.waitForConfirms(); if (flag){ System.out.println("發送消息:" + i); } } // 記錄結束時間 long endTime = System.currentTimeMillis(); System.out.println("發送" + MESSAGE_COUNT + "條消息消耗:"+(endTime - beginTime) + "毫秒"); } }
2.2 批量確認發佈
一批一批的確認發佈可以提高系統的吞吐量。但是缺點是發生故障導致發佈出現問題時,需要將整個批處理保存在內存中,後面再重新發佈。
/** * @Description 確認發佈——批量確認 * @date 2022/3/7 14:49 */ public class BatchProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_batch"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 產生隊列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 開啟確認發佈 channel.confirmSelect(); // 設置一個多少一批確認一次。 int batchSize = MESSAGE_COUNT / 10; // 記錄開始時間 long beginTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = ""+i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 批量發佈確認 if (i % batchSize == 0){ if (channel.waitForConfirms()){ System.out.println("發送消息:" + i); } } } // 記錄結束時間 long endTime = System.currentTimeMillis(); System.out.println("發送" + MESSAGE_COUNT + "條消息消耗:"+(endTime - beginTime) + "毫秒"); } }
顯然效率要比單個確認發佈的高很多。
2.3 異步確認發佈
在編程上比上述兩個要復雜,但是性價比很高,無論是可靠性還行效率的都好很多,利用回調函數來達到消息可靠性傳遞的。
/** * @Description 確認發佈——異步確認 * @date 2022/3/7 14:49 */ public class AsyncProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_async"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 產生隊列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 開啟確認發佈 channel.confirmSelect(); // 記錄開始時間 long beginTime = System.currentTimeMillis(); // 確認成功回調 ConfirmCallback ackCallback = (deliveryTab,multiple) ->{ System.out.println("確認成功消息:" + deliveryTab); }; // 確認失敗回調 ConfirmCallback nackCallback = (deliveryTab,multiple) ->{ System.out.println("未確認的消息:" + deliveryTab); }; // 消息監聽器 /** * addConfirmListener: * 1. 確認成功的消息; * 2. 確認失敗的消息。 */ channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "" + i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); } // 記錄結束時間 long endTime = System.currentTimeMillis(); System.out.println("發送" + MESSAGE_COUNT + "條消息消耗:"+(endTime - beginTime) + "毫秒"); } }
2.4 處理未確認的消息
最好的處理方式把未確認的消息放到一個基於內存的能被發佈線程訪問的隊列。
例如:ConcurrentLinkedQueue
可以在確認隊列confirm callbacks
與發佈線程之間進行消息的傳遞。
處理方式:
1.記錄要發送的全部消息;
2.在發佈成功確認處刪除;
3.打印未確認的消息。
使用一個哈希表存儲消息,它的優點:
可以將需要和消息進行關聯;輕松批量刪除條目;支持高並發。
ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
/** * @Description 異步發佈確認,處理未發佈成功的消息 * @date 2022/3/7 18:09 */ public class AsyncProducerRemember { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_async_remember"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 產生隊列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 開啟確認發佈 channel.confirmSelect(); // 線程安全有序的一個hash表,適用與高並發 ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>(); // 記錄開始時間 long beginTime = System.currentTimeMillis(); // 確認成功回調 ConfirmCallback ackCallback = (deliveryTab, multiple) ->{ //2. 在發佈成功確認處刪除; // 批量刪除 if (multiple){ ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab); confirmMap.clear(); }else { // 單獨刪除 map.remove(deliveryTab); } System.out.println("確認成功消息:" + deliveryTab); }; // 確認失敗回調 ConfirmCallback nackCallback = (deliveryTab,multiple) ->{ // 3. 打印未確認的消息。 System.out.println("未確認的消息:" + map.get(deliveryTab) + ",標記:" + deliveryTab); }; // 消息監聽器 /** * addConfirmListener: * 1. 確認成功的消息; * 2. 確認失敗的消息。 */ channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "" + i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 1. 記錄要發送的全部消息; map.put(channel.getNextPublishSeqNo(),msg); } // 記錄結束時間 long endTime = System.currentTimeMillis(); System.out.println("發送" + MESSAGE_COUNT + "條消息消耗:"+(endTime - beginTime) + "毫秒"); } }
總結
顯然來說,異步處理除瞭在編碼處有些麻煩,在處理時間效率和可用性上都是比單處理和批處理好很多。
本篇文章就到這裡瞭,希望能夠給你帶來幫助,也希望您能夠多多關註WalkonNet的更多內容!
推薦閱讀:
- Java RabbitMQ的工作隊列與消息應答詳解
- RabbitMQ冪等性與優先級及惰性詳細全面講解
- Java 線程的優先級(setPriority)案例詳解
- rabbitmq中routingkey的作用說明
- Java找出兩個大數據量List集合中的不同元素的方法總結