1. 程式人生 > 實用技巧 >從BlockingQueue 看 Condition配合使用實現生產/消費阻塞

從BlockingQueue 看 Condition配合使用實現生產/消費阻塞

BlockingQueue之所以叫阻塞佇列這個名字,主要就是以下兩個特點

  1. 當佇列元素滿時,新增元素的操作(put())會阻塞直到佇列有空位
  2. 當佇列為空時,獲取元素的操作(poll())會阻塞直到佇列不為空(可以設定獲取超時時間,超時返回null)

實現以上特性主要是使用了ReentrantLock+Condition兩個juc的類,以LinkedBlockingQueue原始碼為例,我們簡單解析下它的程式碼實現
LinkedBlockingQueue除了會阻塞的put poll方法外還有offer、take等不阻塞的方法,可以根據實際情況選用

由於Condition是繫結在ReentrantLock上的,我們首先看下相關的定義

Condition的特性:
1.Condition中的await()方法相當於Object的wait()方法,Condition中的signal()方法相當於Object的notify()方法,Condition中的signalAll()相當於Object的notifyAll()方法。不同的是,Object中的這些方法是和同步鎖捆綁使用的;而Condition是需要與互斥鎖/共享鎖捆綁使用的。
2.Condition它更強大的地方在於:能夠更加精細的控制多執行緒的休眠與喚醒。對於同一個鎖,我們可以建立多個Condition,在不同的情況下使用不同的Condition。
例如,假如多執行緒讀/寫同一個緩衝區:當向緩衝區中寫入資料之後,喚醒"讀執行緒";當從緩衝區讀出資料之後,喚醒"寫執行緒";並且當緩衝區滿的時候,"寫執行緒"需要等待;當緩衝區為空時,"讀執行緒"需要等待。
如果採用Object類中的wait(), notify(), notifyAll()實現該緩衝區,當向緩衝區寫入資料之後需要喚醒"讀執行緒"時,不可能通過notify()或notifyAll()明確的指定喚醒"讀執行緒",而只能通過notifyAll喚醒所有執行緒(但是notifyAll無法區分喚醒的執行緒是讀執行緒,還是寫執行緒)。 但是,通過Condition,就能明確的指定喚醒讀執行緒。

*** LinkedBlockingQueue關鍵變數如下 ***

  /** 佇列容量,預設大小為Integer.MAX_VALUE!!可能導致OOM */
    private final int capacity;

    /** 當前佇列中元素個數 */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * 列表頭結點
     */
    transient Node<E> head;

    /**
     * 列表尾結點
     */
    private transient Node<E> last;

    /** 取元素 鎖*/
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 控制獲取操作阻塞/執行 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 放元素 鎖 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 控制放置操作阻塞/執行 */
    private final Condition notFull = putLock.newCondition();

這裡可能有人有疑問,為什麼有了兩個ReentrantLock 還要建兩個 Condition
其實主要是邏輯上的功能不同,比如takeLock 在程式碼中是這樣使用的:

     /**
     * 喚醒等待的取物件執行緒 只提供給 put/offer 呼叫
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        //這時候不允許取元素,防止變為空佇列
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

這一段的原始碼註釋也說明了用途,兩個ReentrantLock主要用於控制當前佇列是否能放入/取出物件,而 Condition用於標識的是佇列滿/空 這兩個臨界狀態

接下來就是看put和poll兩個核心方法

 public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // 建立本地變數c儲存獲取count
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //使用putLock,實現序列新增物件
        putLock.lockInterruptibly();
        try {
            //佇列已滿,用notFull阻塞
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                //佇列不為空,更新notFull
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        //安全地呼叫notEmpty.signal();通知獲取元素的執行緒
        if (c == 0)
            signalNotEmpty();
    }


public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            //等待指定時長後返回null
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

總結:

可以看到put和poll方法總體流程都差不多,都是通過putLock/takeLock將獲取/防止物件的操作變為序列化,並且在開始/完成操作時根據AtomicInteger的count個數,更新notFull和NotEmpty兩個Condition,喚醒相對應操作的執行緒