GoLang中的timer定時器實現原理分析

// NewTimer creates a new Timer that will send
// the current time on its channel after at least duration d.
func NewTimer(d Duration) *Timer {
	c := make(chan Time, 1)
	t := &Timer{
		C: c,
		r: runtimeTimer{
			when: when(d),
			f:    sendTime,
			arg:  c,
		},
	}
	startTimer(&t.r)
	return t
}
// The Timer type represents a single event.
// When the Timer expires, the current time will be sent on C,
// unless the Timer was created by AfterFunc.
// A Timer must be created with NewTimer or AfterFunc.
type Timer struct {
	C <-chan Time
	r runtimeTimer
}
func NewTicker(d Duration) *Ticker {
	if d <= 0 {
		panic(errors.New("non-positive interval for NewTicker"))
	}
	// Give the channel a 1-element time buffer.
	// If the client falls behind while reading, we drop ticks
	// on the floor until the client catches up.
	c := make(chan Time, 1)
	t := &Ticker{
		C: c,
		r: runtimeTimer{
			when:   when(d),
			period: int64(d),
			f:      sendTime,
			arg:    c,
		},
	}
	startTimer(&t.r)
	return t
}
type Ticker struct {
	C <-chan Time // The channel on which the ticks are delivered.
	r runtimeTimer
}

ticker 跟 timer 的初始化過程差不多,但是 ticker 比 timer 多瞭一個 period 參數,意為間隔的意思。

// Interface to timers implemented in package runtime.
// Must be in sync with ../runtime/time.go:/^type timer
type runtimeTimer struct {
	pp       uintptr
	when     int64 //觸發時間
	period   int64 //執行周期性任務的時間間隔
	f        func(any, uintptr) // 執行的回調函數,NOTE: must not be closure
	arg      any //執行任務的參數
	seq      uintptr //回調函數的參數,該參數僅在 netpoll 的應用場景下使用
	nextwhen int64 //如果是周期性任務,下次執行任務時間
	status   uint32 //狀態
}
// sendTime does a non-blocking send of the current time on c.
func sendTime(c any, seq uintptr) {
	select {
	case c.(chan Time) <- Now():
	default:
	}
}

sendTime 采用非阻塞的形式,意為,不管是否存在接收方,此定時器一旦到時間瞭就要觸發掉。

// runtime/runtime2.go
type p struct {
    .....
    // The when field of the first entry on the timer heap.
	// This is updated using atomic functions.
	// This is 0 if the timer heap is empty.
    // 堆頂元素什麼時候執行
	timer0When uint64
    // The earliest known nextwhen field of a timer with
	// timerModifiedEarlier status. Because the timer may have been
	// modified again, there need not be any timer with this value.
	// This is updated using atomic functions.
	// This is 0 if there are no timerModifiedEarlier timers.
    // 如果有timer修改為更早執行時間瞭,將會將執行時間更新到更早時間
	timerModifiedEarliest uint64
    // Lock for timers. We normally access the timers while running
	// on this P, but the scheduler can also do it from a different P.
    // 操作timer的互斥鎖
	timersLock mutex
    // Actions to take at some time. This is used to implement the
	// standard library's time package.
	// Must hold timersLock to access.
    //該p 上的所有timer,必須加鎖去操作這個字段,因為不同的p 操作這個字段會有競爭關系
	timers []*timer
	// Number of timers in P's heap.
	// Modified using atomic instructions.
    //p 堆上所有的timer數
	numTimers uint32
    // Number of timerDeleted timers in P's heap.
	// Modified using atomic instructions.
    //被標記為刪除的timer,要麼是我們調用stop,要麼是timer 自己觸發後過期導致的刪除
	deletedTimers uint32
}
// runtime/time.go
type timer struct {
	// If this timer is on a heap, which P's heap it is on.
	// puintptr rather than *p to match uintptr in the versions
	// of this struct defined in other packages.
	pp puintptr
	// Timer wakes up at when, and then at when+period, ... (period > 0 only)
	// each time calling f(arg, now) in the timer goroutine, so f must be
	// a well-behaved function and not block.
	//
	// when must be positive on an active timer.
	when   int64
	period int64
	f      func(any, uintptr)
	arg    any
	seq    uintptr
	// What to set the when field to in timerModifiedXX status.
	nextwhen int64
	// The status field holds one of the values below.
	status uint32
}
// startTimer adds t to the timer heap.
//go:linkname startTimer time.startTimer
func startTimer(t *timer) {
	if raceenabled {
		racerelease(unsafe.Pointer(t))
	}
	addtimer(t)
}
// stopTimer stops a timer.
// It reports whether t was stopped before being run.
//go:linkname stopTimer time.stopTimer
func stopTimer(t *timer) bool {
	return deltimer(t)
}
// addtimer adds a timer to the current P.
// This should only be called with a newly created timer.
// That avoids the risk of changing the when field of a timer in some P's heap,
// which could cause the heap to become unsorted.
func addtimer(t *timer) {
	// when must be positive. A negative value will cause runtimer to
	// overflow during its delta calculation and never expire other runtime
	// timers. Zero will cause checkTimers to fail to notice the timer.
	if t.when <= 0 {
		throw("timer when must be positive")
	}
	if t.period < 0 {
		throw("timer period must be non-negative")
	}
	if t.status != timerNoStatus {
		throw("addtimer called with initialized timer")
	}
	t.status = timerWaiting
	when := t.when
	// Disable preemption while using pp to avoid changing another P's heap.
    // 如果M在此之後被別的P搶占瞭,那麼後續操作的就是別的P上的timers,這是不允許的
	mp := acquirem()
	pp := getg().m.p.ptr()
	lock(&pp.timersLock)
	cleantimers(pp) // 清理掉已經過期的timer,以提高添加和刪除timer的效率。
	doaddtimer(pp, t) // 執行添加操作
	unlock(&pp.timersLock)
    // 調用 wakeNetPoller 方法,喚醒網絡輪詢器,檢查計時器被喚醒的時間(when)是
    // 否在當前輪詢預期運行的時間(pollerPollUntil)內,若是喚醒。
    // 有的定時器是伴隨著網絡輪訓器的,比如設置的 i/o timeout
    // This can have a spurious wakeup but should never miss a wakeup
    // 寧願出現錯誤的喚醒,也不能漏掉一個喚醒
	wakeNetPoller(when)
	releasem(mp)
}
// 將0位置的timer與下面的子節點比較,如果比子節點大則下移。子節點i*4 + 1,i*4 + 2,i*4 + 3,i*4 + 4
siftdownTimer(pp.timers, 0) 
// 將i位置的timer與上面的父節點比較,如果比父節點小則上移。父節點是(i - 1) / 4
siftupTimer(pp.timers, i) 

timer 存儲在P中的 timers []*timer成員屬性上。timers看起來是一個切片,但是它是按照runtimeTimer.when這個數值排序的小頂堆四叉樹,觸發時間越早越排在前面。

整體來講就是父節點一定比其子節點小,子節點之間沒有任何關系和大小的要求。

關於acquiremreleasem

//go:nosplit
func acquirem() *m {
	_g_ := getg()
	_g_.m.locks++
	return _g_.m
}
//go:nosplit
func releasem(mp *m) {
	_g_ := getg()
	mp.locks--
	if mp.locks == 0 && _g_.preempt {
		// restore the preemption request in case we've cleared it in newstack
		_g_.stackguard0 = stackPreempt
	}
}

acquirem函數獲取當前M,並禁止M被搶占,因為M被搶占時的判斷如下

//C:\Go\src\runtime\preempt.go +287
func canPreemptM(mp *m) bool {
   return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning
}
  • 運行時沒有禁止搶占(m.locks == 0
  • 運行時沒有在執行內存分配(m.mallocing == 0
  • 運行時沒有關閉搶占機制(m.preemptoff == ""
  • M 與 P 綁定且沒有進入系統調用(p.status == _Prunning

timers的觸發

// runtime/proc.go
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool)
// runtime/time.go
func runtimer(pp *p, now int64) int64
func runOneTimer(pp *p, t *timer, now int64)

runtime/time.go文件中提供瞭checkTimers/runtimer/runOneTimer三個方法。checkTimers方法中,如果當前p的timers長度不為0,就不斷地調用runtimers。runtimes會根據堆頂的timer的狀態判斷其能否執行,如果可以執行就調用runOneTimer實際執行。

觸發定時器的途徑有兩個

  • 通過調度器在調度時進行計時器的觸發,findrunnable, schedule, stealWork。
  • 通過系統監控檢查並觸發計時器(到期未執行),sysmon。

調度器的觸發一共分兩種情況,一種是在調度循環的時候調用 checkTimers 方法進行計時器的觸發。另外一種是當前處理器 P 沒有可執行的 Timer,且沒有可執行的 G。那麼按照調度模型,就會去竊取其他計時器和 G。

即使是通過每次調度器調度和竊取的時候觸發,但畢竟是具有一定的隨機和不確定性,因此系統監控觸發依然是一個兜底保障,在 Go 語言中 runtime.sysmon 方法承擔瞭這一個責任,存在觸發計時器的邏輯,在每次進行系統監控時,都會在流程上調用 timeSleepUntil 方法去獲取下一個計時器應觸發的時間,以及保存該計時器已打開的計時器堆的 P。

在獲取完畢後會馬上檢查當前是否存在 GC,若是正在 STW 則獲取調度互斥鎖。若發現下一個計時器的觸發時間已經過去,則重新調用 timeSleepUntil 獲取下一個計時器的時間和相應 P 的地址。檢查 sched.sysmonlock 所花費的時間是否超過 50μs。若是,則有可能前面所獲取的下一個計時器觸發時間已過期,因此重新調用 timeSleepUntil 方法再次獲取。如果發現超過 10ms 的時間沒有進行 netpoll 網絡輪詢,則主動調用 netpoll 方法觸發輪詢。同時如果存在不可搶占的處理器 P,則調用 startm 方法來運行那些應該運行,但沒有在運行的計時器。

到此這篇關於GoLang中的timer定時器實現原理分析的文章就介紹到這瞭,更多相關Go timer定時器內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: