Java RabbitMQ的工作隊列與消息應答詳解
Work Queues
工作隊列(任務隊列)主要思想是避免立即執行資源密集型任務,而不得不等待它完成,相反我們安排任務在之後執行。我們把任務封裝為消息並將其發送到隊列。在後臺運行的工作進程將彈出任務並最終執行作業。當有多個工作線程時,這些工作線程將一起處理這些任務。
其實就是生產者發送大量的消息,發送到隊列之後,由多個消費者(工作線程)來處理消息,並且每個消息隻能被處理一次。
1. 輪詢分發消息
多個工作線程按照次序每來一個消息執行一次。
1.1 抽取工具類
直接通過信息獲取信道
/** * @Description RabbitMQ工具類 * @date 2022/3/5 10:02 */ public class RabbitMQUtils { public static Channel getChannel() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("1"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); return connection.createChannel(); } }
1.2 編寫兩個工作線程
Work2和Work1代碼沒有區別,隻需要對它做出區分即可。
public class Worker1 { // 指定隊列名稱 private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { // 獲取信道 Channel channel = RabbitMQUtils.getChannel(); // 聲明:接收消息回調 DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("工作線程01:"+ new String(message.getBody())); }; // 聲明:取消消費回調 CancelCallback cancelCallback = consumerTag -> { System.out.println("工作線程01取消接收:"+consumerTag); }; System.out.println("工作線程01啟動完成......"); channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback); } }
1.3 編寫生產者
public class Producer { private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 產生隊列 channel.queueDeclare(QUEUE_NAME,false,false,true,null); // 消息體 Scanner scanner = new Scanner(System.in); int i = 1; while (scanner.hasNext()){ String msg = scanner.next(); msg = msg + i; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); System.out.println("發送成功:" + msg); } System.out.println("----------==========發送完畢==========----------"); } }
1.4 運行測試
先啟動兩個工作線程,再啟動生產者。
出現404異常請參考下方1.6
生產者發送情況:
輪詢狀態下兩個工作隊列接收狀態:
1.5 異常情況
在先啟動兩個消費者線程時,會提示404找不到隊列。
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'work_queue' in vhost '/', class-id=60, method-id=20)
發生這個情況的原因很顯然是因為先啟動瞭消費者,但是在RabbitMQ中沒有創建相對應的隊列名稱,解決方法可以:
1.先啟動生產者創建隊列(也可以在RabbitMQ中創建隊列);
2.再啟動消費者就不會產生這個錯誤;
3.再在生產者中使用Scanner
類去發送消息測試。
2. 消息應答
消費者在接收到消息並且處理該消息之後,告訴RabbitMQ它已經處理瞭,RabbitMQ可以刪除消息。其目的就是為瞭保護消息在被處理之前不會消失。
2.1 自動應答
這種方式發送後就被認定為已經傳送成功,所以在消息接收到之前消費者的連接或者channel關閉,那麼這個消息就會丟失。其特點是消費者可以傳遞過載的消息,對傳遞的消息沒有限制,但如果因內存耗盡消費者線程被系統殺死,就會使得多條消息丟失。所以這個模式需要在數據安全性和吞吐量之間選擇,適合使用在消費者可以高效並以某種速率能夠處理這些消息的情況下使用。
所以自動應答的方式局限性很高。
2.2 手動應答
優點:可以批量應答和減少網絡擁擠。
1.channel.basicAck(long deliveryTag, boolean multiple);
:肯應應答,處理完消息之後提醒RabbitMQ可以刪除當前隊列,deliveryTag:當前隊列中選中的消息;multiple:是否批量應答。
2.channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
:否定應答,
3.channel.basicReject(long deliveryTag, boolean requeue)
:否定並且拒絕應答。
2.3 消息自動重新入隊
如果消費者因為一些原因失去瞭對RabbitMQ的連接,導致沒有發送ACK確認,RabbitMQ就會對該消息進行重新排隊,並且分發給可以處理該消息的消費者,所以即使某個消費者死亡,也可以保證消息不會丟失。
2.4 手動應答測試
測試目的:在手動應答狀態下不會發生消息丟失的情況。
測試方法:
1.創建兩個消費者;
2.使用工具類使線程睡眠一定時間;
3.在睡眠時關閉線程,看能否自動重新入隊。
2.4.1 生產者代碼
/** * @Description 手動應答生產者 * @date 2022/3/5 19:03 */ public class Producer1 { // 指定隊列名 private static final String TASK_QUEUE_RES = "queue_res"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.queueDeclare(TASK_QUEUE_RES,false,false,false,null); Scanner scanner = new Scanner(System.in); int i = 0; while (scanner.hasNext()){ i++; String msg = scanner.next() + i; channel.basicPublish("",TASK_QUEUE_RES,null,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("發送消息:'" + msg + "'成功"); } } }
2.4.2 消費者代碼
/** * @Description 手動應答消費者1 * @date 2022/3/5 19:17 */ public class Worker1 { private static final String TASK_QUEUE_RES = "queue_res"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMQUtils.getChannel(); System.out.println("線程A等待接收......"); DeliverCallback deliverCallback = (consumerTag, message) -> { // 模擬並發沉睡一秒 try { Thread.sleep(1000); System.out.println("線程A接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8)); /** * basicAck: * 1. 消息標記 * 2. 是否批量 */ channel.basicAck(message.getEnvelope().getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } }; channel.basicConsume(TASK_QUEUE_RES,false,deliverCallback, consumerTag -> { System.out.println(consumerTag + "消費者取消消費"); }); } }
Worker2類和1區別不大,將名稱改成B再將睡眠事件改成30即可。
2.4.3 測試
測試方法:
1.先啟動生產者創建隊列;
2.啟動兩個消費者接收消息;
3.因為是輪詢方式,所以A線程接收之後肯定是B線程接收,在睡眠時關閉B線程,如果A線程接收到說明測試成功。
發送消息:
線程A接收:
再發送消息:
關閉線程B線程A接收到消息:
測試成功!
總結
本篇文章就到這裡瞭,希望能夠給你帶來幫助,也希望您能夠多多關註WalkonNet的更多內容!
推薦閱讀:
- Java RabbitMQ的持久化和發佈確認詳解
- RabbitMQ冪等性與優先級及惰性詳細全面講解
- SkyWalking 自定義插件(Spring RabbitMQ)具體分析過程
- RabbitMQ 延遲隊列實現訂單支付結果異步階梯性通知(實例代碼)
- rabbitmq中routingkey的作用說明