Java處理延時任務的常用幾種解決方案

前言

項目中經常會遇到如下的需求:

  • 創建訂單30分鐘未支付,訂單自動取消。
  • 訂單支付成功後,1分鐘後給用戶發送短信,提醒用戶評價。

針對延時任務需求,我們可以采用如下的解決方案:

數據庫輪詢

原理

通過一個線程定時的掃描數據庫當天創建的訂單,根據訂單的創建時間來判斷訂單是否超時,針對超時訂單進行相關的更新操作。
實現技術
采用Spring Boot結合quartz來實現,具體的實現可以參考之前的文章。

優缺點

優點:

此方案比較簡單,且quartz也支持集群操作。

缺點:

  • 系統訂單數據量比較大,每個幾分鐘輪詢數據庫,對服務器和數據庫的內存消耗比較大。
  • 存在延遲,即使1分鐘掃描一次數據庫,也會存在1分鐘的延遲。

Java延遲隊列

原理

采用JDK自帶的DelayQueue來實現,這是一個無界阻塞隊列,該隊列隻有在延遲期滿的時候才能從中獲取元素,放入DelayQueue中的對象。
實現技術
使用JDK的DelayQueue隊列進行相關操作即可。

優缺點

優點:

此方案是基於內存操作所以效率高,任務觸發時間延遲低.

缺點:

  • 消息隊列的信息都存放在內存中,一旦服務器重啟,則數據全部消失
  • 無法進行集群用擴展
  • 由於本機內存有限,一旦訂單數據量過大,很容易出現OOM異常。

Reids監聽失效key

原理

該方案使用Redis的Keyspace Notifications,利用key失效的提供的回調機制,處理相關的業務實現

實現技術

基於reids的方案,實現MessageListener接口。

實現步驟

修改Redis配置文件
打開redis.conf 文件,搜索 “notify-keyspace-events”找到原本的notify-keyspace-events " ",修改為 “notify-keyspace-events Ex”,至此Redis 就支持Key過期事件的監聽。

創建監聽類,實現MessageListener接口

@Component
public class RedisKeyExpirationListener implements MessageListener
{
    private static final Logger logger = LoggerFactory.getLogger(RedisKeyExpirationListener.class);
    
    public static final String KEY_PREX = "test::order:queue";
    
    @Override
    public void onMessage(Message message, byte[] pattern)
    {
        try
        {
            String expiredKey = message.toString();

            // 通過key來判斷
            if(!expiredKey.contains(KEY_PREX))
            {
                return;
            }
            
            //滿足條件處理具體的業務邏輯
        }
        catch (Exception e)
        {
            logger.error("失效事件失敗",e);
        }
    }
}

優缺點

優點:

基於Redis實現簡單

缺點:

  • 客戶端斷開後重連會導致所有事件丟失。
  • 高並發場景下,存在大量的失效key場景會導出失效時間存在延遲。
  • 此方案針對業務量較少且可靠性要求不高的場景使用。

RocketMq延遲消息

實現原理

基於RocketMQ設置消息的等級,發送延遲消息,RocketMQ延時消息會暫存在名為SCHEDULE_TOPIC_XXXX的Topic中,並根據delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一個queue隻存相同延遲的消息,保證具有相同發送延遲的消息能夠順序消費。broker會調度地消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的topic。
其具體步驟如下:

  • 修改消息Topic名稱和隊列信息
  • 轉發消息到延遲主題SCHEDULE_TOPIC_XXXX的CosumeQueue中
  • 延遲服務消費SCHEDULE_TOPIC_XXXX消息
  • 將信息重新存儲到CommitLog中
  • 將消息投遞到目標Topic中
  • 消費者消費目標topic中的數據。
/**
    * 發送延遲消息
    * @param topic
    * @param msg
    */
   public void sendDelayMessage(String topic,Object msg)
   {
      Message msgMessage =new Message();
      //設置消息等級
      msgMessage.setDelayTimeLevel(2);
     rocketMQTemplate.convertAndSend(topic, msg);
   }

註意:RocketMQ延時消息的延遲時長不支持隨意時長的延遲,是通過特定的延遲等級來指定的。默認支持18個等級的延遲消息,延時等級定義在RocketMQ服務端的MessageStoreConfig類中的如下變量中:

例如指定的延時等級為2,則表示延遲時長為5s,即延遲等級是從1開始計數的。

優缺點

優點:

支持高並發場景消息處理.

缺點:

  • 引入額外的消息隊列,增加項目的維護和復雜度。
  • 支持固定時長的消息延遲,針對任意時長的消息延遲需要進行擴展。

總結

本文講解瞭針對延時任務的處理的幾種方案和相關的優缺點,針對不同的業務場景,選擇合適的解決方案。更多相關Java 延時任務內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: