Go語言讀寫鎖RWMutex的源碼分析

前言

在前面兩篇文章中 初見 Go Mutex 、Go Mutex 源碼詳解,我們學習瞭 Go語言 中的 Mutex,它是一把互斥鎖,每次隻允許一個 goroutine 進入臨界區,可以保證臨界區資源的狀態正確性。但是有的情況下,並不是所有 goroutine 都會修改臨界區狀態,可能隻是讀取臨界區的數據,如果此時還是需要每個 goroutine 拿到鎖依次進入的話,效率就有些低下瞭。例如房間裡面有一幅畫,有人想修改,有人隻是想看一下,完全可以放要看的一部分人進去,等他們看完再讓修改的人進去修改,這樣既提高瞭效率,也保證瞭臨界區資源的安全。看和修改,對應的就是讀和寫,本篇文章我們就一起來學習下 Go語言 中的讀寫鎖sync.RWMutex

說明:本文中的示例,均是基於Go1.17 64位機器

RWMutex 總覽

RWMutex 是一個讀/寫互斥鎖,在某一時刻隻能由任意數量的 reader 持有 或者 一個 writer 持有。也就是說,要麼放行任意數量的 reader,多個 reader 可以並行讀;要麼放行一個 writer,多個 writer 需要串行寫。

RWMutex 對外暴露的方法有五個:

  • RLock():讀操作獲取鎖,如果鎖已經被 writer 占用,會一直阻塞直到 writer 釋放鎖;否則直接獲得鎖;
  • RUnlock():讀操作完畢之後釋放鎖;
  • Lock():寫操作獲取鎖,如果鎖已經被 reader 或者 writer 占用,會一直阻塞直到獲取到鎖;否則直接獲得鎖;
  • Unlock():寫操作完畢之後釋放鎖;
  • RLocker():返回讀操作的 Locker 對象,該對象的 Lock() 方法對應 RWMutex 的 RLock()Unlock() 方法對應 RWMutex 的 RUnlock() 方法。

一旦涉及到多個 reader 和 writer ,就需要考慮優先級問題,是 reader 優先還是 writer 優先:

  • reader優先:隻要有 reader 要進行讀操作,writer 就一直等待,直到沒有 reader 到來。這種方式做到瞭讀操作的並發,但是如果 reader 持續到來,會導致 writer 饑餓,一直不能進行寫操作;
  • writer優先:隻要有 writer 要進行寫操作,reader 就一直等待,直到沒有 writer 到來。這種方式提高瞭寫操作的優先級,但是如果 writer 持續到來,會導致 reader 饑餓,一直不能進行讀操作;
  • 沒有優先級:按照先來先到的順序,沒有誰比誰更優先,這種相對來說會更公平。

我們先來看下 RWMutex 的運行機制,就可以知道它的優先級是什麼瞭。

可以想象 RWMutex 有兩個隊伍,一個是包含 所有reader 和你獲得準入權writer 的 隊列A,一個是還沒有獲得準入權 writer 的 隊列B。

  • 隊列 A 最多隻允許有 一個writer,如果有其他 writer,需要在 隊列B 等待;
  • 當一個 writer 到瞭 隊列A 後,隻允許它 之前的reader 執行讀操作,新來的 reader 需要在 隊列A 後面排隊;
  • 當前面的 reader 執行完讀操作之後,writer 執行寫操作;
  • writer 執行完寫操作後,讓 後面的reader 執行讀操作,再喚醒隊列B 的一個 writer 到 隊列A 後面排隊。

初始時刻 隊列A 中 writer W1 前面有三個 reader,後面有兩個 reader,隊列B中有兩個 writer

RWMutex運行示例:初始時刻

並發讀 多個 reader 可以同時獲取到讀鎖,進入臨界區進行讀操作;writer W1 在 隊列A 中等待,同時又來瞭兩個 reader,直接在 隊列A 後面排隊

RWMutex運行示例:並發讀

寫操作 W1 前面所有的 reader 完成後,W1 獲得鎖,進入臨界區操作

RWMutex運行示例:寫操作

獲得準入權 W1 完成寫操作退出,先讓後面排隊的 reader 進行讀操作,然後從 隊列B 中喚醒 W2 到 隊列A 排隊。W2 從 隊列B 到 隊列A 的過程中,R8 先到瞭 隊列A,因此 R8 可以執行讀操作。R9R10R11 在 W2 之後到的,所以在後面排隊;新來的 W4 直接在隊列B 排隊。

RWMutex運行示例:獲得準入權

從上面的示例可以看出,RWMutex 可以看作是沒有優先級,按照先來先到的順序去執行,隻不過是 多個reader 可以 並行 去執行罷瞭。

深入源碼

數據結構

type RWMutex struct {
 w           Mutex  // 控制 writer 在 隊列B 排隊
 writerSem   uint32 // 寫信號量,用於等待前面的 reader 完成讀操作
 readerSem   uint32 // 讀信號量,用於等待前面的 writer 完成寫操作
 readerCount int32  // reader 的總數量,同時也指示是否有 writer 在隊列A 中等待
 readerWait  int32  // 隊列A 中 writer 前面 reader 的數量
}

// 允許最大的 reader 數量
const rwmutexMaxReaders = 1 << 30

上述中的幾個變量,比較特殊的是 readerCount ,不僅表示當前 所有reader 的數量,同時表示是否有 writer 在隊列A中等待。當 readerCount 變為 負數 時,就代表有 writer 在隊列A 中等待瞭。

  • 當有 writer 進入 隊列A 後,會將 readerCount 變為負數,即 readerCount = readerCount - rwmutexMaxReaders,同時利用 readerWait 變量記錄它前面有多少個 reader;
  • 如果有新來的 reader,發現 readerCount 是負數,就會直接去後面排隊;
  • writer 前面的 reader 在釋放鎖時,會將 readerCount 和 readerWait都減一,當 readerWait==0 時,表示 writer 前面的所有 reader 都執行完瞭,可以讓 writer 執行寫操作瞭;
  • writer 執行寫操作完畢後,會將 readerCount 再變回正數,readerCount = readerCount + rwmutexMaxReaders

舉例:假設當前有兩個 reader,readerCount = 2;允許最大的reader 數量為 10

  • 當 writer 進入隊列A 時,readerCount = readerCount – rwmutexMaxReaders = -8,readerWait = readerCount = 2
  • 如果再來 3 個reader,readerCount = readerCount + 3 = -5
  • 獲得讀鎖的兩個reader 執行完後,readerCount = readerCount – 2 = -7,readerWait = readerWait-2 =0,writer 獲得鎖
  • writer 執行完後,readerCount = readerCount + rwmutexMaxReaders = 3,當前有 3個 reader

RLock()

reader 執行讀操作之前,需要調用 RLock() 獲取鎖

func (rw *RWMutex) RLock() {
 
  // reader 加鎖,將 readerCount 加一,表示多瞭個 reader
  if atomic.AddInt32(&rw.readerCount, 1) < 0 {
  
    // 如果 readerCount<0,說明有 writer 在自己前面等待,排隊等待讀信號量
  runtime_SemacquireMutex(&rw.readerSem, false, 0)
 }
}

RUnlock()

reader 執行完讀操作後,調用 RUnlock() 釋放鎖

func (rw *RWMutex) RUnlock() {

  // reader 釋放鎖,將 readerCount 減一,表示少瞭個 reader
 if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
  
    // 如果readerCount<0,說明有 writer 在自己後面等待,看是否要讓 writer 運行
  rw.rUnlockSlow(r)
 }
}
func (rw *RWMutex) rUnlockSlow(r int32) {

  // 將 readerWait 減一,表示前面的 reader 少瞭一個
 if atomic.AddInt32(&rw.readerWait, -1) == 0 {
    
  // 如果 readerWait 變為瞭0,那麼自己就是最後一個完成的 reader
    // 釋放寫信號量,讓 writer 運行
  runtime_Semrelease(&rw.writerSem, false, 1)
 }
}

Lock()

writer 執行寫操作之前,調用 Lock() 獲取鎖

func (rw *RWMutex) Lock() {

   // 利用互斥鎖,如果前面有 writer,那麼就需要等待互斥鎖,即在隊列B 中排隊等待;如果沒有,可以直接進入 隊列A 排隊
   rw.w.Lock()

  // atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) 將 readerCount 變成瞭負數
  // 再加 rwmutexMaxReaders,相當於 r = readerCount,r 就是 writer 前面的 reader 數量
   r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
   
   // 如果 r!= 0 ,表示自己前面有 reader,那麼令 readerWait = r,要等前面的 reader 運行完
   if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
      runtime_SemacquireMutex(&rw.writerSem, false, 0)
   }

}

Lock() 和 RUnlock() 是會並發進行的:

  • 如果 Lock() 將 readerCount 變為負數後,假設 r=3,表示加入的那一刻前面有三個 reader,還沒有賦值 readerWait CPU 就被強占瞭,readerWait = 0;
  • 假設此時三個 reader 的 RUnlock() 會進入到 rUnlockSlow() 邏輯,每個 reader 都將 readerWait 減一, readerWait 會變成負數,此時不符合喚醒 writer 的條件;
  • 三個 reader 運行完之後,此時 readerWait = -3, Lock() 運行到 atomic.AddInt32(&rw.readerWait, r) = -3+3 =0,也不會休眠,直接獲取到鎖,因為前面的 reader 都運行完瞭。

這就是為什麼 rUnlockSlow() 要判斷 atomic.AddInt32(&rw.readerWait, -1) == 0 以及 Lock() 要判斷 atomic.AddInt32(&rw.readerWait, r) != 0 的原因。

Unlock()

writer 執行寫操作之後,調用 Lock() 釋放鎖

func (rw *RWMutex) Unlock() {

  // 將 readerCount 變為正數,表示當前沒有 writer 在隊列A 等待瞭
 r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)

  // 將自己後面等待的 reader 喚醒,可以進行讀操作瞭
 for i := 0; i < int(r); i++ {
  runtime_Semrelease(&rw.readerSem, false, 0)
 }
 
  // 釋放互斥鎖,如果隊列B有writer,相當於喚醒一個來隊列A 排隊
 rw.w.Unlock()

}

writer 對 readerCount 一加一減,不會改變整體狀態,隻是用正負來表示是否有 writer 在等待。當然,如果在 writer 將 readerCount變為負數後,來瞭很多 reader,將 readerCount 變為瞭正數,此時reader 在 writer 沒有釋放鎖的時候就獲取到鎖瞭,是有問題的。但是 rwmutexMaxReaders 非常大,可以不考慮這個問題。

常見問題

不可復制

和 Mutex 一樣,RWMutex 也是不可復制。不能復制的原因和互斥鎖一樣。一旦讀寫鎖被使用,它的字段就會記錄它當前的一些狀態。這個時候你去復制這把鎖,就會把它的狀態也給復制過來。但是,原來的鎖在釋放的時候,並不會修改你復制出來的這個讀寫鎖,這就會導致復制出來的讀寫鎖的狀態不對,可能永遠無法釋放鎖。

不可重入

不可重入的原因是,獲得鎖之後,還沒釋放鎖,又申請鎖,這樣有可能造成死鎖。比如 reader A 獲取到瞭讀鎖,writer B 等待 reader A 釋放鎖,reader 還沒釋放鎖又申請瞭一把鎖,但是這把鎖申請不成功,他需要等待 writer B。這就形成瞭一個循環等待的死鎖。

加鎖和釋放鎖一定要成對出現,不能忘記釋放鎖,也不能解鎖一個未加鎖的鎖。

實戰一下

Go 中的 map 是不支持 並發寫的,我們可以利用 讀寫鎖 RWMutex 來實現並發安全的 map。在讀多寫少的情況下,使用 RWMutex 要比 Mutex 性能高。

package main

import (
 "fmt"
 "math/rand"
 "sync"
 "time"
)

type ConcurrentMap struct {
 m     sync.RWMutex
 items map[string]interface{}
}

func (c *ConcurrentMap) Add(key string, value interface{}) {
 c.m.Lock()
 defer c.m.Unlock()
 c.items[key] = value
}

func (c *ConcurrentMap) Remove(key string) {
 c.m.Lock()
 defer c.m.Unlock()
 delete(c.items, key)
}
func (c *ConcurrentMap) Get(key string) interface{} {
 c.m.RLock()
 defer c.m.RUnlock()
 return c.items[key]
}

func NewConcurrentMap() ConcurrentMap {
 return ConcurrentMap{
  items: make(map[string]interface{}),
 }
}

func main() {
 m := NewConcurrentMap()

 var wait sync.WaitGroup

 wait.Add(10000)
 for i := 0; i < 10000; i++ {

  key := fmt.Sprintf("%d", rand.Intn(10))
  value := fmt.Sprintf("%d", rand.Intn(100))
  if i%100 == 0 {
   go func() {
    defer wait.Done()
    m.Add(key, value)
   }()
  } else {
   go func() {
    defer wait.Done()
    fmt.Println(m.Get(key))
    time.Sleep(time.Millisecond * 10)
   }()
  }

 }

 wait.Wait()
}

以上就是Go語言讀寫鎖RWMutex的源碼分析的詳細內容,更多關於Go語言讀寫鎖RWMutex的資料請關註WalkonNet其它相關文章!

推薦閱讀: