1. 程式人生 > 其它 >Java併發容器之ConcurrentLinkedDeque原始碼分析

Java併發容器之ConcurrentLinkedDeque原始碼分析

一、簡介

由於LinkedBlockingDeque作為雙端佇列的實現,採用了單鎖的保守策略使其不利於多執行緒併發情況下的使用,故ConcurrentLinkedDeque應運而生,它是一種基於連結串列的無界的同時支援FIFOLIFO的非阻塞併發雙端佇列,當許多執行緒共享對公共集合的訪問時,ConcurrentLinkedDeque是一個合適的選擇,類比ConcurrentLinkedQueue是針對LinkedBlockingQueue對高併發情況的一種解決方案,ConcurrentLinkedDeque也是同樣的地位,都是採用 CAS來替代加鎖,甚至ConcurrentLinkedDeque

再實現上也與ConcurrentLinkedQueue有很多相似的地方,其中最值得提及的就是,它採用了與ConcurrentLinkedQueue一樣的鬆弛閥值設計(鬆弛閥值都是1),即headtail並不總是指向佇列的第一個、最後一個節點,而是保持head/tail距離第一個/最後一個節點的距離不超過1個節點的距離,從而減少了更新head/tail指標的CAS次數。Java Doc指出理解ConcurrentLinkedQueue的實現是理解該類實現的先決條件,所以最好先理解了ConcurrentLinkedQueue再來理解該類。

ConcurrentLinkedDeque另外還使用了兩種方法來減少volatile

寫的次數:一是使用單次CAS操作來一次性使多次連續的CAS生效;二是將對同一塊記憶體地址的volatile寫與普通寫混合。它的節點類與LinkedBlockingDeque的屬性一致都是資料itemprevnext,只是多了一些CAS操作方法。與ConcurrentLinkedQueue一樣,只有那些資料item不為空的節點才被認為是活動的節點,當將item置為null時,意味著從佇列中邏輯刪除掉了。

LinkedBlockingDeque一樣,任何時候,佇列的第一個節點"first"的前驅prevnull,佇列的最後一個節點"tail"的next後繼為null。“first”和“last”節點可能是活動的,也可能不是活動的。“first”和“last”節點總是相互可達的。通過將第一個或最後一個節點的空前驅或後繼CAS

引用到包含指定元素的新節點,實現原子性地新增一個新元素,從而元素的節點在那時原子性地變成“活動的”,如果一個節點是活動的(item不為null)或者它是first/last節點,我們都稱為是有效節點。ConcurrentLinkedDeque同樣採用了“自連結(p.prev = pp.next = p)”的方式使節點斷開與佇列的連結,有效活動節點不會有自連結的情況。

前面說了ConcurrentLinkedDeque有兩個不總是指向第一個/最後一個節點的headtail指標,所以它並沒有像LinkedBlockingDeque那樣設計firsttail屬性,但是firsttail總是可以通過headtailO(1)時間內找到。

ConcurrentLinkedDeque刪除節點分三個階段:

  1. logical deletion(邏輯刪除):通過CAS將資料item置為null,使該節點滿足解除連結(unlinking)的條件。
  2. unlinking(解除連結):該階段使佇列中的活動節點無法到達該節點,但是保留該節點到佇列中活動節點的連結,從而最終可由GC回收。此階段典型的就是被迭代器使用的時候,使迭代器可以繼續往下迭代。
  3. gc-unlinking:該階段進一步解除被刪除節點到佇列中活動節點的連結,使其更容易被GC回收,通過讓節點自連結或連結到終止節點(PREV_TERMINATORNEXT_TERMINATOR)來實現。這一步是為了使資料結構保持GC健壯性(gc-robust),消除使用保守式GCconservative GC,目前已經很少使用)對記憶體無限期滯留的風險,並提高了分代GC的效能。

由於刪除節點的第二、三階段都不是保證資料正確性必須的,僅僅是對迭代器與記憶體的優化,故適當的減少這些操作的次數對效能是一種提高。所以ConcurrentLinkedDeque不僅設計了同ConcurrentLinkedQueue一樣針對headtail節點的鬆弛閾值,而且還提供了針對解除刪除節點連結的閾值HOPS,也就是隻有當邏輯刪除的節點個數達到一定數量才會觸發unlinkinggc-unlinking,這樣也是對效能的一種優化。

ConcurrentLinkedQueue一樣,ConcurrentLinkedDeque也對headtail設定了如下的一些不變與可變性約束:

head/tail的不變性:

  1. 第一個節點總是可從head通過prev連結在O(1)時間複雜度內訪問到。
  2. 最後一個節點總是可以從tail通過next連結在O(1)時間複雜度內訪問到。
  3. 所有活動節點(item不為null)都可以從第一個節點通過succ()訪問。
  4. 所有活動節點(item不為null)都可以從最後一個節點通過pred()訪問。
  5. headtail都不會為null
  6. head節點的next不會指向自身形成自連線。
  7. head/tail不會是GC-unlinked節點(但它可能是unlink節點)。

head/tail的可變性:

  1. headtail的資料item可以為null,也可以不為null
  2. head可能無法從第一個或最後一個節點或從tail到達。
  3. tail可能無法從第一個或最後一個節點或從head到達。

下面開始分析ConcurrentLinkedDeque的原始碼,ConcurrentLinkedDequeConcurrentLinkedQueue並沒有繼承相應的BlockingQueue/BlockingQueue,容量又是無界的,所以不存在阻塞方法。

二、原始碼解析

2.1 屬性

/**
 * A node from which the first node on list (that is, the unique node p with p.prev == null && p.next != p) can be reached in O(1) time.
   可以在O(1)時間內從列表中的第一個節點到達的節點(即,具有p.prev == null && p.next!= p的唯一節點p)。
  
 * Invariants: 不變性
 * - the first node is always O(1) reachable from head via prev links 第一個節點總是可從head通過prev連結在O(1)時間內訪問到
 * - all live nodes are reachable from the first node via succ() 所有活動節點都可以從第一個節點通過succ()訪問
 * - head != null                                                  head不為空
 * - (tmp = head).next != tmp || tmp != head  
 * - head is never gc-unlinked (but may be unlinked) 。head永遠不會gc-unlinked(但可能是unlinked)
 
 * Non-invariants: 可變性
 * - head.item may or may not be null                                           head 的資料項可以為空
 * - head may not be reachable from the first or last node, or from tail 。 head 可能無法從第一個或最後一個節點或從tail到達。
 */
private transient volatile Node<E> head;

/**
 * A node from which the last node on list (that is, the unique node p
 * with p.next == null && p.prev != p) can be reached in O(1) time.
   可以在O(1)時間內從列表中的最後一個節點到達的節點(即具有p.next == null && p.prev!= p的唯一節點p)。

 * Invariants: 不變性
 * - the last node is always O(1) reachable from tail via next links。最後一個節點始終可以通過下一個連結從tail訪問在O(1)時間內訪問到
 * - all live nodes are reachable from the last node via pred() 。所有活動節點都可以從最後一個節點通過pred()訪問
 * - tail != null                                                            tail不為空
 * - tail is never gc-unlinked (but may be unlinked)                tail永遠不會gc-unlinked(但可能是unlinked)
 
 * Non-invariants: 可變性
 * - tail.item may or may not be null                                            tail的資料項可以為空
 * - tail may not be reachable from the first or last node, or from head      tail可能無法從第一個或最後一個節點或從head訪問到。
 */
private transient volatile Node<E> tail;

/**指示出隊節點的終結節點*/
private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR;

@SuppressWarnings("unchecked")
Node<E> prevTerminator() { //從對頭出隊節點的前向終結節點
    return (Node<E>) PREV_TERMINATOR;
}

@SuppressWarnings("unchecked")
Node<E> nextTerminator() { //從對尾出隊節點的後繼終結節點
    return (Node<E>) NEXT_TERMINATOR;
}

static final class Node<E> {
    volatile Node<E> prev;
    volatile E item;
    volatile Node<E> next;

    Node() {  // default constructor for NEXT_TERMINATOR, PREV_TERMINATOR
    }

    /**
     * Constructs a new node.  Uses relaxed write because item can
     * only be seen after publication via casNext or casPrev.
     */
    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    void lazySetPrev(Node<E> val) {
        UNSAFE.putOrderedObject(this, prevOffset, val);
    }

    boolean casPrev(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, prevOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long prevOffset;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            prevOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("prev"));
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

//針對被刪除節點進行unlinking/GC-unlinking的閾值
private static final int HOPS = 2;

private boolean casHead(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}

private boolean casTail(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
private static final long tailOffset;
static {
    PREV_TERMINATOR = new Node<Object>();
    PREV_TERMINATOR.next = PREV_TERMINATOR;
    NEXT_TERMINATOR = new Node<Object>();
    NEXT_TERMINATOR.prev = NEXT_TERMINATOR;
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = ConcurrentLinkedDeque.class;
        headOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("head"));
        tailOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("tail"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

2.2 構造方法

/**
 * Constructs an empty deque. 預設構造方法,head、tail都指向同一個item為null的節點
 */
public ConcurrentLinkedDeque() {
    head = tail = new Node<E>(null);
}

/**
 * Constructs a deque initially containing the elements of
 * the given collection, added in traversal order of the
 * collection's iterator.
 *
 * @param c the collection of elements to initially contain
 * @throws NullPointerException if the specified collection or any
 *         of its elements are null
 */
public ConcurrentLinkedDeque(Collection<? extends E> c) {
    // Copy c into a private chain of Nodes
    Node<E> h = null, t = null;
    for (E e : c) {
        checkNotNull(e);
        Node<E> newNode = new Node<E>(e);
        if (h == null)
            h = t = newNode;
        else {
            t.lazySetNext(newNode);
            newNode.lazySetPrev(t);
            t = newNode;
        }
    }
    initHeadTail(h, t);
}

/**
 * Initializes head and tail, ensuring invariants hold.
 * 初始化head和tail,確保它們的不變性
 */
private void initHeadTail(Node<E> h, Node<E> t) {
    if (h == t) { //佇列為空,或者只有一個元素
        if (h == null)
            h = t = new Node<E>(null);//佇列為空,head、tail都指向同一個item為null的節點
        else { 
            // 只有一個元素,重新構造一個節點指向tail,避免head、tail都指向同一個非null節點
            // Avoid edge case of a single Node with non-null item.
            Node<E> newNode = new Node<E>(null);
            t.lazySetNext(newNode);
            newNode.lazySetPrev(t);
            t = newNode;
        }
    }
    head = h;
    tail = t;
}

節點內部類和LinkedBlockingDeque一樣都是prevtailitem,空佇列情況下,headtail都指向一個itemnull的節點。PREV_TERMINATORNEXT_TERMINATOR分別是從對頭/隊尾出隊節點的前向/後繼終止節點。ConcurrentLinkedDeque是無界的。

2.3 入隊實現

2.3.1 頭部入隊

/**
 * Links e as first element. 在頭節點入隊
 */
private void linkFirst(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    restartFromHead:
    for (;;)
        //從head節點往前(左)尋找first節點
        for (Node<E> h = head, p = h, q;;) {
            if ((q = p.prev) != null &&     //前驅不為null
                (q = (p = q).prev) != null) //前驅的前驅也不為null(有執行緒剛剛從對頭入隊了一個節點,還沒來得及修改head)
                // Check for head updates every other hop.
                // If p == q, we are sure to follow head instead.
                p = (h != (h = head)) ? h : q; //head被更新了就重新取head,否則取前驅的前驅
            else if (p.next == p) // PREV_TERMINATOR   p是第一個節點,但是是自連結,表示出隊了,重新開始
                continue restartFromHead;
            else {
                // p是第一個節點
                newNode.lazySetNext(p); // p成為新節點的後繼節點
                if (p.casPrev(null, newNode)) { //新節點成為p的前驅節點
                    //成功將e入隊
                    if (p != h) // 鬆弛閥值超過1,更新head
                        casHead(h, newNode);  // Failure is OK.
                    return;
                }
                // 失敗,可能被其它執行緒搶先入隊,重新找前驅
            }
        }
}

LinkedBlockingDeque一樣,linkFirst是從對頭入隊新節點的具體邏輯實現(被其它入隊方法呼叫),看起來很簡單:從head節點往對頭尋找第一個節點p(不論item是不是null),找到之後將新節點連結到它的前驅,同時當head的鬆弛閾值超過1時更新headlinkFirst分別被offerFirstaddFirstpush方法直接或間接呼叫。

2.3.2 尾部入隊

隊尾入隊的邏輯基本上和linkFirst一樣,不同的是它是從tail節點往後尋找最後一個節點,把新節點連結到它的後繼,同時維護tail的鬆弛閾值。linkLast分別被offerLastaddLastaddoffer方法直接或間接呼叫。

入隊的邏輯流程圖如下(ABC分別從隊尾入隊,DE從對頭入隊);

2.4 出隊

這裡以pollFirst出隊方法為例,其他方法邏輯都一樣,先通過first()拿到佇列頭部的第一個節點,如果是活動節點(item不為null),則直接將item置為null,即完成了刪除節點的第一步邏輯刪除,然後執行unlink方法執行刪除節點的第二unlinking、第三步GC-unlinkingunlink方法針對節點在不同的位置按不同的邏輯處理,①如果出隊的節點是佇列的第一個節點,則執行unlinkFirst;②如果是佇列的最後一個節點,則執行unlinkLast,③否則表示是內部節點,執行unlink本身的通用節點邏輯。

unlinkFirst的邏輯其實就分兩個部分:①實現從被移除節點p開始往後(隊尾)找到第一個有效節點,直到找到或者到達佇列的最後一個節點為止,並把p的直接後繼指向該有效節點(如果本身不是其後繼節點的話),其中的skipDeletedPredecessors方法實現將剛剛找到的後繼節點的前驅也指向節點p,即完成它們的互聯,這一步就是所謂的unlinking,使佇列的活動節點無法訪問被刪除的節點;②第二部分就是實現GC-unlinking了,通過updateHeadupdateTail使被刪除的節點無法從head/tail可達,最後讓被刪除節點後繼自連線,前驅指向前向終結節點。

如果是內部節點出隊,執行unlink本身:先找到被刪除節點x的有效前驅和後繼節點,並記錄它們中間的已經被邏輯刪除的節點個數,如果已經積累了超過閾值的節點個數,或者是內部節點刪除,我們需要進一步處理unlink/gc-unlink,①首先使被刪除節點的有效前驅節點和後繼節點互聯,就相當於導致活動節點不會訪問到中間已經被邏輯刪除的節點(unlinking);②若第①步導致重新連結到了對頭或隊尾,則通過updateHeadupdateTail使被刪除的節點無法從head/tail可達,最後讓被刪除節點自連線或者執行終結節點(GC-unlinking)。

如果是隊尾節點出隊則由unlinkLastunlinkLast的原始碼其實與unlinkFirst基本一致,只不過是從被刪除節點p往前尋找一個有效節點,並把p的直接前驅節點指向該有效節點(如果本身不是其前驅節點的話),其中skipDeletedSuccessors則讓剛剛找到的前驅節點的後繼也指向節點p,即完成它們的互聯,這一步就是所謂的unlinking,使佇列的活動節點無法訪問被刪除的節點;②第二部分就是實現GC-unlinking了,通過updateHeadupdateTail使被刪除的節點無法從head/tail可達,最後讓被刪除節點前驅自連線,後繼指向後繼終結節點。unlinkLast的原始碼就不貼了。

可以看見,ConcurrentLinkedDeque在實現的時候,其實對頭隊尾相關的方法都是對稱的,所以理解了一端的方法,另一端的方法就是對稱的。

出隊的方法主要就是unlink + unlinkFirst + unlinkLast實現,它被ConcurrentLinkedDeque的其他方法呼叫,例如:pollFirstremoveFirstremove(包括迭代器)clearpollpollLastremoveLastremoveFirstOccurrence(Object o)removeLastOccurrence(Object o)等大量方法直接或間接呼叫。

2.5 其它方法

peekFirst/peekLast方法從對頭/隊尾開始找第一個活動節點(item不為空),找到一個立即返回item資料,否則直到到達佇列的另一端都沒找到返回null。這兩個方法分別還會被peek/getFirst/isEmpty/getLast方法呼叫。例如isEmpty方法呼叫peekFirst只要返回不為null就表示佇列非空,

size(),返回當前時刻佇列中item不為空的節點個數,但如果超過Integer.MAX_VALUE,則就返回Integer.MAX_VALUE

addAll(Collection c), 將指定的集合組成一個臨時雙端佇列,然後把該臨時佇列拼接到當前ConcurrentLinkedDeque佇列的隊尾。指定的引數集合不能是ConcurrentLinkedDeque本身,不然將丟擲IllegalArgumentException異常。

toArray/toArray(T[] a),從隊頭開始依次將item不為空的節點資料新增到一個ArrayList集合中,最後再通過toArray方法將其轉換成陣列,注意該方法並不會將資料從佇列中移除,僅僅是拷貝item的引用,所以返回的陣列可以任意操作而不會對佇列本身造成任何影響。

2.6 迭代器

ConcurrentLinkedDeque的迭代器實現思想與LinkedBlockingDeque一致,也支援正向和逆向的兩種迭代器,分別是方法iteratordescendingIterator

//按正確的順序返回deque中元素的迭代器。元素將按從第一個(head)到最後一個(tail)的順序返回。
//返回的迭代器是弱一致的。
public Iterator<E> iterator() {
    return new Itr();
}
     
//以相反的順序返回deque中元素的迭代器。元素將按從最後(tail)到第一個(head)的順序返回。
//返回的迭代器是弱一致的。
public Iterator<E> descendingIterator() {
    return new DescendingItr();
}

它們的邏輯主要是由一個內部抽象類AbstractItr來實現,而iteratordescendingIterator僅僅實現了AbstractItr的抽象方法,用來指示迭代器的開始位置和迭代方向,為了保證迭代器的弱一致性,迭代器在建立例項的時候就已經拿到了第一個節點next和其節點資料,為了實現迭代器的remove方法,迭代器還保留了迭代的上一個節點lastRet,用於獲取迭代器的下一個節點的主要邏輯由advance方法實現:

可見迭代器會排除那些被移除的無效節點,迭代器在使用Itr.remove()刪除節點的時候實際上呼叫了ConcurrentLinkedDequeunlink方法,該方法上面已經解析過了,其它方法都很簡單就不一一列舉了。

2.6.1 可拆分迭代器Spliterator

ConcurrentLinkedDeque的可拆分迭代器由內部類CLDSpliterator實現,它不像普通迭代器那樣可以支援正向和反向迭代,可拆分迭代器僅支援正向的拆分迭代:

public Spliterator<E> spliterator() {
      return new CLDSpliterator<E>(this);
}

ConcurrentLinkedDeque的可拆分迭代器實現基本上和LinkedBlockingDeque一樣,不過它不是使用鎖而是CAS實現,可拆分迭代器會對節點的資料item進行null值判斷,只對item不為空的資料做處理,tryAdvance從對頭開始查詢獲取佇列中第一個item不為空的資料節點的資料做指定的操作,forEachRemaining從隊頭開始迴圈遍歷當前佇列中item不為空的資料節點的資料做指定的操作原始碼都很簡單,就不貼程式碼了,至於它的拆分方法trySplit,其實和ConcurrentLinkedQueue/LinkedBlockingDeque拆分方式是一樣的,程式碼都幾乎一致,它不是像ArrayBlockingQueue那樣每次分一半,而是第一次只拆一個元素,第二次拆2個,第三次拆三個,依次內推,拆分的次數越多,拆分出的新迭代器分的得元素越多,直到一個很大的數MAX_BATCH33554432) ,後面的迭代器每次都分到這麼多的元素,拆分的實現邏輯很簡單,每一次拆分結束都記錄下拆分到哪個元素,下一次拆分從上次結束的位置繼續往下拆分,直到沒有元素可拆分了返回null

三、總結

ConcurrentLinkedDeque是雙端佇列家族中對LinkedBlockingDeque的一種高併發優化,因為LinkedBlockingDeque採用的是保守的單鎖實現,在多執行緒高併發下效率極其低下,所以ConcurrentLinkedDeque採用了CAS的方法來處理所以的競爭問題,保留了雙端佇列的所有特性,可以從對頭、對尾兩端插入和移除元素,它的內部實現非常精妙,既採用了ConcurrentLinkedQueue實現中用到過鬆弛閾值處理(即並不每一次都更新head/tail指標),又獨特的針對佇列中被邏輯刪除節點的進行了淤積閥值合併處理和分三個階段的節點刪除步驟,同時還針對多次volatile寫、普通寫,多次連續的CAS操作單次生效等一系列的措施減少volatile寫和CAS的次數,提高ConcurrentLinkedDeque的執行效率。當許多執行緒共享對公共集合(雙端佇列)的訪問時,ConcurrentLinkedDeque是一個合適的選擇,如果不需要用到雙端佇列的特性,完全可以使用ConcurrentLinkedQueue來完成高併發對公共集合的高效使用。注意ConcurrentLinkedDequeConcurrentLinkedQueue都沒有繼承BlockingDequeBlockingQueue,所以它們沒有阻塞等待的相關方法。