Springboot整合RabbitMq測試TTL的方法詳解
什麼是TTL?
在RabbitMq
中,存在一種高級特性
TTL
。
TTL
即Time To Live
的縮寫,含義為存活時間
或者過期時間
。即:
設定消息在隊列中存活的時間。
當指定時間內
,消息依舊未被消費
,則由隊列自動將其刪除。
如何設置TTL?
既然涉及到設定消息的存活時間
,在RabbitMq
中,存在兩種設置方式:
- 設置整個隊列的過期時間。
- 設置單個消息的過期時間。
設定整個隊列的過期時間
按照上一篇文章的依賴導入和配置編寫方式進行。
Springboot——整合Rabbitmq之Confirm和Return詳解
配置類編寫
在原有基礎之上,新創建幾個配置的bean
類,申明bean對象,並進行交換機
與隊列
的關聯,如下所示:、
package cn.linkpower.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class MQConfiguration { // =========================== Direct 直連模式 ================================== //隊列名稱 public static final String QUEUQ_NAME = "xiangjiao.queue"; //交換器名稱 public static final String EXCHANGE = "xiangjiao.exchange"; //路由key public static final String ROUTING_KEY = "xiangjiao.routingKey"; // =========================== Direct 普通隊列申明 和 交換機綁定 =================== //創建隊列 @Bean(value = "getQueue") public Queue getQueue(){ //QueueBuilder.durable(QUEUQ_NAME).build(); return new Queue(QUEUQ_NAME); } //實例化交換機 @Bean(value = "getDirectExchange") public DirectExchange getDirectExchange(){ //DirectExchange(String name, boolean durable, boolean autoDelete) /** * 參數一:交換機名稱;<br> * 參數二:是否永久;<br> * 參數三:是否自動刪除;<br> */ //ExchangeBuilder.directExchange(EXCHANGE).durable(true).build(); return new DirectExchange(EXCHANGE, true, false); //綁定消息隊列和交換機 @Bean public Binding bindExchangeAndQueue(@Qualifier(value = "getDirectExchange") DirectExchange exchange, @Qualifier(value = "getQueue") Queue queue){ return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); // =========================== TTL ================================ public static final String ttl_queue_name = "xiangjiao.ttl.queue"; public static final String ttl_exchange_name = "xiangjiao.ttl.exchange"; public static final String ttl_routing_key = "xiangjiao.ttl.routingKey"; @Bean(value = "getTtlQueue") public Queue getTtlQueue(){ // 設置 ttl 隊列,並設定 x-message-ttl 參數,表示 消息存活最大時間,單位 ms //return QueueBuilder.durable(ttl_queue_name).withArgument("x-message-ttl",10000).build(); Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl",10000); return new Queue(ttl_queue_name,true,false,false,arguments); @Bean(value = "getTTlExchange") public DirectExchange getTTlExchange(){ // 設置交換機屬性,並保證交換機持久化 return new DirectExchange(ttl_exchange_name, true, false); public Binding bindExchangeAndQueueTTL(@Qualifier(value = "getTTlExchange") DirectExchange getTTlExchange, @Qualifier(value = "getTtlQueue") Queue queue){ return BindingBuilder.bind(queue).to(getTTlExchange).with(ttl_routing_key); }
對比原有的配置類,不難發現區別:
對
隊列
設置過期屬性
,隻需要傳遞一個x-message-ttl
的屬性值即可。(單位:ms)
Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl",10000); return new Queue(ttl_queue_name,true,false,false,arguments);
然後定義交換機類型,並將指定的交換機和隊列進行綁定。
為瞭測試效果,暫未定義任何該隊列的消費者信息。
測試
為瞭便於測試,需要定義一個接口,生產新的數據信息,並將數據向對應的Exchange
中傳遞。
/** * 發送消息,指定ttl參數信息(隊列) * @return */ @RequestMapping("/sendQueueTtl") @ResponseBody public String sendQueueTtl(){ //發送10條消息 for (int i = 0; i < 10; i++) { String msg = "msg"+i; System.out.println("發送消息 msg:"+msg); rabbitmqService.sendMessage(MQConfiguration.ttl_exchange_name,MQConfiguration.ttl_routing_key,msg); //每兩秒發送一次 try { Thread.sleep(8000); } catch (InterruptedException e) { e.printStackTrace(); } } return "send ok"; }
兩條消息之間的過期時間為
8s
。
請求鏈接進行測試,查看Rabbitmq web
視圖信息:
http://localhost/sendQueueTtl
查看控制臺輸出日志:
消息正常發送到瞭Exchange
,同時Exchange 也將消息推送到瞭指定的隊列 !
設置有
Confirm
和Return
監聽。
【說明:】
給隊列設定時間後,單位時間內的消息如果未被消費,則隊列會將其中的數據進行刪除處理。
對單個消息設定過期時間
上面的操作和測試,已經驗證對隊列
設定過期時間
,會導致所有的消息過期時間都是一樣
的現象。
但實際開發中,可能一個隊列
需要存放不同過期時間
的消息信息,如果需要進行實現,就不能再設定隊列的過期時間信息瞭,需要采取下面要說到的針對單個消息,設置不同過期時間
。
配置
既然是針對單個消息
設定不同的過期時間
操作,則需要去掉隊列過期
設置。
為瞭測試的簡單化,此處采取直連 Direct 交換機
類型,進行交換機和隊列數據的綁定方式。如下所示:
// =========================== Direct 直連模式 ================================== //隊列名稱 public static final String QUEUQ_NAME = "xiangjiao.queue"; //交換器名稱 public static final String EXCHANGE = "xiangjiao.exchange"; //路由key public static final String ROUTING_KEY = "xiangjiao.routingKey"; // =========================== Direct 普通隊列申明 和 交換機綁定 =================== //創建隊列 @Bean(value = "getQueue") public Queue getQueue(){ //QueueBuilder.durable(QUEUQ_NAME).build(); return new Queue(QUEUQ_NAME); } //實例化交換機 @Bean(value = "getDirectExchange") public DirectExchange getDirectExchange(){ //DirectExchange(String name, boolean durable, boolean autoDelete) /** * 參數一:交換機名稱;<br> * 參數二:是否永久;<br> * 參數三:是否自動刪除;<br> */ //ExchangeBuilder.directExchange(EXCHANGE).durable(true).build(); return new DirectExchange(EXCHANGE, true, false); } //綁定消息隊列和交換機 @Bean public Binding bindExchangeAndQueue(@Qualifier(value = "getDirectExchange") DirectExchange exchange, @Qualifier(value = "getQueue") Queue queue){ return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); }
對於消息的發送,依舊沿用之前寫的發送處理方式
設定
confirm
和return
監聽,保證消息能夠正常到達指定的隊列中。
測試
編寫一個測試的接口,設定單個消息的過期時間屬性
,保證不同消息具備不同的過期時間
。
在之前博客中,針對消息的持久化
設置,需要保證消息向隊列設定屬性時,傳遞一個deliveryMode
參數值信息。
同理,設定每個消息的過期時間,也需要設定對應的屬性信息。如下所示:
/** * 發送消息,指定ttl參數信息(單個消息); * 測試需要將消息消費者關閉監聽 * @return */ @RequestMapping("/sendTtl") @ResponseBody public String sendTtl(){ //發送10條消息 for (int i = 0; i < 10; i++) { String msg = "msg"+i; System.out.println("發送消息 msg:"+msg); MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("5000"); // 針對消息設定時限 // 將消息數據和設置屬性進行封裝,采取消息發送模板,將消息數據推送至指定的交換機 exchange 中 Message message = new Message(msg.getBytes(), messageProperties); rabbitmqService.sendMessage(MQConfiguration.EXCHANGE, MQConfiguration.ROUTING_KEY,message); //每兩秒發送一次 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } return "send ok"; }
上面代碼的編寫核心為將消息內容體和消息對象屬性進行封裝
。
MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("5000"); // 針對消息設定時限 // 將消息數據和設置屬性進行封裝,采取消息發送模板,將消息數據推送至指定的交換機 exchange 中 Message message = new Message(msg.getBytes(), messageProperties);
引申一點:消息的持久化
Springboot 2.x ——RabbitTemplate為什麼會默認消息持久化?
請求連接進行測試:
http://localhost/sendTtl
查看控制臺打印日志情況:
總結
1、設置隊列過期
時間使用參數:x-message-ttl
,單位:ms
(毫秒),會對整個隊列消息統一過期
。
2、設置消息過期
時間使用參數:expiration
。單位:ms
(毫秒),當該消息在隊列頭部
時(消費時),會單獨判斷
這一消息是否過期
。
3、如果兩者都進行瞭設置,以時間短的為準。
代碼下載
gitee 代碼下載
到此這篇關於Springboot整合RabbitMq測試TTL的文章就介紹到這瞭,更多相關Springboot整合RabbitMq內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- RabbitMQ 實現延遲隊列的兩種方式詳解
- Springboot整合Rabbitmq之Confirm和Return機制
- 聊聊RabbitMQ發佈確認高級問題
- SpringBoot整合RabbitMQ實戰教程附死信交換機
- SpringBoot整合RabbitMQ的5種模式實戰