Java利用Netty時間輪實現延時任務

一、時間輪算法簡介

為瞭大傢能夠理解下文中的代碼,我們先來簡單瞭解一下netty時間輪算法的核心原理

時間輪算法名副其實,時間輪就是一個環形的數據結構,類似於表盤,將時間輪分成多個bucket(比如:0-8)。假設每個時間輪輪片的分隔時間段tickDuration=1s(即:指針經過每個格子花費時間是 1 s),當前的時間bucket=3,那麼在18秒後需要被執行的任務需要落到((3+18)%8=5取餘運算)的5號bucket上。假如有多個需要在該時間段內執行的任務,就會組成一個雙向鏈表。另外針對時間輪我們要有下面的幾個認知:

時間輪指針是一個Worker線程,在時間輪整點的時候執行雙向鏈表中的任務。

時間輪算法的並不是精準的延時,它的執行精度取決於每個時間輪輪片的分隔時間段tickDuration

Worker線程是單線程,一個bucket、一個bucket的順序處理任務。「所以我們的延時任務一定要做成異步任務,否則會影響時間輪後續任務的執行時間。」

二、時間輪hello-world

實現一個延時任務的例子,需求仍然十分的簡單:你買瞭一張火車票,必須在30分鐘之內付款,否則該訂單被自動取消。「訂單30分鐘不付款自動取消,這個任務就是一個延時任務。」 我們的火車票訂單取消任務,從需求上看並不需要非常精準的延時,所以是可以使用時間輪算法來完成這個任務的。

首先通過maven坐標引入netty

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.45.Final</version>
</dependency>

然後我們創建一個時間輪,如果是Spring的開發環境,我們可以這麼做。下文中我們new瞭一個包含512個bucket的時間輪,每個時間輪的輪片時間間隔是100毫秒。

@Bean("hashedWheelTimer")
public HashedWheelTimer hashedWheelTimer(){
    return new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512);
}

舉例:當用戶買火車票下單的時候,向時間輪中添加一個30分鐘的延時任務。延時任務將在30分鐘之後被執行,下文的lambda表達式部分實現瞭一個TimerTask(task)延時任務。這個延時任務的函數體內,請一定使用異步任務,即:單獨起一個線程或者使用SpringBoot異步任務線程池。因為Worker線程是單線程的,你的任務處理時間長於tickDuration會妨礙後續時間輪輪片上的任務的執行。

//訂單下單操作
void order(String orderInfo) {
  //下單的時候,向時間輪中添加一個30分鐘的延時任務
  hashedWheelTimer.newTimeout(task -> {
    //註意這裡使用異步任務線程池或者開啟線程進行訂單取消任務的處理
    cancelOrder(orderInfo);
  }, 30, TimeUnit.MINUTES);
}

三、異步任務線程池

我們在上文中已經多次強調,時間輪的任務TimerTask的執行內容要做成異步的。最簡單的做法就是接到一個任務之後啟動一個線程處理該任務。在Spring環境下其實我們有更好的選擇,就是使用Spring的線程池,這個線程池是可以自定義的。比如:下文中的用法是我事先定義瞭一個名字為test的線程池,然後通過@Async使用即可。

@Async("test")
public void cancelOrder(String orderInfo){
  //查詢訂單支付信息,如果用戶未支付,關閉訂單
}

可能有的朋友,還不知道該如何自定義一個Spring線程池,可以參考:我之前寫過一個SpringBoot的**「可觀測、易配置」**的線程池開源項目,源代碼地址:https://gitee.com/hanxt/zimug-monitor-threadpool。我的這個zimug-monitor-threadpool開源項目,可以做到對線程池使用情況的監控,我自己平時用的效果還不錯,向大傢推薦一下!

四、時間輪優缺點

時間輪算法實現延時任務的優點就是,相對於使用JDK的DelayQueue,其算法上具有優勢,執行性能相對好一些。其缺點就是所有的延時任務以及延時觸發的管理,都是在單個應用服務的內存中進行的,一旦該應用服務發生故障重啟服務,時間輪任務數據將全部丟失。這一缺點和DelayQueue是一樣的。為瞭解決這個問題,我們可以使用redis、RocketMQ等分佈式中間件來管理延時任務消息的方式來實現延時任務,這個我會在後續的文章中為大傢介紹。

知識點補充

下面主要和大傢一起來分析下Netty時間輪調度算法的原理

時間輪狀態

時間輪有以下三種狀態:

  • WORKER_STATE_INIT:初始化狀態,此時時間輪內的工作線程還沒有開啟
  • WORKER_STATE_STARTED:運行狀態,時間輪內的工作線程已經開啟
  • WORKER_STATE_SHUTDOWN:終止狀態,時間輪停止工作

狀態轉換如下:

構造函數

    public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
            long maxPendingTimeouts) {

        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        }
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }

        // 初始化時間輪數組,時間輪大小為大於等於 ticksPerWheel 的第一個 2 的冪,和 HashMap 類似
        wheel = createWheel(ticksPerWheel);
        // 取模用,用來定位數組中的槽
        mask = wheel.length - 1;

        // 為瞭保證精度,時間輪內的時間單位為納秒
        long duration = unit.toNanos(tickDuration);

        // 時間輪內的時鐘撥動頻率不宜太大也不宜太小
        if (duration >= Long.MAX_VALUE / wheel.length) {
            throw new IllegalArgumentException(String.format(
                    "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                    tickDuration, Long.MAX_VALUE / wheel.length));
        }

        if (duration < MILLISECOND_NANOS) {
            logger.warn("Configured tickDuration {} smaller then {}, using 1ms.",
                        tickDuration, MILLISECOND_NANOS);
            this.tickDuration = MILLISECOND_NANOS;
        } else {
            this.tickDuration = duration;
        }

        // 創建工作線程
        workerThread = threadFactory.newThread(worker);


        // 非守護線程且 leakDetection 為 true 時檢測內存是否泄漏
        leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

        // 初始化最大等待任務數
        this.maxPendingTimeouts = maxPendingTimeouts;

        // 如果創建的時間輪實例大於 64,打印日志,並且這個日志隻會打印一次
        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
            WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
            reportTooManyInstances();
        }
    }

構造函數中的參數相當重要,當自定義時間輪時,我們應該根據業務的范圍設置合理的參數:

  • threadFactory:創建時間輪任務線程的工廠,通過這個工廠可以給我們的線程自定義一些屬性(線程名、異常處理等)
  • tickDuration:時鐘多長時間撥動一次,值越小,時間輪精度越高
  • unit:tickDuration 的單位
  • ticksPerWheel:時間輪數組大小
  • leakDetection:是否檢測內存泄漏
  • maxPendingTimeouts:時間輪內最大等待的任務數

時間輪的時鐘撥動時長應該根據業務設置恰當的值,如果設置的過大,可能導致任務觸發時間不準確。如果設置的過小,時間輪轉動頻繁,任務少的情況下加載不到任務,屬於一直空轉的狀態,會占用 CPU 線程資源。

為瞭防止時間輪占用過多的 CPU 資源,當創建的時間輪對象大於 64 時會以日志的方式提示。

構造函數中隻是初始化瞭輪線程,並沒有開啟,當第一次往時間輪內添加任務時,線程才會開啟。

到此這篇關於Java利用Netty時間輪實現延時任務的文章就介紹到這瞭,更多相關Java Netty時間輪內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: