1. 程式人生 > >死磕 java同步系列之ReentrantLock原始碼解析(二)——條件鎖

死磕 java同步系列之ReentrantLock原始碼解析(二)——條件鎖

問題

(1)條件鎖是什麼?

(2)條件鎖適用於什麼場景?

(3)條件鎖的await()是在其它執行緒signal()的時候喚醒的嗎?

簡介

條件鎖,是指在獲取鎖之後發現當前業務場景自己無法處理,而需要等待某個條件的出現才可以繼續處理時使用的一種鎖。

比如,在阻塞佇列中,當佇列中沒有元素的時候是無法彈出一個元素的,這時候就需要阻塞在條件notEmpty上,等待其它執行緒往裡面放入一個元素後,喚醒這個條件notEmpty,當前執行緒才可以繼續去做“彈出一個元素”的行為。

注意,這裡的條件,必須是在獲取鎖之後去等待,對應到ReentrantLock的條件鎖,就是獲取鎖之後才能呼叫condition.await()方法。

在java中,條件鎖的實現都在AQS的ConditionObject類中,ConditionObject實現了Condition介面,下面我們通過一個例子來進入到條件鎖的學習中。

使用示例

public class ReentrantLockTest {
    public static void main(String[] args) throws InterruptedException {
        // 宣告一個重入鎖
        ReentrantLock lock = new ReentrantLock();
        // 宣告一個條件鎖
        Condition condition = lock.newCondition();

        new Thread(()->{
            try {
                lock.lock();  // 1
                try {
                    System.out.println("before await");  // 2
                    // 等待條件
                    condition.await();  // 3
                    System.out.println("after await");  // 10
                } finally {
                    lock.unlock();  // 11
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        
        // 這裡睡1000ms是為了讓上面的執行緒先獲取到鎖
        Thread.sleep(1000);
        lock.lock();  // 4
        try {
            // 這裡睡2000ms代表這個執行緒執行業務需要的時間
            Thread.sleep(2000);  // 5
            System.out.println("before signal");  // 6
            // 通知條件已成立
            condition.signal();  // 7
            System.out.println("after signal");  // 8
        } finally {
            lock.unlock();  // 9
        }
    }
}

上面的程式碼很簡單,一個執行緒等待條件,另一個執行緒通知條件已成立,後面的數字代表程式碼實際執行的順序,如果你能把這個順序看懂基本條件鎖掌握得差不多了。

原始碼分析

ConditionObject的主要屬性

public class ConditionObject implements Condition, java.io.Serializable {
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
}

可以看到條件鎖中也維護了一個佇列,為了和AQS的佇列區分,我這裡稱為條件佇列,firstWaiter是佇列的頭節點,lastWaiter是佇列的尾節點,它們是幹什麼的呢?接著看。

lock.newCondition()方法

新建一個條件鎖。

// ReentrantLock.newCondition()
public Condition newCondition() {
    return sync.newCondition();
}
// ReentrantLock.Sync.newCondition()
final ConditionObject newCondition() {
    return new ConditionObject();
}
// AbstractQueuedSynchronizer.ConditionObject.ConditionObject()
public ConditionObject() { }

新建一個條件鎖最後就是呼叫的AQS中的ConditionObject類來例項化條件鎖。

condition.await()方法

condition.await()方法,表明現在要等待條件的出現。

// AbstractQueuedSynchronizer.ConditionObject.await()
public final void await() throws InterruptedException {
    // 如果執行緒中斷了,丟擲異常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 新增節點到Condition的佇列中,並返回該節點
    Node node = addConditionWaiter();
    // 完全釋放當前執行緒獲取的鎖
    // 因為鎖是可重入的,所以這裡要把獲取的鎖全部釋放
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 是否在同步佇列中
    while (!isOnSyncQueue(node)) {
        // 阻塞當前執行緒
        LockSupport.park(this);
        
        // 上面部分是呼叫await()時釋放自己佔有的鎖,並阻塞自己等待條件的出現
        // *************************分界線*************************  //
        // 下面部分是條件已經出現,嘗試去獲取鎖
        
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    
    // 嘗試獲取鎖,注意第二個引數,這是上一章分析過的方法
    // 如果沒獲取到會再次阻塞(這個方法這裡就不貼出來了,有興趣的翻翻上一章的內容)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 清除取消的節點
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 執行緒中斷相關
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
// AbstractQueuedSynchronizer.ConditionObject.addConditionWaiter
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果條件佇列的尾節點已取消,從頭節點開始清除所有已取消的節點
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        // 重新獲取尾節點
        t = lastWaiter;
    }
    // 新建一個節點,它的等待狀態是CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 如果尾節點為空,則把新節點賦值給頭節點(相當於初始化佇列)
    // 否則把新節點賦值給尾節點的nextWaiter指標
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    // 尾節點指向新節點
    lastWaiter = node;
    // 返回新節點
    return node;
}
// AbstractQueuedSynchronizer.fullyRelease
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 獲取狀態變數的值,重複獲取鎖,這個值會一直累加
        // 所以這個值也代表著獲取鎖的次數
        int savedState = getState();
        // 一次性釋放所有獲得的鎖
        if (release(savedState)) {
            failed = false;
            // 返回獲取鎖的次數
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
// AbstractQueuedSynchronizer.isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
    // 如果等待狀態是CONDITION,或者前一個指標為空,返回false
    // 說明還沒有移到AQS的佇列中
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 如果next指標有值,說明已經移到AQS的佇列中了
    if (node.next != null) // If has successor, it must be on queue
        return true;
    // 從AQS的尾節點開始往前尋找看是否可以找到當前節點,找到了也說明已經在AQS的佇列中了
    return findNodeFromTail(node);
}

這裡有幾個難理解的點:

(1)Condition的佇列和AQS的佇列不完全一樣;

AQS的佇列頭節點是不存在任何值的,是一個虛節點;

Condition的佇列頭節點是儲存著實實在在的元素值的,是真實節點。

(2)各種等待狀態(waitStatus)的變化;

首先,在條件佇列中,新建節點的初始等待狀態是CONDITION(-2);

其次,移到AQS的佇列中時等待狀態會更改為0(AQS佇列節點的初始等待狀態為0);

然後,在AQS的佇列中如果需要阻塞,會把它上一個節點的等待狀態設定為SIGNAL(-1);

最後,不管在Condition佇列還是AQS佇列中,已取消的節點的等待狀態都會設定為CANCELLED(1);

另外,後面我們在共享鎖的時候還會講到另外一種等待狀態叫PROPAGATE(-3)。

(3)相似的名稱;

AQS中下一個節點是next,上一個節點是prev;

Condition中下一個節點是nextWaiter,沒有上一個節點。

如果弄明白了這幾個點,看懂上面的程式碼還是輕鬆加愉快的,如果沒弄明白,彤哥這裡指出來了,希望您回頭再看看上面的程式碼。

下面總結一下await()方法的大致流程:

(1)新建一個節點加入到條件佇列中去;

(2)完全釋放當前執行緒佔有的鎖;

(3)阻塞當前執行緒,並等待條件的出現;

(4)條件已出現(此時節點已經移到AQS的佇列中),嘗試獲取鎖;

也就是說await()方法內部其實是先釋放鎖->等待條件->再次獲取鎖的過程。

condition.signal()方法

condition.signal()方法通知條件已經出現。

// AbstractQueuedSynchronizer.ConditionObject.signal
public final void signal() {
    // 如果不是當前執行緒佔有著鎖,呼叫這個方法丟擲異常
    // 說明signal()也要在獲取鎖之後執行
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 條件佇列的頭節點
    Node first = firstWaiter;
    // 如果有等待條件的節點,則通知它條件已成立
    if (first != null)
        doSignal(first);
}
// AbstractQueuedSynchronizer.ConditionObject.doSignal
private void doSignal(Node first) {
    do {
        // 移到條件佇列的頭節點往後一位
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 相當於把頭節點從佇列中出隊
        first.nextWaiter = null;
        // 轉移節點到AQS佇列中
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
// AbstractQueuedSynchronizer.transferForSignal
final boolean transferForSignal(Node node) {
    // 把節點的狀態更改為0,也就是說即將移到AQS佇列中
    // 如果失敗了,說明節點已經被改成取消狀態了
    // 返回false,通過上面的迴圈可知會尋找下一個可用節點
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 呼叫AQS的入隊方法把節點移到AQS的佇列中
    // 注意,這裡enq()的返回值是node的上一個節點,也就是舊尾節點
    Node p = enq(node);
    // 上一個節點的等待狀態
    int ws = p.waitStatus;
    // 如果上一個節點已取消了,或者更新狀態為SIGNAL失敗(也是說明上一個節點已經取消了)
    // 則直接喚醒當前節點對應的執行緒
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    // 如果更新上一個節點的等待狀態為SIGNAL成功了
    // 則返回true,這時上面的迴圈不成立了,退出迴圈,也就是隻通知了一個節點
    // 此時當前節點還是阻塞狀態
    // 也就是說呼叫signal()的時候並不會真正喚醒一個節點
    // 只是把節點從條件佇列移到AQS佇列中
    return true;
}

signal()方法的大致流程為:

(1)從條件佇列的頭節點開始尋找一個非取消狀態的節點;

(2)把它從條件佇列移到AQS佇列;

(3)且只移動一個節點;

注意,這裡呼叫signal()方法後並不會真正喚醒一個節點,那麼,喚醒一個節點是在啥時候呢?

還記得開頭例子嗎?倒回去再好好看看,signal()方法後,最終會執行lock.unlock()方法,此時才會真正喚醒一個節點,喚醒的這個節點如果曾經是條件節點的話又會繼續執行await()方法“分界線”下面的程式碼。

結束了,仔細體會下^^

如果非要用一個圖來表示的話,我想下面這個圖可以大致表示一下(這裡是用時序圖畫的,但是實際並不能算作一個真正的時序圖哈,瞭解就好):

總結

(1)重入鎖是指可重複獲取的鎖,即一個執行緒獲取鎖之後再嘗試獲取鎖時會自動獲取鎖;

(2)在ReentrantLock中重入鎖是通過不斷累加state變數的值實現的;

(3)ReentrantLock的釋放要跟獲取匹配,即獲取了幾次也要釋放幾次;

(4)ReentrantLock預設是非公平模式,因為非公平模式效率更高;

(5)條件鎖是指為了等待某個條件出現而使用的一種鎖;

(6)條件鎖比較經典的使用場景就是佇列為空時阻塞在條件notEmpty上;

(7)ReentrantLock中的條件鎖是通過AQS的ConditionObject內部類實現的;

(8)await()和signal()方法都必須在獲取鎖之後釋放鎖之前使用;

(9)await()方法會新建一個節點放到條件佇列中,接著完全釋放鎖,然後阻塞當前執行緒並等待條件的出現;

(10)signal()方法會尋找條件佇列中第一個可用節點移到AQS佇列中;

(11)在呼叫signal()方法的執行緒呼叫unlock()方法才真正喚醒阻塞在條件上的節點(此時節點已經在AQS佇列中);

(12)之後該節點會再次嘗試獲取鎖,後面的邏輯與lock()的邏輯基本一致了。

彩蛋

為什麼java有自帶的關鍵字synchronized了還需要實現一個ReentrantLock呢?

首先,它們都是可重入鎖;

其次,它們都預設是非公平模式;

然後,...,呃,我們下一章繼續深入探討 ReentrantLock VS synchronized。

推薦閱讀

  1. 死磕 java同步系列之ReentrantLock原始碼解析(一)——公平鎖、非公平鎖

  2. 死磕 java同步系列之AQS起篇

  3. 死磕 java同步系列之自己動手寫一個鎖Lock

  4. 死磕 java魔法類之Unsafe解析

  5. 死磕 java同步系列之JMM(Java Memory Model)

  6. 死磕 java同步系列之volatile解析

  7. 死磕 java同步系列之synchronized解析


歡迎關注我的公眾號“彤哥讀原始碼”,檢視更多原始碼系列文章, 與彤哥一起暢遊原始碼的海洋。

相關推薦

java同步系列ReentrantLock原始碼解析——條件

問題 (1)條件鎖是什麼? (2)條件鎖適用於什麼場景? (3)條件鎖的await()是在其它執行緒signal()的時候喚醒的嗎? 簡介 條件鎖,是指在獲取鎖之後發現當前業務場景自己無法處理,而需要等待某個條件的出現才可以繼續處理時使用的一種鎖。 比如,在阻塞佇列中,當佇列中沒有元素的時候是無法彈出一個元素

java同步系列ReentrantLock原始碼解析——公平、非公平

問題 (1)重入鎖是什麼? (2)ReentrantLock如何實現重入鎖? (3)ReentrantLock為什麼預設是非公平模式? (4)ReentrantLock除了可重入還有哪些特性? 簡介 Reentrant = Re + entrant,Re是重複、又、再的意思,entrant是enter的名詞或

java同步系列ReentrantReadWriteLock原始碼解析

問題 (1)讀寫鎖是什麼? (2)讀寫鎖具有哪些特性? (3)ReentrantReadWriteLock是怎麼實現讀寫鎖的? (4)如何使用ReentrantReadWriteLock實現高效安全的TreeMap? 簡介 讀寫鎖是一種特殊的鎖,它把對共享資源的訪問分為讀訪問和寫訪問,多個執行緒可以同時對共享

java同步系列Semaphore原始碼解析

問題 (1)Semaphore是什麼? (2)Semaphore具有哪些特性? (3)Semaphore通常使用在什麼場景中? (

java同步系列StampedLock原始碼解析

問題 (1)StampedLock是什麼? (2)StampedLock具有什麼特性? (3)StampedLock是否支援可重入

java同步系列CyclicBarrier原始碼解析——有圖有真相

問題 (1)CyclicBarrier是什麼? (2)CyclicBarrier具有什麼特性? (3)CyclicBarrier與

java同步系列Phaser原始碼解析

問題 (1)Phaser是什麼? (2)Phaser具有哪些特性? (3)Phaser相對於CyclicBarrier和Count

java同步系列AQS終篇面試

問題 (1)AQS的定位? (2)AQS的重要組成部分? (3)AQS運用的設計模式? (4)AQS的總體流程? 簡介 AQS的全稱是AbstractQueuedSynchronizer,它的定位是為Java中幾乎所有的鎖和同步器提供一個基礎框架。 在之前的章節中,我們一起學習了ReentrantLock、R

java同步系列ReentrantLock VS synchronized——結果可能跟你想的不一樣

問題 (1)ReentrantLock有哪些優點? (2)ReentrantLock有哪些缺點? (3)ReentrantLock

java同步系列開篇

討論 關註 使用 避免死鎖 更新數據 讀寫 上下文切換 monit 缺點 簡介 同步系列,這是彤哥想了好久的名字,本來是準備寫鎖相關的內容,但是java中的CountDownLatch、Semaphore、CyclicBarrier這些類又不屬於鎖,它們和鎖又有很多共同點,

java同步系列JMMJava Memory Model

簡介 Java記憶體模型是在硬體記憶體模型上的更高層的抽象,它遮蔽了各種硬體和作業系統訪問的差異性,保證了Java程式在各種平臺下對記憶體的訪問都能達到一致的效果。 硬體記憶體模型 在正式講解Java的記憶體模型之前,我們有必要先了解一下硬體層面的一些東西。 在現代計算機的硬體體系中,CPU的運算速度是非常快

java同步系列volatile解析

問題 (1)volatile是如何保證可見性的? (2)volatile是如何禁止重排序的? (3)volatile的實現原理? (4)volatile的缺陷? 簡介 volatile可以說是Java虛擬機器提供的最輕量級的同步機制了,但是它並不容易被正確地理解,以至於很多人不習慣使用它,遇到多執行緒問題一律

java同步系列synchronized解析

問題 (1)synchronized的特性? (2)synchronized的實現原理? (3)synchronized是否可重入? (4)synchronized是否是公平鎖? (5)synchronized的優化? (6)synchronized的五種使用方式? 簡介 synchronized關鍵字是Ja

java同步系列自己動手寫一個Lock

問題 (1)自己動手寫一個鎖需要哪些知識? (2)自己動手寫一個鎖到底有多簡單? (3)自己能不能寫出來一個完美的鎖? 簡介 本篇文章的目標一是自己動手寫一個鎖,這個鎖的功能很簡單,能進行正常的加鎖、解鎖操作。 本篇文章的目標二是通過自己動手寫一個鎖,能更好地理解後面章節將要學習的AQS及各種同步器實現的原理

java同步系列AQS起篇

問題 (1)AQS是什麼? (2)AQS的定位? (3)AQS的實現原理? (4)基於AQS實現自己的鎖? 簡介 AQS的全稱是AbstractQueuedSynchronizer,它的定位是為Java中幾乎所有的鎖和同步器提供一個基礎框架。 AQS是基於FIFO的佇列實現的,並且內部維護了一個狀態變數sta

java同步系列mysql分散式

問題 (1)什麼是分散式鎖? (2)為什麼需要分散式鎖? (3)mysql如何實現分散式鎖? (4)mysql分散式鎖的優點和缺點? 簡介 隨著併發量的不斷增加,單機的服務遲早要向多節點或者微服務進化,這時候原來單機模式下使用的synchronized或者ReentrantLock將不再適用,我們迫切地需要一

java同步系列zookeeper分散式

(2)zookeeper分散式鎖有哪些優點? (3)zookeeper分散式鎖有哪些缺點? 簡介 zooKeeper是一個分散式的,開放原始碼的分散式應用程式協調服務,它可以為分散式應用提供一致性服務,它是Hadoop和Hbase的重要元件,同時也可以作為配置中心、註冊中心運用在微服務體系中。 本章我們將介

java同步系列redis分散式進化史

(2)redis分散式鎖有哪些優點? (3)redis分散式鎖有哪些缺點? (4)redis實現分散式鎖有沒有現成的輪子可以使用? 簡介 Redis(全稱:Remote Dictionary Server 遠端字典服務)是一個開源的使用ANSI C語言編寫、支援網路、可基於記憶體亦可持久化的日誌型、Key-

java同步系列終結篇

腦圖 下面是關於同步系列的一份腦圖,列舉了主要的知識點和問題點,看過本系列文章的同學可以根據腦圖自行回顧所學的內容,也可以作為面試前的準備。 如果有需要高清無碼原圖的同學,可以關注公眾號“彤哥讀原始碼”,回覆“sync”領取。 總結 所謂同步,就是保證多執行緒(包括多程序)對共享資源的讀寫能夠安全有效的執