Golang Mutex互斥鎖源碼分析

前言

在上一篇文章中,我們一起學習瞭如何使用 Go 中的互斥鎖 Mutex,那麼本篇文章,我們就一起來探究下 Mutex 底層是如何實現的,知其然,更要知其所以然!

說明:本文中的示例,均是基於Go1.17 64位機器

Mutex 特性

Mutex 就是一把互斥鎖,可以想象成一個令牌,有且隻有這一個令牌,隻有持有令牌的 goroutine 才能進入房間(臨界區),在房間內執行完任務後,走出房間並把令牌交出來,如果還有其餘的 goroutine 等著獲取這個令牌,讓他們再去搶這個令牌,搶到的重復上述過程,沒搶到的繼續等。

上述是從宏觀角度來看待互斥鎖的,但是在 Mutex 內部,有著非常復雜的搶鎖邏輯,Mutex 的發展也經歷瞭幾個版本,我們可以用拿令牌進餐廳吃飯來形象比喻下幾個主要版本的變化。

前提:餐廳一次隻能進入一個人,餐廳有一個令牌,隻有持有這個令牌的人才能進去;從餐廳出來後,需要把這個令牌歸還

版本一

餐廳在門外設置瞭一個隊伍,如果令牌空閑,拿著令牌去餐廳用餐;如果令牌不是空閑的,新來的人就要去隊伍後面排隊等待叫號。(不是空閑包含兩種情況:持有令牌的人在餐廳裡面,隊伍是空的;隊伍有人排隊。)

此版本的問題就是:隻要令牌不是空閑的,新來的人必須直接去排隊,沒有商量的餘地。這樣看起來很公平,遵循先來後到的原則,但是對於餐廳來說,營業效率就會有所降低,即單位時間內接待顧客的數量(IO)會減少。為什麼這樣說呢,舉個例子,有個顧客從餐廳出來歸還令牌後,需要去等待隊列去叫號,被叫到號的這個人需要花費時間走到餐廳(獲取到CPU),這中間就浪費瞭不少時間。

版本二

為瞭提高營業效率,允許剛到門口的顧客和被叫到號的顧客一起去搶令牌,而不是直接去排隊,這樣就給瞭新人機會。舉個例子:當持有令牌的人從餐廳出來歸還令牌後,去等待隊列叫個號,如果此時有顧客剛到門口,被叫到號的和新到的顧客一起搶令牌,搶到的就可以直接進入餐廳,搶不到的接著去排隊,由於剛到的顧客離門口近(正在占據CPU),被叫到號的顧客離得遠(需要等CPU),而且剛到的顧客可能不隻一個,所以被叫到號的顧客很大概率搶不到令牌,可能還沒走到門口(還沒獲取到CPU)就被新來的顧客搶走瞭。不管怎麼樣,這樣提高瞭餐廳的效率,可以在單位時間內接待更多的客戶。

版本三

餐廳發現有些人用餐很快,如果讓搶不到令牌的先別直接去排隊,而是在門口轉悠會(當然不能一直轉悠,有條件限制,到瞭限制還是要去排隊),這種方式類似樂觀鎖,那麼有顧客從餐廳出來後,就不用去叫號瞭,直接讓門口的這些顧客繼續搶就行瞭,這樣就進一步提高瞭餐廳的運行效率,畢竟叫號真的太浪費時間瞭。

版本四

經過瞭多個版本的優化,餐廳的運營效率是越來越高瞭,但是有些人可要準備要罵娘瞭,這些人是誰呢,當然是已經在隊伍裡等待的那些人。由於給瞭新人機會,如果持續有新顧客來,那麼已經在隊伍裡的那些人永遠也拿不到令牌,可真的要餓死瞭。

Mutex 在這個版本隻為三件事:公平、公平、還是tm的公平!堅持讓每一個人都不餓肚子的原則,餐廳搞出瞭一個新的模式:饑餓模式。如果有顧客等的時間超過瞭閾值(1ms),餐廳變為饑餓模式,在該模式下,所有新來的顧客直接去排隊,然後按照先來先到的順序,依次將令牌給等待隊列隊首的顧客。

那麼什麼時候由饑餓模式變為正常模式呢?當拿到令牌的顧客發現自己從等待到拿到令牌的時間小於閾值(1ms)瞭,或者等待隊伍沒人等瞭,此時餐廳就變為正常模式,畢竟上述兩個條件都說明當前餐廳競爭不是很激烈瞭。

同時這個版本修復瞭以前的一個問題:之前從等待隊列喚醒的顧客如果沒有搶到令牌,再回到隊列後是插到隊尾,這樣對已經排到第一位的顧客太不友好瞭。在這個版本中修復瞭該問題,喚醒的顧客如果沒有搶到令牌,直接插入到隊首,下次叫號還是他。

特性總結

經過瞭多次迭代,目前的版本有瞭如下特性:

給新人機會:讓剛來的顧客和從隊列喚醒的顧客一起去搶令牌,喚醒也是按照先來先到的原則喚醒;

保持樂觀態度:沒搶到不是直接去排隊,而是可以在門口轉悠會,說不定裡面的人馬上就出來瞭;

正常模式和饑餓模式的切換:為瞭公平起見,正常模式下給瞭新人機會,一起去搶令牌;饑餓模式下照顧老人,所有人老老實實排隊,按照先來先到的順序拿令牌。整個餐廳既保持瞭公平,又提高瞭運行效率,一切井然有序起來瞭。

回歸正題

讓我們從餐廳回到 Go 中來,Mutex 有兩種模式:正常模式和饑餓模式:

正常模式下,如果當前鎖正在被持有,搶不到鎖的就會進入一個先進先出的等待隊列。當持有鎖的 goroutine 釋放鎖之後,按照從前到後的順序喚醒等待隊列的第一個等待者,但是不會直接給被喚醒者鎖,還是需要他去搶,即在喚醒等待隊列等待者這個時間,同時也會有正在運行且還未進入等待隊列的 goroutine 正在搶鎖 (數量可能還很多),這些都會和剛喚醒的等待者一起去搶,剛喚醒的可能還沒有分到 CPU,而正在運行的正在占據瞭CPU,所以正在運行的更有可能獲取到鎖,被喚醒的等待者可能搶鎖失敗。如果等待者搶鎖失敗,他會被放到等待隊列的隊首,如果超過 1ms 都沒搶到鎖,就會從 正常模式 切換到 饑餓模式。

饑餓模式下,要釋放鎖的 goroutine 會將鎖直接交給等待隊列的第一個等待者,不需要去搶瞭,而且新來的 goroutine 也不會嘗試去搶鎖,直接加入到等待隊列的尾部。那麼什麼時候會從饑餓模式切換到正常模式呢:

(1)如果當前被喚醒的等待者獲得到鎖後,發現自己是隊列中的最後一個,隊列中沒有其他等待者瞭,此時會切換到正常模式

(2)如果當前被喚醒的等待者獲得到鎖後,發現自己總共的等待時間不超過 1ms,就獲得到鎖瞭,此時也會切換到正常模式

正常模式會帶來更高的吞吐量:一個 goroutine 要釋放鎖,更大可能會被正在運行的 goroutine 搶到,這就避免瞭協程的上下文切換,運行更多的 goroutine,但是有可能造成一個問題,就是鎖始終被新來的 goroutine 搶走,在等待隊列中的等待者始終搶不到鎖,這就會導致饑餓問題。饑餓模式就是為瞭解決這個問題出現的,保證瞭每個 goroutine 都有運行的機會,防止等待時間過長。

數據結構

// 互斥鎖
type Mutex struct {
 state int32  // 狀態
 sema  uint32  // 信號量
}


const (
 mutexLocked = 1 << iota // 1
 mutexWoken // 2
 mutexStarving // 4
 mutexWaiterShift = iota // 3

 starvationThresholdNs = 1e6 // 判斷是否要進入饑餓狀態的閾值
)

信號量 sema 就相當於我們說的令牌,state 是 int32 類型,一共 32位,通過每個位記錄瞭當前的狀態:

state字段

mutexLocked:當前是否已經上鎖,state & mutexLocked = 1 表示已經上鎖;

mutexWoken:標記當前是否有喚醒的 goroutine,state & mutexWoken = 1 表示有喚醒的goroutine;

mutexStarving:當前是否為饑餓狀態,state & mutexWoken = 1 表示處於饑餓狀態;

mutexWaiterShift:29位,state >> mutexWaiterShift 得到等待者的數量;

Lock()

Lock()加鎖方法分為兩部分,第一部分是 fast path,可以理解為快捷通道,如果當前鎖沒被占用,直接獲得鎖返回;否則需要進入 slow path,判斷各種條件去競爭鎖,主要邏輯都在此處。

瞭解過原子操作的同學,對 CompareAndSwap(CAS) 應該不陌生,CompareAndSwapInt32(addr *int32, old, new int32) 有三個參數,如果地址 addr 指向的值與 old 相等,則將 addr 的值改為 new,否則不變,也就是說在我們修改前,如果有人修改瞭 addr 指向的值,本次修改就會失敗。

// 上鎖
func (m *Mutex) Lock() {
 // fastpath:期望當前鎖沒有被占用,可以快速獲取到鎖, CAS 修改 state 最後一位的值為1(標記鎖是否被占用)
 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  return
 }
 // Slow path : 單獨抽出來放到一個函數裡,方便 fast path 被內聯
 m.lockSlow()
}
func (m *Mutex) lockSlow() {
 var waitStartTime int64 // // 記錄等待時間
 starving := false // 當前的 goroutine 是否已經饑餓瞭(如果已經饑餓,就會將 state 的饑餓狀態置為 1)
 awoke := false  // 當前的 goroutine 是否被喚醒的
 iter := 0 // 自旋次數
 old := m.state // 保存當前的 state 狀態
 for {
    
    /*
    自旋:如果滿足如下條件,就會進入 if 語句,然後 continue,不斷自旋:
    1. 鎖被占用,且不處於饑餓模式(饑餓狀態直接去排隊,不允許嘗試獲取鎖)
    2. 基於當前自旋的次數,再次自旋有意義 runtime_canSpin(iter)
    
    那麼退出自旋的條件也就是:
    1. 鎖被釋放瞭,當前處於沒被占用狀態(說明等到瞭,該goroutine就會立即去獲取鎖)
  2. mutex進入瞭饑餓模式,不自旋瞭,沒意義(饑餓狀態會直接把鎖交給等待隊列隊首的goroutine)
  3. 不符合自旋狀態(自旋次數太多瞭,自旋失去瞭意義)
    
    如下代碼是位操作:
    mutexLocked|mutexStarving = 00000...101
    mutexLocked = 00000...001
    如果要滿足 old & 00000...101 = 00000...001,需要 old = ...0*1,即狀態為:鎖被占用,且不處於饑餓狀態 
    
    runtime_canSpin(iter) 會根據自旋次數,判斷是否可以繼續自旋
    */
  if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
   
      /*
      如果 
       1. 當前goroutine不是被喚醒的 (awoke=false) 
       2. 鎖狀態喚醒標志位為0(old&mutexWoken == 0) 
       3. 等待者數量不為0 (old>>mutexWaiterShift != 0  右移三位得到的就是等待者數量)
      
       那麼利用CAS,將 state 的喚醒標記置為1,標記自己是被喚醒的 (將state的喚醒標記置為1,說明外面有喚醒著的goroutine,那麼在釋放鎖的時候,就不去等待隊列叫號瞭,畢竟已經有喚醒的瞭)
       
       如果有其他 goroutine 已經設置瞭 state 的喚醒標記位,那麼本次就會失敗
      */

   if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    awoke = true
   }
   runtime_doSpin()
      
      // 迭代次數加一
   iter++
      
      // 獲取最新的狀態
   old = m.state
      
      // 想再次自旋,看看鎖釋放瞭沒
   continue
  }
    
    // 到這裡,說明退出瞭自旋,當前鎖沒被占用 或者  系統處於饑餓模式 或者 自旋次數太多導致不符合自旋條件
  
    // new 代表當前goroutine 基於當前狀態要設置的新狀態
  new := old
    
    // 隻要不是饑餓狀態,就需要獲取鎖(饑餓狀態直接去排隊,不能搶鎖)
  if old&mutexStarving == 0 {
   new |= mutexLocked
  }
    
    // 鎖被占用 或者 處於饑餓模式下,新增一個等待者
  if old&(mutexLocked|mutexStarving) != 0 {
   new += 1 << mutexWaiterShift
  }
    
    // 當前 goroutine 已經進入饑餓瞭,且鎖還沒有釋放,需要把 Mutex 的狀態改為饑餓狀態
  if starving && old&mutexLocked != 0 {
   new |= mutexStarving
  }
    
    // 如果是被喚醒的,把喚醒標志位置0,表示外面沒有被喚醒的goroutine瞭(搶到就獲得鎖、搶不到就睡眠,把喚醒標志置0)
  if awoke {
   
      // 由於是被喚醒的,new 裡面的 喚醒標記位一定是 1
   if new&mutexWoken == 0 {
    throw("sync: inconsistent mutex state")
   }
      
      // a &^ b 的意思就是 清零a中,ab都為1的位,即清除喚醒標記
   new &^= mutexWoken
  }
    
    /*
      利用CAS,將狀態設置為新的
      1. 如果是饑餓狀態,隻增加一個等待者數量
      2. 正常狀態,加鎖標記置為 1,如果鎖已被占用增加一個等待者數量
      3. 如果當前 goroutine 已經饑餓瞭,將 饑餓標記 置為 1
      4. 如果是被喚醒的,清除喚醒標記
    */
    
  if atomic.CompareAndSwapInt32(&m.state, old, new) {
      
      // 如果改狀態之前,鎖未被占用 且 處於正常模式,那麼就相當於獲取到鎖瞭
   if old&(mutexLocked|mutexStarving) == 0 {
    break 
   }
      
      // 到這裡說明:1. 之前鎖被占用  或者 2.之前是處於饑餓狀態 
      
      // 判斷之前是否等待過(是否從隊列裡喚醒的),之前等待過,再次排隊放在隊首
   queueLifo := waitStartTime != 0
      
      // 如果之前沒等過(新來的),設置等待起始時間當前時間
   if waitStartTime == 0 {
    waitStartTime = runtime_nanotime()
   }
      
     // 之前排過隊的老人,放到等待隊列隊首;新人放到隊尾,然後等待獲取信號量
   runtime_SemacquireMutex(&m.sema, queueLifo, 1)
      
      // 鎖被釋放,goroutine 被喚醒
      
      // 設置當前 goroutine 饑餓狀態,如果之前已經饑餓,或者距離等待開始時間超過瞭 1ms,也變饑餓
   starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
      
      // 獲取最新的狀態
   old = m.state
      
      // 如果 state 饑餓標記為1,說明當前在饑餓模式,饑餓模式下被喚醒,已經獲取到鎖瞭;
      // 饑餓狀態下,釋放鎖沒有更新等待者數量和饑餓標記,需要獲得鎖的goroutine去更新狀態
   if old&mutexStarving != 0 {

        // 正確性校驗:
        // 1. 鎖還是鎖住的狀態(鎖已經釋放給當前goroutine瞭,不應該被鎖住)
    // 2. 或者有被喚醒的goroutine(饑餓模式下不應該有醒著的goroutine,都應該去乖乖等著)
    // 3. 或者當前goroutine 的等待者數量為0(當前goroutine就是等待者)
    // 這三種情況不應該出現,與預期狀態不符
        
    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
     throw("sync: inconsistent mutex state")
    }
        
        // 加鎖,減去一個等待者
    delta := int32(mutexLocked - 1<<mutexWaiterShift)
        
        // 如果當前的 goroutine 非饑餓,或者等待者隻有一個(也就是隻有當前goroutine,等待隊列空瞭),可以取消饑餓狀態,進入正常狀態
    if !starving || old>>mutexWaiterShift == 1 {
     delta -= mutexStarving
    }
        
        // 修改狀態:
        // 加鎖,減去一個等待者:m.state + mutexLocked - 1<<mutexWaiterShift : 
        // 滿足非饑餓條件,加鎖,減去一個等待者,取消饑餓狀態:
        // m.state + mutexLocked - 1<<mutexWaiterShift - mutexStarving: 
    atomic.AddInt32(&m.state, delta)
        
        // 饑餓模式下被喚醒,相當於獲得鎖瞭,可以結束
    break
   }
      
      // 之前是處於鎖被占用且非饑餓狀態,被喚醒,去繼續搶鎖
   awoke = true
      
      // 新喚醒的,自旋數量置0
   iter = 0
  } else {
      
      // 修改新狀態失敗,狀態有更新,需要重試
   old = m.state
  }
 }
}

加鎖的這部分代碼,新來的 goroutine 或者從隊列裡面喚醒的 goroutine 都會進入如下邏輯,相當於給新人機會

1.樂觀態度的自旋:判斷是否可以自旋,如果可以自旋,就自旋等待;如果有可能,把喚醒標記位置為1,標記外面有喚醒的 goroutine,釋放鎖的時候就不會去隊列裡面喚醒瞭,畢竟已經有人在等待瞭;

2.修改系統狀態:跳出自旋後,每個 goroutine 根據當前系統狀態修改系統狀態:

  • 非饑餓狀態,想要加鎖(如果本來就是加鎖狀態,將加鎖位 設置為 1 相當於不變)
  • 鎖被占用 或者 處於饑餓模式下,新增一個等待者
  • 當前 goroutine 已經進入饑餓瞭,且鎖還沒有釋放,需要把 Mutex 的狀態改為饑餓狀態
  • 如果當前 goroutine 是被喚醒的,清除系統喚醒標記

3.利用 CAS 修改系統狀態,同一時刻隻有一個 goroutine 能夠設置成功,但是設置成功並不代表獲取到鎖瞭:

  • 之前是非上鎖的正常狀態,設置成功說明本次搶鎖成功,可以返回去操作臨界區瞭;
  • 之前是上鎖狀態或者饑餓狀態,本次隻是新增瞭一個等待者,然後根據是否是新來的,去隊列隊尾或者隊首排隊,等待叫號;

4.從隊列中被叫號喚醒,不一定是獲取到鎖瞭:

  • 當前是饑餓狀態,那麼一定是獲取到鎖瞭,因為饑餓狀態隻把鎖給隊列的第一個 goroutine
  • 非饑餓狀態,將自己狀態置為喚醒,再去搶鎖,重復上述過程

問:系統會不會同時存在 喚醒標志和饑餓標志都為1 的情況呢?

答:不會。隻有等待時間大於 1ms 的才會去設置饑餓標記,也就是隻有從隊列喚醒的才會去設置,那麼從隊列中喚醒的 goroutine ,自身的 awoke=true,每當去設置饑餓標記的時候會把喚醒標記清除。

Unlock()

Unlock()解鎖方法也分為兩部分,第一部分是 fast path,可以理解為快捷通道,直接把鎖狀態位清除,如果此時系統狀態恢復到初始狀態,說明沒有 goroutine 在搶鎖等鎖,直接返回,否則進入 slow path

slow path 會根據是否為饑餓狀態,做出不一樣的反應:

正常狀態:喚醒一個 goroutine 去搶鎖,等待者數量減一,並將喚醒狀態置為 1

饑餓狀態:直接喚醒等待隊列隊首的 goroutine,鎖的所有權直接移交(修改等待者數量、是否取消饑餓標記,由喚醒的 goroutine 去處理)。

func (m *Mutex) Unlock() {

 // Fast path: 把鎖標記清除
 new := atomic.AddInt32(&m.state, -mutexLocked)
 if new != 0 {
  // 清除完鎖標記,發現還有其他狀態,比如等待隊列不為空,需要喚醒其他 goroutine
  m.unlockSlow(new)
 }
}
func (m *Mutex) unlockSlow(new int32) {
  
  /* 狀態正確性校驗:
    1. 如果解鎖一個上鎖狀態的鎖,最後一位則為1,fast path 中 new 已經減去瞭1, 此時 new 最後一位應當為0
   2. 如果解鎖一個未上鎖狀態的鎖,最後一位則為0,fast path 中 new 已經減去瞭1, 此時 new 最後一位應當為1
    如果 (new+mutexLocked)&mutexLocked == 0,說明 new 當前最後一位是1,那麼就是解鎖瞭一個沒有上鎖的鎖,狀態有誤
 */
 if (new+mutexLocked)&mutexLocked == 0 {
  throw("sync: unlock of unlocked mutex")
 }
  
  // 正常模式,非饑餓,可能需要喚醒隊列中的 goroutine,饑餓狀態直接移交鎖
 if new&mutexStarving == 0 {
  old := new
  for {

      /* 系統運轉正常,鎖可以正確交接,可以直接返回瞭:
        1. 沒有等待者瞭 (沒有等鎖的瞭,去喚醒誰?)
        2. 有喚醒狀態的 goroutine  (自旋狀態的 goroutine,將喚醒狀態置為1)
       3. 有 goroutine 已經獲取瞭鎖 (Unlock方法已經將鎖標記置為瞭0,可能自旋的此時已經搶到瞭鎖)
      */
   if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
    return
   }
      
      // 沒有喚醒狀態的 goroutine,喚醒一個去搶鎖
   // 減去一個等待者,並且將 喚醒標記 置為 1
   new = (old - 1<<mutexWaiterShift) | mutexWoken
   if atomic.CompareAndSwapInt32(&m.state, old, new) {
        // 第二個參數為false, 喚醒隊首的 goroutine 去搶鎖(不一定能搶到)
    runtime_Semrelease(&m.sema, false, 1)
    return
   }
      
      // 上面 CAS 失敗,可能由於新增瞭一個等待者,for 循環重試
   old = m.state
  }
 } else {
  
    /*
     1. 第二個參數為 true,直接將鎖的所有權,交給等待隊列的第一個等待者
   2. 註意,此時沒有設置 mutexLocked =1 ,被喚醒的 goroutine 會設置
   3. 雖然沒有設置 mutexLocked ,但是饑餓模式下, Mutex 始終被認為是鎖住的,都會直接排隊等待移交鎖
    */
  runtime_Semrelease(&m.sema, true, 1)
 }
}

到此這篇關於Golang Mutex互斥鎖源碼分析的文章就介紹到這瞭,更多相關Golang Mutex互斥鎖內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: