JAVA併發容器:ConcurrentSkipListMap
生活
目標定下來以後就不要去變,只要確定是對的,總可以到達。
二分查詢
二分查詢要求有序性,為了保障可以隨機訪問,因此會把資料儲存在連續的記憶體中,在查詢的時候效率高,但是在增加和刪除時需要大量移動元素以保證有序,所以效率不高。
如果需要快速的二分查詢,又要兼顧刪除增加元素的效率,可以考慮使用二叉查詢樹,但是二叉樹在極端情況下會變成一個連結串列,使原本O(log n)的時間複雜度,變成O(n)。
於是就出現了平衡二叉樹,例如AVL樹,紅黑樹,但是平衡二叉樹比較難理解,尤其是紅黑樹的左旋右旋刪除操作。
於是乎出現了跳躍表結構。
今天就來看看這個跳躍表是個啥?
什麼是跳躍表?
先簡單的通過圖示來看下,什麼是跳躍表?
傳遞的連結串列都是單鏈表結構,要向一個單鏈表中增加刪除查詢修改一個節點的時間複雜度都是O(n),
跳躍表其實也是連結串列,只是在連結串列的基礎上加上了一系列index,使之高效。
如上圖所示就是一個跳躍表,每個節點都可以存在多個指向其他節點的索引。他可以先通過最上面的索引來查詢資料,過濾掉一半的節點,他的查詢效率是O(n/2)。
舉個例子 查詢25,
先比較6,在比較9、17、21、26 ,然後可知資料再21和26之間,隨之就找到了25.
跳躍表如何查詢?
ok,具體的查詢,下面也來看下圖示。
這是查詢19的圖示。
每一個節點都不止包含指向下一個節點的指標,也可以包含多個指向其他節點的指標,這樣就可以跳過一些沒有必要的結果,從而提高查詢的效率。
至於每個節點包含多少點後繼節點個數,其實是通過隨機生成的,從而形成了跳躍表。
因為是隨機的,所以跳躍表是一種概率均衡而不是強制均衡。
在Redis/leveldb有用到。
ConcurrentSkipListMap 資料結構
下面來看下jdk1.8裡的跳躍表:ConcurrentSkipListMap
這個玩意 1.7有所不同,但是基本實現是一致的,程式碼裡一些細節稍稍不一樣。
先來看下他的資料結構,
//節點物件 static final class Node<K,V> { final K key; volatile Object value; //下一個節點 volatile Node<K,V> next; } //索引物件 static class Index<K,V> { //節點 final Node<K,V> node; //指向該節點下一個層級的索引 final Index<K,V> down; //指向右邊的索引,即一個節點的索引 volatile Index<K,V> right; } //多個level,標記是哪一個層級的索引 static final class HeadIndex<K,V> extends Index<K,V> { final int level; HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) { super(node, down, right); this.level = level; } }
ConcurrentSkipListMap 成員
//主要就是這兩個
//頭索引
private transient volatile HeadIndex<K,V> head;
/**
* The comparator used to maintain order in this map, or null if
* using natural ordering. (Non-private to simplify access in
* nested classes.)
* @serial
*/
//比較器
final Comparator<? super K> comparator;
ConcurrentSkipListMap 構造器
public ConcurrentSkipListMap() {
this.comparator = null;
initialize();
}
public ConcurrentSkipListMap(Comparator<? super K> comparator) {
this.comparator = comparator;
initialize();
}
public ConcurrentSkipListMap(Map<? extends K, ? extends V> m) {
this.comparator = null;
initialize();
putAll(m);
}
//初始化一個頭結點,注意level是1,就是第一層,key value都是空
private void initialize() {
keySet = null;
entrySet = null;
values = null;
descendingMap = null;
head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
null, null, 1);
}
ConcurrentSkipListMap put
來看下跳躍表增加資料是怎麼做的?
核心方法是doPut()
private V doPut(K key, V value, boolean onlyIfAbsent) {
Node<K,V> z; // added node
if (key == null)
throw new NullPointerException();
//比較器
Comparator<? super K> cmp = comparator;
outer: for (;;) {
//拿到要插入的位置的前驅
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
if (n != null) {
Object v; int c;
Node<K,V> f = n.next;
//如果不一致,說明中間修改過,重新找前驅
if (n != b.next) // inconsistent read
break;
//去過前驅的後一個節點沒有值說明要刪掉,需要把這個物件出連結串列
if ((v = n.value) == null) { // n is deleted
//執行出連結串列操作,,有 了這部操作,後面才能刪掉索引
n.helpDelete(b, f);
break;
}
//如果我的前驅也刪掉了,重新找
if (b.value == null || v == n) // b is deleted
break;
//走到這個說明找錯了?重新繼續找?
if ((c = cpr(cmp, key, n.key)) > 0) {
b = n;
n = f;
continue;
}
// 0,說明本身有這個key,根據策略看是否覆蓋
if (c == 0) {
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
break; // restart if lost race to replace value
}
// else c < 0; fall through
}
//新節點,後驅指向 現前驅的後驅,把自己插進來
z = new Node<K,V>(key, value, n);
//設定自己為我的前驅的後驅
if (!b.casNext(n, z))
break; // restart if lost race to append to b
break outer;
}
}
// 隨機數,跟level有關啊
int rnd = ThreadLocalRandom.nextSecondarySeed();
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
int level = 1, max;
while (((rnd >>>= 1) & 1) != 0)
++level;
Index<K,V> idx = null;
HeadIndex<K,V> h = head;
//如果level比現在的level小
if (level <= (max = h.level)) {
//那就直接建立Index,並一級一級把自己down index設定好
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);
}
else { // try to grow by one level
//如果這個level大於max,那就設定他是max+1
level = max + 1; // hold in array and later pick the one to use
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
(Index<K,V>[])new Index<?,?>[level+1];
//建立自己的index
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
for (;;) {
h = head;
int oldLevel = h.level;
//如果level小於等於原最大level,那
if (level <= oldLevel) // lost race to add level
break;
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
//為每一層生成一個頭結點
for (int j = oldLevel+1; j <= level; ++j)
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
//並替換頭index
if (casHead(h, newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
}
//儲存新跳錶的跳級,並且把right索引設定好。
// find insertion points and splice in
splice: for (int insertionLevel = level;;) {
int j = h.level;
for (Index<K,V> q = h, r = q.right, t = idx;;) {
if (q == null || t == null)
break splice;
if (r != null) {
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = cpr(cmp, key, n.key);
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
if (j == insertionLevel) {
if (!q.link(r, t))
break; // restart
if (t.node.value == null) {
findNode(key);
break splice;
}
if (--insertionLevel == 0)
break splice;
}
if (--j >= insertionLevel && j < level)
t = t.down;
q = q.down;
r = q.right;
}
}
}
return null;
}
來看下尋找前驅節點的方法:
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
if (key == null)
throw new NullPointerException(); // don't postpone errors
for (;;) {
//從頭開始
for (Index<K,V> q = head, r = q.right, d;;) {
if (r != null) {
//把裡面的節點拿出來
Node<K,V> n = r.node;
K k = n.key;
if (n.value == null) {
//中間遇到值為空的,就刪除索引。注意前面的helpDelete是使node出連結串列。這裡是刪索引,不一樣的,,剛開始這裡看了半天
if (!q.unlink(r))
break; // restart
r = q.right; // reread r
continue;
}
//比較key
if (cpr(cmp, key, k) > 0) {
//如果大於就往右邊找
q = r;
r = r.right;
continue;
}
}
//否則往下找,如果下面已經沒有了,,那就是這個節點了
if ((d = q.down) == null)
return q.node;
q = d;
r = d.right;
}
}
}
來看下unlink的方法做了什麼
//就是跳過這個節點,然後把right index往後指
final boolean unlink(Index<K,V> succ) {
return node.value != null && casRight(succ, succ.right);
}
ConcurrentSkipListMap get
這裡來看下get方法
private V doGet(Object key) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
//找前驅或者自己這個節點,其實這個get就是想找自己這個節點,如果沒有找到的是前驅
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
//大於0的情況就一直往後找,直接null
if (n == null)
break outer;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
//找到就return
if ((c = cpr(cmp, key, n.key)) == 0) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
// c<0說明沒有這個節點直接break
if (c < 0)
break outer;
b = n;
n = f;
}
}
return null;
}
ConcurrentSkipListMap remove
最後來看下如何刪除資料?
final V doRemove(Object key, Object value) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
//找到前驅 或者就是他自己
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
if (n == null)
break outer;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
if ((c = cpr(cmp, key, n.key)) < 0)
break outer;
if (c > 0) {
b = n;
n = f;
continue;
}
if (value != null && !value.equals(v))
break outer;
if (!n.casValue(v, null))
break;
//標記他可以刪除,並且設定自己的前驅節點 指向自己的後驅節點
if (!n.appendMarker(f) || !b.casNext(n, f))
findNode(key); // retry via findNode
else {
//刪除自己的索引
findPredecessor(key, cmp); // clean index
//如果頭索引右邊啥也沒有,就降級了
if (head.right == null)
tryReduceLevel();
}
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
}
return null;
}
來看下appendMarker實際做了什麼?
boolean appendMarker(Node<K,V> f) {
return casNext(f, new Node<K,V>(f));
}
//設定自己的next的Node 裡的value是 現在的下一個節點物件。。
有點繞,不知道為什麼這麼設計