Go並發控制WaitGroup的使用場景分析

1. 前言

上一篇介紹瞭 Go並發控制–Channel

使用channel來控制子協程的優點是實現簡單,缺點是當需要大量創建協程時就需要有相同數量的channel,而且對於子協程繼續派生出來的協程不方便控制。

2. 使用WaitGroup控制

WaitGroup,可理解為Wait-Goroutine-Group,即等待一組goroutine結束。比如某個goroutine需要等待其他幾個goroutine全部完成,那麼使用WaitGroup可以輕松實現。

2.1 使用場景

下面程序展示瞭一個goroutine等待另外兩個goroutine結束的例子:

package main

import (
    "fmt"
    "time"
    "sync"
)

func main() {
    var wg sync.WaitGroup

    wg.Add(2) //設置計數器,數值即為goroutine的個數
    go func() {
        //Do some work
        time.Sleep(1*time.Second)

        fmt.Println("Goroutine 1 finished!")
        wg.Done() //goroutine執行結束後將計數器減1
    }()

    go func() {
        //Do some work
        time.Sleep(2*time.Second)

        fmt.Println("Goroutine 2 finished!")
        wg.Done() //goroutine執行結束後將計數器減1
    }()

    wg.Wait() //主goroutine阻塞等待計數器變為0
    fmt.Printf("All Goroutine finished!")
}

簡單的說,上面程序中wg內部維護瞭一個計數器:

  • 啟動goroutine前將計數器通過Add(2)將計數器設置為待啟動的goroutine個數。
  • 啟動goroutine後,使用Wait()方法阻塞自己,等待計數器變為0。
  • 每個goroutine執行結束通過Done()方法將計數器減1。
  • 計數器變為0後,阻塞的goroutine被喚醒

其實WaitGroup也可以實現一組goroutine等待另一組goroutine,這有點像玩雜技,很容出錯,如果不瞭解其實現原理更是如此。實際上,WaitGroup的實現源碼非常簡單。

2.2 信號量

信號量是Unix系統提供的一種保護共享資源的機制,用於防止多個線程同時訪問某個資源

可簡單理解為信號量為一個數值:

  • 當信號量>0時,表示資源可用,獲取信號量時系統自動將信號量減1;
  • 當信號量==0時,表示資源暫不可用,獲取信號量時,當前線程會進入睡眠,當信號量為正時被喚醒;

1.3 WaitGroup 數據結構

源碼包中src/sync/waitgroup.go:WaitGroup定義瞭其數據結構:

type WaitGroup struct {
    state1 [3]uint32
}

state1是個長度為3的數組,其中包含瞭state和一個信號量,而state實際上是兩個計數器:

  • counter: 當前還未執行結束的goroutine計數器
  • waiter count: 等待goroutine-group結束的goroutine數量,即有多少個等候者
  • semaphore: 信號量

考慮到字節是否對齊,三者出現的位置不同,為簡單起見,依照字節已對齊情況下,三者在內存中的位置如下所示:

WaitGroup對外提供三個接口:

  • Add(delta int): 將delta值加到counter中
  • Wait(): waiter遞增1,並阻塞等待信號量semaphore
  • Done(): counter遞減1,按照waiter數值釋放相應次數信號量

下面分別介紹這三個函數的實現細節。

2.3.1 Add () 方法

Add()做瞭兩件事,一是把delta值累加到counter中,因為delta可以為負值,也就是說counter有可能變成0或負值,所以第二件事就是當counter值變為0時,根據waiter數值釋放等量的信號量,把等待的goroutine全部喚醒,如果counter變為負值,則panic.

Add()偽代碼如下:

func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state() //獲取state和semaphore地址指針

    state := atomic.AddUint64(statep, uint64(delta)<<32) //把delta左移32位累加到state,即累加到counter中
    v := int32(state >> 32) //獲取counter值
    w := uint32(state)      //獲取waiter值

    if v < 0 {              //經過累加後counter值變為負值,panic
        panic("sync: negative WaitGroup counter")
    }

    //經過累加後,此時,counter >= 0
    //如果counter為正,說明不需要釋放信號量,直接退出
    //如果waiter為零,說明沒有等待者,也不需要釋放信號量,直接退出
    if v > 0 || w == 0 {
        return
    }

    //此時,counter一定等於0,而waiter一定大於0(內部維護waiter,不會出現小於0的情況),
    //先把counter置為0,再釋放waiter個數的信號量
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false) //釋放信號量,執行一次釋放一個,喚醒一個等待者
    }
}

2.3.2 Wait()

Wait()方法也做瞭兩件事,一是累加waiter, 二是阻塞等待信號量

func (wg *WaitGroup) Wait() {
    statep, semap := wg.state() //獲取state和semaphore地址指針
    for {
        state := atomic.LoadUint64(statep) //獲取state值
        v := int32(state >> 32)            //獲取counter值
        w := uint32(state)                 //獲取waiter值
        if v == 0 {                        //如果counter值為0,說明所有goroutine都退出瞭,不需要待待,直接返回
            return
        }

        // 使用CAS(比較交換算法)累加waiter,累加可能會失敗,失敗後通過for loop下次重試
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            runtime_Semacquire(semap) //累加成功後,等待信號量喚醒自己
            return
        }
    }
}

這裡用到瞭CAS算法保證有多個goroutine同時執行Wait()時也能正確累加waiter。

2.3.3 Done()

Done()隻做一件事,即把counter減1,我們知道Add()可以接受負值,所以Done實際上隻是調用瞭Add(-1)。

源碼如下:

func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

Done()的執行邏輯就轉到瞭Add(),實際上也正是最後一個完成的goroutine把等待者喚醒的。

2.4 總結

簡單說來,WaitGroup通常用於等待一組“工作協程”結束的場景,其內部維護兩個計數器,這裡把它們稱為“工作協程”計數器和“坐等協程”計數器,
WaitGroup對外提供的三個方法分工非常明確:

  • Add(delta int)方法用於增加“工作協程”計數,通常在啟動新的“工作協程”之前調用;
  • Done()方法用於減少“工作協程”計數,每次調用遞減1,通常在“工作協程”內部且在臨近返回之前調用;
  • Wait()方法用於增加“坐等協程”計數,通常在所有”工作協

Done()方法除瞭負責遞減“工作協程”計數以外,還會在“工作協程”計數變為0時檢查“坐等協程”計數器並把“坐等協程”喚醒。

需要註意

  • Done()方法遞減“工作協程”計數後,如果“工作協程”計數變成負數時,將會觸發panic,這就要求Add()方法調用要早於Done()方法。
  • 也就是說代碼中,如果調用Done的次數多於Add的次數會產生painc
  • 當“工作協程”計數多於實際需要等待的“工作協程”數量時,“坐等協程”可能會永遠無法被喚醒而產生列鎖,此時,Go運行時檢測到死鎖會觸發panic
  • Add的添加的工作協程的數量,多於Done調用的次數,則會出現panic
  • 當“工作協程”計數小於實際需要等待的“工作協程”數量時,Done()會在“工作協程”計數變為負數時觸發panic。
  • Add()添加的工作協程個數小於Done調用的次數,會出現panic

3. 總結

WaitGroup控制子協程的方式很簡單,且目的很明確,等待一組子協程執行完畢再執行主線程,但是當子協程裡面有子協程,子協程裡面有其他的子協程時,這種並不知道有多少個子協程的情況下使用WaitGroup就很難,所以就需要****Context**上場瞭

到此這篇關於Go並發控制–WaitGroup篇的文章就介紹到這瞭,更多相關Go並發控制WaitGroup內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: