Java併發容器之PriorityBlockingQueue原始碼分析
阿新 • • 發佈:2022-05-11
簡介
PriorityBlockingQueue
是java
併發包下的優先順序阻塞佇列,它是執行緒安全的,如果讓你來實現你會怎麼實現它呢?
還記得我們前面介紹過的PriorityQueue
嗎?點選連結直達Java集合之PriorityQueue原始碼分析
原始碼分析
屬性
// 預設容量為11 private static final int DEFAULT_INITIAL_CAPACITY = 11; // 最大陣列大小 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; // 儲存元素的地方 private transient Object[] queue; // 元素個數 private transient int size; // 比較器 private transient Comparator<? super E> comparator; // 重入鎖 private final ReentrantLock lock; // 非空條件 private final Condition notEmpty; // 擴容的時候使用的控制變數,CAS更新這個值,誰更新成功了誰擴容,其它執行緒讓出CPU private transient volatile int allocationSpinLock; // 不阻塞的優先順序佇列,非儲存元素的地方,僅用於序列化/反序列化時 private PriorityQueue<E> q;
(1)依然是使用一個數組來使用元素;
(2)使用一個鎖加一個notEmpty條件來保證併發安全;
(3)使用一個變數的CAS操作來控制擴容;
為啥沒有notFull條件呢?
主要構造方法
// 預設容量為11 public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } // 傳入初始容量 public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } // 傳入初始容量和比較器 // 初始化各變數 public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
入隊
每個阻塞佇列都有四個方法,我們這裡只分析一個offer(E e)
方法:
public boolean offer(E e) { // 元素不能為空 if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; // 加鎖 lock.lock(); int n, cap; Object[] array; // 判斷是否需要擴容,即元素個數達到了陣列容量 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; // 根據是否有比較器選擇不同的方法 if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); // 插入元素完畢,元素個數加1 size = n + 1; // 喚醒notEmpty條件 notEmpty.signal(); } finally { // 解鎖 lock.unlock(); } return true; } private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { // 取父節點 int parent = (k - 1) >>> 1; // 父節點的元素值 Object e = array[parent]; // 如果key大於父節點,堆化結束 if (key.compareTo((T) e) >= 0) break; // 否則,交換二者的位置,繼續下一輪比較 array[k] = e; k = parent; } // 找到了應該放的位置,放入元素 array[k] = key; }
入隊的整個操作跟PriorityQueue
幾乎一致:
- 加鎖;
- 判斷是否需要擴容;
- 新增元素並做自下而上的堆化;
- 元素個數加1並喚醒
notEmpty
條件,喚醒取元素的執行緒; - 解鎖;
擴容
private void tryGrow(Object[] array, int oldCap) {
// 先釋放鎖,因為是從offer()方法的鎖內部過來的
// 這裡先釋放鎖,使用allocationSpinLock變數控制擴容的過程
// 防止阻塞的執行緒過多
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// CAS更新allocationSpinLock變數為1的執行緒獲得擴容資格
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 舊容量小於64則翻倍,舊容量大於64則增加一半
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
// 判斷新容量是否溢位
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
// 建立新陣列
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
// 相當於解鎖
allocationSpinLock = 0;
}
}
// 只有進入了上面條件的才會滿足這個條件
// 意思是讓其它執行緒讓出CPU
if (newArray == null) // back off if another thread is allocating
Thread.yield();
// 再次加鎖
lock.lock();
// 判斷新陣列建立成功並且舊陣列沒有被替換過
if (newArray != null && queue == array) {
// 佇列賦值為新陣列
queue = newArray;
// 並拷貝舊陣列元素到新陣列中
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
- 解鎖,解除
offer()
方法中加的鎖; - 使用
allocationSpinLock
變數的CAS
操作來控制擴容的過程; - 舊容量小於
64
則翻倍,舊容量大於64
則增加一半; - 建立新陣列;
- 修改
allocationSpinLock
為0
,相當於解鎖; - 其它執行緒在擴容的過程中要讓出
CPU
; - 再次加鎖;
- 新陣列建立成功,把舊陣列元素拷貝過來,並返回到
offer()
方法中繼續新增元素操作;
出隊
阻塞佇列的出隊方法也有四個,我們這裡只分析一個take()
方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加鎖
lock.lockInterruptibly();
E result;
try {
// 佇列沒有元素,就阻塞在notEmpty條件上
// 出隊成功,就跳出這個迴圈
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
// 解鎖
lock.unlock();
}
// 返回出隊的元素
return result;
}
private E dequeue() {
// 元素個數減1
int n = size - 1;
if (n < 0)
// 陣列元素不足,返回null
return null;
else {
Object[] array = queue;
// 彈出堆頂元素
E result = (E) array[0];
// 把堆尾元素拿到堆頂
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
// 並做自上而下的堆化
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
// 修改size
size = n;
// 返回出隊的元素
return result;
}
}
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
// 只需要遍歷到葉子節點就夠了
while (k < half) {
// 左子節點
int child = (k << 1) + 1; // assume left child is least
// 左子節點的值
Object c = array[child];
// 右子節點
int right = child + 1;
// 取左右子節點中最小的值
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
// key如果比左右子節點都小,則堆化結束
if (key.compareTo((T) c) <= 0)
break;
// 否則,交換key與左右子節點中最小的節點的位置
array[k] = c;
k = child;
}
// 找到了放元素的位置,放置元素
array[k] = key;
}
}
出隊的過程與PriorityQueue
基本類似:
- 加鎖;
- 判斷是否出隊成功,未成功就阻塞在
notEmpty
條件上; - 出隊時彈出堆頂元素,並把堆尾元素拿到堆頂;
- 再做自上而下的堆化;
- 解鎖;
總結
-
PriorityBlockingQueue
整個入隊出隊的過程與PriorityQueue
基本是保持一致的; -
PriorityBlockingQueue
使用一個鎖+一個notEmpty
條件控制併發安全; -
PriorityBlockingQueue
擴容時使用一個單獨變數的CAS
操作來控制只有一個執行緒進行擴容; - 入隊使用自下而上的堆化;
- 出隊使用自上而下的堆化;
拓展
為什麼PriorityBlockingQueue
不需要notFull
條件?
因為PriorityBlockingQueue
在入隊的時候如果沒有空間了是會自動擴容的,也就不存在佇列滿了的狀態,也就是不需要等待通知佇列不滿了可以放元素了,所以也就不需要notFull
條件了。