1. 程式人生 > 程式設計 >JUC:讀寫鎖ReentrantReadWriteLock

JUC:讀寫鎖ReentrantReadWriteLock

前言

之前提到的ReentrantLock是排他鎖,在同一時刻只允許一個執行緒進行訪問,而讀寫鎖在同一時刻可以允許多個讀執行緒訪問,但是在寫執行緒訪問時,所有的讀執行緒和其他寫執行緒均被阻塞。讀寫鎖維護了一對鎖,一個讀鎖和一個寫鎖,通過分離讀鎖和寫鎖,使得併發性相比一般的排他鎖有了很大提升。

特性

公平性選擇

執行非公平(預設)和公平鎖的獲取方式,吞吐量非公平優先於公平。

重進入

支援重進入:讀執行緒在獲取了讀鎖之後,能夠再次獲取讀鎖。而寫執行緒在獲取了寫鎖之後能夠再次獲取寫鎖,同時也可以獲取讀鎖。

鎖降級

遵循獲取寫鎖、獲取讀鎖再釋放寫鎖的次序,寫鎖能夠降級成為讀鎖。

ReentrantReadWriteLock類圖

ReentrantReadWriteLock

ReadWriteLock介面

可以看到,ReentrantReadWriteLock實現了ReadWriteLock介面。

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}
複製程式碼

該介面只定義了兩個方法,返回讀鎖和寫鎖。

Sync和lock的關係

可以看到,Sync同步類實現了AQS抽象類。也就是說ReentrantReadWriteLock也是基於AQS來實現的。

和ReentrantLock類似,Sync也有兩個子類分為FairSync(公平鎖)和NonfairSync(非公平鎖)。

ReadLock以及WriteLock實現了Lock介面,同時保持了一個Sync的引用。

Sync的內部類

HoldCounter主要配合讀鎖使用。

        static final class HoldCounter {
            int count = 0;
            final long tid = getThreadId(Thread.currentThread());
        }
複製程式碼

HoldCounter主要有兩個屬性,count和tid,其中count表示某個讀執行緒重入的次數,tid表示該執行緒的tid欄位的值,該欄位可以用來唯一標識一個執行緒。

ThreadLocalHoldCounter重寫了ThreadLocal的initialValue方法,ThreadLocal類可以將執行緒與物件相關聯。在沒有進行set的情況下,get到的均是initialValue方法裡面生成的那個HolderCounter物件。

static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}
複製程式碼

Sync

我們先從最基礎也是最重要的Sync開始。

屬性

    // 版本序列號
    private static final long serialVersionUID = 6317671515068378041L;
    // 高16位為讀鎖,低16位為寫鎖
    static final int SHARED_SHIFT   = 16;
    // 讀鎖單位
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    // 讀鎖最大數量
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    // 寫鎖最大數量
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    // 本地執行緒計數器
    private transient ThreadLocalHoldCounter readHolds;
    // 快取的計數器
    private transient HoldCounter cachedHoldCounter;
    // 第一個讀執行緒
    private transient Thread firstReader = null;
    // 第一個讀執行緒的計數
    private transient int firstReaderHoldCount;
複製程式碼

主要定義了讀寫狀態的設計。

建構函式

Sync() {
    readHolds = new ThreadLocalHoldCounter();
    setState(getState()); // ensures visibility of readHolds
}
複製程式碼

讀寫狀態的設計

同步狀態在重入鎖的實現中是表示被同一個執行緒重複獲取的次數,即一個整形變數來維護,但是之前的那個表示僅僅表示是否鎖定,而不用區分是讀鎖還是寫鎖。而讀寫鎖需要在同步狀態(一個整形變數)上維護多個讀執行緒和一個寫執行緒的狀態。

讀寫鎖對於同步狀態的實現是在一個整形變數上通過“按位切割使用”:將變數切割成兩部分,高16位表示讀,低16位表示寫。

假設當前同步狀態值為S,get和set的操作如下:

  • 獲取寫狀態:S&0x0000FFFF:將高16位全部抹去

  • 獲取讀狀態:S>>>16:無符號補0,右移16位

  • 寫狀態加1: S+1

  • 讀狀態加1: S+(1<<16)即S + 0x00010000

在程式碼層的判斷中,如果S不等於0,當寫狀態(S&0x0000FFFF),而讀狀態(S>>>16)大於0,則表示該讀寫鎖的讀鎖已被獲取。

tryAcquire 寫鎖的獲取

        protected final boolean tryAcquire(int acquires) {
            // 獲取當前執行緒
            Thread current = Thread.currentThread();
            // 獲取資源數
            int c = getState();
            // 獲取獨佔執行緒的重入數
            int w = exclusiveCount(c);
            // 如果資源已經被獲取過(此時不管是讀鎖獲取過還是寫鎖獲取過都會進入該判斷)
            if (c != 0) {
                // 如果寫鎖重入數為0或者當前執行緒不為獨佔執行緒直接返回嘗試獲取資源失敗
                // 寫鎖獲取資源數不為0則代表了讀鎖沒有獲取該資源
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                // 如果重入數加上需要獲取的資源大於最大重入數則直接丟擲異常
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 如果資源獲取數大於0且是當前執行緒獲取的資源,則設定資源數
                setState(c + acquires);
                return true;
            }
            // 到這裡則表示c == 0,寫鎖和讀鎖都沒有被獲取過。
            // writerShouldBlock判斷是否需要阻塞(公平鎖和非公平鎖實現方式不同)
            // 如果不需要阻塞會CAS嘗試獲取資源
            if (writerShouldBlock() ||
                !compareAndSetState(c,c + acquires))
                // 獲取資源失敗則返回false表示嘗試獲取資源失敗,進入AQS佇列等待獲取鎖
                return false;
            // 走到這裡證明嘗試獲取資源已經成功了,設定當前執行緒為獨佔執行緒
            setExclusiveOwnerThread(current);
            return true;
        }
複製程式碼
  • 首先獲取已經被佔用的資源數c(這裡的資源分為讀鎖和寫鎖)
  • 然後獲取寫鎖佔用的資源數w(重入數)
  • 如果佔用資源數c不為0,則判斷是否當前執行緒獲取的資源
    • 如果寫鎖佔用資源數w為0(這個時候是有其他讀鎖正在佔用該資源),直接返回false。
    • 如果寫鎖佔用資源數w不為0,但是獨佔執行緒不是當前執行緒,也直接返回false。
    • 如果寫鎖佔用資源數w不為0且當前執行緒是獨佔執行緒,但是需要獲取的寫鎖資源數加上已經獲取過的資源數大於最大獲取資源數,則返回false。
    • 如果寫鎖佔用資源數w不為0且當前執行緒是獨佔執行緒,且獲取後的資源數小於最大資源數,直接重新設定獲取的資源數,返回ture。
  • 如果佔用資源數c為0,表示沒有任何鎖(讀鎖、寫鎖)獲取了該資源。
    • 判斷是否需要阻塞獲取(公平或非公平),公平鎖該方法直接返回false,非公平鎖則需要判斷是否有其他執行緒先於當前執行緒獲取鎖資源。
    • 採用CAS方式改變獲取的資源數,獲取失敗直接返回false。
  • 如果c為0且CAS方式改變資源數成功,則設定當前執行緒為獨佔執行緒,返回true。

流程圖如下:

exclusiveCount寫鎖獨佔式重入數

static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
複製程式碼

其實就是低16位代表寫鎖。

tryRelease 寫鎖的釋放

        protected final boolean tryRelease(int releases) {
            // 如果當前執行緒不是獨佔執行緒,直接丟擲異常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 獲取釋放資源後的總資源
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            // 如果釋放後的總資源等於0
            if (free)
                // 則將獨佔執行緒設定為null
                setExclusiveOwnerThread(null);
            // 設定釋放後的總資源
            setState(nextc);
            // 返回是否釋放鎖
            return free;
        }
複製程式碼

tryRelease比較簡單,就不再贅述。

tryAcquireShared 讀鎖的獲取

   
        protected final int tryAcquireShared(int unused) {
            // 獲取當前執行緒
            Thread current = Thread.currentThread();
            // 獲取被佔用資源數
            int c = getState();
            // 如果獨佔(寫鎖)獲取資源不為0
            if (exclusiveCount(c) != 0 &&
                // 且獨佔(寫鎖)獲取資源執行緒不為當前執行緒
                getExclusiveOwnerThread() != current)
                // 直接返回-1
                return -1;
            // 共享資源的獲取數r(寫鎖被獲取過多少次)
            int r = sharedCount(c);
            // 判斷是否讀是否需要阻塞(公平鎖和非公平鎖)
            // 如果不需要阻塞
            if (!readerShouldBlock() &&
                // 寫鎖被獲取次數小於最大次數
                r < MAX_COUNT &&
                // 且CAS方式設定資源數成功
                compareAndSetState(c,c + SHARED_UNIT)) {
                // 如果r==0則表示當前執行緒是第一個獲取讀鎖的執行緒
                if (r == 0) {
                    // 則第一個獲取讀鎖的執行緒設定為當前執行緒
                    firstReader = current;
                    // 第一個讀執行緒佔用的資源數為1
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    // 如果當前執行緒是第一個獲取讀鎖的執行緒,則第一個讀執行緒佔用資源數++
                    firstReaderHoldCount++;
                } else {
                    // 讀鎖數量不為0且第一個獲取讀鎖的不是當前執行緒
                    // 獲取計數器
                    HoldCounter rh = cachedHoldCounter;
                    // 如果計數器為null或者計數器的tid不為當前正在執行的執行緒的tid
                    if (rh == null || rh.tid != getThreadId(current))
                        // 獲取當前執行緒對應的計數器
                        cachedHoldCounter = rh = readHolds.get();
                    // 如果計數為0
                    else if (rh.count == 0)
                        // 將計數器設定到ThreadLocal中
                        readHolds.set(rh);
                    // 計數+1
                    rh.count++;
                }
                // 返回1
                return 1;
            }
            return fullTryAcquireShared(current);
        }
複製程式碼
  • 獲取當前執行緒
  • 獲取被佔用資源數
  • 判斷寫鎖是否被其他執行緒獨佔,如果被其他執行緒獨佔則直接返回-1
    • 首先判斷寫鎖佔用資源數是否為0
    • 如果寫鎖佔用資源數不為0,則判斷是否當前執行緒獨佔了寫鎖
  • 獲取讀鎖佔用資源數
  • 判斷是否能夠獲取資源以及獲取資源
    • 判斷是否需要阻塞(公平鎖和非公平鎖)
    • 判斷寫鎖獲取資源數是否小於最大寫鎖資源數(這裡沒有加上要獲取的資源再比較是因為可能會超過integer的最大值)
    • CAS嘗試獲取資源
  • 獲取資源成功則需要記錄執行緒的重入數
    • 如果r==0,表示當前執行緒是第一個獲取讀鎖的執行緒
      • 將第一個獲取讀鎖的執行緒(firstReader)設定為當前執行緒
      • 設定第一個讀執行緒佔用資源數(firstReaderHoldCount)為1
    • 如果r不為0,則判斷當前執行緒是否第一個獲取讀鎖的執行緒
      • 如果是第一個獲取讀鎖的執行緒則第一個讀執行緒佔用資源數(firstReaderHoldCount)加1
    • 如果r不為0,且當前執行緒不是第一個獲取鎖的執行緒
      • 獲取快取計數器
      • 判斷快取計數器是否是當前執行緒的計數器,如果不是則獲取當前執行緒的計數器
      • 如果快取計數器是當前執行緒的計數器,則判斷計數器count是否為0,為0則設定到readHolds中
      • 最後將計數+1
  • 如果需要阻塞或者獲取資源數失敗,則呼叫fullTryAcquireShared迴圈獲取資源。

更新成功後會在firstReaderHoldCount中或readHolds(ThreadLocal型別的)的本執行緒副本中記錄當前執行緒重入數,這是為了實現jdk1.6中加入的getReadHoldCount()方法的,這個方法能獲取當前執行緒重入共享鎖的次數(state中記錄的是多個執行緒的總重入次數),加入了這個方法讓程式碼複雜了不少,但是其原理還是很簡單的:如果當前只有一個執行緒的話,還不需要動用ThreadLocal,直接往firstReaderHoldCount這個成員變數裡存重入數,當有第二個執行緒來的時候,就要動用ThreadLocal變數readHolds了,每個執行緒擁有自己的副本,用來儲存自己的重入數。

fullTryAcquireShared 迴圈獲取讀鎖

在tryAcquireShared函式中,如果下列三個條件不滿足(讀執行緒是否應該被阻塞、小於最大值、比較設定成功)則會進行fullTryAcquireShared函式中,它用來保證相關操作可以成功。

final int fullTryAcquireShared(Thread current) {
    // 計數器
    HoldCounter rh = null;
    for (;;) {
        // 獲取被佔用資源數
        int c = getState();
        // 如果寫鎖佔用資源數不為0
        if (exclusiveCount(c) != 0) {
            // 如果不是當前執行緒獲取的寫鎖
            if (getExclusiveOwnerThread() != current)
                return -1;
            // 如果需要阻塞
            } else if (readerShouldBlock()) {
                // 如果第一個獲取讀鎖執行緒是當前執行緒
                if (firstReader == current) {
                    
                } else {
                    // 如果計數器為null
                    if (rh == null) {
                        // 獲取快取計數器
                        rh = cachedHoldCounter;
                        // 如果計數器為null或者計數器不是當前執行緒計數器
                        if (rh == null || rh.tid != getThreadId(current)) {
                            // 獲取當前執行緒計數器
                            rh = readHolds.get();
                            // 如果當前執行緒讀鎖計數為0
                            if (rh.count == 0)
                                // 刪除當前執行緒計數器
                                readHolds.remove();
                        } 
                    }
                // 如果當前執行緒計數為0 返回-1
                if (rh.count == 0)
                    return -1;
            }
        }
        // 如果讀鎖佔有資源數等於最大資源數
        if (sharedCount(c) == MAX_COUNT)
            // 拋異常
            throw new Error("Maximum lock count exceeded");
        // Cas方式獲取讀鎖資源
        if (compareAndSetState(c,c + SHARED_UNIT)) {
            // 如果是第一個獲取讀鎖的執行緒
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; 
            }
            return 1;
        }
    }
}
複製程式碼

在tryAcquireShared方法上加入了threadlocal的清理流程,實質上還是迴圈獲取讀鎖。

  • 判斷寫鎖佔用資源數r是否為0
    • 如果r不為0則需要判斷是否當前執行緒佔用的寫鎖,如果不是當前執行緒則直接返回-1
    • 如果r為0表示當前沒有執行緒佔用寫鎖,則判斷是否需要阻塞(公平和非公平)
      • 如果需要阻塞,則判斷當前執行緒是不是第一個獲取讀鎖的執行緒
      • 如果當前執行緒不是第一個獲取讀鎖的執行緒,再判斷當前執行緒是否獲取過讀鎖(是否重進入)
      • 如果當前執行緒沒有佔用讀鎖(沒有重進入),需要清除threadLocal並返回-1
    • 如果r不為0且是當前執行緒佔用的寫鎖,或r為0不需要阻塞
      • 則判斷讀鎖的佔用數,如果佔用數等於最大,則直接拋異常
      • Cas方式獲取讀鎖資源
      • 如果獲取成功,則使用threadLocal等記錄執行緒佔用資源數

tryReleaseShared 讀鎖的釋放

        protected final boolean tryReleaseShared(int unused) {
            // 獲取當前執行緒
            Thread current = Thread.currentThread();
            // 如果第一個獲取讀鎖的執行緒是當前執行緒
            if (firstReader == current) {
                // 如果第一個獲取讀鎖的執行緒獲取讀鎖的重入數為1
                if (firstReaderHoldCount == 1)
                    // 將第一個獲取讀鎖的執行緒設定為null
                    firstReader = null;
                else
                    // 將第一個獲取讀鎖的執行緒的資源佔用數--
                    firstReaderHoldCount--;
            } else {
                // 拿到快取計數器
                HoldCounter rh = cachedHoldCounter;
                // 如果快取計數器指向的不是當前執行緒
                if (rh == null || rh.tid != getThreadId(current))
                    // 從threadLocal裡拿快取計數器
                    rh = readHolds.get();
                // 拿到當前執行緒的獲取讀鎖重入鎖
                int count = rh.count;
                // 如果重入數小於等於1
                if (count <= 1) {
                    // 清理threadlocal
                    readHolds.remove();
                    // 如果小於等於0,丟擲異常
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                // 重入數-1
                --rh.count;
            }
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                // cas方式釋放資源
                if (compareAndSetState(c,nextc))
                    return nextc == 0;
            }
        }
複製程式碼

讀鎖的釋放只有兩個步驟:

  • 1、當前執行緒計數器記錄數-1,如果記錄數為0則清理threadLocal
  • 2、CAS方式釋放讀鎖資源,這裡和ReentrantLock釋放資源不同,因為可能有多個執行緒共享獲取讀鎖,所以釋放資源需要使用for迴圈加CAS方式釋放。

getReadHoldCount 獲取當前執行緒讀鎖的重入數

        final int getReadHoldCount() {
            if (getReadLockCount() == 0)
                return 0;

            Thread current = Thread.currentThread();
            if (firstReader == current)
                return firstReaderHoldCount;

            HoldCounter rh = cachedHoldCounter;
            if (rh != null && rh.tid == getThreadId(current))
                return rh.count;

            int count = readHolds.get().count;
            if (count == 0) readHolds.remove();
            return count;
        }
複製程式碼

比較簡單,就不一一註釋程式碼了。

  • 首先需要判斷CAS中status資源讀鎖佔用是否為0,如果為0直接返回0.
  • 不為0則判斷當前執行緒是否是第一個獲取讀鎖的執行緒
  • 如果不是還要看快取的計數器執行緒id是不是當前執行緒
  • 還不是?那就只有從threadLocal裡找了

為什麼不全放在threadlocal中?

  • threadlocal實際上是一個map,如果只有一個執行緒獲取讀鎖,就完全沒必要放到threadlocal降低效率。

為什麼HoldCounter中不是直接指向當前執行緒,而是記錄執行緒id?

  • 避免HoldCounter和ThreadLocal互相繫結而GC難以釋放它們(儘管GC能夠智慧的發現這種引用而回收它們,但是這需要一定的代價),所以其實這樣做只是為了幫助GC快速回收物件。

ReentrantReadWriteLock

構造器

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
複製程式碼

可以看到,ReentrantReadWriteLock構造分不同情況構造了公平鎖和非公平鎖。

FairSync

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }
複製程式碼

公平鎖的writerShouldBlock、readerShouldBlock方法呼叫了AQS的hasQueuedPredecessors判斷是否有執行緒先於當前執行緒獲取鎖。

NonfairSync

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }
        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }
    }
複製程式碼

可以看到,非公平鎖的寫鎖writerShouldBlock方法是直接返回false的,也就是說在tryAcquire方法中是直接Cas嘗試獲取一次資源的,readerShouldBlock則呼叫了AQS的apparentlyFirstQueuedIsExclusive方法。

apparentlyFirstQueuedIsExclusive

    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h,s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }
複製程式碼

返回為true需要以下條件:

  • 如果頭節點不為null
  • 頭節點的下一個節點不為null
  • 頭節點的下一個節點不是共享的
  • 頭節點的下一個節點的執行緒不為null

這個方法判斷佇列的head.next是否正在等待獨佔鎖(寫鎖)。

官方的解釋是讀鎖不應該讓寫鎖始終等待,造成寫鎖執行緒飢餓的情況。

ReadLock構造器

    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
複製程式碼

WriteLock構造器

    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
複製程式碼

比較簡單,沒啥好說的。

總結

線上程持有讀鎖的情況下,該執行緒不能取得寫鎖(因為獲取寫鎖的時候,如果發現當前的讀鎖被佔用,就馬上獲取失敗,不管讀鎖是不是被當前執行緒持有)。

線上程持有寫鎖的情況下,該執行緒可以繼續獲取讀鎖(獲取讀鎖時如果發現寫鎖被佔用,只有寫鎖沒有被當前執行緒佔用的情況才會獲取失敗)。

參考文獻