Java 多線程並發 ReentrantReadWriteLock詳情

前言

ReentrantReadWriteLock ,可重入讀寫鎖。實際使用場景中,我們需要處理的操作本質上是讀與寫。而對這兩種操作進行同步操作的難度也是不一樣的。

一般情況下,讀操作不會造成同步安全問題,因為隻是讀取數據而不去修改的情況下相當於數據是不可變的,不可變本質上是絕對的線程安全,無需進行任何確保線程安全的操作。

而如果在一系列操作中包含瞭寫操作,那麼就需要考慮線程安全瞭。在 JMM 中,寫操作本質上是將主內存中的數據復制到線程的工作內存,然後進行更新,最後同步到主內存。如果此時有其他線程執行讀操作,可能會讀取到更新前到舊數據,就會造成數據不一致問題。

JMM 中定義的對寫操作的執行流程中,要先去主內存讀取數據,也就是說,一個寫操作前一定包含瞭一個讀操作,再算上其他的讀操作場景,可以得出結論,在實際的使用場景中,讀操作一定是多於寫操作的。

按照上面的說法,好像讀操作我們不需要進行線程安全處理,因為它本身就是線程安全的,那麼為什麼會有讀寫鎖,尤其是讀鎖這種東西存在呢?

試想一個場景,多個線程讀取一個共享資源,其中某個或某些線程在不確定的時間點會進行寫操作,那麼所有線程的讀取到的數據是安全的嗎?答案是不安全,因為寫操作寫入主內存不及時的話,後續其他線程的讀操作讀取到的數據就是主內存更新前的舊數據,就會導致臟數據問題。也就是說,寫操作需要保證線程安全,並且是獨占鎖資源的,不能再寫操作執行時,存在其他線程去執行讀操作。那麼就需要讀鎖與寫鎖配合處理同步邏輯。

常規的保證線程安全的方法就是普通的互斥鎖,互斥鎖會被一個線程持有,對其他線程造成阻塞。如果對一段有讀操作也有寫操作的代碼使用互斥鎖的話,對於爭用這個共享數據的所有線程來說,隻有一個擁有鎖的線程可以正常運行,其他線程的邏輯即使是都是讀操作。其他線程會阻塞等待鎖資源。

讀寫鎖的優勢就是,在上面這種情況下,確保寫操作的互斥性,並在沒有寫操作的場景下,讀操作可以讓多個線程同時獲取鎖資源。

ReadWriteLock

ReentrantReadWriteLock 是基於 AbstractQueuedSynchronizer 並實現瞭 ReadWriteLock 接口實現的一個鎖機制。ReadWriteLock 定義瞭讀寫鎖的特性:

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     */
    Lock readLock();
    /**
     * Returns the lock used for writing.
     */
    Lock writeLock();
}

ReadWriteLock 中定義瞭獲取兩種鎖的方式,一個用於獲取讀鎖、一個用於獲取寫鎖。隻要沒有持有寫鎖的線程在執行,讀鎖可以同時被多個嘗試讀操作的線程持有,而寫鎖是排他鎖。

與互斥鎖相比,讀寫鎖在訪問共享數據時允許更高級的並發特性,即每次隻有一個線程可以執行寫操作,並且在沒有寫操作時其他線程可以並發讀取共享數據。從讀操作的效率來看,如果是互斥鎖每次隻能一個線程執行讀寫操作,而讀寫鎖可以多個線程讀,寫操作時才互斥,所以讀寫鎖的執行效率更高。

ReentrantReadWriteLock 源碼分析

前面的內容介紹瞭讀寫鎖的含義和優勢,接下來分析 Java 並發包中對它的實現 ReentrantReadWriteLock 。

類關系

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    abstract static class Sync extends AbstractQueuedSynchronizer {
      	static final class HoldCounter
        static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> 
    }
    static final class NonfairSync extends Sync
    static final class FairSync extends Sync
    public static class ReadLock implements Lock, java.io.Serializable
    public static class WriteLock implements Lock, java.io.Serializable
}

ReentrantReadWriteLock 實現瞭讀寫鎖接口 ReadWriteLock 和序列化接口 Serializable 。

它有一個抽象靜態內部類 Sync ,Sync 是 AQS 的抽象子類,Sync 有兩個靜態實現 NonfairSync 和 FairSync ,這部分是鎖邏輯的核心內容;Sync 還有兩個內部數據結構類 HoldCounter 和 ThreadLocalHoldCounter 。

ReadLock 和 WriteLock 分別對應瞭讀鎖和寫鎖,它們都實現瞭 Lock 接口和序列號接口 Serializable 。它們是 ReentrantReadWriteLock 中對不同操作的鎖類型的實現,使用瞭裝飾模式,本質上還是通過 Sync 的能力實現的。

Sync

核心邏輯是來自於 Sync 及其兩個實現,Sync 繼承自 AbstractQueuedSynchronizer ,自身有兩個內部類 HoldCounter 和 ThreadLocalHoldCounter 。

HoldCounter

static final class HoldCounter {
    int count;          // initially 0
    // Use id, not reference, to avoid garbage retention
    final long tid = LockSupport.getThreadId(Thread.currentThread());
}

HoldCounter 是一個計數器,count 用來記錄當前線程擁有讀鎖的數量,即讀鎖的重入次數;tid 用來記錄當前線程唯一 ID 。

Sync 有一個 cachedHoldCounter 屬性,用來做緩存效果,避免每次都通過 ThreadLocal 去讀取數據。

ThreadLocalHoldCounter

static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

ThreadLocalHoldCounter 重寫瞭 ThreadLocal 的 initialValue() ,在 ThreadLocal 沒有進行過 set 數據的情況下,默認讀取到的值都來自於這個方法,也就是配合 ThreadLocal 使用,默認值返回一個新的 HoldCounter 實例。

在 Sync 中,有一個屬性 readHolds ,它的類型是 ThreadLocalHoldCounter ,用來做當前線程讀鎖重入計數器的 ThreadLocal 包裝,便於線程讀取自己的讀鎖重入計數器。

屬性

Sync 中定義的屬性包括:

abstract static class Sync extends AbstractQueuedSynchronizer {
		// 高16位為讀鎖,低16位為寫鎖
    static final int SHARED_SHIFT   = 16;
    // 讀鎖單位
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);  // 1 * 2^16 = 65536
    // 讀鎖最大數量
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;  // 2^16 - 1
    // 寫鎖最大數量
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;   // 2^16 - 1 獨占標記
    // 當前線程讀鎖重入次數。當持有讀鎖的線程數量下降到0時刪除。
    private transient ThreadLocalHoldCounter readHolds;
    // 緩存對象,避免每次都去從 ThreadLocal 查找。
    private transient HoldCounter cachedHoldCounter;
		// 第一個獲取讀鎖線程
    private transient Thread firstReader;
    // 第一個讀鎖線程重入讀鎖的計數
    private transient int firstReaderHoldCount;
    // ...
}

構造方法

Sync() {
		readHolds = new ThreadLocalHoldCounter(); 
    setState(getState()); // ensures visibility of readHolds
}

Sync 初始化方法創建瞭 ThreadLocalHoldCounter 並重新設置瞭 State ,為什麼要重新設置呢?因為這裡要讀取當前線程最新的同步狀態並重新設置,獲取實時的同步狀態。

核心方法

Sync 的關鍵方法包括:

abstract static class Sync extends AbstractQueuedSynchronizer {
  	// 並發計數
    static int sharedCount(int c)
    static int exclusiveCount(int c)
		// 阻塞檢查
    abstract boolean readerShouldBlock();
    abstract boolean writerShouldBlock();
		// 獲取和釋放寫鎖
    @ReservedStackAccess
    protected final boolean tryRelease(int releases)
    @ReservedStackAccess
    protected final boolean tryAcquire(int acquires)
		// 獲取和釋放讀鎖
    @ReservedStackAccess
    protected final boolean tryReleaseShared(int unused)
    @ReservedStackAccess
    protected final int tryAcquireShared(int unused)
    final int fullTryAcquireShared(Thread current)
		// 嘗試加讀寫鎖
    @ReservedStackAccess
    final boolean tryWriteLock()
    @ReservedStackAccess
    final boolean tryReadLock()

    // ... 
}

鎖的計數方法

首先是兩個靜態方法 sharedCount(int c) 和 exclusiveCount(int c) :

/** 表示共享持有的數量。 */
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; } // 無符號右移,高位補 0 
/** 表示獨占持有的數量。 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

參數 c 是 AQS 中的 state,根據 state 進行位運算。這兩個方法可以根據鎖自身的狀態解析出持有讀寫鎖的數量。

  • sharedCount ,表示占有讀鎖的線程數量。直接將 AQS 中的 state 右移 16 位,高位補 0,就可以得到讀鎖的線程數量,因為 state 的高十六位表示讀鎖,對應的低十六位表示寫鎖數量。
  • exclusiveCount,表示占有寫鎖的線程數量。直接將 AQS 的 state 和 (2^16 – 1) 做與運算,其等效於將 state 模上 2^16 。寫鎖數量由 state 的低十六位表示。

讀寫鎖阻塞檢查方法

第二組方法是 readerShouldBlock 和 writerShouldBlock ,用來檢查當前的讀鎖/寫鎖是否會造成當前線程阻塞。

// 獲取和釋放對公平鎖和非公平鎖使用相同的代碼,不同點在於但在隊列非空時是否/如何允許碰撞。

// 如果當前線程在嘗試獲取讀鎖時,並且在其他符合條件的線程也在嘗試獲取讀鎖,由於策略其他等待線程占用瞭讀鎖,當前線程應該阻塞,則返回true。
abstract boolean readerShouldBlock();

// 如果當前線程在嘗試獲取寫鎖時,並且在其他符合條件的線程也在嘗試獲取寫鎖,由於策略其他等待線程占用瞭寫鎖,當前線程應該阻塞,則返回true。
abstract boolean writerShouldBlock();

這兩個方法的實現在 Sync 的子類中 — 公平策略實現 FairSync 和非公平策略實現 NonfairSync。

公平策略實現 FairSync 和非公平策略實現 NonfairSync

    // 非公平策略
		static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // 正在持有寫鎖的線程永不阻塞
        }
        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive(); 
        }
    }
		// 公平策略
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
      
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }

公平鎖策略和非公平鎖策略的實現,本質上的不同是這兩個方法的實現。

NonfairSync 非公平策略

NonfairSync 中,執行寫操作的線程是否應該進入阻塞狀態的判斷,直接是 false ,這是因為非公平策略下,如果當前自身已經擁有瞭寫鎖,直接重入,以獨占的方式繼續運行(所以是不公平的)。

執行讀操作的線程是否會阻塞,是通過 apparentlyFirstQueuedIsExclusive() 判斷的,這個方法是 AQS 中的方法:

    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h = head, s = head.next;
        return h != null && s != null && !(s instanceof SharedNode) && s.waiter != null;
    }

這個方法的作用是,CLH 隊列中的頭節點和它的的 next 都存在的情況下,如果 next 節點不是 SharedNode ,且它的關聯線程不為空的情況(即下一個鎖不是共享鎖,共享鎖在讀寫鎖裡就是讀鎖)的情況,會導致當前執行讀操作的線程進入阻塞狀態,確保寫操作的互斥特性。

FairSync 公平策略

FairSync 中,讀寫執行線程是否應該進入阻塞狀態都是根據 hasQueuedPredecessors() 方法判斷的:

    public final boolean hasQueuedPredecessors() {
        Thread first = null; Node h = head, s = h.next;
        if (h != null && (s == null || (first = s.waiter) == null || s.prev == null))
            first = getFirstQueuedThread(); // retry via getFirstQueuedThread
        return first != null && first != Thread.currentThread();
    }

    public final Thread getFirstQueuedThread() {
        Thread first = null, w; Node h, s;
        if ((h = head) != null && ((s = h.next) == null || (first = s.waiter) == null || s.prev == null)) {
            // traverse from tail on stale reads
            for (Node p = tail, q; p != null && (q = p.prev) != null; p = q)
                if ((w = p.waiter) != null)
                    first = w;
        }
        return first;
    }

hasQueuedPredecessors() 對 head 節點和它的 next 節點進行空檢查,並檢查下一個節點的執行線程和 prev 指針是否有值,滿足條件的情況下通過 getFirstQueuedThread() 方法獲取到隊列中第一個節點關聯的線程。最終返回的結過是檢查這個線程不等於當前線程。

如果存在等待隊列第一個等待執行的線程,那麼就優先執行這個線程。也就是說,不管當前線程是擁有讀鎖還是寫鎖,都優先執行等待隊列第一個未執行節點,這裡就能體現出公平,即優先執行等待隊列中頭一個等待的節點所關聯的線程。

Release 和 Acquire 方法組

這一組方法是整個 Sync 的核心邏輯,也是加解鎖核心邏輯。

tryRelease

@ReservedStackAccess
protected final boolean tryRelease(int releases) {
		if (!isHeldExclusively()) // 不是獨占持有鎖的情況,直接拋出異常。
				throw new IllegalMonitorStateException();
		int nextc = getState() - releases; // AQS 當前鎖狀態 - releases = 新的鎖狀態
		boolean free = exclusiveCount(nextc) == 0; // 根據新的鎖狀態獲取到獨占寫鎖的數量 == 0
		if (free) 
				setExclusiveOwnerThread(null); // 持有寫鎖的線程數為0,更新當前獨占線程引用
		setState(nextc); 	// 無論是不是解鎖瞭,都要更新鎖狀態
		return free;			// 最後返回鎖是否已經可用瞭
}

tryRelease(int releases) 用來嘗試釋放寫鎖。

它的邏輯如下圖:

tryAcquire

@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
            /*
             * 工作流程:
             * 1. 如果寫鎖計數非零或所有者是不同的線程,則失敗。
             * 2. 如果寫鎖計數超過最大數量,失敗(這隻發生在計數非 0 的情況)。
             * 3. 否則,如果這個線程是可重入的獲取方式或者隊列策略允許的話,它就有資格獲得鎖。
             *    如果是,更新狀態並設置 owner。
             */
		Thread current = Thread.currentThread(); // 當前線程
		int c = getState();											 // 當前鎖狀態
		int w = exclusiveCount(c);							 // 計算擁有寫鎖的線程數量
		if (c != 0) { // 0 是鎖可用狀態,當前狀態表面鎖狀態為被持有。
				if (w == 0 || current != getExclusiveOwnerThread()) // 對應 【1】 的情況,寫線程數量為0或者當前線程沒有占有獨占資源
						return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT) // 對應【2】的情況, 判斷是否超過最高寫線程數量
            throw new Error("Maximum lock count exceeded");
        // 重入獲取寫鎖
        setState(c + acquires);
        return true; 
		}
		if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) // 是否應該阻塞或更新狀態是否成功,失敗直接 return false;
				return false;
		setExclusiveOwnerThread(current); // 設置當前為持有鎖的線程。
		return true;
}

此函數用於獲取寫鎖,首先會獲取 state ,判斷 state 是否為0。

若為0,表示此時沒有讀鎖線程,再判斷寫線程是否應該被阻塞,而在非公平策略下總是不會被阻塞,在公平策略下會進行判斷(判斷同步隊列中是否有等待時間更長的線程,若存在,則需要被阻塞,否則,無需阻塞),之後在設置狀態state,然後返回true。若state不為0,則表示此時存在讀鎖或寫鎖線程,若寫鎖線程數量為0或者當前線程為獨占鎖線程,則返回false,表示不成功,否則,判斷寫鎖線程的重入次數是否大於瞭最大值,若是,則拋出異常,否則,設置狀態state,返回true,表示成功。

其函數流程圖如下:

tryReleaseShared:

        @ReservedStackAccess
        protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread(); // 當前線程
            if (firstReader == current) { // 當前線程是否是第一個讀線程
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1) 
                    firstReader = null;  // 釋放線程引用
                else
                    firstReaderHoldCount--;  // 當前線程重入次數自減
            } else {
                HoldCounter rh = cachedHoldCounter; // 獲取當前線程的重入讀鎖的次數
                if (rh == null || rh.tid != LockSupport.getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
          	// 死循環直到更新狀態成功
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

tryAcquireShared :

        @ReservedStackAccess
        protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) // 當獨占線程不是當前線程
                return -1;
            int r = sharedCount(c); // 共享讀鎖的線程數量
          	// 檢查讀線程不應該阻塞 and 持有讀鎖的線程數量小於 MAX_COUNT and 更新鎖狀態成功
            if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {  // 第一個嘗試獲取讀鎖的線程
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {  // 第一個線程重入
                    firstReaderHoldCount++;
                } else {	
                    HoldCounter rh = cachedHoldCounter;
                  	// 無緩存 or 當前線程不是計數器所在線程
                    if (rh == null || rh.tid != LockSupport.getThreadId(current)) 
                        cachedHoldCounter = rh = readHolds.get(); // 從 ThreadLocal 中讀取
                    else if (rh.count == 0) 
                        readHolds.set(rh);
                    rh.count++; // 當前線程獲取讀鎖次數 + 1
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

最後執行到瞭 fullTryAcquireShared :

        final int fullTryAcquireShared(Thread current) {
            /*
             * 這段代碼與 tryAcquireShared 中的部分代碼是冗餘的,但總體上更簡單,因為它不會使
             * tryAcquireShared 在重試和懶加載讀鎖計數之間的交互復雜化。
             */
            HoldCounter rh = null;
            for (;;) { // 死循環,不斷嘗試
                int c = getState();
                if (exclusiveCount(c) != 0) { // 獨占檢查是否是當前線程
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                // 否則我們持有獨占鎖;這裡的阻塞將導致死鎖。
                } else if (readerShouldBlock()) {
                    // 確保我們不是重入式地獲取讀鎖
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                      	// 不是重入的情況下,更新 HoldCounter
                        if (rh == null) { 
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != LockSupport.getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
              	// 共享讀鎖 == 最大數量,拋出異常
                if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded");
              	// 是否能夠設置成功
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) { // 第一個線程
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) { // 重入
                        firstReaderHoldCount++;
                    } else { // 其他情況
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != LockSupport.getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

這個方法的整體邏輯與 tryAcquireShared 基本相同。

ReadLock

public static class ReadLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = -5992448646407690164L;
    private final Sync sync;
    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    public void lock() {
        sync.acquireShared(1);
    }
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public boolean tryLock() {
        return sync.tryReadLock();
    }
    public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    public void unlock() {
        sync.releaseShared(1);
    }
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }
    // ... 
}

ReadLock 實現瞭 Lock 接口,代理調用到邏輯都是 Sync 中 Shared 組的核心方法。ReadLock 可以通過 readLock(): ReadLock 方法獲取到。

還有一點值得註意,newCondition() 方法直接拋出瞭異常,這是因為讀鎖是一種共享鎖,不會導致互斥,所以也就不支持使用 Condition 控制阻塞與喚醒。

WriteLock

public static class WriteLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = -4992448646407690164L;
    private final Sync sync;
    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    public void lock() {
        sync.acquire(1);
    }
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    public boolean tryLock() {
        return sync.tryWriteLock();
    }
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    public void unlock() {
        sync.release(1);
    }
    public Condition newCondition() {
        return sync.newCondition();
    }
    public String toString() {
        Thread o = sync.getOwner();
        return super.toString() + ((o == null) ? "[Unlocked]" : "[Locked by thread " + o.getName() + "]");
    }
   public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }
    public int getHoldCount() {
        return sync.getWriteHoldCount();
    }
}

寫鎖本質上也是代理 Sync 中的核心方法。

讀寫鎖降級

鎖降級指的是寫鎖降級為讀鎖,如果當前線程擁有寫鎖,將其釋放然後再獲取讀鎖,這種操作過程不是鎖降級。鎖降級是指把線程當前持有寫鎖,再去獲取讀鎖,隨後釋放寫鎖,這個流程稱為鎖降級。

public void processData() {
    readLock.lock();
    if (!update) {
        // 必須先釋放讀鎖
        readLock.unlock();
        // 鎖降級從寫鎖獲取到開始
        writeLock.lock();
        try {
            if (!update) {
                // 準備數據的流程(略)
                update = true;
            }
            readLock.lock();
        } finally {
            writeLock.unlock();
        }
        // 鎖降級完成,寫鎖降級為讀鎖
    }
    try {
        // 使用數據的流程(略)
    } finally {
        readLock.unlock();
    }
}

鎖降級可以保證數據的可見性,如果再持有寫鎖的情況下,不先去獲取讀鎖,直接釋放寫鎖,再嘗試獲取讀鎖,這一系列操作中會有短暫的無鎖狀態,此時如果有其他線程獲取瞭寫鎖並修改數據,那麼當前線程就無法感知到數據更新,如果當前線程先獲取瞭讀鎖,那麼其他線程就會阻塞,直到當前線程釋放讀鎖後才能獲取寫鎖進行更新。

讀寫鎖 ReentrantReadWriteLock 不支持鎖升級,目的是保證數據的可見性,如果讀鎖已被多個線程獲取,其中任意線程成功獲取瞭寫鎖,並更新瞭數據,那麼這個更新對其他線程是不可見的,容易造成數據不一致問題。

總結

  • ReentrantReadWriteLock 底層加解鎖原理是 AQS
  • ReentrantReadWriteLock 分為 ReadLock 和 WriteLock 兩種鎖,ReadLock 是共享鎖,WriteLock 是互斥鎖。
  • ReentrantReadWriteLock 的寫鎖可重入是根據 AQS 中的 state 計數的;讀鎖的可重入是 Sync 中的 HoldCounter 來記錄的。
  • 公平策略和非公平策略都需要對讀鎖和寫鎖分別實現一個判斷邏輯。
  • 核心實現在 Sync 方法中。

到此這篇關於Java 多線程並發 ReentrantReadWriteLock詳情的文章就介紹到這瞭,更多相關Java ReentrantReadWriteLock內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: