1. JUC原始碼分析系列--ReentrantLock 可重入鎖
JUC(java.util.concurrent) 包在 JAVA 的併發程式設計中佔領著絕對的地位,在各大開源框架中均能看到它的身影。最近看完了 JUC 的原始碼,做一個持續的輸出,從併發工具到併發容器再到執行緒池進行分析。在瀏覽過程中,建議你開啟對應的原始碼,跟著本文一起分析。(文末補上了完整的流程圖)
開篇問題:
- 為什麼一個 .lock() 方法就可以鎖住一段同步程式碼塊?(本文核心)
- 可重鎖入是怎麼實現的?
- 公平鎖和非公平鎖的區別在哪裡?
- 獲取鎖的超時和中斷是怎麼實現的?
鎖的實現過程
首先看一下 ReentrantLock 的幾個方法:
private final Sync sync;
public ReentrantLock () {sync = new NonfairSync();}
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
public void lock() {sync.lock();}
public void unlock() {sync.release(1);}
複製程式碼
我們可以發現 ReentrantLock 的加鎖、解鎖方法都是通過呼叫其內部的 Sync 實現的,並且 Sync 是其內部抽象類,其實現分別有 FairSync 和 NonfairSync,分別代表了公平鎖和非公平鎖的實現。預設會使用非公平鎖。我們看一下 Sync 的類圖。上面的父類就是大名鼎鼎的 AQS ,我會在接下來的文章裡分析。
FairSync # lock() :
final void lock() {
acquire(1);
}
// 發現其呼叫的是 AQS 中的 acquire 方法,深入看一下
AbstractQueuedSynchronizer # acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
selfInterrupt();
}
複製程式碼
我們可以發現 acquire() 這裡判斷分成了兩步,大致的意思就是先嚐試能不能獲取到鎖,如果不能的話就進入到等待的執行緒中排隊去,分別對應的就是 tryAcquire(arg) 和 acquireQueued(addWaiter(~)),acquireQueued 這個方法是 AQS 排隊的實現,我會放到 # 中來講,所以我們這裡就可以理解為先用 tryAcquire() 方法嘗試一下能不能拿到鎖,拿到的時候會返回 true,這個方法就直接返回了。如果拿不到鎖了,就到 AQS 中排隊等著去吧。
我們這裡重點看一下 tryAcquire() 的實現,我們會發現這是 AQS 提供的一個模板方法,其實現我們可以在 FairSync 中找到:
FairSync # tryAcquire() :
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 獲取 AQS 中的 state 欄位,在 ReentrantLock 中,state == 0 代表鎖處於空閒狀態,state > 0 代表鎖已經被某個執行緒獲取了
int c = getState();
if (c == 0) {
// 首先判斷 AQS 的是否有其他執行緒在排隊,對於公平鎖來說是要講究先來後到的
if (!hasQueuedPredecessors() &&
// 使用 CAS 來設定 state
compareAndSetState(0,acquires)) {
// 設定當前執行緒為鎖的持有者
setExclusiveOwnerThread(current);
return true;
}
}
// 判斷當前執行緒是否為鎖的持有執行緒,如果是的話代表重入了
else if (current == getExclusiveOwnerThread()) {
// 從這裡可以看出 state 代表了鎖重入的次數
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 由於 state 是大於 1 的,其他執行緒是沒法到達這一步的,所以不需要 CAS 就可以直接設定 state
setState(nextc);
return true;
}
return false;
}
複製程式碼
我們這裡稍微分析一下 CAS。compareAndSetState(0,acquires)
這行程式碼使用了 Unsafe
類對 AQS 的變數 state
進行修改,這個修改是直接發生在本地記憶體上的。首先 Unsafe
會查詢到 state
在本地記憶體的位置,把它跟 0 做對比,相等的話在更新。這裡看起來是分兩步執行(先對比後更新),實際上底層是由一個CPU命令完成的,所以還是原子操作的。並且在執行這個命令前會對匯流排加鎖,讓其他處理器暫時不能訪問記憶體。這樣子就保證了對 state
更新的安全性。
從多執行緒的角度分析這段程式碼如何防止併發,我們可以發現執行緒想要獲得鎖,即這個函式返回 true,都是需要經過 compareAndSetState(0,acquires)
這個 CAS 操作的。當一個執行緒拿到鎖後,其他執行緒再進行 CAS 的時候就會發現本地記憶體上的 state
已經不是 0 了,所以都會返回失敗。而且 CAS 又保證了原子性,我們可以看成各個執行緒之間的 compareAndSetState(0,acquires)
都是順序執行的,不存在併發的情況。
看完了加鎖的過程,我們再看一下解鎖是怎麼實現的。
ReentrantLock # unlock
public void unlock() {
// 直接呼叫了 AQS 的 release
sync.release(1);
}
AbstractQueuedSynchronizer # release
public final boolean release(int arg) {
// 解鎖的模板方法,在ReentrantLock的內部類實現
if (tryRelease(arg)) {
// 留到下節分析,實現了喚醒佇列中排著隊的其他執行緒
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
ReentrantLock # Sync # tryRelease()
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 不為 0 的時候說明鎖重入了,本執行緒還持有著這個鎖
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
複製程式碼
鎖的釋放的實現比較簡單,直接更新 state
和 exclusiveOwnerThread
,也不需要考慮併發,因為在這兩個更新前,其他執行緒對 state
的更新都會失敗。至此對 ReentrantLock
加鎖解鎖的分析就結束了,其實看完原始碼會發現 ReentrantLock
就是通過對一個變數進行 CAS 操作,成功就是拿到鎖了,不成功就去排隊。至於如何排隊,就是 AQS 為所有併發工具類提供的功能了。
解答開篇
可重入鎖是怎麼實現的
其實通過剛才對加鎖過程的分析我們已經可以得到可重入鎖就是當鎖的持有物件就是本執行緒的時候,就對 state
的累加。不過由於解鎖操作只是對 state
減一,所以當使用重入鎖時,還是要記得呼叫對應次數的解鎖操作。
公平鎖和非公平鎖的區別在哪裡
剛才我們是拿 FairSync
也就是公平鎖進行分析的,我門再來看一下 NonfairSync
的加鎖是怎麼進行的。
NonfairSync#lock()
final void lock() {
if (compareAndSetState(0,1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
複製程式碼
我們可以發現非公平鎖就是在競爭鎖前先嚐試了一次CAS操作(就是插隊的感覺),另外就是在tryAcquire
中,把對列是否存在排隊執行緒的判斷去掉了。這樣即使在本執行緒前存在多個排隊佇列,本執行緒也是有機會率先拿到鎖的。
獲取鎖的超時和中斷是怎麼實現的?
ReentrantLock
為我們提供了一個通過指定時間來獲取鎖的方法,當時間到了並且還是沒有取到鎖的話就會返回一個 false
。
ReentrantLock#tryLock
public boolean tryLock(long timeout,TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(timeout));
}
Sync#tryAcquireNanos
public final boolean tryAcquireNanos(int arg,long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 嘗試獲取鎖,如果不成功的話就呼叫 AQS 支援的時間排隊法了
return tryAcquire(arg) ||
doAcquireNanos(arg,nanosTimeout);
}
複製程式碼
我們可以發現超時機制的支援主要還是靠 AQS 的doAcquireNanos(arg,nanosTimeout)
實現的。這個方法我會在下一篇裡面分析,這裡我們只需要知道它也是同樣的把執行緒放到佇列中去排隊,當達到超時時間時,就會返回一個 false。
總結
今天我們分析了 ReentrantLock
的原始碼,知道的加鎖解鎖過程其實就是在競爭對 state
變數的修改,對於沒有競爭到鎖的執行緒,則通過 AQS 提供的排隊能力進入到佇列中等待。此外ReentrantLock
還提供了對其相關變數的 get 方法,比如獲取鎖的持有執行緒,獲取state
的值,獲取整個排隊佇列等,建議你通過原始碼檢視這些方法。