Golang分佈式應用定時任務示例詳解
正文
在系統開發中,有一類任務不是立即執行,而是在未來某個時間點或者按照一定間隔去執行,比如日志定期壓縮、報表制作、過期數據清理等,這就是定時任務。
在單機中,定時任務通常需要實現一個類似crontab的系統,一般有兩種方式:
- 最小堆,按照任務執行時間建堆,每次取最近的任務執行
- 時間輪,將任務放到時間輪列表中,每次轉動取對應的任務列表執行
最小堆
最小堆是一種特殊的完全二叉樹,任意非葉子節點的值不大於其子節點,如圖
通過最小堆,根據任務最近執行時間鍵堆,每次取堆頂元素即最近需要執行的任務,設置timer定時器,到期後觸發任務執行。由於堆的特性每次調整的時間復雜度為O(lgN),相較於普通隊列性能更快。
在container/heap
中已經實現操作堆的相關函數,我們隻需要實現定期任務核心邏輯即可。
// 運行 func (c *Cron) Run() error { // 設置cron已啟動,atomic.Bool來保證並發安全 c.started.Store(true) // 主循環 for { // 如果停止則退出 if !c.started.Load() { break } c.runTask() } return nil } // 核心邏輯 func (c *Cron) runTask() { now := time.Now() duration := infTime // 獲取堆頂元素 task, ok := c.tasks.Peek() if ok { // 如果已刪除則彈出 if !c.set.Has(task.Name()) { c.tasks.Pop() return } // 計算於當前時間查找,設置定時器 if task.next.After(now) { duration = task.next.Sub(now) } else { duration = 0 } } timer := time.NewTimer(duration) defer timer.Stop() // 當有新元素插入直接返回,防止新元素執行時間小於當前堆頂元素 select { case <-c.new: return case <-timer.C: } // 彈出任務,執行 go task.Exec() // 計算下次執行時間,如果為0說明任務已結束,否則重新入堆 task.next = task.Next(time.Now()) if task.next.IsZero() { c.set.Delete(task.Name()) } else { c.tasks.Push(task) } }
主要邏輯可總結為:
- 將任務按照下次執行時間建最小堆
- 每次取堆頂任務,設置定時器
- 如果中間有新加入任務,轉入步驟2
- 定時器到期後執行任務
- 再次取下個任務,轉入步驟2,依次執行
時間輪
另一種實現Cron的方式是時間輪,時間輪通過一個環形隊列,每個插槽放入需要到期執行的任務,按照固定間隔轉動時間輪,取插槽中任務列表執行,如圖所示:
時間輪可看作一個表盤,如圖中時間間隔為1秒,總共60個格子,如果任務在3秒後執行則放為插槽3,每秒轉動次取插槽上所有任務執行。
如果執行時間超過最大插槽,比如有個任務需要63秒後執行(超過瞭最大格子刻度),一般可以通過多層時間輪,或者設置一個額外變量圈數,隻執行圈數為0的任務。
時間輪插入的時間復雜度為O(1),獲取任務列表復雜度為O(1),執行列表最差為O(n)。對比最小堆,時間輪插入刪除元素更快。
核心代碼如下:
// 定義 type TimeWheel struct { interval time.Duration // 觸發間隔 slots int // 總插槽數 currentSlot int // 當前插槽數 tasks []*list.List // 環形列表,每個元素為對應插槽的任務列表 set containerx.Set[string] // 記錄所有任務key值,用來檢查任務是否被刪除 tricker *time.Ticker // 定時觸發器 logger logr.Logger } func (tw *TimeWheel) Run() error { tw.tricker = time.NewTicker(tw.interval) for { // 通過定時器模擬時間輪轉動 now, ok := <-tw.tricker.C if !ok { break } // 轉動一次,執行任務列表 tw.RunTask(now, tw.currentSlot) tw.currentSlot = (tw.currentSlot + 1) % tw.slots } return nil } func (tw *TimeWheel) RunTask(now time.Time, slot int) { // 一次執行任務列表 for item := taskList.Front(); item != nil; { task, ok := item.Value.(*TimeWheelTask) // 任務圈數大於0,不需要執行,將圈數減一 if task.circle > 0 { task.circle-- item = item.Next() continue } // 運行任務 go task.Exec() // 計算任務下次運行時間 next := item.Next() taskList.Remove(item) item = next task.next = task.Next(now) if !task.next.IsZero() { tw.add(now, task) } else { tw.Remove(task.Name()) } } } // 添加任務,計算下一次任務執行的插槽與圈數 func (tw *TimeWheel) add(now time.Time, task *TimeWheelTask) { if !task.initialized { task.next = task.Next(now) task.initialized = true } duration := task.next.Sub(now) if duration <= 0 { task.slot = tw.currentSlot + 1 task.circle = 0 } else { mult := int(duration / tw.interval) task.slot = (tw.currentSlot + mult) % tw.slots task.circle = mult / tw.slots } tw.tasks[task.slot].PushBack(task) tw.set.Insert(task.Name()) }
時間輪的主要邏輯如下:
- 將任務存在對應插槽的時間
- 通過定時間模擬時間輪轉動
- 每次到期後遍歷當前插槽的任務列表,若任務圈數為0則執行
- 如果任務未結束,計算下次執行的插槽與圈數
- 轉入步驟2,依次執行
總結
本文主要總結瞭定時任務的兩種實現方式,最小堆與時間輪,並分析其核心實現邏輯。
對於執行分佈式定時任務,可以借助延時消息隊列或者直接使用Kubernetes的CronJob。
自己開發的話可以借助Etcd:
- 中心節點Coordinator將任務按照一定算法(Hash、輪詢、或者更復雜的分配算法)將任務與工作節點Worker綁定
- 每個Worker添加到有綁定到自己的任務則取出放到本地的Cron中
- 如果Worker掛掉,執行將其上任務重新綁定即可
本文所有代碼見github.com/qingwave/go…
以上就是Golang分佈式應用定時任務示例詳解的詳細內容,更多關於Golang分佈式定時的資料請關註WalkonNet其它相關文章!