Java併發容器之ConcurrentLinkedDeque原始碼分析
一、簡介
由於LinkedBlockingDeque
作為雙端佇列的實現,採用了單鎖的保守策略使其不利於多執行緒併發情況下的使用,故ConcurrentLinkedDeque
應運而生,它是一種基於連結串列的無界的同時支援FIFO
、LIFO
的非阻塞併發雙端佇列,當許多執行緒共享對公共集合的訪問時,ConcurrentLinkedDeque
是一個合適的選擇,類比ConcurrentLinkedQueue
是針對LinkedBlockingQueue
對高併發情況的一種解決方案,ConcurrentLinkedDeque
也是同樣的地位,都是採用 CAS
來替代加鎖,甚至ConcurrentLinkedDeque
ConcurrentLinkedQueue
有很多相似的地方,其中最值得提及的就是,它採用了與ConcurrentLinkedQueue
一樣的鬆弛閥值設計(鬆弛閥值都是1
),即head
、tail
並不總是指向佇列的第一個、最後一個節點,而是保持head/tail
距離第一個/最後一個節點
的距離不超過1
個節點的距離,從而減少了更新head/tail
指標的CAS
次數。Java Doc
指出理解ConcurrentLinkedQueue
的實現是理解該類實現的先決條件,所以最好先理解了ConcurrentLinkedQueue
再來理解該類。
ConcurrentLinkedDeque
另外還使用了兩種方法來減少volatile
CAS
操作來一次性使多次連續的CAS
生效;二是將對同一塊記憶體地址的volatile
寫與普通寫混合。它的節點類與LinkedBlockingDeque
的屬性一致都是資料item
、prev
、next
,只是多了一些CAS
操作方法。與ConcurrentLinkedQueue
一樣,只有那些資料item
不為空的節點才被認為是活動的節點,當將item
置為null
時,意味著從佇列中邏輯刪除掉了。
與LinkedBlockingDeque
一樣,任何時候,佇列的第一個節點"first"的前驅prev
為null
,佇列的最後一個節點"tail"的next
後繼為null
。“first”和“last”節點可能是活動的,也可能不是活動的。“first”和“last”節點總是相互可達的。通過將第一個或最後一個節點的空前驅或後繼CAS
item
不為null
)或者它是first/last
節點,我們都稱為是有效節點。ConcurrentLinkedDeque
同樣採用了“自連結(p.prev = p
或p.next = p
)”的方式使節點斷開與佇列的連結,有效活動節點不會有自連結的情況。
前面說了ConcurrentLinkedDeque
有兩個不總是指向第一個/最後一個節點的head
、tail
指標,所以它並沒有像LinkedBlockingDeque
那樣設計first
、tail
屬性,但是first
、tail
總是可以通過head
、tail
在O(1)
時間內找到。
ConcurrentLinkedDeque
刪除節點分三個階段:
-
logical deletion
(邏輯刪除):通過CAS
將資料item
置為null
,使該節點滿足解除連結(unlinking
)的條件。 -
unlinking
(解除連結):該階段使佇列中的活動節點無法到達該節點,但是保留該節點到佇列中活動節點的連結,從而最終可由GC
回收。此階段典型的就是被迭代器使用的時候,使迭代器可以繼續往下迭代。 -
gc-unlinking
:該階段進一步解除被刪除節點到佇列中活動節點的連結,使其更容易被GC回收,通過讓節點自連結或連結到終止節點(PREV_TERMINATOR
或NEXT_TERMINATOR
)來實現。這一步是為了使資料結構保持GC
健壯性(gc-robust
),消除使用保守式GC
(conservative GC
,目前已經很少使用)對記憶體無限期滯留的風險,並提高了分代GC
的效能。
由於刪除節點的第二、三階段都不是保證資料正確性必須的,僅僅是對迭代器與記憶體的優化,故適當的減少這些操作的次數對效能是一種提高。所以ConcurrentLinkedDeque
不僅設計了同ConcurrentLinkedQueue
一樣針對head
、tail
節點的鬆弛閾值,而且還提供了針對解除刪除節點連結的閾值HOPS
,也就是隻有當邏輯刪除的節點個數達到一定數量才會觸發unlinking
和gc-unlinking
,這樣也是對效能的一種優化。
同ConcurrentLinkedQueue
一樣,ConcurrentLinkedDeque
也對head
、tail
設定了如下的一些不變與可變性約束:
head/tail
的不變性:
- 第一個節點總是可從
head
通過prev
連結在O(1)
時間複雜度內訪問到。 - 最後一個節點總是可以從
tail
通過next
連結在O(1)
時間複雜度內訪問到。 - 所有活動節點
(item
不為null)
都可以從第一個節點通過succ()
訪問。 - 所有活動節點
(item
不為null)
都可以從最後一個節點通過pred()
訪問。 -
head
和tail
都不會為null
。 -
head
節點的next
不會指向自身形成自連線。 -
head/tail
不會是GC-unlinked
節點(但它可能是unlink
節點)。
head/tail
的可變性:
-
head
、tail
的資料item
可以為null
,也可以不為null
。 -
head
可能無法從第一個或最後一個節點或從tail
到達。 -
tail
可能無法從第一個或最後一個節點或從head
到達。
下面開始分析ConcurrentLinkedDeque
的原始碼,ConcurrentLinkedDeque
和ConcurrentLinkedQueue
並沒有繼承相應的BlockingQueue/BlockingQueue
,容量又是無界的,所以不存在阻塞方法。
二、原始碼解析
2.1 屬性
/**
* A node from which the first node on list (that is, the unique node p with p.prev == null && p.next != p) can be reached in O(1) time.
可以在O(1)時間內從列表中的第一個節點到達的節點(即,具有p.prev == null && p.next!= p的唯一節點p)。
* Invariants: 不變性
* - the first node is always O(1) reachable from head via prev links 第一個節點總是可從head通過prev連結在O(1)時間內訪問到
* - all live nodes are reachable from the first node via succ() 所有活動節點都可以從第一個節點通過succ()訪問
* - head != null head不為空
* - (tmp = head).next != tmp || tmp != head
* - head is never gc-unlinked (but may be unlinked) 。head永遠不會gc-unlinked(但可能是unlinked)
* Non-invariants: 可變性
* - head.item may or may not be null head 的資料項可以為空
* - head may not be reachable from the first or last node, or from tail 。 head 可能無法從第一個或最後一個節點或從tail到達。
*/
private transient volatile Node<E> head;
/**
* A node from which the last node on list (that is, the unique node p
* with p.next == null && p.prev != p) can be reached in O(1) time.
可以在O(1)時間內從列表中的最後一個節點到達的節點(即具有p.next == null && p.prev!= p的唯一節點p)。
* Invariants: 不變性
* - the last node is always O(1) reachable from tail via next links。最後一個節點始終可以通過下一個連結從tail訪問在O(1)時間內訪問到
* - all live nodes are reachable from the last node via pred() 。所有活動節點都可以從最後一個節點通過pred()訪問
* - tail != null tail不為空
* - tail is never gc-unlinked (but may be unlinked) tail永遠不會gc-unlinked(但可能是unlinked)
* Non-invariants: 可變性
* - tail.item may or may not be null tail的資料項可以為空
* - tail may not be reachable from the first or last node, or from head tail可能無法從第一個或最後一個節點或從head訪問到。
*/
private transient volatile Node<E> tail;
/**指示出隊節點的終結節點*/
private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR;
@SuppressWarnings("unchecked")
Node<E> prevTerminator() { //從對頭出隊節點的前向終結節點
return (Node<E>) PREV_TERMINATOR;
}
@SuppressWarnings("unchecked")
Node<E> nextTerminator() { //從對尾出隊節點的後繼終結節點
return (Node<E>) NEXT_TERMINATOR;
}
static final class Node<E> {
volatile Node<E> prev;
volatile E item;
volatile Node<E> next;
Node() { // default constructor for NEXT_TERMINATOR, PREV_TERMINATOR
}
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext or casPrev.
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
void lazySetPrev(Node<E> val) {
UNSAFE.putOrderedObject(this, prevOffset, val);
}
boolean casPrev(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, prevOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long prevOffset;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
prevOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("prev"));
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
//針對被刪除節點進行unlinking/GC-unlinking的閾值
private static final int HOPS = 2;
private boolean casHead(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}
private boolean casTail(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
private static final long tailOffset;
static {
PREV_TERMINATOR = new Node<Object>();
PREV_TERMINATOR.next = PREV_TERMINATOR;
NEXT_TERMINATOR = new Node<Object>();
NEXT_TERMINATOR.prev = NEXT_TERMINATOR;
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentLinkedDeque.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("tail"));
} catch (Exception e) {
throw new Error(e);
}
}
2.2 構造方法
/**
* Constructs an empty deque. 預設構造方法,head、tail都指向同一個item為null的節點
*/
public ConcurrentLinkedDeque() {
head = tail = new Node<E>(null);
}
/**
* Constructs a deque initially containing the elements of
* the given collection, added in traversal order of the
* collection's iterator.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public ConcurrentLinkedDeque(Collection<? extends E> c) {
// Copy c into a private chain of Nodes
Node<E> h = null, t = null;
for (E e : c) {
checkNotNull(e);
Node<E> newNode = new Node<E>(e);
if (h == null)
h = t = newNode;
else {
t.lazySetNext(newNode);
newNode.lazySetPrev(t);
t = newNode;
}
}
initHeadTail(h, t);
}
/**
* Initializes head and tail, ensuring invariants hold.
* 初始化head和tail,確保它們的不變性
*/
private void initHeadTail(Node<E> h, Node<E> t) {
if (h == t) { //佇列為空,或者只有一個元素
if (h == null)
h = t = new Node<E>(null);//佇列為空,head、tail都指向同一個item為null的節點
else {
// 只有一個元素,重新構造一個節點指向tail,避免head、tail都指向同一個非null節點
// Avoid edge case of a single Node with non-null item.
Node<E> newNode = new Node<E>(null);
t.lazySetNext(newNode);
newNode.lazySetPrev(t);
t = newNode;
}
}
head = h;
tail = t;
}
節點內部類和LinkedBlockingDeque
一樣都是prev
、tail
、item
,空佇列情況下,head
、tail
都指向一個item
為null
的節點。PREV_TERMINATOR
、NEXT_TERMINATOR
分別是從對頭/隊尾出隊節點的前向/後繼終止節點。ConcurrentLinkedDeque
是無界的。
2.3 入隊實現
2.3.1 頭部入隊
/**
* Links e as first element. 在頭節點入隊
*/
private void linkFirst(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
restartFromHead:
for (;;)
//從head節點往前(左)尋找first節點
for (Node<E> h = head, p = h, q;;) {
if ((q = p.prev) != null && //前驅不為null
(q = (p = q).prev) != null) //前驅的前驅也不為null(有執行緒剛剛從對頭入隊了一個節點,還沒來得及修改head)
// Check for head updates every other hop.
// If p == q, we are sure to follow head instead.
p = (h != (h = head)) ? h : q; //head被更新了就重新取head,否則取前驅的前驅
else if (p.next == p) // PREV_TERMINATOR p是第一個節點,但是是自連結,表示出隊了,重新開始
continue restartFromHead;
else {
// p是第一個節點
newNode.lazySetNext(p); // p成為新節點的後繼節點
if (p.casPrev(null, newNode)) { //新節點成為p的前驅節點
//成功將e入隊
if (p != h) // 鬆弛閥值超過1,更新head
casHead(h, newNode); // Failure is OK.
return;
}
// 失敗,可能被其它執行緒搶先入隊,重新找前驅
}
}
}
同LinkedBlockingDeque
一樣,linkFirst
是從對頭入隊新節點的具體邏輯實現(被其它入隊方法呼叫),看起來很簡單:從head
節點往對頭尋找第一個節點p
(不論item
是不是null
),找到之後將新節點連結到它的前驅,同時當head
的鬆弛閾值超過1
時更新head
。linkFirst
分別被offerFirst
、addFirst
、push
方法直接或間接呼叫。
2.3.2 尾部入隊
隊尾入隊的邏輯基本上和linkFirst
一樣,不同的是它是從tail
節點往後尋找最後一個節點,把新節點連結到它的後繼,同時維護tail
的鬆弛閾值。linkLast
分別被offerLast
、addLast
、add
、offer
方法直接或間接呼叫。
入隊的邏輯流程圖如下(ABC
分別從隊尾入隊,DE
從對頭入隊);
2.4 出隊
這裡以pollFirst
出隊方法為例,其他方法邏輯都一樣,先通過first()
拿到佇列頭部的第一個節點,如果是活動節點(item
不為null
),則直接將item
置為null
,即完成了刪除節點的第一步邏輯刪除,然後執行unlink
方法執行刪除節點的第二unlinking
、第三步GC-unlinking
,unlink
方法針對節點在不同的位置按不同的邏輯處理,①如果出隊的節點是佇列的第一個節點,則執行unlinkFirst
;②如果是佇列的最後一個節點,則執行unlinkLast
,③否則表示是內部節點,執行unlink
本身的通用節點邏輯。
unlinkFirst
的邏輯其實就分兩個部分:①實現從被移除節點p
開始往後(隊尾)找到第一個有效節點,直到找到或者到達佇列的最後一個節點為止,並把p
的直接後繼指向該有效節點(如果本身不是其後繼節點的話),其中的skipDeletedPredecessors
方法實現將剛剛找到的後繼節點的前驅也指向節點p
,即完成它們的互聯,這一步就是所謂的unlinking
,使佇列的活動節點無法訪問被刪除的節點;②第二部分就是實現GC-unlinking
了,通過updateHead
、updateTail
使被刪除的節點無法從head/tail
可達,最後讓被刪除節點後繼自連線,前驅指向前向終結節點。
如果是內部節點出隊,執行unlink
本身:先找到被刪除節點x
的有效前驅和後繼節點,並記錄它們中間的已經被邏輯刪除的節點個數,如果已經積累了超過閾值的節點個數,或者是內部節點刪除,我們需要進一步處理unlink/gc-unlink
,①首先使被刪除節點的有效前驅節點和後繼節點互聯,就相當於導致活動節點不會訪問到中間已經被邏輯刪除的節點(unlinking
);②若第①步導致重新連結到了對頭或隊尾,則通過updateHead
、updateTail
使被刪除的節點無法從head/tail
可達,最後讓被刪除節點自連線或者執行終結節點(GC-unlinking
)。
如果是隊尾節點出隊則由unlinkLast
,unlinkLast
的原始碼其實與unlinkFirst
基本一致,只不過是從被刪除節點p
往前尋找一個有效節點,並把p
的直接前驅節點指向該有效節點(如果本身不是其前驅節點的話),其中skipDeletedSuccessors
則讓剛剛找到的前驅節點的後繼也指向節點p
,即完成它們的互聯,這一步就是所謂的unlinking
,使佇列的活動節點無法訪問被刪除的節點;②第二部分就是實現GC-unlinking
了,通過updateHead
、updateTail
使被刪除的節點無法從head/tail
可達,最後讓被刪除節點前驅自連線,後繼指向後繼終結節點。unlinkLast
的原始碼就不貼了。
可以看見,ConcurrentLinkedDeque
在實現的時候,其實對頭隊尾相關的方法都是對稱的,所以理解了一端的方法,另一端的方法就是對稱的。
出隊的方法主要就是unlink + unlinkFirst + unlinkLast
實現,它被ConcurrentLinkedDeque
的其他方法呼叫,例如:pollFirst
、removeFirst
、remove(
包括迭代器)
、clear
、poll
、pollLast
、removeLast
、removeFirstOccurrence(Object o)
、removeLastOccurrence(Object o)
等大量方法直接或間接呼叫。
2.5 其它方法
peekFirst/peekLast
方法從對頭/隊尾開始找第一個活動節點(item
不為空),找到一個立即返回item
資料,否則直到到達佇列的另一端都沒找到返回null
。這兩個方法分別還會被peek/getFirst/isEmpty/getLast
方法呼叫。例如isEmpty
方法呼叫peekFirst
只要返回不為null
就表示佇列非空,
size()
,返回當前時刻佇列中item
不為空的節點個數,但如果超過Integer.MAX_VALUE
,則就返回Integer.MAX_VALUE
。
addAll(Collection c)
, 將指定的集合組成一個臨時雙端佇列,然後把該臨時佇列拼接到當前ConcurrentLinkedDeque
佇列的隊尾。指定的引數集合不能是ConcurrentLinkedDeque
本身,不然將丟擲IllegalArgumentException
異常。
toArray/toArray(T[] a)
,從隊頭開始依次將item
不為空的節點資料新增到一個ArrayList
集合中,最後再通過toArray
方法將其轉換成陣列,注意該方法並不會將資料從佇列中移除,僅僅是拷貝item
的引用,所以返回的陣列可以任意操作而不會對佇列本身造成任何影響。
2.6 迭代器
ConcurrentLinkedDeque
的迭代器實現思想與LinkedBlockingDeque
一致,也支援正向和逆向的兩種迭代器,分別是方法iterator
、descendingIterator
:
//按正確的順序返回deque中元素的迭代器。元素將按從第一個(head)到最後一個(tail)的順序返回。
//返回的迭代器是弱一致的。
public Iterator<E> iterator() {
return new Itr();
}
//以相反的順序返回deque中元素的迭代器。元素將按從最後(tail)到第一個(head)的順序返回。
//返回的迭代器是弱一致的。
public Iterator<E> descendingIterator() {
return new DescendingItr();
}
它們的邏輯主要是由一個內部抽象類AbstractItr
來實現,而iterator
和descendingIterator
僅僅實現了AbstractItr
的抽象方法,用來指示迭代器的開始位置和迭代方向,為了保證迭代器的弱一致性,迭代器在建立例項的時候就已經拿到了第一個節點next
和其節點資料,為了實現迭代器的remove
方法,迭代器還保留了迭代的上一個節點lastRet
,用於獲取迭代器的下一個節點的主要邏輯由advance
方法實現:
可見迭代器會排除那些被移除的無效節點,迭代器在使用Itr.remove()
刪除節點的時候實際上呼叫了ConcurrentLinkedDeque
的unlink
方法,該方法上面已經解析過了,其它方法都很簡單就不一一列舉了。
2.6.1 可拆分迭代器Spliterator
ConcurrentLinkedDeque
的可拆分迭代器由內部類CLDSpliterator
實現,它不像普通迭代器那樣可以支援正向和反向迭代,可拆分迭代器僅支援正向的拆分迭代:
public Spliterator<E> spliterator() {
return new CLDSpliterator<E>(this);
}
ConcurrentLinkedDeque
的可拆分迭代器實現基本上和LinkedBlockingDeque
一樣,不過它不是使用鎖而是CAS
實現,可拆分迭代器會對節點的資料item
進行null
值判斷,只對item
不為空的資料做處理,tryAdvance
從對頭開始查詢獲取佇列中第一個item
不為空的資料節點的資料做指定的操作,forEachRemaining
從隊頭開始迴圈遍歷當前佇列中item
不為空的資料節點的資料做指定的操作原始碼都很簡單,就不貼程式碼了,至於它的拆分方法trySplit
,其實和ConcurrentLinkedQueue/LinkedBlockingDeque
拆分方式是一樣的,程式碼都幾乎一致,它不是像ArrayBlockingQueue
那樣每次分一半,而是第一次只拆一個元素,第二次拆2
個,第三次拆三個,依次內推,拆分的次數越多,拆分出的新迭代器分的得元素越多,直到一個很大的數MAX_BATCH
(33554432
) ,後面的迭代器每次都分到這麼多的元素,拆分的實現邏輯很簡單,每一次拆分結束都記錄下拆分到哪個元素,下一次拆分從上次結束的位置繼續往下拆分,直到沒有元素可拆分了返回null
。
三、總結
ConcurrentLinkedDeque
是雙端佇列家族中對LinkedBlockingDeque
的一種高併發優化,因為LinkedBlockingDeque
採用的是保守的單鎖實現,在多執行緒高併發下效率極其低下,所以ConcurrentLinkedDeque
採用了CAS
的方法來處理所以的競爭問題,保留了雙端佇列的所有特性,可以從對頭、對尾兩端插入和移除元素,它的內部實現非常精妙,既採用了ConcurrentLinkedQueue
實現中用到過鬆弛閾值處理(即並不每一次都更新head/tail
指標),又獨特的針對佇列中被邏輯刪除節點的進行了淤積閥值合併處理和分三個階段的節點刪除步驟,同時還針對多次volatile
寫、普通寫,多次連續的CAS
操作單次生效等一系列的措施減少volatile
寫和CAS
的次數,提高ConcurrentLinkedDeque
的執行效率。當許多執行緒共享對公共集合(雙端佇列)的訪問時,ConcurrentLinkedDeque
是一個合適的選擇,如果不需要用到雙端佇列的特性,完全可以使用ConcurrentLinkedQueue
來完成高併發對公共集合的高效使用。注意ConcurrentLinkedDeque
,ConcurrentLinkedQueue
都沒有繼承BlockingDeque
、BlockingQueue
,所以它們沒有阻塞等待的相關方法。