Java並發編程之淺談ReentrantLock

一、首先看圖

在這裡插入圖片描述

二、lock()跟蹤源碼

在這裡插入圖片描述

這裡對公平鎖和非公平鎖做瞭不同實現,由構造方法參數決定是否公平。

public ReentrantLock(boolean fair) {
     sync = fair ? new FairSync() : new NonfairSync();
}

2.1 非公平鎖實現

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;
    
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

代碼量很少。首先compareAndSetState(0, 1)通過CAS(期望值0,新值1,內存值stateOffset)

  • 如果修改成功,即搶占到鎖,setExclusiveOwnerThread(Thread.currentThread());將AQS中的變量exclusiveOwnerThread設置為當前搶占到鎖的線程,也就是圖中的ThreadA。
  • 若沒有搶占成功,證明此時鎖被占用,執行方法acquire(1);
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

這裡主要看兩個方法tryAcquire(arg)acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。當滿足if條件後,會給當前線程標記一個interrupt狀態。

2.1.1 tryAcquire(arg)

這個方法又有多個實現。這裡看NonfairSync非公平鎖。

在這裡插入圖片描述

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
     final Thread current = Thread.currentThread();
     int c = getState();
     if (c == 0) {
         if (compareAndSetState(0, acquires)) {
             setExclusiveOwnerThread(current);
             return true;
         }
     }
     else if (current == getExclusiveOwnerThread()) {
         int nextc = c + acquires;
         if (nextc < 0) // overflow
             throw new Error("Maximum lock count exceeded");
         setState(nextc);
         return true;
     }
     return false;
 }

在這個方法中,還不死心,首先會判斷下AQS中的state是否為0,為0也就是說距離上次嘗試獲取鎖到現在準備進入隊列(雙向鏈表)中這段時間內,鎖已經被釋放,可以重新CAS嘗試獲取鎖。

如果當前鎖還是被持有狀態,就是state!=0,就會判斷,當前線程是不是當前持有鎖的線程exclusiveOwnerThread,如果是,則state+1,從這裡可以看出state表示的是重入次數。

全部不滿足,返回false。

2.1.2 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

addWaiter

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

tryAcquire(arg)返回false,證明當前線程還是沒有獲取到鎖。那麼就要進入隊列等待瞭,首先addWaiter方法,將當前線程封裝成一個Node,如果pred不為空,則將當前節點做鏈表的尾部插入,同時為瞭防止在此期間前序節點已經不在隊列中瞭,也會運用CAS操作來執行(期望值pred,新值node,內存值tailOffset)。

如果前序節點為空,或者在CAS時發現前序節點已經不存在瞭,則重新構建鏈表,將當前節點封裝的Node,加入到鏈表當中。

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

加入完成後,返回當前node節點,進入acquireQueued方法。

acquireQueued

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
        	//獲取到當前node節點的上一個節點
            final Node p = node.predecessor();
            //如果當前的上個節點就是頭節點,會再次嘗試獲取鎖
            if (p == head && tryAcquire(arg)) {
            	//獲取成功,將當前節點置空,並成為新的頭節點
                setHead(node);
				//這個p已經沒用瞭,防止內存泄漏,直接指向null,下次GC時回收
                p.next = null; // help GC
                //不需要取消
                failed = false;
                //return false,不需要中斷當前線程
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

這裡是一個自旋操作,首先拿到當前線程封裝節點的上一個節點,如果滿足第一個if條件if (p == head && tryAcquire(arg)),證明上個節點為頭節點,則此時當前線程也會再次嘗試獲取鎖,獲取鎖成功,證明此時沒有別的線程在隊列中瞭,則將當前node清空並設置為頭節點,返回不需要中斷當前線程。

在第二個if條件中if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())。走到這裡證明當前線程不是第一個線程節點,或者沒有搶占到鎖,shouldParkAfterFailedAcquire這個方法見名知意,在搶占失敗後是否需要park阻塞,裡面主要是用於清理雙向鏈表中被取消的節點線程和未被阻塞的節點線程。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//獲取前置節點的等待狀態
    if (ws == Node.SIGNAL)
		//前置節點的等待狀態為-1,表示前置節點在隊列中阻塞,那麼當前節點也需要被阻塞在隊列中
        return true;
    if (ws > 0) {
		//前置節點等待狀態大於0,此前置節點已經被取消,循環遍歷清除所有已被取消的節點。
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
		//前置節點等待狀態小於等於0,且不等於-1,也就是沒有被阻塞也沒有被取消
		//則將前置節點設置為阻塞狀態。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
  • 前置節點的等待狀態為-1,表示前置節點在隊列中阻塞,那麼當前節點也需要被阻塞在隊列中
  • 前置節點等待狀態大於0,此前置節點已經被取消,循環遍歷清除所有已被取消的節點。
  • 前置節點等待狀態小於等於0,且不等於-1,也就是沒有被阻塞也沒有被取消。則將前置節點設置為阻塞狀態。

到這裡,基於非公平鎖的實現結束。

2.2 公平鎖實現

公平鎖和樂觀鎖的區別就在於,非公平鎖acquire(1)前會先嘗試獲取鎖,公平鎖直接acquire(1)

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }
}

2.2.1 tryAcquire(arg)

在tryAcquire中也和非公平鎖有一定的區別。在當前鎖沒有被占有時。非公平鎖不用考慮目前AQS隊列中的排隊情況,直接通過CAS嘗試獲取鎖。公平鎖會看目前隊列的狀態,再來決定是嘗試占有鎖還是在隊列中等待。

protected final boolean tryAcquire(int acquires) {
   final Thread current = Thread.currentThread();
   int c = getState();
   if (c == 0) {
       if (!hasQueuedPredecessors() &&
           compareAndSetState(0, acquires)) {
           setExclusiveOwnerThread(current);
           return true;
       }
   }
   else if (current == getExclusiveOwnerThread()) {
       int nextc = c + acquires;
       if (nextc < 0)
           throw new Error("Maximum lock count exceeded");
       setState(nextc);
       return true;
   }
   return false;
}

到此這篇關於Java並發編程之淺談ReentrantLock的文章就介紹到這瞭,更多相關Java ReentrantLock內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: