從BlockingQueue 看 Condition配合使用實現生產/消費阻塞
BlockingQueue之所以叫阻塞佇列這個名字,主要就是以下兩個特點
- 當佇列元素滿時,新增元素的操作(put())會阻塞直到佇列有空位
- 當佇列為空時,獲取元素的操作(poll())會阻塞直到佇列不為空(可以設定獲取超時時間,超時返回null)
實現以上特性主要是使用了ReentrantLock+Condition兩個juc的類,以LinkedBlockingQueue原始碼為例,我們簡單解析下它的程式碼實現
LinkedBlockingQueue除了會阻塞的put poll方法外還有offer、take等不阻塞的方法,可以根據實際情況選用
由於Condition是繫結在ReentrantLock上的,我們首先看下相關的定義
Condition的特性:
1.Condition中的await()方法相當於Object的wait()方法,Condition中的signal()方法相當於Object的notify()方法,Condition中的signalAll()相當於Object的notifyAll()方法。不同的是,Object中的這些方法是和同步鎖捆綁使用的;而Condition是需要與互斥鎖/共享鎖捆綁使用的。
2.Condition它更強大的地方在於:能夠更加精細的控制多執行緒的休眠與喚醒。對於同一個鎖,我們可以建立多個Condition,在不同的情況下使用不同的Condition。
例如,假如多執行緒讀/寫同一個緩衝區:當向緩衝區中寫入資料之後,喚醒"讀執行緒";當從緩衝區讀出資料之後,喚醒"寫執行緒";並且當緩衝區滿的時候,"寫執行緒"需要等待;當緩衝區為空時,"讀執行緒"需要等待。
如果採用Object類中的wait(), notify(), notifyAll()實現該緩衝區,當向緩衝區寫入資料之後需要喚醒"讀執行緒"時,不可能通過notify()或notifyAll()明確的指定喚醒"讀執行緒",而只能通過notifyAll喚醒所有執行緒(但是notifyAll無法區分喚醒的執行緒是讀執行緒,還是寫執行緒)。 但是,通過Condition,就能明確的指定喚醒讀執行緒。
*** LinkedBlockingQueue關鍵變數如下 ***
/** 佇列容量,預設大小為Integer.MAX_VALUE!!可能導致OOM */ private final int capacity; /** 當前佇列中元素個數 */ private final AtomicInteger count = new AtomicInteger(); /** * 列表頭結點 */ transient Node<E> head; /** * 列表尾結點 */ private transient Node<E> last; /** 取元素 鎖*/ private final ReentrantLock takeLock = new ReentrantLock(); /** 控制獲取操作阻塞/執行 */ private final Condition notEmpty = takeLock.newCondition(); /** 放元素 鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** 控制放置操作阻塞/執行 */ private final Condition notFull = putLock.newCondition();
這裡可能有人有疑問,為什麼有了兩個ReentrantLock 還要建兩個 Condition
其實主要是邏輯上的功能不同,比如takeLock 在程式碼中是這樣使用的:
/**
* 喚醒等待的取物件執行緒 只提供給 put/offer 呼叫
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
//這時候不允許取元素,防止變為空佇列
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
這一段的原始碼註釋也說明了用途,兩個ReentrantLock主要用於控制當前佇列是否能放入/取出物件,而 Condition用於標識的是佇列滿/空 這兩個臨界狀態
接下來就是看put和poll兩個核心方法
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// 建立本地變數c儲存獲取count
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//使用putLock,實現序列新增物件
putLock.lockInterruptibly();
try {
//佇列已滿,用notFull阻塞
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
//佇列不為空,更新notFull
notFull.signal();
} finally {
putLock.unlock();
}
//安全地呼叫notEmpty.signal();通知獲取元素的執行緒
if (c == 0)
signalNotEmpty();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//等待指定時長後返回null
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
總結:
可以看到put和poll方法總體流程都差不多,都是通過putLock/takeLock將獲取/防止物件的操作變為序列化,並且在開始/完成操作時根據AtomicInteger的count個數,更新notFull和NotEmpty兩個Condition,喚醒相對應操作的執行緒