Java使用延時隊列搞定超時訂單處理的場景
1、延時隊列使用場景:
那麼什麼時候需要用延時隊列呢?常見的延時任務場景 舉栗子:
- 訂單在30分鐘之內未支付則自動取消。
- 重試機制實現,把調用失敗的接口放入一個固定延時的隊列,到期後再重試。
- 新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送消息提醒。
- 用戶發起退款,如果三天內沒有得到處理則通知相關運營人員。
- 預定會議後,需要在預定的時間點前十分鐘通知各個與會人員參加會議。
- 關閉空閑連接,服務器中,有很多客戶端的連接,空閑一段時間之後需要關閉之。
- 清理過期數據業務。比如緩存中的對象,超過瞭空閑時間,需要從緩存中移出。
解決方案也非常多:
- 定期輪詢(數據庫等)
- JDK DelayQueue
- JDK Timer
- ScheduledExecutorService 周期性線程池
- 時間輪(kafka)
- 時間輪(Netty的HashedWheelTimer)
- Redis有序集合(zset)
- zookeeper之curator
- RabbitMQ
- Quartz,xxljob等定時任務框架
- Koala(考拉)
- JCronTab(仿crontab的java調度器)
SchedulerX(阿裡)
對於單機服務優選DelayQueue,對於分佈式環境,可以使用mq、zk、redis之類的。接下來,介紹DelayQueue的使用。
一句話介紹:DelayQueue = BlockingQueue + PriorityQueue + Delayed
2、示例:
實戰以訂單下單後三十分鐘內未支付則自動取消 為業務場景,該場景的代碼邏輯分析如下:
- 下單後將訂單直接放入未支付的延時隊列中
- 如果超時未支付,則從隊列中取出,進行修改為取消狀態的訂單
- 如果支付瞭,則不去進行取消,或者取消的時候做個狀態篩選,即可避免更新
- 或者支付完成後,做個主動出隊
還有就是用戶主動取消訂單,也做個主動出隊
1)先來寫個通用的Delayed
:
import lombok.Getter; import lombok.Setter; import java.util.Date; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; @Setter @Getter public class ItemDelayed<T> implements Delayed { /**默認延遲30分鐘*/ private final static long DELAY = 30 * 60 * 1000L; /**數據id*/ private Long dataId; /**開始時間*/ private long startTime; /**到期時間*/ private long expire; /**創建時間*/ private Date now; /**泛型data*/ private T data; public ItemDelayed(Long dataId, long startTime, long secondsDelay) { super(); this.dataId = dataId; this.startTime = startTime; this.expire = startTime + (secondsDelay * 1000); this.now = new Date(); } public ItemDelayed(Long dataId, long startTime) { super(); this.dataId = dataId; this.startTime = startTime; this.expire = startTime + DELAY; this.now = new Date(); } @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } }
2)再寫個通用的接口,用於規范和方便統一實現 這樣任何類型的訂單都可以實現這個接口 進行延時任務的處理:
public interface DelayOrder<T> { /** * 添加延遲對象到延時隊列 * * @param itemDelayed 延遲對象 * @return boolean */ boolean addToOrderDelayQueue(ItemDelayed<T> itemDelayed); /** * 根據對象添加到指定延時隊列 * * @param data 數據對象 * @return boolean */ boolean addToDelayQueue(T data); /** * 移除指定的延遲對象從延時隊列中 * * @param data */ void removeToOrderDelayQueue(T data); }
具體業務邏輯實現:
@Slf4j @Lazy(false) @Component public class DelayOwnOrderImpl implements DelayOrder<Order> { @Autowired private OrderService orderService; @Autowired private ExecutorService delayOrderExecutor; private final static DelayQueue<ItemDelayed<Order>> DELAY_QUEUE = new DelayQueue<>(); /** * 初始化時加載數據庫中需處理超時的訂單 * 系統啟動:掃描數據庫中未支付(要在更新時:加上已支付就不用更新瞭),未過期的的訂單 */ @PostConstruct public void init() { log.info("系統啟動:掃描數據庫中未支付,未過期的的訂單"); List<Order> orderList = orderService.selectFutureOverTimeOrder(); for (Order order : orderList) { ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime()); this.addToOrderDelayQueue(orderDelayed); } log.info("系統啟動:掃描數據庫中未支付的訂單,總共掃描瞭" + orderList.size() + "個訂單,推入檢查隊列,準備到期檢查..."); /*啟動一個線程,去取延遲訂單*/ delayOrderExecutor.execute(() -> { log.info("啟動處理的訂單線程:" + Thread.currentThread().getName()); ItemDelayed<Order> orderDelayed; while (true) { try { orderDelayed = DELAY_QUEUE.take(); //處理超時訂單 orderService.updateCloseOverTimeOrder(orderDelayed.getDataId()); } catch (Exception e) { log.error("執行自營超時訂單的_延遲隊列_異常:" + e); } } }); } /** * 加入延遲消息隊列 **/ @Override public boolean addToOrderDelayQueue(ItemDelayed<Order> orderDelayed) { return DELAY_QUEUE.add(orderDelayed); } /** * 加入延遲消息隊列 **/ @Override public boolean addToDelayQueue(Order order) { ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime()); return DELAY_QUEUE.add(orderDelayed); } /** * 從延遲隊列中移除 主動取消就主動從隊列中取出 **/ @Override public void removeToOrderDelayQueue(Order order) { if (order == null) { return; } for (Iterator<ItemDelayed<Order>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) { ItemDelayed<Order> queue = iterator.next(); if (queue.getDataId().equals(order.getId())) { DELAY_QUEUE.remove(queue); } } } }
分析:
- delayOrderExecutor是註入的一個專門處理出隊的一個線程
- @PostConstruct是啥呢,是在容器啟動後隻進行一次初始化動作的一個註解,相當實用
- 啟動後呢,我們去數據庫掃描一遍,防止有漏網之魚,因為單機版嗎,隊列的數據是在內存中的,重啟後肯定原先的數據會丟失,所以為保證服務質量,我們可能會錄音…..所以為保證重啟後數據的恢復,我們需要重新掃描數據庫把未支付的數據重新裝載到內存的隊列中
- 接下來就是用這個線程去一直不停的訪問隊列的take()方法,當隊列無數據就一直阻塞,或者數據沒到期繼續阻塞著,直到到期出隊,然後獲取訂單的信息,去處理訂單的更新操作
到此這篇關於Java使用延時隊列搞定超時訂單處理的文章就介紹到這瞭,更多相關java延時隊列超時訂單處理內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- java中DelayQueue實例用法詳解
- java DelayQueue的原理淺析
- java並發中DelayQueue延遲隊列原理剖析
- Java線程池隊列LinkedTransferQueue示例詳解
- Java 阻塞隊列BlockingQueue詳解