Java使用延時隊列搞定超時訂單處理的場景

1、延時隊列使用場景:

那麼什麼時候需要用延時隊列呢?常見的延時任務場景 舉栗子:

  1. 訂單在30分鐘之內未支付則自動取消。
  2. 重試機制實現,把調用失敗的接口放入一個固定延時的隊列,到期後再重試。
  3. 新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送消息提醒。
  4. 用戶發起退款,如果三天內沒有得到處理則通知相關運營人員。
  5. 預定會議後,需要在預定的時間點前十分鐘通知各個與會人員參加會議。
  6. 關閉空閑連接,服務器中,有很多客戶端的連接,空閑一段時間之後需要關閉之。
  7. 清理過期數據業務。比如緩存中的對象,超過瞭空閑時間,需要從緩存中移出。

解決方案也非常多:

  • 定期輪詢(數據庫等)
  • JDK DelayQueue
  • JDK Timer
  • ScheduledExecutorService 周期性線程池
  • 時間輪(kafka)
  • 時間輪(Netty的HashedWheelTimer)
  • Redis有序集合(zset)
  • zookeeper之curator
  • RabbitMQ
  • Quartz,xxljob等定時任務框架
  • Koala(考拉)
  • JCronTab(仿crontab的java調度器)

SchedulerX(阿裡)

對於單機服務優選DelayQueue,對於分佈式環境,可以使用mq、zk、redis之類的。接下來,介紹DelayQueue的使用。

一句話介紹:DelayQueue = BlockingQueue + PriorityQueue + Delayed

2、示例:

實戰以訂單下單後三十分鐘內未支付則自動取消 為業務場景,該場景的代碼邏輯分析如下:

  1. 下單後將訂單直接放入未支付的延時隊列中
  2. 如果超時未支付,則從隊列中取出,進行修改為取消狀態的訂單
  3. 如果支付瞭,則不去進行取消,或者取消的時候做個狀態篩選,即可避免更新
  4. 或者支付完成後,做個主動出隊

還有就是用戶主動取消訂單,也做個主動出隊

1)先來寫個通用的​​Delayed​​ :

import lombok.Getter;
import lombok.Setter;
import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

@Setter
@Getter
public class ItemDelayed<T> implements Delayed {

    /**默認延遲30分鐘*/
    private final static long DELAY = 30 * 60 * 1000L;
    /**數據id*/
    private Long dataId;
    /**開始時間*/
    private long startTime;
    /**到期時間*/
    private long expire;
    /**創建時間*/
    private Date now;
    /**泛型data*/
    private T data;
    
    public ItemDelayed(Long dataId, long startTime, long secondsDelay) {
        super();
        this.dataId = dataId;
        this.startTime = startTime;
        this.expire = startTime + (secondsDelay * 1000);
        this.now = new Date();
    }

    public ItemDelayed(Long dataId, long startTime) {
        super();
        this.dataId = dataId;
        this.startTime = startTime;
        this.expire = startTime + DELAY;
        this.now = new Date();
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
}

2)再寫個通用的接口,用於規范和方便統一實現 這樣任何類型的訂單都可以實現這個接口 進行延時任務的處理:

public interface DelayOrder<T> {

    /**
     * 添加延遲對象到延時隊列
     *
     * @param itemDelayed 延遲對象
     * @return boolean
     */
    boolean addToOrderDelayQueue(ItemDelayed<T> itemDelayed);

    /**
     * 根據對象添加到指定延時隊列
     *
     * @param data 數據對象
     * @return boolean
     */
    boolean addToDelayQueue(T data);

    /**
     * 移除指定的延遲對象從延時隊列中
     *
     * @param data
     */
    void removeToOrderDelayQueue(T data);
}

具體業務邏輯實現:

@Slf4j
@Lazy(false)
@Component
public class DelayOwnOrderImpl implements DelayOrder<Order> {

    @Autowired
    private OrderService orderService;

    @Autowired
    private ExecutorService delayOrderExecutor;

    private final static DelayQueue<ItemDelayed<Order>> DELAY_QUEUE = new DelayQueue<>();

    /**
     * 初始化時加載數據庫中需處理超時的訂單
     * 系統啟動:掃描數據庫中未支付(要在更新時:加上已支付就不用更新瞭),未過期的的訂單
     */
    @PostConstruct
    public void init() {
        log.info("系統啟動:掃描數據庫中未支付,未過期的的訂單");
        List<Order> orderList = orderService.selectFutureOverTimeOrder();
        for (Order order : orderList) {
            ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
            this.addToOrderDelayQueue(orderDelayed);
        }
        log.info("系統啟動:掃描數據庫中未支付的訂單,總共掃描瞭" + orderList.size() + "個訂單,推入檢查隊列,準備到期檢查...");

        /*啟動一個線程,去取延遲訂單*/
        delayOrderExecutor.execute(() -> {
            log.info("啟動處理的訂單線程:" + Thread.currentThread().getName());
            ItemDelayed<Order> orderDelayed;
            while (true) {
                try {
                    orderDelayed = DELAY_QUEUE.take();
                    //處理超時訂單
                    orderService.updateCloseOverTimeOrder(orderDelayed.getDataId());
                } catch (Exception e) {
                    log.error("執行自營超時訂單的_延遲隊列_異常:" + e);
                }
            }
        });
    }

    /**
     * 加入延遲消息隊列
     **/
    @Override
    public boolean addToOrderDelayQueue(ItemDelayed<Order> orderDelayed) {
        return DELAY_QUEUE.add(orderDelayed);
    }

    /**
     * 加入延遲消息隊列
     **/
    @Override
    public boolean addToDelayQueue(Order order) {
        ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
        return DELAY_QUEUE.add(orderDelayed);
    }

    /**
     * 從延遲隊列中移除 主動取消就主動從隊列中取出
     **/
    @Override
    public void removeToOrderDelayQueue(Order order) {
        if (order == null) {
            return;
        }
        for (Iterator<ItemDelayed<Order>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) {
            ItemDelayed<Order> queue = iterator.next();
            if (queue.getDataId().equals(order.getId())) {
                DELAY_QUEUE.remove(queue);
            }
        }
    }
}

分析:

  1. delayOrderExecutor是註入的一個專門處理出隊的一個線程
  2. @PostConstruct是啥呢,是在容器啟動後隻進行一次初始化動作的一個註解,相當實用
  3. 啟動後呢,我們去數據庫掃描一遍,防止有漏網之魚,因為單機版嗎,隊列的數據是在內存中的,重啟後肯定原先的數據會丟失,所以為保證服務質量,我們可能會錄音…..所以為保證重啟後數據的恢復,我們需要重新掃描數據庫把未支付的數據重新裝載到內存的隊列中
  4. 接下來就是用這個線程去一直不停的訪問隊列的take()方法,當隊列無數據就一直阻塞,或者數據沒到期繼續阻塞著,直到到期出隊,然後獲取訂單的信息,去處理訂單的更新操作

到此這篇關於Java使用延時隊列搞定超時訂單處理的文章就介紹到這瞭,更多相關java延時隊列超時訂單處理內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: