redis實現延時隊列的兩種方式(小結)
背景
項目中的流程監控,有幾種節點,需要監控每一個節點是否超時。按傳統的做法,肯定是通過定時任務,去掃描然後判斷,但是定時任務有缺點:1,數據量大會慢;2,時間不好控制,太短,怕一次處理不完,太長狀態就會有延遲。所以就想到用延遲隊列的方式去實現。
一,redis的過期key監控
1,開啟過期key監聽
在redis的配置裡把這個註釋去掉
notify-keyspace-events Ex
然後重啟redis
2,使用redis過期監聽實現延遲隊列
繼承KeyExpirationEventMessageListener類,實現父類的方法,就可以監聽key過期時間瞭。當有key過期,就會執行這裡。這裡就把需要的key過濾出來,然後發送給kafka隊列。
@Component @Slf4j public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { @Autowired private KafkaProducerService kafkaProducerService; public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } /** * 針對 redis 數據失效事件,進行數據處理 * @param message * @param pattern */ @Override public void onMessage(Message message, byte[] pattern){ if(message == null || StringUtils.isEmpty(message.toString())){ return; } String content = message.toString(); //key的格式為 flag:時效類型:運單號 示例如下 try { if(content.startsWith(AbnConstant.EMS)){ kafkaProducerService.sendMessageSync(TopicConstant.EMS_WAYBILL_ABN_QUEUE,content); }else if(content.startsWith(AbnConstant.YUNDA)){ kafkaProducerService.sendMessageSync(TopicConstant.YUNDA_WAYBILL_ABN_QUEUE,content); } } catch (Exception e) { log.error("監控過期key,發送kafka異常,",e); } } }
可以看的出來,這種方式其實是很簡單的,但是有幾個問題需要註意,一是,這個盡量單機運行,因為多臺機器都會執行,浪費cpu,增加數據庫負擔。二是,機器頻繁部署的時候,如果有時間間隔,會出現數據的漏處理。
二,redis的zset實現延遲隊列
1,生產者實現
可以看到生產者很簡單,其實就是利用zset的特性,給一個zset添加元素而已,而時間就是它的score。
public void produce(Integer taskId, long exeTime) { System.out.println("加入任務, taskId: " + taskId + ", exeTime: " + exeTime + ", 當前時間:" + LocalDateTime.now()); RedisOps.getJedis().zadd(RedisOps.key, exeTime, String.valueOf(taskId)); }
2,消費者實現
消費者的代碼也不難,就是把已經過期的zset中的元素給刪除掉,然後處理數據。
public void consumer() { Executors.newSingleThreadExecutor().submit(new Runnable() { @Override public void run() { while (true) { Set<String> taskIdSet = RedisOps.getJedis().zrangeByScore(RedisOps.key, 0, System.currentTimeMillis(), 0, 1); if (taskIdSet == null || taskIdSet.isEmpty()) { System.out.println("沒有任務"); } else { taskIdSet.forEach(id -> { long result = RedisOps.getJedis().zrem(RedisOps.key, id); if (result == 1L) { System.out.println("從延時隊列中獲取到任務,taskId:" + id + " , 當前時間:" + LocalDateTime.now()); } }); } try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }); }
可以看到這種方式其實是比上個方式要好的。因為,他的那兩個缺點都被克服掉瞭。多臺機器也沒事兒,也不用再擔心部署時間間隔長的問題。
總結
兩個方式都是不錯的,都能解決問題。碰到問題,多思考,多總結。
到此這篇關於redis實現延時隊列的兩種方式(小結)的文章就介紹到這瞭,更多相關redis 延時隊列內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- None Found