Java實現Redis延時消息隊列
什麼是延時任務
延時任務,顧名思義,就是延遲一段時間後才執行的任務。舉個例子,假設我們有個發佈資訊的功能,運營需要在每天早上7點準時發佈資訊,但是早上7點大傢都還沒上班,這個時候就可以使用延時任務來實現資訊的延時發佈瞭。隻要在前一天下班前指定第二天要發送資訊的時間,到瞭第二天指定的時間點資訊就能準時發出去瞭。如果大傢有運營過公眾號,就會知道公眾號後臺也有文章定時發送的功能。總而言之,延時任務的使用還是很廣泛的。
延時任務的特點
- 時間有序性
- 時間具體性
- 任務中攜帶詳細的信息 ,通常包括 任務ID, 任務的類型 ,時間點。
實現思路:
將整個Redis當做消息池,以kv形式存儲消息,key為id,value為具體的消息body
使用ZSET做優先隊列,按照score維持優先級(用當前時間+需要延時的時間作為score)
輪詢ZSET,拿出score比當前時間戳大的數據(已過期的)
根據id拿到消息池的具體消息進行消費
消費成功,刪除改隊列和消息
消費失敗,讓該消息重新回到隊列
代碼實現
1.消息模型
import lombok.Data; import lombok.experimental.Accessors; import javax.validation.constraints.NotNull; import java.io.Serializable; /** * Redis 消息隊列中的消息體 * @author shikanatsu */ @Data @Accessors(chain = true) public class RedisMessage implements Serializable { /** 消息隊列組 **/ private String group; /** * 消息id */ private String id; /** * 消息延遲/ 秒 */ @NotNull(message = "消息延時時間不能為空") private long delay; /** * 消息存活時間 單位:秒 */ @NotNull(message = "消息存活時間不能為空") private int ttl; /** * 消息體,對應業務內容 */ private Object body; /** * 創建時間,如果隻有優先級沒有延遲,可以設置創建時間為0 * 用來消除時間的影響 */ private long createTime; }
2.RedisMq 消息隊列實現類
package com.shixun.base.redisMq; import com.shixun.base.jedis.service.RedisService; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * Redis消息隊列 * * @author shikanatsu */ @Component public class RedisMq { /** * 消息池前綴,以此前綴加上傳遞的消息id作為key,以消息{@link MSG_POOL} * 的消息體body作為值存儲 */ public static final String MSG_POOL = "Message:Pool:"; /** * zset隊列 名稱 queue */ public static final String QUEUE_NAME = "Message:Queue:"; // private static final int SEMIH = 30 * 60; @Resource private RedisService redisService; /** * 存入消息池 * * @param message * @return */ public boolean addMsgPool(RedisMessage message) { if (null != message) { redisService.set(MSG_POOL + message.getGroup() + message.getId(), message, message.getTtl()); return true; } return false; } /** * 從消息池中刪除消息 * * @param id * @return */ public void deMsgPool(String group, String id) { redisService.remove(MSG_POOL + group + id); } /** * 向隊列中添加消息 * * @param key * @param score 優先級 * @param val * @return 返回消息id */ public void enMessage(String key, long score, String val) { redisService.zsset(key, val, score); } /** * 從隊列刪除消息 * * @param id * @return */ public boolean deMessage(String key, String id) { return redisService.zdel(key, id); } }
3.消息生產者
import cn.hutool.core.convert.Convert; import cn.hutool.core.lang.Assert; import cn.hutool.core.util.IdUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import org.springframework.validation.annotation.Validated; import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.TimeUnit; /** * 消息生產者 * * @author shikanatsu */ @Component public class MessageProvider { static Logger logger = LoggerFactory.getLogger(MessageProvider.class); @Resource private RedisMq redisMq; SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public boolean sendMessage(@Validated RedisMessage message) { Assert.notNull(message); //The priority is if there is no creation time // message.setCreateTime(System.currentTimeMillis()); message.setId(IdUtil.fastUUID()); Long delayTime = message.getCreateTime() + Convert.convertTime(message.getDelay(), TimeUnit.SECONDS, TimeUnit.MILLISECONDS); try { redisMq.addMsgPool(message); redisMq.enMessage(RedisMq.QUEUE_NAME+message.getGroup(), delayTime, message.getId()); logger.info("RedisMq發送消費信息{},當前時間:{},消費時間預計{}",message.toString(),new Date(),sdf.format(delayTime)); }catch (Exception e){ e.printStackTrace(); logger.error("RedisMq 消息發送失敗,當前時間:{}",new Date()); return false; } return true; } }
4.消息消費者
/** * Redis消息消費者 * @author shikanatsu */ @Component public class RedisMqConsumer { private static final Logger log = LoggerFactory.getLogger(RedisMqConsumer.class); @Resource private RedisMq redisMq; @Resource private RedisService redisService; @Resource private MessageProvider provider; SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //@Scheduled(cron = "*/1 * * * * ? ") /** Instead of a thread loop, you can use Cron expressions to perform periodic tasks */ public void baseMonitor(RedisMqExecute mqExecute){ String queueName = RedisMq.QUEUE_NAME+mqExecute.getQueueName(); //The query is currently expired Set<Object> set = redisService.rangeByScore(queueName, 0, System.currentTimeMillis()); if (null != set) { long current = System.currentTimeMillis(); for (Object id : set) { long score = redisService.getScore(queueName, id.toString()).longValue(); //Once again the guarantee has expired , And then perform the consumption if (current >= score) { String str = ""; RedisMessage message = null; String msgPool = RedisMq.MSG_POOL+mqExecute.getQueueName(); try { message = (RedisMessage)redisService.get(msgPool + id.toString()); log.debug("RedisMq:{},get RedisMessage success now Time:{}",str,sdf.format(System.currentTimeMillis())); if(null==message){ return; } //Do something ; You can add a judgment here and if it fails you can add it to the queue again mqExecute.execute(message); } catch (Exception e) { e.printStackTrace(); //If an exception occurs, it is put back into the queue // todo: If repeated, this can lead to repeated cycles log.error("RedisMq: RedisMqMessage exception ,It message rollback , If repeated, this can lead to repeated cycles{}",new Date()); provider.sendMessage(message); } finally { redisMq.deMessage(queueName, id.toString()); redisMq.deMsgPool(message.getGroup(),id.toString()); } } } } } }
5. 消息執接口
/** * @author shikanatsu */ public interface RedisMqExecute { /** * 獲取隊列名稱 * @return */ public String getQueueName(); /** * 統一的通過執行期執行 * @param message * @return */ public boolean execute(RedisMessage message); /** * Perform thread polling */ public void threadPolling(); }
6. 任務類型的實現類:可以根據自己的情況去實現對應的隊列需求
/** * 訂單執行 * * @author shikanatsu */ @Service public class OrderMqExecuteImpl implements RedisMqExecute { private static Logger logger = LoggerFactory.getLogger(OrderMqExecuteImpl.class); public final static String name = "orderPoll:"; @Resource private RedisMqConsumer redisMqConsumer; private RedisMqExecute mqExecute = this; @Resource private OrderService orderService; @Override public String getQueueName() { return name; } @Override /** * For the time being, only all orders will be processed. You can change to make orders */ public boolean execute(RedisMessage message) { logger.info("Do orderMqPoll ; Time:{}",new Date()); //Do return true; } @Override /** 通過線程去執行輪詢的過程,時間上可以自由控制 **/ public void threadPolling() { ThreadUtil.execute(() -> { while (true) { redisMqConsumer.baseMonitor(mqExecute); ThreadUtil.sleep(5, TimeUnit.MICROSECONDS); } }); } }
使用事例
1. 實現RedisMqExecute 接口 創建對應的輪詢或者采取定時器的方式執行 和實現具體的任務。
2. 通過MessageProvider 實現相對應的消息服務和綁定隊列組,通過隊列組的方式執行。
3. 提示: 采取線程的方式需要在項目啟動過程中執行,采取定時器或者調度的方式可以更加動態的調整。
到此這篇關於Java實現Redis延時消息隊列的文章就介紹到這瞭,更多相關Java Redis延時消息隊列內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Redis 使用 List 實現消息隊列的優缺點
- Java工作中常見的並發問題處理方法總結
- Java處理延時任務的常用幾種解決方案
- redis實現隊列的阻塞、延時、發佈和訂閱
- Java利用Redis實現高並發計數器的示例代碼