Go語言讀寫鎖RWMutex的源碼分析
前言
在前面兩篇文章中 初見 Go Mutex 、Go Mutex 源碼詳解,我們學習瞭 Go語言 中的 Mutex
,它是一把互斥鎖,每次隻允許一個 goroutine
進入臨界區,可以保證臨界區資源的狀態正確性。但是有的情況下,並不是所有 goroutine
都會修改臨界區狀態,可能隻是讀取臨界區的數據,如果此時還是需要每個 goroutine
拿到鎖依次進入的話,效率就有些低下瞭。例如房間裡面有一幅畫,有人想修改,有人隻是想看一下,完全可以放要看的一部分人進去,等他們看完再讓修改的人進去修改,這樣既提高瞭效率,也保證瞭臨界區資源的安全。看和修改,對應的就是讀和寫,本篇文章我們就一起來學習下 Go語言 中的讀寫鎖sync.RWMutex
。
說明:本文中的示例,均是基於Go1.17 64位機器
RWMutex 總覽
RWMutex
是一個讀/寫互斥鎖,在某一時刻隻能由任意數量的 reader
持有 或者 一個 writer 持有。也就是說,要麼放行任意數量的 reader,多個 reader 可以並行讀;要麼放行一個 writer,多個 writer 需要串行寫。
RWMutex 對外暴露的方法有五個:
- RLock():讀操作獲取鎖,如果鎖已經被
writer
占用,會一直阻塞直到writer
釋放鎖;否則直接獲得鎖; - RUnlock():讀操作完畢之後釋放鎖;
- Lock():寫操作獲取鎖,如果鎖已經被
reader
或者writer
占用,會一直阻塞直到獲取到鎖;否則直接獲得鎖; - Unlock():寫操作完畢之後釋放鎖;
- RLocker():返回讀操作的
Locker
對象,該對象的Lock()
方法對應RWMutex
的RLock()
,Unlock()
方法對應RWMutex
的RUnlock()
方法。
一旦涉及到多個 reader 和 writer ,就需要考慮優先級問題,是 reader 優先還是 writer 優先:
- reader優先:隻要有 reader 要進行讀操作,writer 就一直等待,直到沒有 reader 到來。這種方式做到瞭讀操作的並發,但是如果 reader 持續到來,會導致 writer 饑餓,一直不能進行寫操作;
- writer優先:隻要有 writer 要進行寫操作,reader 就一直等待,直到沒有 writer 到來。這種方式提高瞭寫操作的優先級,但是如果 writer 持續到來,會導致 reader 饑餓,一直不能進行讀操作;
- 沒有優先級:按照先來先到的順序,沒有誰比誰更優先,這種相對來說會更公平。
我們先來看下 RWMutex 的運行機制,就可以知道它的優先級是什麼瞭。
可以想象 RWMutex 有兩個隊伍,一個是包含 所有reader 和你獲得準入權writer 的 隊列A,一個是還沒有獲得準入權 writer 的 隊列B。
- 隊列 A 最多隻允許有 一個writer,如果有其他 writer,需要在 隊列B 等待;
- 當一個 writer 到瞭 隊列A 後,隻允許它 之前的reader 執行讀操作,新來的 reader 需要在 隊列A 後面排隊;
- 當前面的 reader 執行完讀操作之後,writer 執行寫操作;
- writer 執行完寫操作後,讓 後面的reader 執行讀操作,再喚醒隊列B 的一個 writer 到 隊列A 後面排隊。
初始時刻 隊列A 中 writer W1
前面有三個 reader,後面有兩個 reader,隊列B中有兩個 writer
RWMutex運行示例:初始時刻
並發讀 多個 reader 可以同時獲取到讀鎖,進入臨界區進行讀操作;writer W1
在 隊列A 中等待,同時又來瞭兩個 reader,直接在 隊列A 後面排隊
RWMutex運行示例:並發讀
寫操作 W1
前面所有的 reader 完成後,W1
獲得鎖,進入臨界區操作
RWMutex運行示例:寫操作
獲得準入權 W1
完成寫操作退出,先讓後面排隊的 reader 進行讀操作,然後從 隊列B 中喚醒 W2
到 隊列A 排隊。W2
從 隊列B 到 隊列A 的過程中,R8
先到瞭 隊列A,因此 R8
可以執行讀操作。R9
、R10
、R11
在 W2
之後到的,所以在後面排隊;新來的 W4
直接在隊列B 排隊。
RWMutex運行示例:獲得準入權
從上面的示例可以看出,RWMutex
可以看作是沒有優先級,按照先來先到的順序去執行,隻不過是 多個reader 可以 並行 去執行罷瞭。
深入源碼
數據結構
type RWMutex struct { w Mutex // 控制 writer 在 隊列B 排隊 writerSem uint32 // 寫信號量,用於等待前面的 reader 完成讀操作 readerSem uint32 // 讀信號量,用於等待前面的 writer 完成寫操作 readerCount int32 // reader 的總數量,同時也指示是否有 writer 在隊列A 中等待 readerWait int32 // 隊列A 中 writer 前面 reader 的數量 } // 允許最大的 reader 數量 const rwmutexMaxReaders = 1 << 30
上述中的幾個變量,比較特殊的是 readerCount
,不僅表示當前 所有reader
的數量,同時表示是否有 writer
在隊列A中等待。當 readerCount
變為 負數
時,就代表有 writer 在隊列A 中等待瞭。
- 當有 writer 進入 隊列A 後,會將
readerCount
變為負數,即readerCount = readerCount - rwmutexMaxReaders
,同時利用readerWait
變量記錄它前面有多少個 reader; - 如果有新來的 reader,發現
readerCount
是負數,就會直接去後面排隊; - writer 前面的 reader 在釋放鎖時,會將
readerCount
和readerWait
都減一,當 readerWait==0 時,表示 writer 前面的所有 reader 都執行完瞭,可以讓 writer 執行寫操作瞭; - writer 執行寫操作完畢後,會將 readerCount 再變回正數,
readerCount = readerCount + rwmutexMaxReaders
。
舉例:假設當前有兩個 reader,readerCount = 2;允許最大的reader 數量為 10
- 當 writer 進入隊列A 時,readerCount = readerCount – rwmutexMaxReaders = -8,readerWait = readerCount = 2
- 如果再來 3 個reader,readerCount = readerCount + 3 = -5
- 獲得讀鎖的兩個reader 執行完後,readerCount = readerCount – 2 = -7,readerWait = readerWait-2 =0,writer 獲得鎖
- writer 執行完後,readerCount = readerCount + rwmutexMaxReaders = 3,當前有 3個 reader
RLock()
reader 執行讀操作之前,需要調用 RLock() 獲取鎖
func (rw *RWMutex) RLock() { // reader 加鎖,將 readerCount 加一,表示多瞭個 reader if atomic.AddInt32(&rw.readerCount, 1) < 0 { // 如果 readerCount<0,說明有 writer 在自己前面等待,排隊等待讀信號量 runtime_SemacquireMutex(&rw.readerSem, false, 0) } }
RUnlock()
reader 執行完讀操作後,調用 RUnlock() 釋放鎖
func (rw *RWMutex) RUnlock() { // reader 釋放鎖,將 readerCount 減一,表示少瞭個 reader if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // 如果readerCount<0,說明有 writer 在自己後面等待,看是否要讓 writer 運行 rw.rUnlockSlow(r) } }
func (rw *RWMutex) rUnlockSlow(r int32) { // 將 readerWait 減一,表示前面的 reader 少瞭一個 if atomic.AddInt32(&rw.readerWait, -1) == 0 { // 如果 readerWait 變為瞭0,那麼自己就是最後一個完成的 reader // 釋放寫信號量,讓 writer 運行 runtime_Semrelease(&rw.writerSem, false, 1) } }
Lock()
writer 執行寫操作之前,調用 Lock() 獲取鎖
func (rw *RWMutex) Lock() { // 利用互斥鎖,如果前面有 writer,那麼就需要等待互斥鎖,即在隊列B 中排隊等待;如果沒有,可以直接進入 隊列A 排隊 rw.w.Lock() // atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) 將 readerCount 變成瞭負數 // 再加 rwmutexMaxReaders,相當於 r = readerCount,r 就是 writer 前面的 reader 數量 r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // 如果 r!= 0 ,表示自己前面有 reader,那麼令 readerWait = r,要等前面的 reader 運行完 if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) } }
Lock()
和 RUnlock()
是會並發進行的:
- 如果 Lock() 將 readerCount 變為負數後,假設 r=3,表示加入的那一刻前面有三個 reader,還沒有賦值 readerWait CPU 就被強占瞭,readerWait = 0;
- 假設此時三個 reader 的 RUnlock() 會進入到 rUnlockSlow() 邏輯,每個 reader 都將 readerWait 減一, readerWait 會變成負數,此時不符合喚醒 writer 的條件;
- 三個 reader 運行完之後,此時 readerWait = -3, Lock() 運行到 atomic.AddInt32(&rw.readerWait, r) = -3+3 =0,也不會休眠,直接獲取到鎖,因為前面的 reader 都運行完瞭。
這就是為什麼 rUnlockSlow()
要判斷 atomic.AddInt32(&rw.readerWait, -1) == 0
以及 Lock()
要判斷 atomic.AddInt32(&rw.readerWait, r) != 0
的原因。
Unlock()
writer 執行寫操作之後,調用 Lock() 釋放鎖
func (rw *RWMutex) Unlock() { // 將 readerCount 變為正數,表示當前沒有 writer 在隊列A 等待瞭 r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) // 將自己後面等待的 reader 喚醒,可以進行讀操作瞭 for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // 釋放互斥鎖,如果隊列B有writer,相當於喚醒一個來隊列A 排隊 rw.w.Unlock() }
writer 對 readerCount 一加一減,不會改變整體狀態,隻是用正負來表示是否有 writer 在等待。當然,如果在 writer 將 readerCount變為負數後,來瞭很多 reader,將 readerCount 變為瞭正數,此時reader 在 writer 沒有釋放鎖的時候就獲取到鎖瞭,是有問題的。但是 rwmutexMaxReaders 非常大,可以不考慮這個問題。
常見問題
不可復制
和 Mutex 一樣,RWMutex 也是不可復制。不能復制的原因和互斥鎖一樣。一旦讀寫鎖被使用,它的字段就會記錄它當前的一些狀態。這個時候你去復制這把鎖,就會把它的狀態也給復制過來。但是,原來的鎖在釋放的時候,並不會修改你復制出來的這個讀寫鎖,這就會導致復制出來的讀寫鎖的狀態不對,可能永遠無法釋放鎖。
不可重入
不可重入的原因是,獲得鎖之後,還沒釋放鎖,又申請鎖,這樣有可能造成死鎖。比如 reader A 獲取到瞭讀鎖,writer B 等待 reader A 釋放鎖,reader 還沒釋放鎖又申請瞭一把鎖,但是這把鎖申請不成功,他需要等待 writer B。這就形成瞭一個循環等待的死鎖。
加鎖和釋放鎖一定要成對出現,不能忘記釋放鎖,也不能解鎖一個未加鎖的鎖。
實戰一下
Go 中的 map 是不支持 並發寫的,我們可以利用 讀寫鎖 RWMutex 來實現並發安全的 map。在讀多寫少的情況下,使用 RWMutex 要比 Mutex 性能高。
package main import ( "fmt" "math/rand" "sync" "time" ) type ConcurrentMap struct { m sync.RWMutex items map[string]interface{} } func (c *ConcurrentMap) Add(key string, value interface{}) { c.m.Lock() defer c.m.Unlock() c.items[key] = value } func (c *ConcurrentMap) Remove(key string) { c.m.Lock() defer c.m.Unlock() delete(c.items, key) } func (c *ConcurrentMap) Get(key string) interface{} { c.m.RLock() defer c.m.RUnlock() return c.items[key] } func NewConcurrentMap() ConcurrentMap { return ConcurrentMap{ items: make(map[string]interface{}), } } func main() { m := NewConcurrentMap() var wait sync.WaitGroup wait.Add(10000) for i := 0; i < 10000; i++ { key := fmt.Sprintf("%d", rand.Intn(10)) value := fmt.Sprintf("%d", rand.Intn(100)) if i%100 == 0 { go func() { defer wait.Done() m.Add(key, value) }() } else { go func() { defer wait.Done() fmt.Println(m.Get(key)) time.Sleep(time.Millisecond * 10) }() } } wait.Wait() }
以上就是Go語言讀寫鎖RWMutex的源碼分析的詳細內容,更多關於Go語言讀寫鎖RWMutex的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- 一文掌握go的sync.RWMutex鎖
- Golang的鎖機制與使用技巧小結
- go sync.Map基本原理深入解析
- Go語言並發編程之互斥鎖Mutex和讀寫鎖RWMutex
- Golang並發操作中常見的讀寫鎖詳析