ReentrantLock從源碼解析Java多線程同步學習
前言
如今多線程編程已成為瞭現代軟件開發中的重要部分,而並發編程中的線程同步問題更是一道難以逾越的坎。在Java語言中,synchronized是最基本的同步機制,但它也存在著許多問題,比如可重入性不足、死鎖等等。為瞭解決這些問題,Java提供瞭更加高級的同步機制——ReentrantLock。
管程
管程(Monitor)是一種用於實現多線程同步的抽象數據類型,它可以用來協調不同線程之間的互斥和同步訪問共享資源。通俗地說,管程就像一個門衛,控制著進入某個共享資源區域的線程數量和時間,以避免多個線程同時訪問導致的數據競爭和混亂。
管程模型
- Mesa管程模型:由美國計算機科學傢Dijkstra提出,是最流行的管程模型之一。在Mesa管程模型中,每個管程也有一個條件變量和等待隊列,但與Hoare管程不同的是,當一個線程請求進入管程時,如果條件不滿足,該線程並不會立即被阻塞,而是繼續執行後續操作,直到該線程主動放棄鎖資源或者其他線程喚醒它。
- Hoare管程模型:由英國計算機科學傢C.A.R. Hoare提出,是最早的管程模型之一。在Hoare管程模型中,每個管程都有一個條件變量和一個等待隊列,當一個線程請求進入管程時,如果條件不滿足,該線程就會被阻塞並加入等待隊列,直到條件滿足後才被喚醒。
- Brinch Hansen管程模型:由丹麥計算機科學傢Per Brinch Hansen提出,是一種改進的管程模型。在Brinch Hansen管程模型中,每個管程也有一個條件變量和等待隊列,但與其他管程模型不同的是,它允許多個線程同時在管程中等待,並且不需要像Hoare管程那樣每次隻喚醒一個等待線程。
在Java中,采用的是基於Mesa管程模型實現的管程機制。具體地,Java中的synchronized
關鍵字就是基於Mesa管程模型實現的,包括Java中的AbstractQueuedSynchronizer(AQS)可以被看作是一種基於管程模型實現的同步框架。
MESA模型
主要特點
- 互斥訪問
MESA模型采用瞭互斥訪問的機制,即同一時刻隻能有一個線程進入管程執行代碼。
- 條件變量
MESA模型還引入瞭條件變量的概念,用於實現線程間的等待和喚醒操作。條件變量提供瞭一種機制,使得線程可以在等待某個條件成立時掛起,並在條件成立時被喚醒。
- 等待隊列
MESA模型使用等待隊列來維護處於等待狀態的線程,這些線程都在等待條件變量成立。等待隊列由一個或多個條件變量組成,每個條件變量都有自己的等待隊列。
- 原子操作
MESA模型要求管程中的所有操作都是原子操作,即一旦進入管程,就不能被中斷,直到操作執行完畢。
AQS
在講ReentrantLock之前先說一下AQS,AQS(AbstractQueuedSynchronizer)是Java中的一個同步器,它是許多同步類(如ReentrantLock、Semaphore、CountDownLatch等)的基礎。AQS提供瞭一種實現同步操作的框架,其中包括獨占模式和共享模式,以及一個等待隊列來管理線程的等待和喚醒。AQS也借鑒瞭Mesa模型的思想。
共享變量
AQS內部維護瞭屬性volatile int state表示資源的可用狀態
state三種訪問方式:
- getState()
- setState()
- compareAndSetState()
資源訪問方式
Exclusive-獨占,隻有一個線程能執行,如ReentrantLock
Share-共享,多個線程可以同時執行,如Semaphore/CountDownLatch
主要方法
- isHeldExclusively():該線程是否正在獨占資源。隻有用到condition才需要去實現它。
- tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
- tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
- tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
- tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點返回true,否則返回false
隊列
- 同步等待隊列: 主要用於維護獲取鎖失敗時入隊的線程。
- 條件等待隊列: 調用await()的時候會釋放鎖,然後線程會加入到條件隊列,調用signal()喚醒的時候會把條件隊列中的線程節點移動到同步隊列中,等待再次獲得鎖。
node節點等待狀態
- 值為0,初始化狀態,表示當前節點在sync隊列中,等待著獲取鎖。
- CANCELLED,值為1,表示當前的線程被取消;
- SIGNAL,值為-1,表示當前節點的後繼節點包含的線程需要運行,也就是unpark;
- CONDITION,值為-2,表示當前節點在等待condition,也就是在condition隊列中;
- PROPAGATE,值為-3,表示當前場景下後續的acquireShared能夠得以執行;
ReentrantLock源碼分析
在ReentrantLock中有一個內部類Sync會繼承 AQS然後將同步器所有調用都映射到Sync對應的方法。
實例化ReentrantLock
/** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); } /** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
ReentrantLock還提供瞭一個傳佈爾值的實例化方式,這個傳true用來創建一個公平鎖的,默認是創建非公平鎖。非公平鎖的話 sync是用NonfairSync來進行實例化,公平鎖sync是用FairSync來進行實例化。
加鎖
現在假設有AB兩個線程來競爭鎖
A線程加鎖成功
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { //CAS修改state狀態 if (compareAndSetState(0, 1)) //修改成功設置exclusiveOwnerThread setExclusiveOwnerThread(Thread.currentThread()); else //嘗試獲取資源 acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
假定A線程先CAS修改成功,他會設置exclusiveOwnerThread為A線程
B線程嘗試加鎖
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
我們先看tryAcquire()方法,這裡體現出瞭他的可重入性。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //獲取當前資源標識 int c = getState(); if (c == 0) { //如果資源沒被占有CAS嘗試加鎖 if (compareAndSetState(0, acquires)) { //修改成功設置exclusiveOwnerThread setExclusiveOwnerThread(current); return true; } } //資源被占有要判斷占有資源的線程是不是當前線程,加鎖成功設置的exclusiveOwnerThread值在這裡就派上瞭用處 else if (current == getExclusiveOwnerThread()) { //這下面就是將重入次數設置到資源標識裡 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
根據上面源碼我們可以看出B線程嘗試加鎖是失敗的,接下來看嘗試加鎖失敗後的方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg),該方法實現分為兩個部分:
addWaiter(Node.EXCLUSIVE)
:該方法會將當前線程加入到等待隊列(即Sync
中的queue
)的尾部,並返回該節點。這個節點的模式是獨占模式(Node.EXCLUSIVE
),表示當前線程想要獲取獨占鎖。acquireQueued(Node node, int arg)
:該方法是一個循環方法,用於等待和獲取鎖。
我們先解析addWaiter
private Node addWaiter(Node mode) { //構建一個當前線程的node節點 這裡prev 和 next 都為null Node node = new Node(Thread.currentThread(), mode); // 指向雙向鏈表的尾節點的引用 Node pred = tail; //B線程進來目前還未構建任何隊列這裡肯定是空的 if (pred != null) { //如果已經構建過隊列會把當前線程的node節點的上一個node節點指向tail尾節點 node.prev = pred; //CAS操作把當前線程的node節點設置為新的tail尾節點 if (compareAndSetTail(pred, node)) { //把舊的tail尾節點的下一個node節點指向當前線程的node節點 pred.next = node; return node; } } //尾節點為空執行 enq(node); return node; }
private Node enq(final Node node) { //死循環當tail 尾節點不為空才會跳出 for (;;) { Node t = tail; if (t == null) { // Must initialize //用CAS構建出一個head空的node節點 if (compareAndSetHead(new Node())) //將當前空的node節點交給尾節點(下一次循環就會走else分支) tail = head; } else { //把我們addWaiter中創建的node節點的prev指向瞭當前線程node節點 node.prev = t; //將tail尾節點更改為當前線程的node節點 if (compareAndSetTail(t, node)) { //將t的下一個節點指向當前線程創建的node節點 t.next = node; return t; } } } }
執行完這裡初步的一個等待隊列就構建好瞭
解析完addWaiter我們再來解析acquireQueued,addWaiter執行完後的結果會返回一個雙向列表的node節點
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { //中斷標志位 boolean interrupted = false; for (;;) { //獲取當前線程node節點的上一個節點 final Node p = node.predecessor(); //如果上一個節點就是head節點說明當前線程其實是處在隊列第一位然後就會再次嘗試加鎖 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //這裡是重點 這個方法來判斷當前線程是否應該進入等待狀態 if (shouldParkAfterFailedAcquire(p, node) && //調用LockSupport.park(this)阻塞瞭當前線程等待被喚醒 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //獲取上一個節點的等待狀態 int ws = pred.waitStatus; if (ws == Node.SIGNAL)//表示當前節點的後繼節點包含的線程需要運行 /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) {//當前線程被取消 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//設置等待狀態為-1 } return false; }
運行到parkAndCheckInterrupt()B線程就會被阻塞瞭,後續的邏輯我們在解鎖操作unlock之後再繼續說
釋放鎖
public void unlock() { sync.release(1); }
public final boolean release(int arg) { //嘗試釋放鎖 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //head節點不為空並且等待狀態不是0就去進行unpark操作 unparkSuccessor(h); return true; } return false; }
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ //head節點等待狀態 int ws = node.waitStatus; // if (ws < 0) //將頭節點的等待狀態修改為0 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //獲取頭節點的下一個節點(也就是B節點) Node s = node.next; if (s == null || s.waitStatus > 0) {//節點為空或者線程處於取消狀態 s = null; //從尾節點往上找符合條件的節點 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //對該線程進行unpark喚醒(B節點) LockSupport.unpark(s.thread); }
喚醒之後我們的B線程就能繼續往下走瞭,我們繼續看剛剛的acquireQueued()方法
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //這裡嘗試獲取鎖由於A線程釋放瞭鎖這裡是肯定獲取成功的 if (p == head && tryAcquire(arg)) { //把head設置為當前節點(也就是往前移一位,並且把上一個節點指向指為null) setHead(node); //把剛剛的上一個節點也指向為null (這裡他就沒引用瞭會被GC回收掉) p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && //剛剛在這裡阻塞現在被喚醒 parkAndCheckInterrupt()) //設置標志中斷位為true 然後開始下一次循環 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
總結
- 鎖的實現方式:ReentrantLock內部通過維護一個state變量來表示鎖的狀態,其中高16位表示持有鎖的線程ID,低16位表示重入次數。使用CAS操作來獲取鎖,如果當前鎖未被持有,則將state的高16位設置為當前線程ID,並將低16位設置為1,表示重入次數為1;如果當前鎖已經被持有,則判斷持有鎖的線程是否為當前線程,如果是,則將state的低16位加1表示重入,如果不是,則進入等待隊列。
- 等待隊列:ReentrantLock中的等待隊列采用瞭CLH隊列的實現方式,每個等待線程會被封裝成一個Node節點,節點中維護瞭前繼節點、後繼節點和等待狀態等信息。當一個線程需要等待鎖時,會將自己封裝成一個Node節點插入到等待隊列的尾部,並在自旋等待時自動阻塞。
- 公平鎖與非公平鎖:ReentrantLock可以通過構造函數中的fair參數來指定鎖的公平性,當fair為true時表示該鎖是公平鎖,即等待隊列中的線程會按照先進先出的順序獲取鎖;當fair為false時表示該鎖是非公平鎖,即等待隊列中的線程會按照隨機順序獲取鎖。
- 鎖的釋放:ReentrantLock中的鎖釋放采用瞭state變量的遞減來實現,當一個線程釋放鎖時,會將state的低16位減1,如果減1後低16位變為0,則表示當前線程已經完全釋放瞭鎖,此時會將高16位清零,表示鎖變為瞭可獲取狀態,等待隊列中的線程可以繼續競爭鎖。
以上就是ReentrantLock從源碼解析Java多線程同步學習的詳細內容,更多關於Java多線程ReentrantLock的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- Java並發編程之淺談ReentrantLock
- Java中的AQS同步隊列問題詳解
- Java多線程之深入理解ReentrantLock
- ReentrantLock獲取鎖釋放鎖的流程示例分析
- 淺談Java並發之同步器設計