一文詳解Golang 定時任務庫 gron 設計和原理

 cron 簡介

在 Unix-like 操作系統中,有一個大傢都很熟悉的 cli 工具,它能夠來處理定時任務,周期性任務,這就是: cron。 你隻需要簡單的語法控制就能實現任意【定時】的語義。用法上可以參考一下這個 Crontab Guru Editor,做的非常精巧。

簡單說,每一個位都代表瞭一個時間維度,* 代表全集,所以,上面的語義是:在每天早上的4點05分觸發任務。

但 cron 畢竟隻是一個操作系統級別的工具,如果定時任務失敗瞭,或者壓根沒啟動,cron 是沒法提醒開發者這一點的。並且,cron 和 正則表達式都有一種魔力,不知道大傢是否感同身受,這裡引用同事的一句名言:

這世界上有些語言非常相似: shell腳本, es查詢的那個dsl語言, 定時任務的crontab, 正則表達式. 他們相似就相似在每次要寫的時候基本都得重新現學一遍。

正巧,最近看到瞭 gron 這個開源項目,它是用 Golang 實現一個並發安全的定時任務庫。實現非常簡單精巧,代碼量也不多。今天我們就來一起結合源碼看一下,怎樣基於 Golang 的能力做出來一個【定時任務庫】。

gron

Gron provides a clear syntax for writing and deploying cron jobs.

gron 是一個泰國小哥在 2016 年開源的作品,它的特點就在於非常簡單和清晰的語義來定義【定時任務】,你不用再去記 cron 的語法。我們來看下作為使用者怎樣上手。

首先,我們還是一個 go get 安裝依賴:

$ go get github.com/roylee0704/gron

假設我們期望在【時機】到瞭以後,要做的工作是打印一個字符串,每一個小時執行一次,我們就可以這樣:

package main

import (
	"fmt"
	"time"
	"github.com/roylee0704/gron"
)
func main() {
	c := gron.New()
	c.AddFunc(gron.Every(1*time.Hour), func() {
		fmt.Println("runs every hour.")
	})
	c.Start()
}

非常簡單,而且即便是在 c.Start 之後我們依然可以添加新的定時任務進去。支持瞭很好的擴展性。

定時參數

註意到我們調用 gron.New().AddFunc() 時傳入瞭一個 gron.Every(1*time.Hour)

這裡其實你可以傳入任何一個 time.Duration,從而把調度間隔從 1 小時調整到 1 分鐘甚至 1 秒。

除此之外,gron 還很貼心地封裝瞭一個 xtime 包用來把常見的 time.Duration 封裝起來,這裡我們開箱即用。

import "github.com/roylee0704/gron/xtime"

gron.Every(1 * xtime.Day)
gron.Every(1 * xtime.Week)

很多時候我們不僅僅某個任務在當天運行,還希望是我們指定的時刻,而不是依賴程序啟動時間,機械地加 24 hour。gron 對此也做瞭很好的支持:

gron.Every(30 * xtime.Day).At("00:00")
gron.Every(1 * xtime.Week).At("23:59")

我們隻需指定 At("hh:mm") 就可以實現在指定時間執行。

源碼解析

這一節我們來看看 gron 的實現原理。

所謂定時任務,其實包含兩個層面:

  • 觸發器。即我們希望這個任務在什麼時間點,什麼周期被觸發;
  • 任務。即我們在觸發之後,希望執行的任務,類比到我們上面示例的 fmt.Println。

對這兩個概念的封裝和擴展是一個定時任務庫必須考慮的。

而同時,我們是在 Golang 的協程上跑程序的,意味著這會是一個長期運行的協程,否則你即便指定瞭【一個月後幹XXX】這個任務,程序兩天後掛瞭,也就無法實現你的訴求瞭。

所以,我們還希望有一個 manager 的角色,來管理我們的一組【定時任務】,如何調度,什麼時候啟動,怎麼停止,啟動瞭以後還想加新任務是否支持。

Cron

在 gron 的體系裡,Cron 對象(我們上面通過 gron.New 創建出來的)就是我們的 manager,而底層的一個個【定時任務】則對應到 Cron 對象中的一個個 Entry:

// Cron provides a convenient interface for scheduling job such as to clean-up
// database entry every month.
//
// Cron keeps track of any number of entries, invoking the associated func as
// specified by the schedule. It may also be started, stopped and the entries
// may be inspected.
type Cron struct {
	entries []*Entry
	running bool
	add     chan *Entry
	stop    chan struct{}
}

// New instantiates new Cron instant c.
func New() *Cron {
	return &Cron{
		stop: make(chan struct{}),
		add:  make(chan *Entry),
	}
}
  • entries 就是定時任務的核心能力,它記錄瞭一組【定時任務】;
  • running 用來標識這個 Cron 是否已經啟動;
  • add 是一個channel,用來支持在 Cron 啟動後,新增的【定時任務】;
  • stop 同樣是個channel,註意到是空結構體,用來控制 Cron 的停止。這個其實是經典寫法瞭,對日常開發也有借鑒意義,我們待會兒會好好看一下。

我們觀察到,當調用 gron.New() 方法後,得到的是一個指向 Cron 對象的指針。此時隻是初始化瞭 stop 和 add 兩個 channel,沒有啟動調度。

Entry

重頭戲來瞭,Cron 裡面的 []*Entry 其實就代表瞭一組【定時任務】,每個【定時任務】可以簡化理解為 <觸發器,任務> 組成的一個 tuple。

// Entry consists of a schedule and the job to be executed on that schedule.
type Entry struct {
	Schedule Schedule
	Job      Job

	// the next time the job will run. This is zero time if Cron has not been
	// started or invalid schedule.
	Next time.Time

	// the last time the job was run. This is zero time if the job has not been
	// run.
	Prev time.Time
}

// Schedule is the interface that wraps the basic Next method.
//
// Next deduces next occurring time based on t and underlying states.
type Schedule interface {
	Next(t time.Time) time.Time
}

// Job is the interface that wraps the basic Run method.
//
// Run executes the underlying func.
type Job interface {
	Run()
}
  • Schedule 代表瞭一個【觸發器】,或者說一個定時策略。它隻包含一個 Next 方法,接受一個時間點,業務要返回下一次觸發調動的時間點。
  • Job 則是對【任務】的抽象,隻需要實現一個 Run 方法,沒有入參出參。

除瞭這兩個核心依賴外,Entry 結構還包含瞭【前一次執行時間點】和【下一次執行時間點】,這個目前可以忽略,隻是為瞭輔助代碼用。

按照時間排序

// byTime is a handy wrapper to chronologically sort entries.
type byTime []*Entry

func (b byTime) Len() int      { return len(b) }
func (b byTime) Swap(i, j int) { b[i], b[j] = b[j], b[i] }

// Less reports `earliest` time i should sort before j.
// zero time is not `earliest` time.
func (b byTime) Less(i, j int) bool {

	if b[i].Next.IsZero() {
		return false
	}
	if b[j].Next.IsZero() {
		return true
	}

	return b[i].Next.Before(b[j].Next)
}

這裡是對 Entry 列表的簡單封裝,因為我們可能同時有多個 Entry 需要調度,處理的順序很重要。這裡實現瞭 sort 的接口, 有瞭 Len()Swap()Less() 我們就可以用 sort.Sort() 來排序瞭。

此處的排序策略是按照時間大小。

新增定時任務

我們在示例裡面出現過調用 AddFunc() 來加入一個 gron.Every(xxx) 這樣一個【定時任務】。其實這是給用戶提供的簡單封裝。

// JobFunc is an adapter to allow the use of ordinary functions as gron.Job
// If f is a function with the appropriate signature, JobFunc(f) is a handler
// that calls f.
//
// todo: possibly func with params? maybe not needed.
type JobFunc func()

// Run calls j()
func (j JobFunc) Run() {
	j()
}


// AddFunc registers the Job function for the given Schedule.
func (c *Cron) AddFunc(s Schedule, j func()) {
	c.Add(s, JobFunc(j))
}

// Add appends schedule, job to entries.
//
// if cron instant is not running, adding to entries is trivial.
// otherwise, to prevent data-race, adds through channel.
func (c *Cron) Add(s Schedule, j Job) {

	entry := &Entry{
		Schedule: s,
		Job:      j,
	}

	if !c.running {
		c.entries = append(c.entries, entry)
		return
	}
	c.add <- entry
}

JobFunc 實現瞭我們上一節提到的 Job 接口,基於此,我們就可以讓用戶直接傳入一個 func() 就ok,內部轉成 JobFunc,再利用通用的 Add 方法將其加入到 Cron 中即可。

註意,這裡的 Add 方法就是新增定時任務的核心能力瞭,我們需要觸發器 Schedule,任務 Job。並以此來構造出一個定時任務 Entry。

若 Cron 實例還沒啟動,加入到 Cron 的 entries 列表裡就ok,隨後啟動的時候會處理。但如果已經啟動瞭,就直接往 add 這個 channel 中塞,走額外的新增調度路徑。

啟動和停止

// Start signals cron instant c to get up and running.
func (c *Cron) Start() {
	c.running = true
	go c.run()
}


// Stop halts cron instant c from running.
func (c *Cron) Stop() {

	if !c.running {
		return
	}
	c.running = false
	c.stop <- struct{}{}
}

我們先 high level 地看一下一個 Cron 的啟動和停止。

  • Start 方法執行的時候會先將 running 變量置為 true,用來標識實例已經啟動(啟動前後加入的定時任務 Entry 處理策略是不同的,所以這裡需要標識),然後啟動一個 goroutine 來實際跑啟動的邏輯。
  • Stop 方法則會將 running 置為 false,然後直接往 stop channel 塞一個空結構體即可。

ok,有瞭這個心裡預期,我們來看看 c.run() 裡面幹瞭什麼事:

var after = time.After


// run the scheduler...
//
// It needs to be private as it's responsible of synchronizing a critical
// shared state: `running`.
func (c *Cron) run() {

	var effective time.Time
	now := time.Now().Local()

	// to figure next trig time for entries, referenced from now
	for _, e := range c.entries {
		e.Next = e.Schedule.Next(now)
	}

	for {
		sort.Sort(byTime(c.entries))
		if len(c.entries) > 0 {
			effective = c.entries[0].Next
		} else {
			effective = now.AddDate(15, 0, 0) // to prevent phantom jobs.
		}

		select {
		case now = <-after(effective.Sub(now)):
			// entries with same time gets run.
			for _, entry := range c.entries {
				if entry.Next != effective {
					break
				}
				entry.Prev = now
				entry.Next = entry.Schedule.Next(now)
				go entry.Job.Run()
			}
		case e := <-c.add:
			e.Next = e.Schedule.Next(time.Now())
			c.entries = append(c.entries, e)
		case <-c.stop:
			return // terminate go-routine.
		}
	}
}

重點來瞭,看看我們是如何把上面 Cron, Entry, Schedule, Job 串起來的。

  • 首先拿到 local 的時間 now;
  • 遍歷所有 Entry,調用 Next 方法拿到各個【定時任務】下一次運行的時間點;
  • 對所有 Entry 按照時間排序(我們上面提過的 byTime);
  • 拿到第一個要到期的時間點,在 select 裡面通過 time.After 來監聽。到點瞭就起動新的 goroutine 跑對應 entry 裡的 Job,並回到 for 循環,繼續重新 sort,再走同樣的流程;
  • 若 add channel 裡有新的 Entry 被加進來,就加入到 Cron 的 entries 裡,觸發新的 sort;
  • 若 stop channel 收到瞭信號,就直接 return,結束執行。

整體實現還是非常簡潔的,大傢可以感受一下。

Schedule

前面其實我們暫時將觸發器的復雜性封裝在 Schedule 接口中瞭,但怎麼樣實現一個 Schedule 呢?

尤其是註意,我們還支持 At 操作,也就是指定 Day,和具體的小時,分鐘。回憶一下:

gron.Every(30 * xtime.Day).At("00:00")
gron.Every(1 * xtime.Week).At("23:59")

這一節我們就來看看,gron.Every 幹瞭什麼事,又是如何支持 At 方法的。

// Every returns a Schedule reoccurs every period p, p must be at least
// time.Second.
func Every(p time.Duration) AtSchedule {

	if p < time.Second {
		p = xtime.Second
	}

	p = p - time.Duration(p.Nanoseconds())%time.Second // truncates up to seconds

	return &periodicSchedule{
		period: p,
	}
}

gron 的 Every 函數接受一個 time.Duration,返回瞭一個 AtSchedule 接口。我待會兒會看,這裡註意,Every 裡面是會把【秒】級以下給截掉。

我們先來看下,最後返回的這個 periodicSchedule 是什麼:

type periodicSchedule struct {
	period time.Duration
}

// Next adds time t to underlying period, truncates up to unit of seconds.
func (ps periodicSchedule) Next(t time.Time) time.Time {
	return t.Truncate(time.Second).Add(ps.period)
}

// At returns a schedule which reoccurs every period p, at time t(hh:ss).
//
// Note: At panics when period p is less than xtime.Day, and error hh:ss format.
func (ps periodicSchedule) At(t string) Schedule {
	if ps.period < xtime.Day {
		panic("period must be at least in days")
	}

	// parse t naively
	h, m, err := parse(t)

	if err != nil {
		panic(err.Error())
	}

	return &atSchedule{
		period: ps.period,
		hh:     h,
		mm:     m,
	}
}

// parse naively tokenises hours and minutes.
//
// returns error when input format was incorrect.
func parse(hhmm string) (hh int, mm int, err error) {

	hh = int(hhmm[0]-'0')*10 + int(hhmm[1]-'0')
	mm = int(hhmm[3]-'0')*10 + int(hhmm[4]-'0')

	if hh < 0 || hh > 24 {
		hh, mm = 0, 0
		err = errors.New("invalid hh format")
	}
	if mm < 0 || mm > 59 {
		hh, mm = 0, 0
		err = errors.New("invalid mm format")
	}

	return
}

可以看到,所謂 periodicSchedule 就是一個【周期性觸發器】,隻維護一個 time.Duration 作為【周期】。

periodicSchedule 實現 Next 的方式也很簡單,把秒以下的截掉之後,直接 Add(period),把周期加到當前的 time.Time 上,返回新的時間點。這個大傢都能想到。

重點在於,對 At 能力的支持。我們來關註下 func (ps periodicSchedule) At(t string) Schedule 這個方法

  • 若周期連 1 天都不到,不支持 At 能力,因為 At 本質是在選定的一天內,指定小時,分鐘,作為輔助。連一天都不到的周期,是要精準處理的;
  • 將用戶輸入的形如 "23:59" 時間字符串解析出來【小時】和【分鐘】;
  • 構建出一個 atSchedule 對象,包含瞭【周期時長】,【小時】,【分鐘】。

ok,這一步隻是拿到瞭材料,那具體怎樣處理呢?這個還是得繼續往下走,看看 atSchedule 結構幹瞭什麼:

type atSchedule struct {
	period time.Duration
	hh     int
	mm     int
}

// reset returns new Date based on time instant t, and reconfigure its hh:ss
// according to atSchedule's hh:ss.
func (as atSchedule) reset(t time.Time) time.Time {
	return time.Date(t.Year(), t.Month(), t.Day(), as.hh, as.mm, 0, 0, time.UTC)
}

// Next returns **next** time.
// if t passed its supposed schedule: reset(t), returns reset(t) + period,
// else returns reset(t).
func (as atSchedule) Next(t time.Time) time.Time {
	next := as.reset(t)
	if t.After(next) {
		return next.Add(as.period)
	}
	return next
}

其實隻看這個 Next 的實現即可。我們從 periodSchedule 那裡獲取瞭三個屬性。

在調用 Next 方法時,先做 reset,根據原有 time.Time 的年,月,日,以及用戶輸入的 At 中的小時,分鐘,來構建出來一個 time.Time 作為新的時間點。

此後判斷是在哪個周期,如果當前周期已經過瞭,那就按照下個周期的時間點返回。

到這裡,一切就都清楚瞭,如果我們不用 At 能力,直接 gron.Every(xxx),那麼直接就會調用

t.Truncate(time.Second).Add(ps.period)

拿到一個新的時間點返回。

而如果我們要用 At 能力,指定當天的小時,分鐘。那就會走到 periodicSchedule.At 這裡,解析出【小時】和【分鐘】,最後走 Next 返回 reset 之後的時間點。

這個和 gron.Every 方法返回的 AtSchedule 接口其實是完全對應的:

// AtSchedule extends Schedule by enabling periodic-interval & time-specific setup
type AtSchedule interface {
	At(t string) Schedule
	Schedule
}

直接就有一個 Schedule 可以用,但如果你想針對天級以上的 duration 指定時間,也可以走 At 方法,也會返回一個 Schedule 供我們使用。

擴展性

gron 裡面對於所有的依賴也都做成瞭【依賴接口而不是實現】。Cron 的 Add 函數的入參也是兩個接口,這裡可以隨意替換:func (c *Cron) Add(s Schedule, j Job)

最核心的兩個實體依賴 Schedule, Job 都可以用你自定義的實現來替換掉。

如實現一個新的 Job:

type Reminder struct {
	Msg string
}

func (r Reminder) Run() {
  fmt.Println(r.Msg)
}

事實上,我們上面提到的 periodicSchedule 以及 atSchedule 就是 Schedule 接口的具體實現。我們也完全可以不用 gron.Every,而是自己寫一套新的 Schedule 實現。隻要實現 Next(p time.Duration) time.Time 即可。

我們來看一個完整用法案例:

package main

import (
	"fmt"
	"github.com/roylee0704/gron"
	"github.com/roylee0704/gron/xtime"
)
type PrintJob struct{ Msg string }
func (p PrintJob) Run() {
	fmt.Println(p.Msg)
}

func main() {

	var (
		// schedules
		daily     = gron.Every(1 * xtime.Day)
		weekly    = gron.Every(1 * xtime.Week)
		monthly   = gron.Every(30 * xtime.Day)
		yearly    = gron.Every(365 * xtime.Day)

		// contrived jobs
		purgeTask = func() { fmt.Println("purge aged records") }
		printFoo  = printJob{"Foo"}
		printBar  = printJob{"Bar"}
	)

	c := gron.New()

	c.Add(daily.At("12:30"), printFoo)
	c.AddFunc(weekly, func() { fmt.Println("Every week") })
	c.Start()

	// Jobs may also be added to a running Gron
	c.Add(monthly, printBar)
	c.AddFunc(yearly, purgeTask)

	// Stop Gron (running jobs are not halted).
	c.Stop()
}

經典寫法-控制退出

這裡我們還是要聊一下 Cron 裡控制退出的經典寫法。我們把其他不相關的部分清理掉,隻留下核心代碼:

type Cron struct {
	stop    chan struct{}
}

func (c *Cron) Stop() {
	c.stop <- struct{}{}
}

func (c *Cron) run() {

	for {
		select {
		case <-c.stop:
			return // terminate go-routine.
		}
	}
}

空結構體能夠最大限度節省內存,畢竟我們隻是需要一個信號。核心邏輯用 for + select 的配合,這樣當我們需要結束時可以立刻響應。非常經典,建議大傢日常有需要的時候采用。

結語

gron 整體代碼其實隻在 cron.go 和 schedule.go 兩個文件,合起來代碼不過 300 行,非常精巧,基本沒有冗餘,擴展性很好,是非常好的入門材料。

不過,作為一個 cron 的替代品,其實 gron 還是有自己的問題的。簡單講就是,如果我重啟瞭一個EC2實例,那麼我的 cron job 其實也還會繼續執行,這是落盤的,操作系統級別的支持。

但如果我執行 gron 的進程掛掉瞭,不好意思,那就完全涼瞭。你隻有重啟,然後再把所有任務加回來才行。而我們既然要用 gron,是很有可能定一個幾天後,幾個星期後,幾個月後這樣的觸發器的。誰能保證進程一直活著呢?連機子本身都可能重啟。

所以,我們需要一定的機制來保證 gron 任務的可恢復性,將任務落盤,持久化狀態信息,算是個思考題,這裡大傢可以考慮一下怎麼做。

到此這篇關於一文詳解Golang 定時任務庫 gron 設計和原理的文章就介紹到這瞭,更多相關Golang   gron內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: