Golang實現CronJob(定時任務)的方法詳解

引言

最近做瞭一個需求,是定時任務相關的。以前定時任務都是通過 linux crontab 去實現的,現在服務上雲(k8s)瞭,嘗試瞭 k8s 的 CronJob,由於公司提供的是界面化工具,使用、查看起來很不方便。於是有瞭本文,通過一個單 pod 去實現一個常駐服務,去跑定時任務。

經過篩選,選用瞭cron這個庫,它支持 linux cronjob 語法取配置定時任務,還支持@every 10s、@hourly 等描述符去配置定時任務,完全滿足我們要求,比如下面的例子:

package main

import (
	"fmt"

	"github.com/natefinch/lumberjack"
	"github.com/robfig/cron/v3"
	"github.com/sirupsen/logrus"
)

type CronLogger struct {
	clog *logrus.Logger
}

func (l *CronLogger) Info(msg string, keysAndValues ...interface{}) {
	l.clog.WithFields(logrus.Fields{
		"data": keysAndValues,
	}).Info(msg)
}

func (l *CronLogger) Error(err error, msg string, keysAndValues ...interface{}) {
	l.clog.WithFields(logrus.Fields{
		"msg":  msg,
		"data": keysAndValues,
	}).Warn(err.Error())
}

func main() {
	logger := logrus.New()
	_logger := &lumberjack.Logger{
		Filename:   "./test.log",
		MaxSize:    50,
		MaxAge:     15,
		MaxBackups: 5,
	}

	logger.SetOutput(_logger)
	logger.SetFormatter(&logrus.JSONFormatter{
		DisableHTMLEscape: true,
	})

	c := cron.New(cron.WithLogger(&CronLogger{
		clog: logger,
	}))
	c.AddFunc("*/5 * * * *", func() {
		fmt.Println("你的流量包即將過期瞭")
	})
	c.AddFunc("*/2 * * * *", func() {
		fmt.Println("你的轉碼包即將過期瞭")
	})
	c.Start()

	for {
		select {}
	}
}

使用瞭 cronjob、並結合瞭 golang 的 log 組建,輸出日志到文件,使用很方便。

但是,在使用過程中,發現還有些不足,缺少某些功能,比如我很想使用的查看任務列表。

類庫介紹

擴展性強

此類庫擴展性挺強,通過 JobWrapper 去包裝一個任務,NewChain(w1, w2, w3).Then(job),相關實現如下:

type JobWrapper func(Job) Job
type Chain struct {
    wrappers []JobWrapper
}
func NewChain(c ...JobWrapper) Chain {
    return Chain{c}
}
func (c Chain) Then(j Job) Job {
    for i := range c.wrappers {
        j = c.wrappers[len(c.wrappers)-i-1](j)
    }
    return j
}

比如當前腳本如果還沒有執行完,下次任務時間又到瞭,就可以通過如下默認提供的 wrapper 去避免繼續執行。可以看到最後執行的任務 j.Run() 被包裝在瞭一個函數閉包中,並且根據閉包中的 channel 去判斷是否執行,避免重復執行。首次執行的時候,容量為 1 的 channel 中已經有數據瞭,重復執行時,channel 無數據,默認跳過,等上次任務執行完成後,又像 channel 中寫入一條數據,下次 channel 可以讀出數據,又可以執行任務瞭:

func SkipIfStillRunning(j Job) Job {
    var ch = make(chan struct{}, 1)
    ch <- struct{}{}
    return FuncJob(func() {
        select {
        case v := <-ch:
            defer func() { ch <- v }()
            j.Run()
        default:
            // "skip"
        }
    })
}

主流程

cron 主流程是啟動一個協程,裡面有雙重 for 循環,下面我們來一起分析一下。

定時器

第一層循環,首先計算下次最早執行任務的時間跟當前時間間隔 gap,然後設置定時器為 gap,這裡很巧妙,定時器間隔不是 1s/次,而是跟下次任務的時間相關,這樣就避免瞭無用的定時器循環,也讓執行時間更精準,不存在設置小瞭浪費資源,設置大瞭誤差大的情況。接下來進入第二層循環。

sort.Sort(byTime(c.entries))
timer = time.NewTimer(c.entries[0].Next.Sub(now))

事件循環

事件循環中,包含瞭很多事件,比如 添加任務、停止、移除任務,當 cron 啟動之後,這些任務都是異步的。比如添加任務,不會直接將任務信息寫入內存中,而是進入事件循環,加入之後,重新計算第一二層循環,避免瞭正在修改任務信息,又執行任務信息,然後出錯的情況。

有人可能會問瞭,為何不在事件中加鎖,這樣也能避免內存競爭。我想說,我們執行的是腳本任務,有的事件可能很長,可能會阻塞有些事件,所以這些事件都放在循環中,避免瞭加鎖,也滿足瞭要求。

for {
    select {
    case now = <-timer.C:
        // 執行任務
    case newEntry := <-c.add:
        // 添加任務
    case replyChan := <-c.snapshot:
        // 獲取任務信息
    case <-c.stop:
        //  停止任務
    case id := <-c.remove:
        // 移除任務
    }
    break
}

類庫改造

在瞭解瞭項目的基本情況之後,對項目做瞭部分改造,方便使用。

打印任務列表信息

在主循環匯總加入瞭信號量監聽,當觸發信號量 SIGUSR1,將任務信息輸出到日志:

usrSig := make(chan os.Signal, 1)
signal.Notify(usrSig, syscall.SIGUSR1)

for {
	select {
	case <-usrSig:
		// 啟動單獨的協程去打印定時任務執行信息
		continue
	}
	break
}

根據名稱移除腳本

目前腳本隻能根據腳本 id 去移除要執行的任務,執行過程中,也不能通過命令去移除任務,不是太方便。比如有個腳本馬上要執行瞭,但是該腳本發現問題瞭,這時候生產環境的話,就需要更新代碼,然後重啟服務去下線腳本任務,這時候,黃花菜可能都涼瞭。

所以我也是通過信號量,來處理運行之後,運行中移除任務的問題,收到信號量之後,讀取文件中的內容,根據命令去處理 runing 中的內存:

usrSig2 := make(chan os.Signal, 1)
signal.Notify(usrSig2, syscall.SIGUSR2)

......
case <-usrSig2:
	actionByte, err := os.ReadFile("/tmp/cron.action")
	...... //校驗命令正確性
	action := strings.Fields(string(actionByte))
	switch action[0] {
	case "removeTag":
		timer.Stop()
		now = c.now()
		c.removeEntryByTag(action[1])
		c.logger.Info("removedByTag", "tag", action[1])
	}
......

改造效果

由於原項目已經 2 年多沒有個更新過瞭,就算發起 pr 估計也不會被處理,所以 fork 一份放在瞭這裡aizuyan/cron進行改造,下面是改進之後的代碼:

package main

import (
	// 加載配置文件

	"fmt"

	"github.com/aizuyan/cron/v3"
)

func main() {
	c := cron.New(cron.WithLogger(cron.DefaultLogger))
	c.AddFuncWithTag("流量包過期", "*/5 * * * *", func() {
		fmt.Println("你的流量包即將過期瞭")
	})
	c.AddFuncWithTag("轉碼包過期", "*/2 * * * *", func() {
		fmt.Println("你的轉碼包即將過期瞭")
	})
	c.Start()

	for {
		select {}
	}
}

對每個定時任務增加瞭一個名稱標識,當任務啟動後,當我們執行 kill -SIGUSR1 <pid> 的時候,會看到 stdout 輸出瞭運行的任務列表信息:

+—-+————+————-+———————+———————+
| ID |    TAG     |    SPEC     |        PREV         |        NEXT         |
+—-+————+————-+———————+———————+
|  2 | 轉碼包過期 | */2 * * * * | 0001-01-01 00:00:00 | 2023-04-02 17:22:00 |
|  1 | 流量包過期 | */5 * * * * | 0001-01-01 00:00:00 | 2023-04-02 17:25:00 |
+—-+————+————-+———————+———————+

執行 kill -SIGUSR2 <pid>,移除轉碼包過期任務,避免瞭使用 ID 容易出錯的問題。

cat /tmp/cron.action 
removeTag 轉碼包過期
// {"data":["tag","轉碼包過期"],"level":"info","msg":"removedByTag","time":"2023-04-02T18:32:56+08:00"}

放目前為止,是不是更好用瞭,基本能滿足我們的需求瞭,也可以自己去再做各種擴展。

到此這篇關於Golang實現CronJob(定時任務)的方法詳解的文章就介紹到這瞭,更多相關Golang定時任務內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: