1. 程式人生 > >Java 集合框架分析:PriorityBlockingQueue java1.8

Java 集合框架分析:PriorityBlockingQueue java1.8

哈哈,終於有了第二篇部落格了,終於知道編輯一個部落格需要注意什麼了,希望堅持下去,每天看點小原始碼!

目錄
1.簡述PriorityBlockingQueue
2.主要方法及實現
3.使用過程中需要注意的地方
4.和其他的相關容器的比較
5.總結

簡述PriorityBlockingQueue
特點:
1.屬於併發安全的集合。(什麼是併發安全的集合:即多執行緒的情況下,不會出現不確定的狀態)。
2.無界的佇列。
3.它的blocking表現在取元素時,如果佇列為空,則取元素的執行緒會阻塞。
4.不允許null元素

主要方法及實現
1.主要的成員
同Priorityqueue,使用底層陣列儲存資料,擁有兩把鎖,一個可重入鎖,一個自旋鎖。

    private final ReentrantLock lock;

/**
 * Condition for blocking when empty
 */
private final Condition notEmpty;

/**
 * Spinlock for allocation, acquired via CAS.
 */
private transient volatile int allocationSpinLock;
    private transient Object[] queue;

/**
 * The number of elements in the priority queue.
 */
private transient int size;

2.插入
步驟:
a.null檢測
b.因為priorityblockingQueue是執行緒安全的,所以,插入刪除都是需要加鎖的。這裡先進行加鎖。
c.如果需要擴容,則先擴容。關於擴容操作,一會再說。
d.插入元素,調整順序(和priorityqueue是一樣的)
e.傳送資訊,啟用阻塞的取資料的執行緒
f.釋放鎖(必須的,因為不是用的Synchronized,而是用的lock進行控制的)

public boolean offer(E e) 
{
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try 
    {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        notEmpty.signal();
    } 
    finally { lock.unlock(); }
    return true;
}

3.擴容操作
擴容發生在你要插入元素時,發現底層陣列大小不夠,則需要擴容。這裡在判斷時,已經獲取的reentrantlock鎖,因為要擴容,說明queue不為空。另一方面,擴容時,需要發生底層陣列的重新複製到新陣列中,而取資料的執行緒當前還不會讀到新加入的資料(先取你之間加入的,這是happen-before原則,你新資料還沒有插入成功,別人是看不到的),所以為了提高併發量,這裡需要先釋放reentrantlock,讓其他的讀執行緒能夠進行(寫執行緒還是會阻塞,因為要寫,還是要進行擴容,會在獲取擴容鎖時阻塞)。突然想到一個問題,如果A寫入時,發現滿了,因此要擴容,所示擴容期間,釋放了鎖。B poll操作,然後C要寫入,但是此時容量是允許的,這不就讓C在A之前了麼?這個問題是什麼情況?
這裡進行擴容使用的是CAS鎖,當獲取鎖不成功時,說明有其他執行緒在擴容,則等待。在成功擴容之後,需要重新獲取主鎖(即reentrantlock),然後修改queue底層陣列引用。

 private void tryGrow(Object[] array, int oldCap) 
{
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) 
    {
        try {
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) 
            {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    lock.lock();
    if (newArray != null && queue == array) 
    {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

4.刪除元素
刪除元素有多個版本:
不阻塞的:poll()(如果沒有元素可用,則直接返回null)
阻塞的:take()
等待一段時間的:poll(long timeout, TimeUnit unit)
實現都大概相同:

    public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}
    public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null && nanos > 0)
            nanos = notEmpty.awaitNanos(nanos);
    } finally {
        lock.unlock();
    }
    return result;
}

使用過程中需要注意的地方
1.priorityblockingqueue在操作佇列時,都是共用的一把鎖(在擴容時,用到了自旋鎖,會釋放一段主鎖,然後重新獲取)
2.peek,offer,poll,size等都是要獲取同一把鎖的,效率不是很高
3.在序列化時,為了提供效率,會先將資料放入到priorityqueue中,然後一次性加入到阻塞佇列中,增加操作效率(不用每次都獲取鎖)
和其他的相關容器的比較
和priorityqueue基本相同,處理有加鎖的操作外。
總結
除非是在多執行緒中,否則不要使用,幾乎所有操作都要競爭同一把鎖。