1. 程式人生 > 其它 >Java同步器之ReentrantLock原始碼分析(一)

Java同步器之ReentrantLock原始碼分析(一)

一、概述

ReentrantLockJava併發包中提供的一個可重入的互斥鎖ReentrantLocksynchronized在基本用法,行為語義上都是類似的,同樣都具有可重入性。只不過相比原生的SynchronizedReentrantLock增加了一些高階的擴充套件功能,比如它可以實現公平鎖,同時也可以繫結多個Condition

二、特性

2.1 可重入性

所謂的可重入性,就是可以支援一個執行緒對鎖的重複獲取,原生的synchronized就具有可重入性,一個用synchronized修飾的遞迴方法,當執行緒在執行期間,它是可以反覆獲取到鎖的,而不會出現自己把自己鎖死的情況。ReentrantLock

也是如此,在呼叫lock()方法時,已經獲取到鎖的執行緒,能夠再次呼叫lock()方法獲取鎖而不被阻塞。那麼有可重入鎖,就有不可重入鎖,我們在之前文章中自定義的一個Mutex鎖就是個不可重入鎖,不過使用場景極少而已。

2.2 公平鎖與非公平鎖

所謂公平鎖,顧名思義是指鎖的獲取策略相對公平,當多個執行緒在獲取同一個鎖時,必須按照鎖的申請時間來依次獲得鎖,不能插隊;非公平鎖則不同,當鎖被釋放時,等待中的執行緒均有機會獲得鎖。synchronized是非公平鎖,ReentrantLock預設也是非公平的,可以通過帶boolean引數的構造方法指定使用公平鎖,但非公平鎖的效能一般要優於公平鎖。

synchronized

Java原生的互斥同步鎖,使用方便,對於synchronized修飾的方法或同步塊,無需再顯式釋放鎖。synchronized底層是通過monitorentermonitorexit兩個位元組碼指令來實現加鎖解鎖操作的。而ReentrantLock做為API層面的互斥鎖,需要顯式地去加鎖解鎖。

class X {
    private final ReentrantLock lock = new ReentrantLock();

    // ...
 
    public void m() {
        lock.lock();  // 加鎖
        try {
            // ... 函式主題
        } finally {
            lock.unlock(); //解鎖
        }
    }
}

2.3 條件鎖

條件鎖,是指在獲取鎖之後發現當前業務場景自己無法處理,而需要等待某個條件的出現才可以繼續處理時使用的一種鎖。

比如,在阻塞佇列中,當佇列中沒有元素的時候是無法彈出一個元素的,這時候就需要阻塞在條件notEmpty上,等待其它執行緒往裡面放入一個元素後,喚醒這個條件notEmpty,當前執行緒才可以繼續去做“彈出一個元素”的行為。

注意,這裡的條件,必須是在獲取鎖之後去等待,對應到ReentrantLock的條件鎖,就是獲取鎖之後才能呼叫condition.await()方法。

java中,條件鎖的實現都在AQSConditionObject類中,ConditionObject實現了Condition介面。

public class ReentrantLockTest {
    public static void main(String[] args) throws InterruptedException {
        // 宣告一個重入鎖
        ReentrantLock lock = new ReentrantLock();
        // 宣告一個條件鎖
        Condition condition = lock.newCondition();

        new Thread(() -> {
            try {
                lock.lock();  // 1
                try {
                    System.out.println("before await");  // 2
                    // 等待條件
                    condition.await();  // 3
                    System.out.println("after await");  // 10
                } finally {
                    lock.unlock();  // 11
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        
        // 這裡睡1000ms是為了讓上面的執行緒先獲取到鎖
        Thread.sleep(1000);
        lock.lock();  // 4
        try {
            // 這裡睡2000ms代表這個執行緒執行業務需要的時間
            Thread.sleep(2000);  // 5
            System.out.println("before signal");  // 6
            // 通知條件已成立
            condition.signal();  // 7
            System.out.println("after signal");  // 8
        } finally {
            lock.unlock();  // 9
        }
    }
}

三、原始碼分析 - 公平鎖/非公平鎖

ReentrantLock是基於AQS的,AQSJava併發包中眾多同步元件的構建基礎,它通過一個int型別的狀態變數state和一個FIFO佇列來完成共享資源的獲取,執行緒的排隊等待等。AQS是個底層框架,採用模板方法模式,它定義了通用的較為複雜的邏輯骨架,比如執行緒的排隊,阻塞,喚醒等,將這些複雜但實質通用的部分抽取出來,這些都是需要構建同步元件的使用者無需關心的,使用者僅需重寫一些簡單的指定的方法即可(其實就是對於共享變數state的一些簡單的獲取釋放的操作)。

3.1 內部類

abstract static class Sync extends AbstractQueuedSynchronizer {}

static final class NonfairSync extends Sync {}

static final class FairSync extends Sync {}
  1. 抽象類Sync實現了AQS的部分方法;
  2. NonfairSync實現了Sync,主要用於非公平鎖的獲取;
  3. FairSync實現了Sync,主要用於公平鎖的獲取。

3.2 屬性

private final Sync sync;

主要屬性就一個sync,它在構造方法中初始化,決定使用公平鎖還是非公平鎖的方式獲取鎖。

3.3 構造器

3.3.1 無參構造器(預設為非公平鎖)

public ReentrantLock() {
    sync = new NonfairSync();//預設是非公平的
}

syncReentrantLock內部實現的一個同步元件,它是ReentrantLock的一個靜態內部類,繼承於AQS,後面我們再分析。

3.3.2 帶布林值的構造器(是否公平)

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();//fair為true,公平鎖;反之,非公平鎖
}

看到了吧,此處可以指定是否採用公平鎖,FailSyncNonFailSync亦為ReentrantLock的靜態內部類,都繼承於Sync

3.4 lock()方法

3.4.1 公平鎖

這裡我們假設ReentrantLock的例項是通過以下方式獲得的:

ReentrantLock reentrantLock = new ReentrantLock(true);

加鎖的主要邏輯:

// ReentrantLock.lock()
public void lock() {
    // 呼叫的sync屬性的lock()方法
    // 這裡的sync是公平鎖,所以是FairSync的例項
    sync.lock();
}

// ReentrantLock.FairSync.lock()
final void lock() {
    // 呼叫AQS的acquire()方法獲取鎖
    // 注意,這裡傳的值為1
    acquire(1);
}

// AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) {
    // 嘗試獲取鎖
    // 如果失敗了,就排隊
    if (!tryAcquire(arg) &&
        // 注意addWaiter()這裡傳入的節點模式為獨佔模式
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

// ReentrantLock.FairSync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
    // 當前執行緒
    final Thread current = Thread.currentThread();
    // 檢視當前狀態變數的值
    int c = getState();
    // 如果狀態變數的值為0,說明暫時還沒有人佔有鎖
    if (c == 0) {
        // 如果沒有其它執行緒在排隊,那麼當前執行緒嘗試更新state的值為1
        // 如果成功了,則說明當前執行緒獲取了鎖
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            // 當前執行緒獲取了鎖,把自己設定到exclusiveOwnerThread變數中
            // exclusiveOwnerThread是AQS的父類AbstractOwnableSynchronizer中提供的變數
            setExclusiveOwnerThread(current);
            // 返回true說明成功獲取了鎖
            return true;
        }
    }
    // 如果當前執行緒本身就佔有著鎖,現在又嘗試獲取鎖
    // 那麼,直接讓它獲取鎖並返回true
    else if (current == getExclusiveOwnerThread()) {
        // 狀態變數state的值加1
        int nextc = c + acquires;
        // 如果溢位了,則報錯
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        // 設定到state中
        // 這裡不需要CAS更新state
        // 因為當前執行緒佔有著鎖,其它執行緒只會CAS把state從0更新成1,是不會成功的
        // 所以不存在競爭,自然不需要使用CAS來更新
        setState(nextc);
        // 當執行緒獲取鎖成功
        return true;
    }
    // 當前執行緒嘗試獲取鎖失敗
    return false;
}

// AbstractQueuedSynchronizer.addWaiter()
// 呼叫這個方法,說明上面嘗試獲取鎖失敗了
private Node addWaiter(Node mode) {
    // 新建一個節點
    Node node = new Node(Thread.currentThread(), mode);
    // 這裡先嚐試把新節點加到尾節點後面
    // 如果成功了就返回新節點
    // 如果沒成功再呼叫enq()方法不斷嘗試
    Node pred = tail;
    // 如果尾節點不為空
    if (pred != null) {
        // 設定新節點的前置節點為現在的尾節點
        node.prev = pred;
        // CAS更新尾節點為新節點
        if (compareAndSetTail(pred, node)) {
            // 如果成功了,把舊尾節點的下一個節點指向新節點
            pred.next = node;
            // 並返回新節點
            return node;
        }
    }
    // 如果上面嘗試入隊新節點沒成功,呼叫enq()處理
    enq(node);
    return node;
}

// AbstractQueuedSynchronizer.enq()
private Node enq(final Node node) {
    // 自旋,不斷嘗試
    for (;;) {
        Node t = tail;
        // 如果尾節點為空,說明還未初始化
        if (t == null) { // Must initialize
            // 初始化頭節點和尾節點
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 如果尾節點不為空
            // 設定新節點的前一個節點為現在的尾節點
            node.prev = t;
            // CAS更新尾節點為新節點
            if (compareAndSetTail(t, node)) {
                // 成功了,則設定舊尾節點的下一個節點為新節點
                t.next = node;
                // 並返回舊尾節點
                return t;
            }
        }
    }
}
// AbstractQueuedSynchronizer.acquireQueued()
// 呼叫上面的addWaiter()方法使得新節點已經成功入隊了
// 這個方法是嘗試讓當前節點來獲取鎖的
final boolean acquireQueued(final Node node, int arg) {
    // 失敗標記
    boolean failed = true;
    try {
        // 中斷標記
        boolean interrupted = false;
        // 自旋
        for (;;) {
            // 當前節點的前一個節點
            final Node p = node.predecessor();
            // 如果當前節點的前一個節點為head節點,則說明輪到自己獲取鎖了
            // 呼叫ReentrantLock.FairSync.tryAcquire()方法再次嘗試獲取鎖
            if (p == head && tryAcquire(arg)) {
                // 嘗試獲取鎖成功
                // 這裡同時只會有一個執行緒在執行,所以不需要用CAS更新
                // 把當前節點設定為新的頭節點
                setHead(node);
                // 並把上一個節點從連結串列中刪除
                p.next = null; // help GC
                // 未失敗
                failed = false;
                return interrupted;
            }
            // 是否需要阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 真正阻塞的方法
                parkAndCheckInterrupt())
                // 如果中斷了
                interrupted = true;
        }
    } finally {
        // 如果失敗了
        if (failed)
            // 取消獲取鎖
            cancelAcquire(node);
    }
}

// AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire()
// 這個方法是在上面的for()迴圈裡面呼叫的
// 第一次呼叫會把前一個節點的等待狀態設定為SIGNAL,並返回false
// 第二次呼叫才會返回true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 上一個節點的等待狀態
    // 注意Node的waitStatus欄位我們在上面建立Node的時候並沒有指定
    // 也就是說使用的是預設值0
    // 這裡把各種等待狀態再貼出來
    //static final int CANCELLED =  1;
    //static final int SIGNAL    = -1;
    //static final int CONDITION = -2;
    //static final int PROPAGATE = -3;
    int ws = pred.waitStatus;
    // 如果等待狀態為SIGNAL(等待喚醒),直接返回true
    if (ws == Node.SIGNAL)
        return true;
    // 如果前一個節點的狀態大於0,也就是已取消狀態
    if (ws > 0) {
        // 把前面所有取消狀態的節點都從連結串列中刪除
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 如果前一個節點的狀態小於等於0,則把其狀態設定為等待喚醒
        // 這裡可以簡單地理解為把初始狀態0設定為SIGNAL
        // CONDITION是條件鎖的時候使用的
        // PROPAGATE是共享鎖使用的
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

// AbstractQueuedSynchronizer.parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
    // 阻塞當前執行緒
    // 底層呼叫的是Unsafe的park()方法
    LockSupport.park(this);
    // 返回是否已中斷
    return Thread.interrupted();
}

主要方法的呼叫關係:

ReentrantLock#lock()
->ReentrantLock.FairSync#lock() // 公平模式獲取鎖
  ->AbstractQueuedSynchronizer#acquire() // AQS的獲取鎖方法
    ->ReentrantLock.FairSync#tryAcquire() // 嘗試獲取鎖
    ->AbstractQueuedSynchronizer#addWaiter()  // 新增到佇列
	  ->AbstractQueuedSynchronizer#enq()  // 入隊
    ->AbstractQueuedSynchronizer#acquireQueued() // 裡面有個for()迴圈,喚醒後再次嘗試獲取鎖
      ->AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire() // 檢查是否要阻塞
      ->AbstractQueuedSynchronizer#parkAndCheckInterrupt()  // 真正阻塞的地方

獲取鎖的主要過程大致如下:

  1. 嘗試獲取鎖,如果獲取到了就直接返回了;
  2. 嘗試獲取鎖失敗,再呼叫addWaiter()構建新節點並把新節點入隊;
  3. 然後呼叫acquireQueued()再次嘗試獲取鎖,如果成功了,直接返回;
  4. 如果再次失敗,再呼叫shouldParkAfterFailedAcquire()將節點的等待狀態置為等待喚醒(SIGNAL);
  5. 呼叫parkAndCheckInterrupt()阻塞當前執行緒;
  6. 如果被喚醒了,會繼續在acquireQueued()for()迴圈再次嘗試獲取鎖,如果成功了就返回;
  7. 如果不成功,再次阻塞,重複(3)(4)(5)直到成功獲取到鎖。

以上就是整個公平鎖獲取鎖的過程,下面我們看看非公平鎖是怎麼獲取鎖的。

3.4.2 非公平鎖

// ReentrantLock.lock()
public void lock() {
    sync.lock();
}

// ReentrantLock.NonfairSync.lock()
// 這個方法在公平鎖模式下是直接呼叫的acquire(1);
final void lock() {
    // 直接嘗試CAS更新狀態變數
    if (compareAndSetState(0, 1))
        // 如果更新成功,說明獲取到鎖,把當前執行緒設為獨佔執行緒
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

// ReentrantLock.NonfairSync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
    // 呼叫父類的方法
    return nonfairTryAcquire(acquires);
}

// ReentrantLock.Sync.nonfairTryAcquire()
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 如果狀態變數的值為0,再次嘗試CAS更新狀態變數的值
        // 相對於公平鎖模式少了!hasQueuedPredecessors()條件
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

相對於公平鎖,非公平鎖加鎖的過程主要有兩點不同:

  1. 一開始就嘗試CAS更新狀態變數state的值,如果成功了就獲取到鎖了;
  2. tryAcquire()的時候沒有檢查是否前面有排隊的執行緒,直接上去獲取鎖才不管別人有沒有排隊呢;

總的來說,相對於公平鎖,非公平鎖在一開始就多了兩次直接嘗試獲取鎖的過程。

3.5 lockInterruptibly()方法

支援執行緒中斷,它與lock()方法的主要區別在於lockInterruptibly()獲取鎖的時候如果執行緒中斷了,會丟擲會丟擲InterruptedException異常,而lock()不會管執行緒是否中斷都會一直嘗試獲取鎖,獲取鎖之後把自己標記為已中斷,繼續執行自己的邏輯,後面也會正常釋放鎖。

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);//代理到sync的相應方法上,同lock方法的區別是此方法響應中斷
}

題外話:

執行緒中斷,只是在執行緒上打一箇中斷標誌,並不會對執行中的執行緒有什麼影響,具體需要根據這個中斷標誌幹些什麼,使用者自己去決定。

比如,如果使用者在呼叫lock()獲取鎖後,發現執行緒中斷了,就直接返回了,而導致沒有釋放鎖,這也是允許的,但是會導致這個鎖一直得不到釋放,就出現了死鎖。

lock.lock();

if (Thread.currentThread().interrupted()) {
    return ;
}

lock.unlock();

當然,這裡只是舉個例子,實際使用肯定是要把lock.lock()後面的程式碼都放在try...finally...裡面的以保證鎖始終會釋放,這裡主要是為了說明執行緒中斷只是一個標誌,至於要做什麼完全由使用者自己決定。

3.6 tryLock()方法

嘗試獲取一次鎖,成功了就返回true,沒成功就返回false,不會繼續嘗試。

// ReentrantLock.tryLock()
public boolean tryLock() {
    // 直接呼叫Sync的nonfairTryAcquire()方法
    return sync.nonfairTryAcquire(1);//代理到sync的相應方法上
}

// ReentrantLock.Sync.nonfairTryAcquire()
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

tryLock直接以非公平的模式去嘗試獲取一次鎖,獲取到了或者鎖本來就是當前執行緒佔有著就返回true,否則返回false

3.7 tryLock(long time, TimeUnit unit)方法

嘗試獲取鎖,並等待一段時間,如果在這段時間內都沒有獲取到鎖,就返回false

// ReentrantLock.tryLock()
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    // 呼叫AQS中的方法
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

// AbstractQueuedSynchronizer.tryAcquireNanos()
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 如果執行緒中斷了,丟擲異常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 先嚐試獲取一次鎖
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

// AbstractQueuedSynchronizer.doAcquireNanos()
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 如果時間已經到期了,直接返回false
    if (nanosTimeout <= 0L)
        return false;
    // 到期時間
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            // 如果到期了,就直接返回false
            if (nanosTimeout <= 0L)
                return false;
            // spinForTimeoutThreshold = 1000L;
            // 只有到期時間大於1000納秒,才阻塞
            // 小於等於1000納秒,直接自旋解決就得了
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                // 阻塞一段時間
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在阻塞的時候加上阻塞時間,並且會隨時檢查是否到期,只要到期了沒獲取到鎖就返回false

3.8 unlock()方法

// ReentrantLock.unlock()
public void unlock() {
    sync.release(1);
}

// AbstractQueuedSynchronizer.release
public final boolean release(int arg) {
    // 呼叫AQS實現類的tryRelease()方法釋放鎖
    if (tryRelease(arg)) {
        Node h = head;
        // 如果頭節點不為空,且等待狀態不是0,就喚醒下一個節點
        // 還記得waitStatus嗎?
        // 在每個節點阻塞之前會把其上一個節點的等待狀態設為SIGNAL(-1)
        // 所以,SIGNAL的準確理解應該是喚醒下一個等待的執行緒
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// ReentrantLock.Sync.tryRelease
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 如果當前執行緒不是佔有著鎖的執行緒,丟擲異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 如果狀態變數的值為0了,說明完全釋放了鎖
    // 這也就是為什麼重入鎖呼叫了多少次lock()就要呼叫多少次unlock()的原因
    // 如果不這樣做,會導致鎖不會完全釋放,別的執行緒永遠無法獲取到鎖
    if (c == 0) {
        free = true;
        // 清空佔有執行緒
        setExclusiveOwnerThread(null);
    }
    // 設定狀態變數的值
    setState(c);
    return free;
}

private void unparkSuccessor(Node node) {
    // 注意,這裡的node是頭節點
    
    // 如果頭節點的等待狀態小於0,就把它設定為0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 頭節點的下一個節點
    Node s = node.next;
    // 如果下一個節點為空,或者其等待狀態大於0(實際為已取消)
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 從尾節點向前遍歷取到佇列最前面的那個狀態不是已取消狀態的節點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果下一個節點不為空,則喚醒它
    if (s != null)
        LockSupport.unpark(s.thread);
}

釋放鎖的過程大致為:

  1. state的值減1
  2. 如果state減到了0,說明已經完全釋放鎖了,喚醒下一個等待著的節點;

3.9 內部類詳情

3.9.1 NonFairSync(非公平鎖)

static final class NonfairSync extends Sync {//繼承Sync

    private static final long serialVersionUID = 7316153563782823691L;

    // 獲取鎖
    final void lock() {
        if (compareAndSetState(0, 1))//CAS設定state狀態,若原值是0,將其置為1
            setExclusiveOwnerThread(Thread.currentThread());//將當前執行緒標記為已持有鎖
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);//呼叫Sync中的方法
    }
}

compareAndSetState若設定失敗,呼叫AQSacquire方法,acquire又會呼叫下面重寫的tryAcquire方法。

這裡說的呼叫失敗有兩種情況:

  1. 當前沒有執行緒獲取到資源,state0,但是將state0設定為1的時候,其他執行緒搶佔資源,將state修改了,導致了CAS失敗;
  2. state原本就不為0,也就是已經有執行緒獲取到資源了,有可能是別的執行緒獲取到資源,也有可能是當前執行緒獲取的,這時執行緒又重複去獲取,所以去tryAcquire中的nonfairTryAcquire我們應該就能看到可重入的實現邏輯了。

3.9.1.1 nonfairTryAcquire()

final boolean nonfairTryAcquire(int acquires) {
    //獲取當前執行緒
    final Thread current = Thread.currentThread();
    // 獲取當前state值    
    int c = getState();
    // 若state為0,意味著沒有執行緒獲取到資源,CAS將state設定為1,並將當前執行緒標記我獲取到排他鎖的執行緒,返回true    
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 若state不為0,但是持有鎖的執行緒是當前執行緒
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;//state累加1
        if (nextc < 0) // int型別溢位了
            throw new Error("Maximum lock count exceeded");
        // 設定state,此時state大於1,代表著一個執行緒多次獲鎖,state的值即是執行緒重入的次數
        setState(nextc);
        return true;//返回true,獲取鎖成功
    }
    return false;//獲取鎖失敗了
}

簡單總結下流程:

1. 先獲取state值,若為0,意味著此時沒有執行緒獲取到資源,CAS將其設定為1,設定成功則代表獲取到排他鎖了;
2. 若state大於0,肯定有執行緒已經搶佔到資源了,此時再去判斷是否就是自己搶佔的,是的話,state累加,返回true,重入成功,state的值即是執行緒重入的次數;
3. 其他情況,則獲取鎖失敗。

3.9.2 FairSync(公平鎖)

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);//直接呼叫AQS的模板方法acquire
    }

    protected final boolean tryAcquire(int acquires) {
        // 獲取當前執行緒
        final Thread current = Thread.currentThread();
        // 獲取state值
        int c = getState();
        // 若state為0,意味著當前沒有執行緒獲取到資源,那就可以直接獲取資源了嗎?
        // NO!這不就跟之前的非公平鎖的邏輯一樣了嘛。看下面的邏輯
        if (c == 0) {
            // 判斷在時間順序上,是否有申請鎖排在自己之前的執行緒,
            // 若沒有,才能去獲取,CAS設定state,並標記當前執行緒為持有排他鎖的執行緒;
            // 反之,不能獲取!這即是公平的處理方式。
            if (!hasQueuedPredecessors() 
                && compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {//重入的處理邏輯,與上文一致,不再贅述
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

可以看到,公平鎖的大致邏輯與非公平鎖是一致的,不同的地方在於有了!hasQueuedPredecessors()這個判斷邏輯,即便state0,也不能貿然直接去獲取,要先去看有沒有還在排隊的執行緒,若沒有,才能嘗試去獲取,做後面的處理。反之,返回false,獲取失敗。

看看這個判斷是否有排隊中執行緒的邏輯

3.9.2.1 hasQueuedPredecessors()方法

public final boolean hasQueuedPredecessors() {
    Node t = tail; // 尾結點
    Node h = head;//頭結點
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());//判斷是否有排在自己之前的執行緒
}

需要注意的是,這個判斷是否有排在自己之前的執行緒的邏輯稍微有些繞,我們來梳理下,由程式碼得知,有兩種情況會返回true,我們將此邏輯分解一下(注意:返回true意味著有其他執行緒申請鎖比自己早,需要放棄搶佔)

  1. h !=t && (s = h.next) == null,這個邏輯成立的一種可能是head指向頭結點,tail此時還為null
    考慮這種情況:當其他某個執行緒去獲取鎖失敗,需構造一個結點加入同步佇列中(假設此時同步佇列為空),在新增的時候,需要先建立一個無意義傀儡頭結點(在AQSenq方法中,這是個自旋CAS操作),有可能在將head指向此傀儡結點完畢之後,還未將tail指向此結點。很明顯,此執行緒時間上優於當前執行緒,所以,返回true,表示有等待中的執行緒且比自己來的還早。
  2. h != t && (s = h.next) != null && s.thread != Thread.currentThread() 。同步佇列中已經有若干排隊執行緒且當前執行緒不是佇列的老二結點,此種情況會返回true
    假如沒有s.thread !=Thread.currentThread()這個判斷的話,會怎麼樣呢?
    若當前執行緒已經在同步佇列中是老二結點(頭結點此時是個無意義的傀儡結點),此時持有鎖的執行緒釋放了資源,喚醒老二結點執行緒,老二結點執行緒重新tryAcquire(此邏輯在AQS中的acquireQueued方法中),又會呼叫到hasQueuedPredecessors,不加s.thread !=Thread.currentThread()這個判斷的話,返回值就為true,導致tryAcquire失敗。

最後,來看看ReentrantLocktryRelease,定義在Sync

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;//減去1個資源
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //若state值為0,表示當前執行緒已完全釋放乾淨,返回true,上層的AQS會意識到資源已空出。
    //若不為0,則表示執行緒還佔有資源,只不過將此次重入的資源的釋放了而已,返回false。
    if (c == 0) {
        free = true;//
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

四、原始碼分析 - 條件鎖

4.1 內部類

public class ConditionObject implements Condition, java.io.Serializable {
    
    /** First node of condition queue. */
    private transient Node firstWaiter;
    
    /** Last node of condition queue. */
    private transient Node lastWaiter;
}

可以看到條件鎖中也維護了一個佇列,為了和AQS的佇列區分,我這裡稱為條件佇列,firstWaiter是佇列的頭節點,lastWaiter是佇列的尾節點。

4.2 lock.newCondition()方法

新建一個條件鎖。

// ReentrantLock.newCondition()
public Condition newCondition() {
    return sync.newCondition();
}

// ReentrantLock.Sync.newCondition()
final ConditionObject newCondition() {
    return new ConditionObject();
}

// AbstractQueuedSynchronizer.ConditionObject.ConditionObject()
public ConditionObject() { }

新建一個條件鎖最後就是呼叫的AQS中的ConditionObject類來例項化條件鎖。

4.3 condition.await()方法

condition.await()方法,表明現在要等待條件的出現。

// AbstractQueuedSynchronizer.ConditionObject.await()
public final void await() throws InterruptedException {
    // 如果執行緒中斷了,丟擲異常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 新增節點到Condition的佇列中,並返回該節點
    Node node = addConditionWaiter();
    // 完全釋放當前執行緒獲取的鎖
    // 因為鎖是可重入的,所以這裡要把獲取的鎖全部釋放
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 是否在同步佇列中
    while (!isOnSyncQueue(node)) {
        // 阻塞當前執行緒
        LockSupport.park(this);
        
        // 上面部分是呼叫await()時釋放自己佔有的鎖,並阻塞自己等待條件的出現

        // *************************分界線*************************  //

        // 下面部分是條件已經出現,嘗試去獲取鎖
        
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    
    // 嘗試獲取鎖,注意第二個引數,這是上一章分析過的方法
    // 如果沒獲取到會再次阻塞(這個方法這裡就不貼出來了,有興趣的翻翻上一章的內容)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 清除取消的節點
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 執行緒中斷相關
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

// AbstractQueuedSynchronizer.ConditionObject.addConditionWaiter
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果條件佇列的尾節點已取消,從頭節點開始清除所有已取消的節點
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        // 重新獲取尾節點
        t = lastWaiter;
    }
    // 新建一個節點,它的等待狀態是CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 如果尾節點為空,則把新節點賦值給頭節點(相當於初始化佇列)
    // 否則把新節點賦值給尾節點的nextWaiter指標
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    // 尾節點指向新節點
    lastWaiter = node;
    // 返回新節點
    return node;
}

// AbstractQueuedSynchronizer.fullyRelease
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 獲取狀態變數的值,重複獲取鎖,這個值會一直累加
        // 所以這個值也代表著獲取鎖的次數
        int savedState = getState();
        // 一次性釋放所有獲得的鎖
        if (release(savedState)) {
            failed = false;
            // 返回獲取鎖的次數
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

// AbstractQueuedSynchronizer.isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
    // 如果等待狀態是CONDITION,或者前一個指標為空,返回false
    // 說明還沒有移到AQS的佇列中
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 如果next指標有值,說明已經移到AQS的佇列中了
    if (node.next != null) // If has successor, it must be on queue
        return true;
    // 從AQS的尾節點開始往前尋找看是否可以找到當前節點,找到了也說明已經在AQS的佇列中了
    return findNodeFromTail(node);
}

這裡有幾個難理解的點:

  1. Condition的佇列和AQS的佇列不完全一樣;

    • AQS的佇列頭節點是不存在任何值的,是一個虛節點;
    • Condition的佇列頭節點是儲存著實實在在的元素值的,是真實節點。
  2. 各種等待狀態(waitStatus)的變化;

    • 首先,在條件佇列中,新建節點的初始等待狀態是CONDITION(-2)
    • 其次,移到AQS的佇列中時等待狀態會更改為0(AQS佇列節點的初始等待狀態為0);
    • 然後,在AQS的佇列中如果需要阻塞,會把它上一個節點的等待狀態設定為SIGNAL(-1)
    • 最後,不管在Condition佇列還是AQS佇列中,已取消的節點的等待狀態都會設定為CANCELLED(1)
    • 另外,後面我們在共享鎖的時候還會講到另外一種等待狀態叫PROPAGATE(-3)
  3. 相似的名稱;

    • AQS中下一個節點是next,上一個節點是prev
    • Condition中下一個節點是nextWaiter,沒有上一個節點。

總結一下await()方法的大致流程:

  1. 新建一個節點加入到條件佇列中去;
  2. 完全釋放當前執行緒佔有的鎖;
  3. 阻塞當前執行緒,並等待條件的出現;
  4. 條件已出現(此時節點已經移到AQS的佇列中),嘗試獲取鎖;

也就是說await()方法內部其實是先釋放鎖->等待條件->再次獲取鎖的過程。

4.4 condition.signal()方法

condition.signal()方法通知條件已經出現。

// AbstractQueuedSynchronizer.ConditionObject.signal
public final void signal() {
    // 如果不是當前執行緒佔有著鎖,呼叫這個方法丟擲異常
    // 說明signal()也要在獲取鎖之後執行
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 條件佇列的頭節點
    Node first = firstWaiter;
    // 如果有等待條件的節點,則通知它條件已成立
    if (first != null)
        doSignal(first);
}

// AbstractQueuedSynchronizer.ConditionObject.doSignal
private void doSignal(Node first) {
    do {
        // 移到條件佇列的頭節點往後一位
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 相當於把頭節點從佇列中出隊
        first.nextWaiter = null;
        // 轉移節點到AQS佇列中
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

// AbstractQueuedSynchronizer.transferForSignal
final boolean transferForSignal(Node node) {
    // 把節點的狀態更改為0,也就是說即將移到AQS佇列中
    // 如果失敗了,說明節點已經被改成取消狀態了
    // 返回false,通過上面的迴圈可知會尋找下一個可用節點
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 呼叫AQS的入隊方法把節點移到AQS的佇列中
    // 注意,這裡enq()的返回值是node的上一個節點,也就是舊尾節點
    Node p = enq(node);
    // 上一個節點的等待狀態
    int ws = p.waitStatus;
    // 如果上一個節點已取消了,或者更新狀態為SIGNAL失敗(也是說明上一個節點已經取消了)
    // 則直接喚醒當前節點對應的執行緒
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    // 如果更新上一個節點的等待狀態為SIGNAL成功了
    // 則返回true,這時上面的迴圈不成立了,退出迴圈,也就是隻通知了一個節點
    // 此時當前節點還是阻塞狀態
    // 也就是說呼叫signal()的時候並不會真正喚醒一個節點
    // 只是把節點從條件佇列移到AQS佇列中
    return true;
}

signal()方法的大致流程為:

  1. 從條件佇列的頭節點開始尋找一個非取消狀態的節點;
  2. 把它從條件佇列移到AQS佇列;
  3. 且只移動一個節點;

注意,這裡呼叫signal()方法後並不會真正喚醒一個節點,那麼,喚醒一個節點是在啥時候呢?

還記得開頭例子嗎?倒回去再好好看看,signal()方法後,最終會執行lock.unlock()方法,此時才會真正喚醒一個節點,喚醒的這個節點如果曾經是條件節點的話又會繼續執行await()方法“分界線”下面的程式碼。

五、總結

ReentrantLock是一種可重入的,可實現公平性的互斥鎖(預設是非公平模式,因為非公平模式效率更高),它的設計基於AQS框架,可重入和公平性的實現邏輯都不難理解,每重入一次,state就加1,當然在釋放的時候,也得一層一層釋放。至於公平性,在嘗試獲取鎖的時候多了一個判斷:是否有比自己申請早的執行緒在同步佇列中等待,若有,去等待;若沒有,才允許去搶佔。

條件鎖是指為了等待某個條件出現而使用的一種鎖;條件鎖比較經典的使用場景就是佇列為空時阻塞在條件notEmpty上;ReentrantLock中的條件鎖是通過AQSConditionObject內部類實現的;

await()signal()方法都必須在獲取鎖之後釋放鎖之前使用;await()方法會新建一個節點放到條件佇列中,接著完全釋放鎖,然後阻塞當前執行緒並等待條件的出現;signal()方法會尋找條件佇列中第一個可用節點移到AQS佇列中;在呼叫signal()方法的執行緒呼叫unlock()方法才真正喚醒阻塞在條件上的節點(此時節點已經在AQS佇列中);之後該節點會再次嘗試獲取鎖,後面的邏輯與lock()的邏輯基本一致了。