1. 程式人生 > >JAVA併發容器:ConcurrentSkipListMap

JAVA併發容器:ConcurrentSkipListMap

生活

目標定下來以後就不要去變,只要確定是對的,總可以到達。

二分查詢

二分查詢要求有序性,為了保障可以隨機訪問,因此會把資料儲存在連續的記憶體中,在查詢的時候效率高,但是在增加和刪除時需要大量移動元素以保證有序,所以效率不高。
如果需要快速的二分查詢,又要兼顧刪除增加元素的效率,可以考慮使用二叉查詢樹,但是二叉樹在極端情況下會變成一個連結串列,使原本O(log n)的時間複雜度,變成O(n)。
於是就出現了平衡二叉樹,例如AVL樹,紅黑樹,但是平衡二叉樹比較難理解,尤其是紅黑樹的左旋右旋刪除操作。
於是乎出現了跳躍表結構。
今天就來看看這個跳躍表是個啥?

什麼是跳躍表?

先簡單的通過圖示來看下,什麼是跳躍表?
傳遞的連結串列都是單鏈表結構,要向一個單鏈表中增加刪除查詢修改一個節點的時間複雜度都是O(n),
跳躍表其實也是連結串列,只是在連結串列的基礎上加上了一系列index,使之高效。
在這裡插入圖片描述
如上圖所示就是一個跳躍表,每個節點都可以存在多個指向其他節點的索引。他可以先通過最上面的索引來查詢資料,過濾掉一半的節點,他的查詢效率是O(n/2)。
舉個例子 查詢25,
先比較6,在比較9、17、21、26 ,然後可知資料再21和26之間,隨之就找到了25.

跳躍表如何查詢?

ok,具體的查詢,下面也來看下圖示。
在這裡插入圖片描述

這是查詢19的圖示。
每一個節點都不止包含指向下一個節點的指標,也可以包含多個指向其他節點的指標,這樣就可以跳過一些沒有必要的結果,從而提高查詢的效率。
至於每個節點包含多少點後繼節點個數,其實是通過隨機生成的,從而形成了跳躍表。

因為是隨機的,所以跳躍表是一種概率均衡而不是強制均衡。
在Redis/leveldb有用到。

ConcurrentSkipListMap 資料結構

下面來看下jdk1.8裡的跳躍表:ConcurrentSkipListMap
這個玩意 1.7有所不同,但是基本實現是一致的,程式碼裡一些細節稍稍不一樣。
先來看下他的資料結構,

//節點物件
static final class Node<K,V> {

        final K key;
        volatile Object value;
        //下一個節點
        volatile Node<K,V> next;
        }
//索引物件
static class Index<K,V> {
//節點
        final Node<K,V> node;
        //指向該節點下一個層級的索引
        final Index<K,V> down;
        //指向右邊的索引,即一個節點的索引
        volatile Index<K,V> right;
        }

//多個level,標記是哪一個層級的索引
static final class HeadIndex<K,V> extends Index<K,V> {
        final int level;
        HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
            super(node, down, right);
            this.level = level;
        }
    }

ConcurrentSkipListMap 成員

//主要就是這兩個
//頭索引
 private transient volatile HeadIndex<K,V> head;

    /**
     * The comparator used to maintain order in this map, or null if
     * using natural ordering.  (Non-private to simplify access in
     * nested classes.)
     * @serial
     */
     //比較器
    final Comparator<? super K> comparator;

ConcurrentSkipListMap 構造器

 public ConcurrentSkipListMap() {
        this.comparator = null;
        initialize();
    }
   public ConcurrentSkipListMap(Comparator<? super K> comparator) {
        this.comparator = comparator;
        initialize();
    }

    public ConcurrentSkipListMap(Map<? extends K, ? extends V> m) {
        this.comparator = null;
        initialize();
        putAll(m);
    }


//初始化一個頭結點,注意level是1,就是第一層,key value都是空
private void initialize() {
        keySet = null;
        entrySet = null;
        values = null;
        descendingMap = null;
        head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
                                  null, null, 1);
    }

ConcurrentSkipListMap put

來看下跳躍表增加資料是怎麼做的?
核心方法是doPut()

    private V doPut(K key, V value, boolean onlyIfAbsent) {
        Node<K,V> z;             // added node
        if (key == null)
            throw new NullPointerException();
            //比較器
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
        //拿到要插入的位置的前驅
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                if (n != null) {
                    Object v; int c;
                    Node<K,V> f = n.next;
                    //如果不一致,說明中間修改過,重新找前驅
                    if (n != b.next)               // inconsistent read
                        break;
                        //去過前驅的後一個節點沒有值說明要刪掉,需要把這個物件出連結串列
                    if ((v = n.value) == null) {   // n is deleted
                    //執行出連結串列操作,,有 了這部操作,後面才能刪掉索引
                        n.helpDelete(b, f);
                        break;
                    }
                    //如果我的前驅也刪掉了,重新找
                    if (b.value == null || v == n) // b is deleted
                        break;
                        //走到這個說明找錯了?重新繼續找?
                    if ((c = cpr(cmp, key, n.key)) > 0) {
                        b = n;
                        n = f;
                        continue;
                    }
                    // 0,說明本身有這個key,根據策略看是否覆蓋
                    if (c == 0) {
                        if (onlyIfAbsent || n.casValue(v, value)) {
                            @SuppressWarnings("unchecked") V vv = (V)v;
                            return vv;
                        }
                        break; // restart if lost race to replace value
                    }
                    // else c < 0; fall through
                }
		
		//新節點,後驅指向 現前驅的後驅,把自己插進來
                z = new Node<K,V>(key, value, n);
                //設定自己為我的前驅的後驅
                if (!b.casNext(n, z))
                    break;         // restart if lost race to append to b
                break outer;
            }
        }

//  隨機數,跟level有關啊
        int rnd = ThreadLocalRandom.nextSecondarySeed();
        if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
            int level = 1, max;
            while (((rnd >>>= 1) & 1) != 0)
                ++level;
            Index<K,V> idx = null;
            HeadIndex<K,V> h = head;
            //如果level比現在的level小
            if (level <= (max = h.level)) {
            //那就直接建立Index,並一級一級把自己down index設定好
                for (int i = 1; i <= level; ++i)
                    idx = new Index<K,V>(z, idx, null);
            }
            else { // try to grow by one level
            //如果這個level大於max,那就設定他是max+1
                level = max + 1; // hold in array and later pick the one to use
                @SuppressWarnings("unchecked")Index<K,V>[] idxs =
                    (Index<K,V>[])new Index<?,?>[level+1];
                    //建立自己的index
                for (int i = 1; i <= level; ++i)
                    idxs[i] = idx = new Index<K,V>(z, idx, null);
                for (;;) {
                    h = head;
                    int oldLevel = h.level;
                    //如果level小於等於原最大level,那
                    if (level <= oldLevel) // lost race to add level
                        break;
                    HeadIndex<K,V> newh = h;
                    Node<K,V> oldbase = h.node;
                    //為每一層生成一個頭結點
                    for (int j = oldLevel+1; j <= level; ++j)
                        newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                        //並替換頭index
                    if (casHead(h, newh)) {
                        h = newh;
                        idx = idxs[level = oldLevel];
                        break;
                    }
                }
            }


//儲存新跳錶的跳級,並且把right索引設定好。
            // find insertion points and splice in
            splice: for (int insertionLevel = level;;) {
                int j = h.level;
                for (Index<K,V> q = h, r = q.right, t = idx;;) {
                    if (q == null || t == null)
                        break splice;
                    if (r != null) {
                        Node<K,V> n = r.node;
                        // compare before deletion check avoids needing recheck
                        int c = cpr(cmp, key, n.key);
                        if (n.value == null) {
                            if (!q.unlink(r))
                                break;
                            r = q.right;
                            continue;
                        }
                        if (c > 0) {
                            q = r;
                            r = r.right;
                            continue;
                        }
                    }

                    if (j == insertionLevel) {
                        if (!q.link(r, t))
                            break; // restart
                        if (t.node.value == null) {
                            findNode(key);
                            break splice;
                        }
                        if (--insertionLevel == 0)
                            break splice;
                    }

                    if (--j >= insertionLevel && j < level)
                        t = t.down;
                    q = q.down;
                    r = q.right;
                }
            }
        }
        return null;
    }

來看下尋找前驅節點的方法:

private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
        if (key == null)
            throw new NullPointerException(); // don't postpone errors
        for (;;) {
        //從頭開始
            for (Index<K,V> q = head, r = q.right, d;;) {
                if (r != null) {
                //把裡面的節點拿出來
                    Node<K,V> n = r.node;
                    K k = n.key;
                    if (n.value == null) {
                    //中間遇到值為空的,就刪除索引。注意前面的helpDelete是使node出連結串列。這裡是刪索引,不一樣的,,剛開始這裡看了半天
                        if (!q.unlink(r))
                            break;           // restart
                        r = q.right;         // reread r
                        continue;
                    }
                    //比較key
                    if (cpr(cmp, key, k) > 0) {
                    //如果大於就往右邊找
                        q = r;
                        r = r.right;
                        continue;
                    }
                }
                //否則往下找,如果下面已經沒有了,,那就是這個節點了
                if ((d = q.down) == null)
                    return q.node;
                q = d;
                r = d.right;
            }
        }
    }

來看下unlink的方法做了什麼

//就是跳過這個節點,然後把right index往後指
final boolean unlink(Index<K,V> succ) {
            return node.value != null && casRight(succ, succ.right);
        }

ConcurrentSkipListMap get

這裡來看下get方法

 private V doGet(Object key) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
        //找前驅或者自己這個節點,其實這個get就是想找自己這個節點,如果沒有找到的是前驅
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                //大於0的情況就一直往後找,直接null
                if (n == null)
                    break outer;
                Node<K,V> f = n.next;
                if (n != b.next)                // inconsistent read
                    break;
                if ((v = n.value) == null) {    // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)  // b is deleted
                    break;
                    //找到就return
                if ((c = cpr(cmp, key, n.key)) == 0) {
                    @SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
              //  c<0說明沒有這個節點直接break
                if (c < 0)
                    break outer;
                b = n;
                n = f;
            }
        }
        return null;
    }

ConcurrentSkipListMap remove

最後來看下如何刪除資料?

final V doRemove(Object key, Object value) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
        //找到前驅 或者就是他自己
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                Node<K,V> f = n.next;
                if (n != b.next)                    // inconsistent read
                    break;
                if ((v = n.value) == null) {        // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)      // b is deleted
                    break;
                if ((c = cpr(cmp, key, n.key)) < 0)
                    break outer;
                if (c > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                if (value != null && !value.equals(v))
                    break outer;
                if (!n.casValue(v, null))
                    break;
                    //標記他可以刪除,並且設定自己的前驅節點 指向自己的後驅節點
                if (!n.appendMarker(f) || !b.casNext(n, f))
                    findNode(key);                  // retry via findNode
                else {
                //刪除自己的索引
                    findPredecessor(key, cmp);      // clean index
//如果頭索引右邊啥也沒有,就降級了
                    if (head.right == null)
                        tryReduceLevel();
                }
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
        }
        return null;
    }

來看下appendMarker實際做了什麼?

boolean appendMarker(Node<K,V> f) {
            return casNext(f, new Node<K,V>(f));
        }

//設定自己的next的Node 裡的value是 現在的下一個節點物件。。
有點繞,不知道為什麼這麼設計