一文帶你深入理解Go語言中的sync.Cond

在 go 的標準庫中,提供瞭 sync.Cond 這個並發原語,讓我們可以實現多個 goroutine 等待某一條件滿足之後再繼續執行。 它需要配合 sync.Mutex 一起使用,因為 CondWait 方法需要在 Mutex 的保護下才能正常工作。 對於條件變量,可能大多數人隻是知道它的存在,但是用到它的估計寥寥無幾,因為很多並發場景的處理都能使用 chan 來實現, 而且 chan 的使用也更加簡單。 但是在某些場景下,Cond 可能是最好的選擇,本文就來探討一下 Cond 的使用場景,基本用法,以及它的實現原理。

sync.Cond 是什麼

sync.Cond 表示的是條件變量,它是一種同步機制,用來協調多個 goroutine 之間的同步,當共享資源的狀態發生變化的時候, 可以通過條件變量來通知所有等待的 goroutine 去重新獲取共享資源。

適用場景

在實際使用中,我們可能會有多個 goroutine 在執行的過程中,由於某一條件不滿足而阻塞的情況。 這個時候,我們就可以使用條件變量來實現 goroutine 之間的同步。比如,我們有一個 goroutine 用來獲取數據, 但是可能會比較耗時,這個時候,我們就可以使用條件變量來實現 goroutine 之間的同步, 當數據準備好之後,就可以通過條件變量來通知所有等待的 goroutine 去重新獲取共享資源。

sync.Cond 條件變量用來協調想要訪問共享資源的那些 goroutine,當共享資源的狀態發生變化的時候, 它可以用來通知所有等待的 goroutine 去重新獲取共享資源。

sync.Cond 的基本用法

sync.Cond 的基本用法非常簡單,我們隻需要通過 sync.NewCond 方法來創建一個 Cond 實例, 然後通過 Wait 方法來等待條件滿足,通過 Signal 或者 Broadcast 方法來通知所有等待的 goroutine 去重新獲取共享資源。

NewCond 創建實例

sync.NewCond 方法用來創建一個 Cond 實例,它的參數是一個 Locker 接口,我們可以傳入一個 Mutex 或者 RWMutex 實例。 這個條件變量的 Locker 接口就是用來保護共享資源的。

Wait 等待條件滿足

Wait 方法用來等待條件滿足,它會先釋放 Cond 的鎖(Cond.L),然後阻塞當前 goroutine(實際調用的是 goparkunlock),直到被 Signal 或者 Broadcast 喚醒。

它做瞭如下幾件事情:

  • 釋放 Cond 的鎖(Cond.L),然後阻塞當前 goroutine。(所以,使用之前需要先鎖定)
  • Signal 或者 Broadcast 喚醒之後,會重新獲取 Cond 的鎖(Cond.L)。
  • 之後,就返回到 goroutine 阻塞的地方繼續執行。

Signal 通知一個等待的 goroutine

Signal 方法用來通知一個等待的 goroutine,它會喚醒一個等待的 goroutine,然後繼續執行當前 goroutine。 如果沒有等待的 goroutine,則不會有任何操作。

Broadcast 通知所有等待的 goroutine

Broadcast 方法用來通知所有等待的 goroutine,它會喚醒所有等待的 goroutine,然後繼續執行當前 goroutine。 如果沒有等待的 goroutine,則不會有任何操作。

sync.Cond 使用實例

下面我們通過一個實例來看一下 sync.Cond 的使用方法。

package cond

import (
   "fmt"
   "sync"
   "testing"
   "time"
)

var done bool
var data string

func write(c *sync.Cond) {
   fmt.Println("writing.")
   // 讓 reader 先獲取鎖,模擬條件不滿足然後 wait 的情況
   time.Sleep(time.Millisecond * 10)
   c.L.Lock()
   // 模擬耗時的寫操作
   time.Sleep(time.Millisecond * 50)
   data = "hello world"
   done = true
   fmt.Println("writing done.")
   c.L.Unlock()
   c.Broadcast()
}

func read(c *sync.Cond) {
   fmt.Println("reading")
   c.L.Lock()
   for !done {
      fmt.Println("reader wait.")
      c.Wait()
   }
   fmt.Println("read done.")
   fmt.Println("data:", data)
   defer c.L.Unlock()
}

func TestCond(t *testing.T) {
   var c = sync.NewCond(&sync.Mutex{})

   go read(c)  // 讀操作
   go read(c)  // 讀操作
   go write(c) // 寫操作

   time.Sleep(time.Millisecond * 100) // 等待操作完成
}

輸出:

reading
reader wait. // 還沒獲取完數據,需要等待
writing.
reading
reader wait.
writing done. // 獲取完數據瞭,通知所有等待的 reader
read done. // 讀取到數據瞭
data: hello world // 輸出讀取到的數據
read done.
data: hello world

這個例子可以粗略地用下圖來表示:

說明:

  • read1reader2 表示兩個 goroutine,它們都會調用 read 函數。
  • donefalse 的時候,reader1reader2 都會調用 c.Wait() 函數,然後阻塞等待。
  • write 表示一個 goroutine,它會調用 write 函數。
  • write 函數中,獲取完數據之後,會將 done 設置為 true,然後調用 c.Broadcast() 函數,通知所有等待的 reader 去重新獲取共享資源。
  • reader1reader2 在解除阻塞狀態後,都會重新獲取共享資源,然後輸出讀取到的數據。

在這個例子中,done 的功能是標記,用來表示共享資源是否已經獲取完畢,如果沒有獲取完畢,那麼 reader 就會阻塞等待。

為什麼要用 sync.Cond

在文章開頭,我們說瞭,很多並發編程的問題都可以通過 channel 來解決。 同樣的,在上面提到的 sync.Cond 的使用場景,使用 channel 也是可以實現的, 我們隻要 close(ch) 來關閉 channel 就可以實現通知多個等待的協程瞭。

那麼為什麼還要用 sync.Cond 呢? 主要原因是,sync.Cond 可以重復地進行 Wait()Signal()Broadcast() 操作, 但是,如果想通過關閉 chan 來實現這個功能的話,那就隻能通知一次瞭。 因為 channel 隻能關閉一次,關閉一個已經關閉的 channel 會導致程序 panic。

使用 channel 的另外一種方式是,記錄 reader 的數量,然後通過往 channel 中發送多次數據來實現通知多個 reader。 但是這樣一來代碼就會復雜很多,從另一個角度說,出錯的概率大瞭很多。

close channel 廣播實例

下面的例子模擬瞭使用 close(chan) 來實現 sync.Cond 中那種廣播功能,但是隻能通知一次。

package close_chan

import (
   "fmt"
   "testing"
   "time"
)

var data string

func read(c <-chan struct{}) {
   fmt.Println("reading.")

   // 從 chan 接收數據,如果 chan 中沒有數據,會阻塞。
   // 如果能接收到數據,或者 chan 被關閉,會解除阻塞狀態。
   <-c

   fmt.Println("data:", data)
}

func write(c chan struct{}) {
   fmt.Println("writing.")
   // 模擬耗時的寫操作
   time.Sleep(time.Millisecond * 10)
   data = "hello world"
   fmt.Println("write done.")

   // 關閉 chan 的時候,會通知所有的 reader
   // 所有等待從 chan 接收數據的 goroutine 都會被喚醒
   close(c)
}

func TestCloseChan(t *testing.T) {
   ch := make(chan struct{})

   go read(ch)
   go read(ch)
   go write(ch)

   // 不能關閉已經關閉的 chan
   time.Sleep(time.Millisecond * 20)
   // panic: close of closed channel
   // 下面這行代碼會導致 panic
   //go write(ch)

   time.Sleep(time.Millisecond * 100)
}

輸出:

writing.
reading. // 會阻塞直到寫完
reading. // 會阻塞直到寫完
write done. // 寫完之後,才能讀
data: hello world
data: hello world

上面例子的 write 不能多次調用,否則會導致 panic。

sync.Cond 基本原理

go 的 sync.Cond 中維護瞭一個鏈表,這個鏈表記錄瞭所有阻塞的 goroutine,也就是由於調用瞭 Wait 而阻塞的 goroutine。 而 SignalBroadcast 方法就是用來喚醒這個鏈表中的 goroutine 的。 Signal 方法隻會喚醒鏈表中的第一個 goroutine,而 Broadcast 方法會喚醒鏈表中的所有 goroutine

下圖是 Signal 方法的效果,可以看到,Signal 方法隻會喚醒鏈表中的第一個 goroutine

說明:

  • notifyListsync.Cond 中維護的一個鏈表,這個鏈表記錄瞭所有阻塞的 goroutine
  • head 是鏈表的頭節點,tail 是鏈表的尾節點。
  • Signal 方法隻會喚醒鏈表中的第一個 goroutine

Broadcast 方法會喚醒 notifyList 中的所有 goroutine

sync.Cond 的設計與實現

最後,我們來看一下 sync.Cond 的設計與實現。

sync.Cond 模型

sync.Cond 的模型如下所示:

type Cond struct {
   noCopy noCopy

   // L is held while observing or changing the condition
   L Locker // L 在觀察或改變條件時被持有

   notify  notifyList
   checker copyChecker
}

屬性說明:

  • noCopy 是一個空結構體,用來檢查 sync.Cond 是否被復制。(在編譯前通過 go vet 命令來檢查)
  • L 是一個 Locker 接口,用來保護條件變量。
  • notify 是一個 notifyList 類型,用來記錄所有阻塞的 goroutine
  • checker 是一個 copyChecker 類型,用來檢查 sync.Cond 是否被復制。(如果在運行時被復制,會導致 panic

notifyList 結構體

notifyListsync.Cond 中維護的一個鏈表,這個鏈表記錄瞭所有因為共享資源還沒準備好而阻塞的 goroutine。它的定義如下所示:

type notifyList struct {
   wait atomic.Uint32
   notify uint32

   // 阻塞的 waiter 名單。
   lock mutex // 鎖
   head *sudog // 阻塞的 goroutine 鏈表(鏈表頭)
   tail *sudog // 阻塞的 goroutine 鏈表(鏈表尾)
}

屬性說明:

  • wait 是下一個 waiter 的編號。它在鎖外自動遞增。
  • notify 是下一個要通知的 waiter 的編號。它可以在鎖外讀取,但隻能在持有鎖的情況下寫入。
  • lock 是一個 mutex 類型,用來保護 notifyList
  • head 是一個 sudog 類型,用來記錄阻塞的 goroutine 鏈表的頭節點。
  • tail 是一個 sudog 類型,用來記錄阻塞的 goroutine 鏈表的尾節點。

notifyList 的方法說明:

notifyList 中包含瞭幾個操作阻塞的 goroutine 鏈表的方法。

  • notifyListAdd 方法將 waiter 的編號加 1。
  • notifyListWait 方法將當前的 goroutine 加入到 notifyList 中。(也就是將當前協程掛起)
  • notifyListNotifyOne 方法將 notifyList 中的第一個 goroutine 喚醒。
  • notifyListNotifyAll 方法將 notifyList 中的所有 goroutine 喚醒。
  • notifyListCheck 方法檢查 notifyList 的大小是否正確。

sync.Cond 的方法

notifyList 就不細說瞭,本文重點講解一下 sync.Cond 的實現。

Wait 方法

Wait 方法用在當條件不滿足的時候,將當前運行的協程掛起。

func (c *Cond) Wait() {
   // 檢查是否被復制
   c.checker.check()
   // 更新 notifyList 中需要等待的 waiter 的數量
   // 返回當前需要插入 notifyList 的編號
   t := runtime_notifyListAdd(&c.notify)
   // 解鎖
   c.L.Unlock()
   // 掛起當前 g,直到被喚醒
   runtime_notifyListWait(&c.notify, t)
   // 喚醒之後,重新加鎖。
   // 因為阻塞之前解鎖瞭。
   c.L.Lock()
}

對於 Wait 方法,我們需要註意的是,使用之前,我們需要先調用 L.Lock() 方法加鎖,然後再調用 Wait 方法,否則會報錯。

文檔裡面的例子:

c.L.Lock()
for !condition() {
    c.Wait()
}
// ...使用條件...
// 這裡是我們在條件滿足之後,需要執行的代碼。
c.L.Unlock()

好瞭,問題來瞭,調用 Wait 方法之前為什麼要先加鎖呢?

這是因為在我們使用共享資源的時候,可能一些代碼是互斥的,所以我們需要加鎖。 這樣我們就可以保證在我們使用共享資源的時候,不會被其他協程修改。 但是如果因為條件不滿足,我們需要等待的話,我們不可能在持有鎖的情況下等待, 因為在修改條件的時候,可能也需要加鎖,這樣就會造成死鎖。

另外一個問題是,為什麼要使用 for 來檢查條件是否滿足,而不是使用 if 呢?

這是因為在我們調用 Wait 方法之後,可能會有其他協程喚醒我們,但是條件並沒有滿足, 這個時候依然是需要繼續 Wait 的。

Signal 方法

Signal 方法用在當條件滿足的時候,將 notifyList 中的第一個 goroutine 喚醒。

func (c *Cond) Signal() {
   // 檢查 sync.Cond 是否被復制瞭
   c.checker.check()
   // 喚醒 notifyList 中的第一個 goroutine
   runtime_notifyListNotifyOne(&c.notify)
}

Broadcast 方法

Broadcast 方法用在當條件滿足的時候,將 notifyList 中的所有 goroutine 喚醒。

func (c *Cond) Broadcast() {
   // 檢查 sync.Cond 是否被復制瞭
   c.checker.check()
   // 喚醒 notifyList 中的所有 goroutine
   runtime_notifyListNotifyAll(&c.notify)
}

copyChecker 結構體

copyChecker 結構體用來檢查 sync.Cond 是否被復制。它實際上隻是一個 uintptr 類型的值。

type copyChecker uintptr

// check 方法檢查 copyChecker 是否被復制瞭。
func (c *copyChecker) check() {
   if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
      !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
      uintptr(*c) != uintptr(unsafe.Pointer(c)) {
      panic("sync.Cond is copied")
   }
}

copyChecker 的值隻有兩種可能:

  • 0,表示還沒有調用過 Wait, SignalBroadcast 方法。
  • uintptr(unsafe.Pointer(&copyChecker)),表示已經調用過 Wait, SignalBroadcast 方法。在這幾個方法裡面會調用 check 方法,所以 copyChecker 的值會被修改。

所以如果 copyChecker 的值不是 0,也不是 uintptr(unsafe.Pointer(&copyChecker))(也就是最初的 copyChecker 的內存地址),則表示 copyChecker 被復制瞭。

需要註意的是,這個方法在調用 CompareAndSwapUintptr 還會檢查一下,這是因為有可能會並發調用 CompareAndSwapUintptr, 如果另外一個協程調用瞭 CompareAndSwapUintptr 並且成功瞭,那麼當前協程的這個 CompareAndSwapUintptr 調用會返回 false, 這個時候就需要檢查是否是因為另外一個協程調用瞭 CompareAndSwapUintptr 而導致的,如果是的話,就不會 panic

為什麼 sync.Cond 不能被復制

從上一小節中我們可以看到,sync.Cond 其實是不允許被復制的,但是如果是在調用 Wait, SignalBroadcast 方法之前復制,那倒是沒關系。

這是因為 sync.Cond 中維護瞭一個阻塞的 goroutine 列表。如果 sync.Cond 被復制瞭,那麼這個列表就會被復制,這樣就會導致兩個 sync.Cond 都包含瞭這個列表;但是我們喚醒的時候,隻會有其中一個 sync.Cond 被喚醒,另外一個 sync.Cond 就會一直阻塞。 所以 go 直接從語言層面限制瞭這種情況,不允許 sync.Cond 被復制。

總結

sync.Cond 是一個條件變量,它可以用來協調多個 goroutine 之間的同步,當條件滿足的時候,去通知那些因為條件不滿足被阻塞的 goroutine 繼續執行。

sync.Cond 的接口比較簡單,隻有 Wait, SignalBroadcast 三個方法。

  • Wait 方法用來阻塞當前 goroutine,直到條件滿足。調用 Wait 方法之前,需要先調用 L.Lock 方法加鎖。
  • Signal 方法用來喚醒 notifyList 中的第一個 goroutine
  • Broadcast 方法用來喚醒 notifyList 中的所有 goroutine

sync.Cond 的實現也比較簡單,它的核心就是 notifyList,它是一個鏈表,用來保存所有因為條件不滿足而被阻塞的 goroutine

用關閉 channel 的方式也可以實現類似的廣播功能,但是有個問題是 channel 不能被重復關閉,所以這種方式無法被多次使用。也就是說使用這種方式無法多次廣播。

使用 channel 發送通知的方式也是可以的,但是這樣實現起來就復雜很多瞭,就更容易出錯瞭。

sync.Cond 中使用 copyChecker 來檢查 sync.Cond 是否被復制,如果被復制瞭,就會 panic。需要註意的是,這裡的復制是指調用瞭 WaitSignalBroadcast 方法之後,sync.Cond 被復制瞭。在調用這幾個方法之前進行復制是沒有影響的。

以上就是一文帶你深入理解Go語言中的sync.Cond的詳細內容,更多關於Go語言 sync.Cond的資料請關註WalkonNet其它相關文章!

推薦閱讀: