深入理解 AQS 之 Condition 原始碼
前言
很久之前分享過ReetrantLock
的實現深入剖析ReentrantLock公平鎖與非公平鎖原始碼實現,而今再回頭去看,對AQS也有了更深刻準確的理解,隨即更新了下之前的文章。今天分享利用AQS實現的另一個重要的JUC工具類Condition
。如果上篇理解到位,這個Condition
的學習就沒啥難度的~
我們應該都瞭解過Object
的一些監視器方法:wait()
,notify()
, notifyAll()
。場景是某執行緒A需要對資源操作時,需要滿足一定的條件,不滿足就呼叫wait(),進入等待佇列等待條件滿足,而創造這個條件的是另外一個執行緒B,執行緒B操作這份資源讓其滿足條件,一旦條件滿足,執行緒B就會signal等待佇列的執行緒。這個過程中會發現涉及了多執行緒操作共享資料,所以這就是為什麼呼叫監視器方法需要首先獲取一把鎖。另外,Object自帶的一套監視器方法,只能包含一個同步佇列,一個條件等待佇列。Condition是對上述模型的另一種實現,支援的功能特性更加豐富,如:
-
一個同步佇列可以有多個等待佇列
-
可以在wait過程中不響應中斷退出等待
-
可以指定等待滿足條件獲取鎖的等待時間
Condition使用
Condition屬於多執行緒間通訊的一種機制,我們常用的BlockingQueue就是基於Condition實現的。我們在非同步RPC通訊框架中也經常會見到使用的BlockingQueue,比如我們在rpc client端使用Netty IO,因為Netty本身write操作非阻塞,而業務呼叫要求同步阻塞獲取結果,所以可以實現上使所有業務執行緒write資料後,阻塞在BlockingQueue上,等Netty client 收到響應資料後,填充到BlockingQueue,並喚醒當時阻塞的請求執行緒。
JDK的檔案中給出的Condition使用例子就是如何實現一個簡單的BlockingQueue:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer {
final Lock lock = new ReentrantLock();
// 建立Condition一定依賴Lock例項
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr,takeptr,count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
// 生產者執行緒發現佇列滿了,無法繼續生產,只能在notFull條件上排隊等待
// while 迴圈為為了防止假喚醒
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
// 生產成功,通知等待notEmpty條件的執行緒來消費
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
// 消費者執行緒如果發現沒有資料可消費,只能排隊等待在notEmpty條件上
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
//每次被消費者執行緒消費一個都會發個通知,告訴等待notFull條件的執行緒
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
複製程式碼
Condition實現
整體結構
整個Condition整體的佇列模型如上,包含一個同步佇列和多個條件佇列,如果執行緒執行時條件不滿足,呼叫await()方法,會將該執行緒封裝成Node節點新增到condition的條件佇列中,一旦滿足條件,條件佇列便會被其他執行緒signal()通知頭節點 並請到同步佇列去搶鎖,搶到鎖後便會從wait()方法退出,繼續執行。每個condition例項都對應一個條件佇列,條件佇列的實現類是ConditionObject
,內部維護了一個單向連結串列,每個節點也都是一個Node例項,畢竟因為將來是要被從條件佇列轉移到阻塞佇列中的。
public class ConditionObject implements Condition,java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
複製程式碼
因為每次呼叫condition的方法操作前一定是獲取了鎖的,所以對條件佇列的操作是執行緒安全的
await() 方法
await()方法有三種不同的實現:
-
awaitUninterruptibly
: await期間不響應中斷,非得等到條件滿足被喚醒 -
await() throws InterruptedException
: await期間響應中斷,如果阻塞太久可以隨時中斷喚醒 -
await(long time,TimeUnit unit) throws InterruptedException
: 可以設定等待超時時間,並可以響應中斷
對於中斷的處理使用interruptMode final常量來表示,取值為1,表示當前執行緒後續需要重新中斷,-1表示後續需要丟擲InterruptedException。總之這個常量是用來標記將來怎麼處理這個中斷的。 private static final int REINTERRUPT = 1; private static final int THROW_IE = -1; 我們以第三種實現為例來具體分析:
public final boolean await(long time,TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
// 因為本身await期間要響應中斷,在await前先判斷是否已被中斷了,已中斷就拋InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
//將當前執行緒封裝成Node節點並加入等待佇列的尾部
Node node = addConditionWaiter();
// 釋放當前執行緒所佔有的鎖,如果是可重入鎖,也要把state值歸為0
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
// while只要不退出,就說明還在等待佇列中進行await
while (!isOnSyncQueue(node)) {
// 如果到了超時時間,會將節點從等待佇列轉移到同步佇列,返回true,說明等待真的超時。返回false,說明當正準備取消等待前,已經被signal了,只是還沒有完成轉移到同步佇列而已
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
//如果等待剩餘時間少於1000納秒就沒必要park了,不如自旋,畢竟很快就要退出while迴圈了
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this,nanosTimeout);
// != 0意味著 waiting期間被中斷,因為要響應中斷,所以break,沒必要再await; 等於0,意味著waiting期間沒有被中斷
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
// 退出上面while後,說明已經在同步隊列了,此執行緒開始搶鎖(試圖恢復await前的state值),如果acquireQueued 返回false,說明在同步佇列裡獲取鎖的過程中沒有被中斷過,返回true則表示曾發生過中斷
if (acquireQueued(node,savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 統一處理上述過程產生的中斷狀態
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
複製程式碼
奇怪的一點在於為什麼執行緒用這個常量來標誌未來要如何處理? 而不是立刻處理,個人理解是為了複用方法,如acquireQueued
, 因為對外介面有的需要響應中斷,有的不需要,意味著對中斷處理上有著不同的方式,acquireQueued
只需要返回 是否中斷過,而不會在內部做實際的中斷處理,實際處理交給上層。
下面我們詳細拆解await()
中一些重要的方法。
isOnSyncQueue()
此方法用於判斷條件佇列某節點是否已經被轉移到同步佇列。
final boolean isOnSyncQueue(Node node) {
// 如果節點的waitStatus 依然為Node.CONDITION,說明還在條件佇列,否則如果已被轉移到同步佇列中時waitStatus應為0或-1
// node.prev 是在同步佇列才會用的屬性,==null 依然意味著沒有進入同步佇列
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor,it must be on queue
return true;
// 如果node.prev為非空,依然不能確定其已在同步佇列中,因為同步佇列的節點入隊是兩步操作,先設定node.prev,然後CAS設定自己為tail,第二步操作可能CAS失敗。
//從同步佇列尾節點往前找
return findNodeFromTail(node);
}
複製程式碼
signal()
此方法將條件佇列頭節點轉移到同步佇列中
public final void signal() {
// 呼叫signal的執行緒必須持有獨佔鎖
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
複製程式碼
private void doSignal(Node first) {
do {
// 因為first馬上就要被轉移到同步隊列了,所以將first.nextWaiter,作為新的firstWatier。
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//切斷和等待佇列的關聯
first.nextWaiter = null;
// 如果轉移不成功且還有後續節點,那麼繼續後續節點的轉移
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
複製程式碼
final boolean transferForSignal(Node node) {
// 進行CAS,畢竟因為當前發起signal的是另一個執行緒,而node本身可能自己取消等待,所以需要CAS
//如果CAS失敗 說明此節點已取消等待,此節點接下來將不會被轉移到同步佇列,如果CAS成功,waitStatus將會被置為0
if (!compareAndSetWaitStatus(node,Node.CONDITION,0))
return false;
// 將node 加入同步佇列後返回其前置節點
Node p = enq(node);
int ws = p.waitStatus;
// ws > 0 說明 node 在阻塞佇列中的前驅節點取消了等待鎖,直接喚醒 node 對應的執行緒。ws < 0時CAS設定node前置節點的waitStatus為SIGNAL,之前文章說過,新節點入同步佇列需要設定前置節點waitStatus為SIGNAL,肩負起喚醒後繼節點的責任
// 所以如果 node進入同步佇列後的前置節點取消或者 CAS設定SIGNAL失敗,直接喚醒該node
// 但是在絕大多數情況下 應該是ws<0,並且CAS成功的,並不會直接unpark,而是等到在同步佇列中成功拿到鎖後被unpark
if (ws > 0 || !compareAndSetWaitStatus(p,ws,Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
複製程式碼
看await()
方法中的程式碼 ,一旦unpark後,繼續往後執行 。
有以下三種情況會讓 LockSupport.park(this); 這句返回繼續往下執行:
- 常規路徑。signal -> 轉移節點到阻塞佇列 -> 獲取了鎖(unpark)
- 執行緒中斷。在 park 的時候,另外一個執行緒對這個執行緒進行了中斷
- signal 的時候我們說過,轉移以後的前驅節點取消了,或者對前驅節點的CAS操作失敗了
- 假喚醒。這個也是存在的,和 Object.wait() 類似,都有這個問題
走到這裡,該node一定是從park
中返回了,返回後檢查中斷狀態,如果 不為0,說明發生過中斷。 為0,沒有被中斷過
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
複製程式碼
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
// transferAfterCancelledWait 方法會判斷被unpark的節點曾被中斷的時機,如果返回true,意味著在條件佇列中等待的時候被中斷過(未被signal之前),false意味著中斷髮生在被signal之後
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
複製程式碼
只有在發生過中斷時,才會呼叫這個transferAfterCancelledWait
方法。可以理解為出現了這樣一個場景:
某已park的節點還在條件佇列中靜靜的等待滿足條件後被轉移到同步佇列中時,被其它執行緒進行了中斷
final boolean transferAfterCancelledWait(Node node) {
// 此處CAS設定成功意味著node還在條件佇列中
if (compareAndSetWaitStatus(node,0)) {
// 所以在條件佇列中即在signal之前被中斷,那麼將node加入到同步佇列,並且返回true
enq(node);
return true;
}
/*
* If we lost out to a signal(),then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient,so just
* spin.
*/
// 走到這裡大概率是已經在同步隊列了,也可能是正在加入同步佇列的過程中,自旋等待入隊完成。總之中斷是發生在已經被signal之後了。
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
複製程式碼
在檢查中斷狀態的過程中我們會發現,被unpark
後,無論是否被中斷過,無論是否是響應中斷的await
方法,都會被加入同步佇列中,看起來並沒有對中斷有啥特殊的處理的地方。是不是有點和我們預期的不符?
所以退出await
中while ()
迴圈的條件有兩個:
- 節點被之前所說的幾種條件之一喚醒後發現已經在同步隊列了
- 節點被之前所說的幾種條件之一喚醒後發現是經由中斷導致的被
unpark
,直接break;跳出while迴圈
我們繼續看await()
方法,跳出while
迴圈後
// 執行到這裡,該node一定是已經進入同步隊列了
if (acquireQueued(node,savedState) && interruptMode != THROW_IE)
// 能進入if ,一定是在被signal之後發生的中斷,標誌下接下來的處理中,需要重新進行中斷
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
複製程式碼
acquireQueued
返回值為true 說明在同步佇列中獲取鎖的過程中被中斷過,而在上面while
沒退出之前是否中斷過 由當前 interruptMode
變數標誌,因為在checkInterruptWhileWaiting
的Thread.interrupted()
方法呼叫時,已經把中斷狀態標誌位清除了。上面如果interruptMode != THROW_IE
成立,說明在while
階段沒有發生過中斷或者發生過中斷,但中斷是在對條件佇列中某node進行signal
之後發生的。
總結一下就是說只要能夠退出while
迴圈,不管是否被中斷過,那麼node一定在同步隊列了,當執行完acquireQueued
,意味著拿到鎖並返回了,返回值代表在同步佇列獲取鎖的過程中是否被中斷過,所以可以看到,對執行緒進行中斷,並不會影響其進入同步佇列併成功拿到鎖,而是把整個過程中是否被中斷過先記錄下來,然後“事後說事兒”,上層統一處理——使用reportInterruptAfterWait()
方法。可以看到如果是響應中斷的await
, 如果在條件佇列期間,被signal之前被中斷的話 interruptMode == THROW_IE
,會丟擲異常:
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
複製程式碼
總結
JUC中的實現確實存在很多設計技巧,每次看一遍都會有新的感悟,會發現一些之前沒有發現的“奇技淫巧”,很多時候看不懂,就隔段時間不斷的去看,我記得大學那會兒,看《Java併發程式設計的藝術》看了兩三遍,每看一遍就多加深一層理解。在JUC的學習上這回也算是“雄關漫道真如鐵,而今邁步從頭越”了~,不過現在回顧基礎輕鬆得很,,,