基於rabbitmq延遲插件實現分佈式延遲任務
之前給大傢介紹過SpringBoot集成Redisson實現延遲隊列的場景分析,今天介紹下基於rabbitmq延遲插件rabbitmq_delayed_message_exchange實現延遲任務。
一、延遲任務的使用場景
1、下單成功,30分鐘未支付。支付超時,自動取消訂單
2、訂單簽收,簽收後7天未進行評價。訂單超時未評價,系統默認好評
3、下單成功,商傢5分鐘未接單,訂單取消
4、配送超時,推送短信提醒
5、三天會員試用期,三天到期後準時準點通知用戶,試用產品到期瞭
……
對於延時比較長的場景、實時性不高的場景,我們可以采用任務調度的方式定時輪詢處理。如:xxl-job。
今天我們講解延遲隊列的實現方式,而延遲隊列有很多種實現方式,普遍會采用如下等方式,如:
1.如基於RabbitMQ的隊列ttl+死信路由策略:通過設置一個隊列的超時未消費時間,配合死信路由策略,到達時間未消費後,回會將此消息路由到指定隊列
2.基於RabbitMQ延遲隊列插件(rabbitmq-delayed-message-exchange):發送消息時通過在請求頭添加延時參數(headers.put("x-delay",5000))即可達到延遲隊列的效果。(順便說一句阿裡雲的收費版rabbitMQ當前可支持一天以內的延遲消息),局限性:目前該插件的當前設計並不真正適合包含大量延遲消息(例如數十萬或數百萬)的場景,詳情參見#/issues/72另外該插件的一個可變性來源是依賴於 Erlang 計時器,在系統中使用瞭一定數量的長時間計時器之後,它們開始爭用調度程序資源。
3.使用redis的zset有序性,輪詢zset中的每個元素,到點後將內容遷移至待消費的隊列,(redisson已有實現)
4.使用redis的key的過期通知策略,設置一個key的過期時間為延遲時間,過期後通知客戶端(此方式依賴redis過期檢查機制key多後延遲會比較嚴重;Redis的pubsub不會被持久化,服務器宕機就會被丟棄)。
二、組件安裝
安裝rabbitMQ需要依賴erlang語言環境,所以需要我們下載erlang的環境安裝程序。網上有很多安裝教程,這裡不再貼圖累述,需要註意的是:該延遲插件支持的版本匹配。
插件Git官方地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
當你成功安裝好插件後運行起rabbitmq管理後臺,在新建exchange裡就可以看到type類型中多出瞭這個選項
三、RabbitMQ延遲隊列插件的延遲隊列實現
1、基本原理
通過 x-delayed-message 聲明的交換機,它的消息在發佈之後不會立即進入隊列,先將消息保存至 Mnesia(一個分佈式數據庫管理系統,適合於電信和其它需要持續運行和具備軟實時特性的 Erlang 應用。目前資料介紹的不是很多)
這個插件將會嘗試確認消息是否過期,首先要確保消息的延遲范圍是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被設置的范圍為 (2^32)-1 毫秒),如果消息過期通過 x-delayed-type 類型標記的交換機投遞至目標隊列,整個消息的投遞過程也就完成瞭。
2、核心組件開發走起
引入maven依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
application.yml簡單配置
rabbitmq: host: localhost port: 5672 virtual-host: /
RabbitMqConfig配置文件
package com.example.code.bot_monomer.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.ExchangeBuilder; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @author: shf description: date: 2022/1/5 15:00 */ @Configuration public class RabbitMQConfig { /** * 普通 */ public static final String EXCHANGE_NAME = "test_exchange"; public static final String QUEUE_NAME = "test001_queue"; public static final String NEW_QUEUE_NAME = "test002_queue"; /** * 延遲 */ public static final String DELAY_EXCHANGE_NAME = "delay_exchange"; public static final String DELAY_QUEUE_NAME = "delay001_queue"; public static final String DELAY_QUEUE_ROUT_KEY = "key001_delay"; //由於阿裡rabbitmq增加隊列要額外收費,現改為各業務延遲任務共同使用一個queue:delay001_queue //public static final String NEW_DELAY_QUEUE_NAME = "delay002_queue"; @Bean public CustomExchange delayMessageExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); //自定義交換機 return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean public Queue delayMessageQueue() { return new Queue(DELAY_QUEUE_NAME, true, false, false); } @Bean public Binding bindingDelayExchangeAndQueue(Queue delayMessageQueue, Exchange delayMessageExchange) { return new Binding(DELAY_QUEUE_NAME, Binding.DestinationType.QUEUE, DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUT_KEY, null); //return BindingBuilder.bind(delayMessageQueue).to(delayMessageExchange).with("key001_delay").noargs(); } /** * 交換機 */ @Bean public Exchange orderExchange() { return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); //return new TopicExchange(EXCHANGE_NAME, true, false); } /** * 隊列 */ @Bean public Queue orderQueue() { //return QueueBuilder.durable(QUEUE_NAME).build(); return new Queue(QUEUE_NAME, true, false, false, null); } /** * 隊列 */ @Bean public Queue orderQueue1() { //return QueueBuilder.durable(NEW_QUEUE_NAME).build(); return new Queue(NEW_QUEUE_NAME, true, false, false, null); } /** * 交換機和隊列綁定關系 */ @Bean public Binding orderBinding(Queue orderQueue, Exchange orderExchange) { //return BindingBuilder.bind(queue).to(exchange).with("#.delay").noargs(); return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "test001_common", null); } /** * 交換機和隊列綁定關系 */ @Bean public Binding orderBinding1(Queue orderQueue1, Exchange orderExchange) { //return BindingBuilder.bind(queue).to(exchange).with("#.delay").noargs(); return new Binding(NEW_QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "test001_common", null); } }
MqDelayQueueEnum枚舉類
package com.example.code.bot_monomer.enums; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; /** * @author: shf description: 延遲隊列業務枚舉類 * date: 2021/8/27 14:03 */ @Getter @NoArgsConstructor @AllArgsConstructor public enum MqDelayQueueEnum { /** * 業務0001 */ YW0001("yw0001", "測試0001", "yw0001"), /** * 業務0002 */ YW0002("yw0002", "測試0002", "yw0002"); /** * 延遲隊列業務區分唯一Key */ private String code; /** * 中文描述 */ private String name; /** * 延遲隊列具體業務實現的 Bean 可通過 Spring 的上下文獲取 */ private String beanId; public static String getBeanIdByCode(String code) { for (MqDelayQueueEnum queueEnum : MqDelayQueueEnum.values()) { if (queueEnum.code.equals(code)) { return queueEnum.beanId; } } return null; } }
模板接口處理類:MqDelayQueueHandle
package com.example.code.bot_monomer.service.mqDelayQueue; /** * @author: shf description: RabbitMQ延遲隊列方案處理接口 * date: 2022/1/10 10:46 */ public interface MqDelayQueueHandle<T> { void execute(T t); }
具體業務實現處理類
@Slf4j @Component("yw0001") public class MqTaskHandle01 implements MqDelayQueueHandle<String> { @Override public void execute(String s) { log.info("MqTaskHandle01.param=[{}]",s); //TODO } }
註意:@Component("yw0001") 要和業務枚舉類MqDelayQueueEnum中對應的beanId保持一致。
統一消息體封裝類
/** * @author: shf description: date: 2022/1/10 10:51 */ @Data @NoArgsConstructor @AllArgsConstructor @Builder public class MqDelayMsg<T> { /** * 業務區分唯一key */ @NonNull String businessCode; /** * 消息內容 */ @NonNull T content; }
統一消費分發處理Consumer
package com.example.code.bot_monomer.service.mqConsumer; import com.alibaba.fastjson.JSONObject; import com.example.code.bot_monomer.config.common.MqDelayMsg; import com.example.code.bot_monomer.enums.MqDelayQueueEnum; import com.example.code.bot_monomer.service.mqDelayQueue.MqDelayQueueHandle; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import lombok.extern.slf4j.Slf4j; /** * @author: shf description: date: 2022/1/5 15:12 */ @Slf4j @Component //@RabbitListener(queues = "test001_queue") @RabbitListener(queues = "delay001_queue") public class TestConsumer { @Autowired ApplicationContext context; /** * RabbitHandler 會自動匹配 消息類型(消息自動確認) * * @param msgStr * @param message */ @RabbitHandler public void taskHandle(String msgStr, Message message) { try { MqDelayMsg msg = JSONObject.parseObject(msgStr, MqDelayMsg.class); log.info("TestConsumer.taskHandle:businessCode=[{}],deliveryTag=[{}]", msg.getBusinessCode(), message.getMessageProperties().getDeliveryTag()); String beanId = MqDelayQueueEnum.getBeanIdByCode(msg.getBusinessCode()); if (StringUtils.isNotBlank(beanId)) { MqDelayQueueHandle<Object> handle = (MqDelayQueueHandle<Object>) context.getBean(beanId); handle.execute(msg.getContent()); } else { log.warn("TestConsumer.taskHandle:MQ延遲任務不存在的beanId,businessCode=[{}]", msg.getBusinessCode()); } } catch (Exception e) { log.error("TestConsumer.taskHandle:MQ延遲任務Handle異常:", e); } } }
最後簡單封裝個工具類
package com.example.code.bot_monomer.utils; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.example.code.bot_monomer.config.RabbitMQConfig; import com.example.code.bot_monomer.config.common.MqDelayMsg; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.lang.NonNull; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.time.temporal.ChronoUnit; import java.util.Objects; import lombok.extern.slf4j.Slf4j; /** * @author: shf description: MQ分佈式延遲隊列工具類 date: 2022/1/10 15:20 */ @Slf4j @Component public class MqDelayQueueUtil { @Autowired private RabbitTemplate template; @Value("${mqdelaytask.limit.days:2}") private Integer mqDelayLimitDays; /** * 添加延遲任務 * * @param bindId 業務綁定ID,用於關聯具體消息 * @param businessCode 業務區分唯一標識 * @param content 消息內容 * @param delayTime 設置的延遲時間 單位毫秒 * @return 成功true;失敗false */ public boolean addDelayQueueTask(@NonNull String bindId, @NonNull String businessCode, @NonNull Object content, @NonNull Long delayTime) { log.info("MqDelayQueueUtil.addDelayQueueTask:bindId={},businessCode={},delayTime={},content={}", bindId, businessCode, delayTime, JSON.toJSONString(content)); if (StringUtils.isAnyBlank(bindId, businessCode) || Objects.isNull(content) || Objects.isNull(delayTime)) { return false; } try { //TODO 延時時間大於2天的先加入數據庫表記錄,後由定時任務每天拉取2次將低於2天的延遲記錄放入MQ中等待到期執行 if (ChronoUnit.DAYS.between(LocalDateTime.now(), LocalDateTime.now().plus(delayTime, ChronoUnit.MILLIS)) >= mqDelayLimitDays) { //TODO } else { this.template.convertAndSend( RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUE_ROUT_KEY, JSONObject.toJSONString(MqDelayMsg.<Object>builder().businessCode(businessCode).content(content).build()), message -> { //註意這裡時間可使用long類型,毫秒單位,設置header message.getMessageProperties().setHeader("x-delay", delayTime); return message; } ); } } catch (Exception e) { log.error("MqDelayQueueUtil.addDelayQueueTask:bindId={}businessCode={}異常:", bindId, businessCode, e); return false; } return true; } /** * 撤銷延遲消息 * @param bindId 業務綁定ID,用於關聯具體消息 * @param businessCode 業務區分唯一標識 * @return 成功true;失敗false */ public boolean cancelDelayQueueTask(@NonNull String bindId, @NonNull String businessCode) { if (StringUtils.isAnyBlank(bindId,businessCode)) { return false; } try { //TODO 查詢DB,如果消息還存在即可刪除 } catch (Exception e) { log.error("MqDelayQueueUtil.cancelDelayQueueTask:bindId={}businessCode={}異常:", bindId, businessCode, e); return false; } return true; } /** * 修改延遲消息 * @param bindId 業務綁定ID,用於關聯具體消息 * @param businessCode 業務區分唯一標識 * @param content 消息內容 * @param delayTime 設置的延遲時間 單位毫秒 * @return 成功true;失敗false */ public boolean updateDelayQueueTask(@NonNull String bindId, @NonNull String businessCode, @NonNull Object content, @NonNull Long delayTime) { if (StringUtils.isAnyBlank(bindId, businessCode) || Objects.isNull(content) || Objects.isNull(delayTime)) { return false; } try { //TODO 查詢DB,消息不存在返回false,存在判斷延遲時長入庫或入mq //TODO 延時時間大於2天的先加入數據庫表記錄,後由定時任務每天拉取2次將低於2天的延遲記錄放入MQ中等待到期執行 if (ChronoUnit.DAYS.between(LocalDateTime.now(), LocalDateTime.now().plus(delayTime, ChronoUnit.MILLIS)) >= mqDelayLimitDays) { //TODO } else { this.template.convertAndSend( RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUE_ROUT_KEY, JSONObject.toJSONString(MqDelayMsg.<Object>builder().businessCode(businessCode).content(content).build()), message -> { //註意這裡時間可使用long類型,毫秒單位,設置header message.getMessageProperties().setHeader("x-delay", delayTime); return message; } ); } } catch (Exception e) { log.error("MqDelayQueueUtil.updateDelayQueueTask:bindId={}businessCode={}異常:", bindId, businessCode, e); return false; } return true; } }
附上測試類:
/** * description: 延遲隊列測試 * * @author: shf date: 2021/8/27 14:18 */ @RestController @RequestMapping("/mq") @Slf4j public class MqQueueController { @Autowired private MqDelayQueueUtil mqDelayUtil; @PostMapping("/addQueue") public String addQueue() { mqDelayUtil.addDelayQueueTask("00001",MqDelayQueueEnum.YW0001.getCode(),"delay0001測試",3000L); return "SUCCESS"; } }
貼下DB記錄表的字段設置
配合xxl-job定時任務即可。
由於投遞後的消息無法修改,設置延遲消息需謹慎!並需要與業務方配合,如:延遲時間在2天以內(該時間天數可調整,你也可以設置閾值單位為小時,看業務需求)的消息不支持修改與撤銷。2天之外的延遲消息支持撤銷與修改,需要註意的是,需要綁定關聯具體操作業務唯一標識ID以對應關聯操作撤銷或修改。(PS:延遲時間設置在2天以外的會先保存到DB記錄表,由定時任務每天拉取到時2天內的投放到延遲對列)。
再穩妥點,為瞭防止進入DB記錄的消息有操作時間誤差導致的不一致問題,可在消費統一Consumer消費分發前,查詢DB記錄表,該消息是否已被撤銷刪除(增加個刪除標記字段記錄),並且當前時間大於等於DB表中記錄的到期執行時間才能分發出去執行,否則棄用。
此外,利用rabbitmq的死信隊列機制也可以實現延遲任務,有時間再附上實現案例。
到此這篇關於基於rabbitmq延遲插件實現分佈式延遲任務的文章就介紹到這瞭,更多相關rabbitmq分佈式延遲任務內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- RabbitMQ 實現延遲隊列的兩種方式詳解
- Springboot整合RabbitMq測試TTL的方法詳解
- Spring Boot+RabbitMQ 通過fanout模式實現消息接收功能(支持消費者多實例部署)
- PHP實現RabbitMQ消息列隊的示例代碼
- SpringBoot+RabbitMQ實現消息可靠傳輸詳解