Golang分佈式應用定時任務示例詳解

正文

在系統開發中,有一類任務不是立即執行,而是在未來某個時間點或者按照一定間隔去執行,比如日志定期壓縮、報表制作、過期數據清理等,這就是定時任務。

在單機中,定時任務通常需要實現一個類似crontab的系統,一般有兩種方式:

  • 最小堆,按照任務執行時間建堆,每次取最近的任務執行
  • 時間輪,將任務放到時間輪列表中,每次轉動取對應的任務列表執行

最小堆

最小堆是一種特殊的完全二叉樹,任意非葉子節點的值不大於其子節點,如圖

通過最小堆,根據任務最近執行時間鍵堆,每次取堆頂元素即最近需要執行的任務,設置timer定時器,到期後觸發任務執行。由於堆的特性每次調整的時間復雜度為O(lgN),相較於普通隊列性能更快。

container/heap中已經實現操作堆的相關函數,我們隻需要實現定期任務核心邏輯即可。

// 運行
func (c *Cron) Run() error {
    // 設置cron已啟動,atomic.Bool來保證並發安全
	c.started.Store(true)
    // 主循環
	for {
        // 如果停止則退出
		if !c.started.Load() {
			break
		}
		c.runTask()
	}
	return nil
}
// 核心邏輯
func (c *Cron) runTask() {
	now := time.Now()
	duration := infTime
	// 獲取堆頂元素
	task, ok := c.tasks.Peek()
	if ok {
		// 如果已刪除則彈出
		if !c.set.Has(task.Name()) {
			c.tasks.Pop()
			return
		}
		// 計算於當前時間查找,設置定時器
		if task.next.After(now) {
			duration = task.next.Sub(now)
		} else {
			duration = 0
		}
	}
	timer := time.NewTimer(duration)
	defer timer.Stop()
	// 當有新元素插入直接返回,防止新元素執行時間小於當前堆頂元素
	select {
	case <-c.new:
		return
	case <-timer.C:
	}
	// 彈出任務,執行
	go task.Exec()
	// 計算下次執行時間,如果為0說明任務已結束,否則重新入堆
	task.next = task.Next(time.Now())
	if task.next.IsZero() {
		c.set.Delete(task.Name())
	} else {
		c.tasks.Push(task)
	}
}

主要邏輯可總結為:

  • 將任務按照下次執行時間建最小堆
  • 每次取堆頂任務,設置定時器
  • 如果中間有新加入任務,轉入步驟2
  • 定時器到期後執行任務
  • 再次取下個任務,轉入步驟2,依次執行

時間輪

另一種實現Cron的方式是時間輪,時間輪通過一個環形隊列,每個插槽放入需要到期執行的任務,按照固定間隔轉動時間輪,取插槽中任務列表執行,如圖所示:

時間輪可看作一個表盤,如圖中時間間隔為1秒,總共60個格子,如果任務在3秒後執行則放為插槽3,每秒轉動次取插槽上所有任務執行。

如果執行時間超過最大插槽,比如有個任務需要63秒後執行(超過瞭最大格子刻度),一般可以通過多層時間輪,或者設置一個額外變量圈數,隻執行圈數為0的任務。

時間輪插入的時間復雜度為O(1),獲取任務列表復雜度為O(1),執行列表最差為O(n)。對比最小堆,時間輪插入刪除元素更快。

核心代碼如下:

// 定義
type TimeWheel struct {
	interval    time.Duration // 觸發間隔
	slots       int // 總插槽數
	currentSlot int // 當前插槽數
	tasks       []*list.List // 環形列表,每個元素為對應插槽的任務列表
	set         containerx.Set[string] // 記錄所有任務key值,用來檢查任務是否被刪除
	tricker *time.Ticker // 定時觸發器
	logger logr.Logger
}
func (tw *TimeWheel) Run() error {
	tw.tricker = time.NewTicker(tw.interval)
	for {
		// 通過定時器模擬時間輪轉動
		now, ok := <-tw.tricker.C
		if !ok {
			break
		}
		// 轉動一次,執行任務列表
		tw.RunTask(now, tw.currentSlot)
		tw.currentSlot = (tw.currentSlot + 1) % tw.slots
	}
	return nil
}
func (tw *TimeWheel) RunTask(now time.Time, slot int) {
	// 一次執行任務列表
	for item := taskList.Front(); item != nil; {
		task, ok := item.Value.(*TimeWheelTask)
		// 任務圈數大於0,不需要執行,將圈數減一
		if task.circle > 0 {
			task.circle--
			item = item.Next()
			continue
		}
		// 運行任務
		go task.Exec()
		// 計算任務下次運行時間
		next := item.Next()
		taskList.Remove(item)
		item = next
		task.next = task.Next(now)
		if !task.next.IsZero() {
			tw.add(now, task)
		} else {
			tw.Remove(task.Name())
		}
	}
}
// 添加任務,計算下一次任務執行的插槽與圈數
func (tw *TimeWheel) add(now time.Time, task *TimeWheelTask) {
	if !task.initialized {
		task.next = task.Next(now)
		task.initialized = true
	}
	duration := task.next.Sub(now)
	if duration <= 0 {
		task.slot = tw.currentSlot + 1
		task.circle = 0
	} else {
		mult := int(duration / tw.interval)
		task.slot = (tw.currentSlot + mult) % tw.slots
		task.circle = mult / tw.slots
	}
	tw.tasks[task.slot].PushBack(task)
	tw.set.Insert(task.Name())
}

時間輪的主要邏輯如下:

  • 將任務存在對應插槽的時間
  • 通過定時間模擬時間輪轉動
  • 每次到期後遍歷當前插槽的任務列表,若任務圈數為0則執行
  • 如果任務未結束,計算下次執行的插槽與圈數
  • 轉入步驟2,依次執行

總結

本文主要總結瞭定時任務的兩種實現方式,最小堆與時間輪,並分析其核心實現邏輯。

對於執行分佈式定時任務,可以借助延時消息隊列或者直接使用Kubernetes的CronJob。

自己開發的話可以借助Etcd:

  • 中心節點Coordinator將任務按照一定算法(Hash、輪詢、或者更復雜的分配算法)將任務與工作節點Worker綁定
  • 每個Worker添加到有綁定到自己的任務則取出放到本地的Cron中
  • 如果Worker掛掉,執行將其上任務重新綁定即可

本文所有代碼見github.com/qingwave/go…

以上就是Golang分佈式應用定時任務示例詳解的詳細內容,更多關於Golang分佈式定時的資料請關註WalkonNet其它相關文章!

推薦閱讀: