Go語言學習之WaitGroup用法詳解
前言
在前面的文章中,我們使用過 WaitGroup
進行任務編排,Go語言中的 WaitGroup
和 Java 中的 CyclicBarrier
、CountDownLatch
非常類似。比如我們有一個主任務在執行,執行到某一點時需要並行執行三個子任務,並且需要等到三個子任務都執行完後,再繼續執行主任務。那我們就需要設置一個檢查點,使主任務一直阻塞在這,等三個子任務執行完後再放行。
說明:本文中的示例,均是基於Go1.17 64位機器
小試牛刀
我們先來個簡單的例子,看下 WaitGroup
是怎麼使用的。示例中使用 Add(5)
表示我們有 5個 子任務,然後起瞭 5個 協程去完成任務,主協程使用 Wait()
方法等待 子協程執行完畢,輸出一共等待的時間。
func main() { var waitGroup sync.WaitGroup start := time.Now() waitGroup.Add(5) for i := 0; i < 5; i++ { go func() { defer waitGroup.Done() time.Sleep(time.Second) fmt.Println("done") }() } waitGroup.Wait() fmt.Println(time.Now().Sub(start).Seconds()) } /* done done done done done 1.000306089 */
總覽
WaitGroup 一共有三個方法:
(wg *WaitGroup) Add(delta int) (wg *WaitGroup) Done() (wg *WaitGroup) Wait()
Add
方法用於設置 WaitGroup 的計數值,可以理解為子任務的數量Done
方法用於將 WaitGroup 的計數值減一,可以理解為完成一個子任務Wait
方法用於阻塞調用者,直到 WaitGroup 的計數值為0,即所有子任務都完成
正常來說,我們使用的時候,需要先確定子任務的數量,然後調用 Add() 方法傳入相應的數量,在每個子任務的協程中,調用 Done(),需要等待的協程調用 Wait() 方法,狀態流轉如下圖:
底層實現
結構體
type WaitGroup struct { noCopy noCopy // noCopy 字段標識,由於 WaitGroup 不能復制,方便工具檢測 state1 [3]uint32 // 12個字節,8個字節標識 計數值和等待數量,4個字節用於標識信號量 }
state1
是個復合字段,會拆分為兩部分: 64位(8個字節)的 statep
作為一個整體用於原子操作, 其中前面4個字節表示計數值,後面四個字節表示等待數量;剩餘 32位(4個字節)semap
用於標識信號量。
Go語言中對於64位的變量進行原子操作,需要保證該變量是64位對齊的,也就是要保證這 8個字節 的首地址是 8 的整數倍。因此當 state1
的首地址是 8 的整數倍時,取前8個字節作為 statep
,後4個字節作為 semap
;當 state1
的首地址不是 8 的整數倍時,取後8個字節作為 statep
,前4個字節作為 semap
。
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { // 首地址是8的倍數時,前8個字節為 statep, 後四個字節為 semap if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2] } else { // 後8個字節為 statep, 前四個字節為 semap return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0] } }
Add
Add
方法用於添加一個計數值(負數相當於減),當計數值變為0後,Wait
方法阻塞的所有等待者都會被釋放- 計數值變為負數是非法操作,產生
panic
- 當計數值為0時(初始狀態),
Add
方法不能和Wait
方法並發調用,需要保證Add
方法在Wait
方法之前
調用,否則會panic
func (wg *WaitGroup) Add(delta int) { // 拿到計數值等待者變量 statep 和 信號量 semap statep, semap := wg.state() // 計數值加上 delta: statep 的前四個字節是計數值,因此將 delta 前移 32位 state := atomic.AddUint64(statep, uint64(delta)<<32) // 計數值 v := int32(state >> 32) // 等待者數量 w := uint32(state) // 如果加上 delta 之後,計數值變為負數,不合法,panic if v < 0 { panic("sync: negative WaitGroup counter") } // delta > 0 && v == int32(delta) : 表示從 0 開始添加計數值 // w!=0 :表示已經有瞭等待者 // 說明在添加計數值的時候,同時添加瞭等待者,非法操作。添加等待者需要在添加計數值之後 if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // v>0 : 計數值不等於0,不需要喚醒等待者,直接返回 // w==0: 沒有等待者,不需要喚醒,直接返回 if v > 0 || w == 0 { return } // 再次檢查數據是否一致 if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 到這裡說明計數值為0,且等待者大於0,需要喚醒所有的等待者,並把系統置為初始狀態(0狀態) // 將計數值和等待者數量都置為0 *statep = 0 // 喚醒等待者 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) } }
Done
// 完成一個任務,將計數值減一,當計數值減為0時,需要喚醒所有的等待者 func (wg *WaitGroup) Done() { wg.Add(-1) }
Wait
// 調用 Wait 方法會被阻塞,直到 計數值 變為0 func (wg *WaitGroup) Wait() { // 獲取計數、等待數和信號量 statep, semap := wg.state() for { state := atomic.LoadUint64(statep) // 計數值 v := int32(state >> 32) // 等待者數量 w := uint32(state) // 計數值數量為0,直接返回,無需等待 if v == 0 { return } // 到這裡說明計數值數量大於0 // 增加等待者數量:這裡會有競爭,比如多個 Wait 調用,或者在同時調用 Add 方法,增加不成功會繼續 for 循環 if atomic.CompareAndSwapUint64(statep, state, state+1) { // 增加成功後,阻塞在信號量這裡,等待被喚醒 runtime_Semacquire(semap) // 被喚醒的時候,應該是0狀態。如果重用 WaitGroup,需要等 Wait 返回 if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } return } } }
易錯點
上面分析源碼可以看到幾個會產生 panic
的點,這也是我們使用 WaitGroup
需要註意的地方
1.計數值變為負數
調用 Add 時參數值傳負數
func main() { var wg sync.WaitGroup wg.Add(1) wg.Add(-1) wg.Add(-1) }
多次調用 Done 方法
func main() { var wg sync.WaitGroup wg.Add(1) go func() { fmt.Println("test") wg.Done() wg.Done() }() time.Sleep(time.Second) wg.Wait() }
2.Add 和 Wait 並發調用
Add
和 Wait
並發調用,有可能達不到我們預期的效果,甚至 panic
。如下示例中,我們想要等待 3 個子任務都執行完後再執行主任務,但實際情況可能是子任務還沒起來,主任務就繼續往下執行瞭。
func doSomething(wg *sync.WaitGroup) { wg.Add(1) fmt.Println("do something") defer wg.Done() } func main() { var wg sync.WaitGroup for i := 0; i < 3; i++ { go doSomething(&wg) } wg.Wait() fmt.Println("main") } //main //do something //do something
正確的使用方式,應該是在調用 Wait
前先調用 Add
func doSomething(wg *sync.WaitGroup) { defer wg.Done() fmt.Println("do something") } func main() { var wg sync.WaitGroup wg.Add(3) for i := 0; i < 3; i++ { go doSomething(&wg) } wg.Wait() fmt.Println("main") } //do something //do something //do something //main
3.沒有等 Wait 返回,就重用 WaitGroup
func main() { var wg sync.WaitGroup wg.Add(1) go func() { fmt.Println("do something") wg.Done() wg.Add(1) }() wg.Wait() }
4.復制使用
我們知道 Go 語言中的參數傳遞,都是值傳遞,就會產生復制操作。因此在向函數傳遞 WaitGroup 時,使用指針進行操作。
// 錯誤使用方式,沒有使用指針 func doSomething(wg sync.WaitGroup) { fmt.Println("do something") defer wg.Done() } func main() { var wg sync.WaitGroup wg.Add(3) for i := 0; i < 3; i++ { // 這裡沒使用指針,wg狀態一直不會改變,導致 Wait 一直阻塞 go doSomething(wg) } wg.Wait() fmt.Println("main") }
總結
我們通過源碼+示例的方式,一起學習瞭 sync.WaitGroup
實現邏輯,同時也給出瞭一些註意點,隻要做到如下操作,就不會出現問題:
- 保證 Add 在 Wait 前調用
- Add 中不傳遞負數
- 任務完成後不要忘記調用 Done 方法,建議使用 defer wg.Done()
- 不要復制使用 WaitGroup,函數傳遞時使用指針傳遞
- 盡量不復用 WaigGroup,減少出問題的風險
到此這篇關於Go語言學習之WaitGroup用法詳解的文章就介紹到這瞭,更多相關Go語言 WaitGroup內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Go並發控制WaitGroup的使用場景分析
- Go WaitGroup及Cond底層實現原理
- 詳解Go語言中Goroutine退出機制的原理及使用
- 在golang中使用Sync.WaitGroup解決等待的問題
- Go語言同步等待組sync.WaitGroup結構體對象方法詳解