ConcurrentHashMap原始碼分析(1.8)
轉載:https://www.cnblogs.com/zerotomax/p/8687425.html#go0
ConcurrentHashMap原始碼分析(1.8)
</h1>
<div class="clear"></div>
<div class="postBody">
</h1>
<div class="clear"></div>
<div class="postBody">
1、ConcurrentHashMap跟HashMap,HashTable的對比
※為了分析原始碼的時候方便除錯,把ConcurrentHashMap的原始碼放在本地了,名字改為了ConcurrentHashMapDebug
由於原始碼中的unsafe有很多限制,不能直接在本地使用,所以,在原始碼的最後面的靜態程式碼塊處修改了U的初始化方法。
private static final sun.misc.Unsafe U; static{ U = getUnsafe(); .... } static sun.misc.Unsafe getUnsafe() throws Exception { java.lang.reflect.Field field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); Unsafe unsafe=(Unsafe) field.get(null); return unsafe; }
1、ConcurrentHashMap跟HashMap,HashTable的對比
我們都知道HashMap不是執行緒安全的,所以在處理併發的時候會出現問題。
而HashTable雖然是執行緒安全的,但是是通過整個來加鎖的方式,當一個執行緒在寫操作的時候,另外的執行緒則不能進行讀寫。
而ConcurrentHashMap則可以支援併發的讀寫。跟1.7版本相比,1.8版本又有了很大的變化,已經拋棄了Segment的概念,雖然原始碼裡面還保留了,也只是為了相容性的考慮。
在ConcurrentHashMap中通過一個Node<K,V>[]陣列來儲存新增到map中的鍵值對,而在同一個陣列位置是通過連結串列和紅黑樹的形式來儲存的。但是這個陣列只有在第一次新增元素的時候才會初始化,否則只是初始化一個ConcurrentHashMap物件的話,只是設定了一個sizeCtl變數,這個變數用來判斷物件的一些狀態和是否需要擴容,後面會詳細解釋。
第一次新增元素的時候,預設初期長度為16,當往map中繼續新增元素的時候,通過hash值跟陣列長度取與來決定放在陣列的哪個位置,如果出現放在同一個位置的時候,優先以連結串列的形式存放,在同一個位置的個數又達到了8個以上,如果陣列的長度還小於64的時候,則會擴容陣列。如果陣列的長度大於等於64了的話,在會將該節點的連結串列轉換成樹。
通過擴容陣列的方式來把這些節點給分散開。然後將這些元素複製到擴容後的新的陣列中,同一個連結串列中的元素通過hash值的陣列長度位來區分,是還是放在原來的位置還是放到擴容的長度的相同位置去 。在擴容完成之後,如果某個節點的是樹,同時現在該節點的個數又小於等於6個了,則會將該樹轉為連結串列。
取元素的時候,相對來說比較簡單,通過計算hash來確定該元素在陣列的哪個位置,然後在通過遍歷連結串列或樹來判斷key和key的hash,取出value值。
往ConcurrentHashMap中新增元素的時候,裡面的資料以陣列的形式存放的樣子大概是這樣的:
這個時候因為陣列的長度才為16,則不會轉化為樹,而是會進行擴容。
擴容後陣列大概是這樣的:
需要注意的是,擴容之後的長度不是32,擴容後的長度在後面細說。
如果陣列擴張後長度達到64了,且繼續在某個節點的後面新增元素達到8個以上的時候,則會出現轉化為紅黑樹的情況。
轉化之後大概是這樣:
下面是幾個重要的屬性
private static final int MAXIMUM_CAPACITY = 1 << 30; private static final int DEFAULT_CAPACITY = 16; static final int TREEIFY_THRESHOLD = 8; static final int UNTREEIFY_THRESHOLD = 6; static final int MIN_TREEIFY_CAPACITY = 64; static final int MOVED = -1; // 表示正在轉移 static final int TREEBIN = -2; // 表示已經轉換成樹 static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash transient volatile Node<K,V>[] table;//預設沒初始化的陣列,用來儲存元素 private transient volatile Node<K,V>[] nextTable;//轉移的時候用的陣列 /** * 用來控制表初始化和擴容的,預設值為0,當在初始化的時候指定了大小,這會將這個大小儲存在sizeCtl中,大小為陣列的0.75 * 當為負的時候,說明表正在初始化或擴張, * -1表示初始化 * -(1+n) n:表示活動的擴張執行緒 */ private transient volatile int sizeCtl;
幾個重要的類
Node<K,V>,這是構成每個元素的基本類。
static class Node<K,V> implements Map.Entry<K,V> { final int hash; //key的hash值 final K key; //key volatile V val; //value volatile Node<K,V> next; //表示連結串列中的下一個節點 Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } }
TreeNode,構造樹的節點
static final class TreeNode<K,V> extends Node<K,V> { TreeNode<K,V> parent; // red-black tree links TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; // needed to unlink next upon deletion boolean red;TreeNode(</span><span style="color: #0000ff;">int</span> hash, K key, V val, Node<K,V><span style="color: #000000;"> next, TreeNode</span><K,V><span style="color: #000000;"> parent) { </span><span style="color: #0000ff;">super</span><span style="color: #000000;">(hash, key, val, next); </span><span style="color: #0000ff;">this</span>.parent =<span style="color: #000000;"> parent; }
}
TreeBin 用作樹的頭結點,只儲存root和first節點,不儲存節點的key、value值。
static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root; volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock }
ForwardingNode在轉移的時候放在頭部的節點,是一個空節點
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; }
}
在ConcurrentHashMap中使用了unSafe方法,通過直接操作記憶體的方式來保證併發處理的安全性,使用的是硬體的安全機制。
/* * 用來返回節點陣列的指定位置的節點的原子操作 */ @SuppressWarnings("unchecked") static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); }</span><span style="color: #008000;">/*</span><span style="color: #008000;"> * cas原子操作,在指定位置設定值 </span><span style="color: #008000;">*/</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">final</span> <K,V> <span style="color: #0000ff;">boolean</span> casTabAt(Node<K,V>[] tab, <span style="color: #0000ff;">int</span><span style="color: #000000;"> i, Node</span><K,V> c, Node<K,V><span style="color: #000000;"> v) { </span><span style="color: #0000ff;">return</span> U.compareAndSwapObject(tab, ((<span style="color: #0000ff;">long</span>)i << ASHIFT) +<span style="color: #000000;"> ABASE, c, v); } </span><span style="color: #008000;">/*</span><span style="color: #008000;"> * 原子操作,在指定位置設定值 </span><span style="color: #008000;">*/</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">final</span> <K,V> <span style="color: #0000ff;">void</span> setTabAt(Node<K,V>[] tab, <span style="color: #0000ff;">int</span> i, Node<K,V><span style="color: #000000;"> v) { U.putObjectVolatile(tab, ((</span><span style="color: #0000ff;">long</span>)i << ASHIFT) +<span style="color: #000000;"> ABASE, v); }</span></pre>
首先我們看看構造方法
//空的構造 public ConcurrentHashMapDebug() { } //如果在例項化物件的時候指定了容量,則初始化sizeCtl public ConcurrentHashMapDebug(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } //當出入一個Map的時候,先設定sizeCtl為預設容量,在新增元素 public ConcurrentHashMapDebug(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); }可以看到,在任何一個構造方法中,都沒有對儲存Map元素Node的table變數進行初始化。而是在第一次put操作的時候在進行初始化。
下面來看看陣列的初始化方法initTable
/** * 初始化陣列table, * 如果sizeCtl小於0,說明別的陣列正在進行初始化,則讓出執行權 * 如果sizeCtl大於0的話,則初始化一個大小為sizeCtl的陣列 * 否則的話初始化一個預設大小(16)的陣列 * 然後設定sizeCtl的值為陣列長度的3/4 */ private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //第一次put的時候,table還沒被初始化,進入while if ((sc = sizeCtl) < 0) //sizeCtl初始值為0,當小於0的時候表示在別的執行緒在初始化表或擴充套件表 Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //SIZECTL:表示當前物件的記憶體偏移量,sc表示期望值,-1表示要替換的值,設定為-1表示要初始化表了 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; //指定了大小的時候就建立指定大小的Node陣列,否則建立指定大小(16)的Node陣列 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; //初始化後,sizeCtl長度為陣列長度的3/4 } break; } } return tab; }
下面看看put方法的原始碼
/* * 單純的額呼叫putVal方法,並且putVal的第三個引數設定為false * 當設定為false的時候表示這個value一定會設定 * true的時候,只有當這個key的value為空的時候才會設定 */ public V put(K key, V value) { return putVal(key, value, false); }再來看putVal
/* * 當新增一對鍵值對的時候,首先會去判斷儲存這些鍵值對的陣列是不是初始化了, * 如果沒有的話就初始化陣列 * 然後通過計算hash值來確定放在陣列的哪個位置 * 如果這個位置為空則直接新增,如果不為空的話,則取出這個節點來 * 如果取出來的節點的hash值是MOVED(-1)的話,則表示當前正在對這個陣列進行擴容,複製到新的陣列,則當前執行緒也去幫助複製 * 最後一種情況就是,如果這個節點,不為空,也不在擴容,則通過synchronized來加鎖,進行新增操作 * 然後判斷當前取出的節點位置存放的是連結串列還是樹 * 如果是連結串列的話,則遍歷整個連結串列,直到取出來的節點的key來個要放的key進行比較,如果key相等,並且key的hash值也相等的話, * 則說明是同一個key,則覆蓋掉value,否則的話則新增到連結串列的末尾 * 如果是樹的話,則呼叫putTreeVal方法把這個元素新增到樹中去 * 最後在新增完成之後,會判斷在該節點處共有多少個節點(注意是新增前的個數),如果達到8個以上了的話, * 則呼叫treeifyBin方法來嘗試將處的連結串列轉為樹,或者擴容陣列 */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException();//K,V都不能為空,否則的話跑出異常 int hash = spread(key.hashCode()); //取得key的hash值 int binCount = 0; //用來計算在這個節點總共有多少個元素,用來控制擴容或者轉移為樹 for (Node<K,V>[] tab = table;;) { // Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); //第一次put的時候table沒有初始化,則初始化table else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //通過雜湊計算出一個表中的位置因為n是陣列的長度,所以(n-1)&hash肯定不會出現陣列越界 if (casTabAt(tab, i, null, //如果這個位置沒有元素的話,則通過cas的方式嘗試新增,注意這個時候是沒有加鎖的 new Node<K,V>(hash, key, value, null))) //建立一個Node新增到陣列中區,null表示的是下一個節點為空 break; // no lock when adding to empty bin } /* * 如果檢測到某個節點的hash值是MOVED,則表示正在進行陣列擴張的資料複製階段, * 則當前執行緒也會參與去複製,通過允許多執行緒複製的功能,一次來減少陣列的複製所帶來的效能損失 */ else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { /* * 如果在這個位置有元素的話,就採用synchronized的方式加鎖, * 如果是連結串列的話(hash大於0),就對這個連結串列的所有元素進行遍歷, * 如果找到了key和key的hash值都一樣的節點,則把它的值替換到 * 如果沒找到的話,則新增在連結串列的最後面 * 否則,是樹的話,則呼叫putTreeVal方法新增到樹中去 * * 在新增完之後,會對該節點上關聯的的數目進行判斷, * 如果在8個以上的話,則會呼叫treeifyBin方法,來嘗試轉化為樹,或者是擴容 */ V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { //再次取出要儲存的位置的元素,跟前面取出來的比較 if (fh >= 0) { //取出來的元素的hash值大於0,當轉換為樹之後,hash值為-2 binCount = 1; for (Node<K,V> e = f;; ++binCount) { //遍歷這個連結串列 K ek; if (e.hash == hash && //要存的元素的hash,key跟要儲存的位置的節點的相同的時候,替換掉該節點的value即可 ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) //當使用putIfAbsent的時候,只有在這個key沒有設定值得時候才設定 e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { //如果不是同樣的hash,同樣的key的時候,則判斷該節點的下一個節點是否為空, pred.next = new Node<K,V>(hash, key, //為空的話把這個要加入的節點設定為當前節點的下一個節點 value, null); break; } } } else if (f instanceof TreeBin) { //表示已經轉化成紅黑樹型別了 Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, //呼叫putTreeVal方法,將該元素新增到樹中去 value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) //當在同一個節點的數目達到8個的時候,則擴張陣列或將給節點的資料轉為tree treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); //計數 return null; }在put方法的詳解中,我們可以看到,在同一個節點的個數超過8個的時候,會呼叫treeifyBin方法來看看是擴容還是轉化為一棵樹
同時在每次新增完元素的addCount方法中,也會判斷當前陣列中的元素是否達到了sizeCtl的量,如果達到了的話,則會進入transfer方法去擴容
/** * Replaces all linked nodes in bin at given index unless table is * too small, in which case resizes instead. * 當陣列長度小於64的時候,擴張陣列長度一倍,否則的話把連結串列轉為樹 */ private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { System.out.println("treeifyBin方\t==>陣列長:"+tab.length); if ((n = tab.length) < MIN_TREEIFY_CAPACITY) //MIN_TREEIFY_CAPACITY 64 tryPresize(n << 1); // 陣列擴容 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { //使用synchronized同步器,將該節點出的連結串列轉為樹 if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; //hd:樹的頭(head) for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) //把Node組成的連結串列,轉化為TreeNode的連結串列,頭結點任然放在相同的位置 hd = p; //設定head else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd));//把TreeNode的連結串列放入容器TreeBin中 } } } } }可以看到當需要擴容的時候,呼叫的時候tryPresize方法,看看trePresize的原始碼
/** * 擴容表為指可以容納指定個數的大小(總是2的N次方) * 假設原來的陣列長度為16,則在呼叫tryPresize的時候,size引數的值為16<<1(32),此時sizeCtl的值為12 * 計算出來c的值為64,則要擴容到sizeCtl≥為止 * 第一次擴容之後 陣列長:32 sizeCtl:24 * 第二次擴容之後 陣列長:64 sizeCtl:48 * 第二次擴容之後 陣列長:128 sizeCtl:94 --> 這個時候才會退出擴容 */ private final void tryPresize(int size) { /* * MAXIMUM_CAPACITY = 1 << 30 * 如果給定的大小大於等於陣列容量的一半,則直接使用最大容量, * 否則使用tableSizeFor算出來 * 後面table一直要擴容到這個值小於等於sizeCtrl(陣列長度的3/4)才退出擴容 */ int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; // printTable(tab); 除錯用的 /* * 如果陣列table還沒有被初始化,則初始化一個大小為sizeCtrl和剛剛算出來的c中較大的一個大小的陣列 * 初始化的時候,設定sizeCtrl為-1,初始化完成之後把sizeCtrl設定為陣列長度的3/4 * 為什麼要在擴張的地方來初始化陣列呢?這是因為如果第一次put的時候不是put單個元素, * 而是呼叫putAll方法直接put一個map的話,在putALl方法中沒有呼叫initTable方法去初始化table, * 而是直接呼叫了tryPresize方法,所以這裡需要做一個是不是需要初始化table的判斷 */ if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //初始化tab的時候,把sizeCtl設為-1 try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } } } /* * 一直擴容到的c小於等於sizeCtl或者陣列長度大於最大長度的時候,則退出 * 所以在一次擴容之後,不是原來長度的兩倍,而是2的n次方倍 */ else if (c <= sc || n >= MAXIMUM_CAPACITY) { break; //退出擴張 } else if (tab == table) { int rs = resizeStamp(n); /* * 如果正在擴容Table的話,則幫助擴容 * 否則的話,開始新的擴容 * 在transfer操作,將第一個引數的table中的元素,移動到第二個元素的table中去, * 雖然此時第二個引數設定的是null,但是,在transfer方法中,當第二個引數為null的時候, * 會建立一個兩倍大小的table */ if (sc < 0) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; /* * transfer的執行緒數加一,該執行緒將進行transfer的幫忙 * 在transfer的時候,sc表示在transfer工作的執行緒數 */ if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } /* * 沒有在初始化或擴容,則開始擴容 */ else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) { transfer(tab, null); } } } }在tryPresize方法中,並沒有加鎖,允許多個執行緒進入,如果陣列正在擴張,則當前執行緒也去幫助擴容。
陣列擴容的主要方法就是transfer方法
/** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. * 把陣列中的節點複製到新的陣列的相同位置,或者移動到擴張部分的相同位置 * 在這裡首先會計算一個步長,表示一個執行緒處理的陣列長度,用來控制對CPU的使用, * 每個CPU最少處理16個長度的陣列元素,也就是說,如果一個數組的長度只有16,那只有一個執行緒會對其進行擴容的複製移動操作 * 擴容的時候會一直遍歷,知道複製完所有節點,沒處理一個節點的時候會在連結串列的頭部設定一個fwd節點,這樣其他執行緒就會跳過他, * 複製後在新陣列中的連結串列不是絕對的反序的 */ private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) //MIN_TRANSFER_STRIDE 用來控制不要佔用太多CPU stride = MIN_TRANSFER_STRIDE; // subdivide range //MIN_TRANSFER_STRIDE=16 /* * 如果複製的目標nextTab為null的話,則初始化一個table兩倍長的nextTab * 此時nextTable被設定值了(在初始情況下是為null的) * 因為如果有一個執行緒開始了表的擴張的時候,其他執行緒也會進來幫忙擴張, * 而只是第一個開始擴張的執行緒需要初始化下目標陣列 */ if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; /* * 建立一個fwd節點,這個是用來控制併發的,當一個節點為空或已經被轉移之後,就設定為fwd節點 * 這是一個空的標誌節點 */ ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; //是否繼續向前查詢的標誌位 boolean finishing = false; // to ensure sweep(清掃) before committing nextTab,在完成之前重新在掃描一遍陣列,看看有沒完成的沒 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) { advance = false; } else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { //已經完成轉移 nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); //設定sizeCtl為擴容後的0.75 return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) { return; } finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) //陣列中把null的元素設定為ForwardingNode節點(hash值為MOVED[-1]) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; // already processed else { synchronized (f) { //加鎖操作 if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { //該節點的hash值大於等於0,說明是一個Node節點 /* * 因為n的值為陣列的長度,且是power(2,x)的,所以,在&操作的結果只可能是0或者n * 根據這個規則 * 0--> 放在新表的相同位置 * n--> 放在新表的(n+原來位置) */ int runBit = fh & n; Node<K,V> lastRun = f; /* * lastRun 表示的是需要複製的最後一個節點 * 每當新節點的hash&n -> b 發生變化的時候,就把runBit設定為這個結果b * 這樣for迴圈之後,runBit的值就是最後不變的hash&n的值 * 而lastRun的值就是最後一次導致hash&n 發生變化的節點(假設為p節點) * 為什麼要這麼做呢?因為p節點後面的節點的hash&n 值跟p節點是一樣的, * 所以在複製到新的table的時候,它肯定還是跟p節點在同一個位置 * 在複製完p節點之後,p節點的next節點還是指向它原來的節點,就不需要進行復制了,自己就被帶過去了 * 這也就導致了一個問題就是複製後的連結串列的順序並不一定是原來的倒序 */ for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; //n的值為擴張前的陣列的長度 if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } /* * 構造兩個連結串列,順序大部分和原來是反的 * 分別放到原來的位置和新增加的長度的相同位置(i/n+i) */ for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) /* * 假設runBit的值為0, * 則第一次進入這個設定的時候相當於把舊的序列的最後一次發生hash變化的節點(該節點後面可能還有hash計算後同為0的節點)設定到舊的table的第一個hash計算後為0的節點下一個節點 * 並且把自己返回,然後在下次進來的時候把它自己設定為後面節點的下一個節點 */ ln = new Node<K,V>(ph, pk, pv, ln); else /* * 假設runBit的值不為0, * 則第一次進入這個設定的時候相當於把舊的序列的最後一次發生hash變化的節點(該節點後面可能還有hash計算後同不為0的節點)設定到舊的table的第一個hash計算後不為0的節點下一個節點 * 並且把自己返回,然後在下次進來的時候把它自己設定為後面節點的下一個節點 */ hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { //否則的話是一個樹節點 TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } /* * 在複製完樹節點之後,判斷該節點處構成的樹還有幾個節點, * 如果≤6個的話,就轉回為一個連結串列 */ ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }到這裡,ConcurrentHashMap的put操作和擴容都介紹的差不多了,
下面的兩點一定要注意:
·複製之後的新連結串列不是舊連結串列的絕對倒序。
·在擴容的時候每個執行緒都有處理的步長,最少為16,在這個步長範圍內的陣列節點只有自己一個執行緒來處理
相比put操作,get操作就顯得很簡單了。廢話少說,直接上原始碼分析。
/* * 相比put方法,get就很單純了,支援併發操作, * 當key為null的時候回丟擲NullPointerException的異常 * get操作通過首先計算key的hash值來確定該元素放在陣列的哪個位置 * 然後遍歷該位置的所有節點 * 如果不存在的話返回null */ public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }前面分析了下ConcurrentHashMap的原始碼,那麼,對於一個對映集合來說,ConcurrentHashMap是如果來做到併發安全,又是如何做到高效的併發的呢?
首先是讀操作,從原始碼中可以看出來,在get操作中,根本沒有使用同步機制,也沒有使用unsafe方法,所以讀操作是支援併發操作的。
那麼寫操作呢?
分析這個之前,先看看什麼情況下會引起陣列的擴容,擴容是通過transfer方法來進行的。而呼叫transfer方法的只有trePresize、helpTransfer和addCount三個方法。
這三個方法又是分別在什麼情況下進行呼叫的呢?
·tryPresize是在treeIfybin和putAll方法中呼叫,treeIfybin主要是在put新增元素完之後,判斷該陣列節點相關元素是不是已經超過8個的時候,如果超過則會呼叫這個方法來擴容陣列或者把連結串列轉為樹。
·helpTransfer是在當一個執行緒要對table中元素進行操作的時候,如果檢測到節點的HASH值為MOVED的時候,就會呼叫helpTransfer方法,在helpTransfer中再呼叫transfer方法來幫助完成陣列的擴容
·addCount是在當對陣列進行操作,使得陣列中儲存的元素個數發生了變化的時候會呼叫的方法。
所以引起陣列擴容的情況如下:
·只有在往map中新增元素的時候,在某一個節點的數目已經超過了8個,同時陣列的長度又小於64的時候,才會觸發陣列的擴容。
·當陣列中元素達到了sizeCtl的數量的時候,則會呼叫transfer方法來進行擴容
那麼在擴容的時候,可以不可以對陣列進行讀寫操作呢?
事實上是可以的。當在進行陣列擴容的時候,如果當前節點還沒有被處理(也就是說還沒有設定為fwd節點),那就可以進行設定操作。
如果該節點已經被處理了,則當前執行緒也會加入到擴容的操作中去。
那麼,多個執行緒又是如何同步處理的呢?
在ConcurrentHashMap中,同步處理主要是通過Synchronized和unsafe兩種方式來完成的。
·在取得sizeCtl、某個位置的Node的時候,使用的都是unsafe的方法,來達到併發安全的目的
·當需要在某個位置設定節點的時候,則會通過Synchronized的同步機制來鎖定該位置的節點。
·在陣列擴容的時候,則通過處理的步長和fwd節點來達到併發安全的目的,通過設定hash值為MOVED
·當把某個位置的節點複製到擴張後的table的時候,也通過Synchronized的同步機制來保證現程安全
前面在講解tryifyBin的原始碼的時候講到過,如果在當個bin上的元素超過了8個的時候,就會嘗試去擴容陣列或者是將連結串列轉為紅黑樹。
原始碼:
private final void treeifyBin(Node<K,V>[] tab, int index) { System.out.println("當前執行緒:"+Thread.currentThread().getName()+"進入treeifyBin方法"); Node<K,V> b; int n, sc; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) //MIN_TREEIFY_CAPACITY 64 tryPresize(n << 1); // 陣列擴容 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { //使用synchronized同步器,將該節點出的連結串列轉為樹 if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; //hd:樹的頭(head) for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) //把Node組成的連結串列,轉化為TreeNode的連結串列,頭結點任然放在相同的位置 hd = p; //設定head else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd));//把TreeNode的連結串列放入容器TreeBin中 } } } } }首先將Node的連結串列轉化為一個TreeNode的連結串列,然後將TreeNode連結串列的頭結點來構造一個TreeBin。
下面是TreeBin構造方法的原始碼:
TreeBin(TreeNode<K,V> b) { super(TREEBIN, null, null, null); //建立的TreeBin是一個空節點,hash值為TREEBIN(-2) this.first = b; TreeNode<K,V> r = null; for (TreeNode<K,V> x = b, next; x != null; x = next) { next = (TreeNode<K,V>)x.next; x.left = x.right = null; if (r == null) { x.parent = null; x.red = false; r = x; }// else { K k = x.key; int h = x.hash; Class<?> kc = null; for (TreeNode<K,V> p = r;;) {//x代表的是轉換為樹之前的順序遍歷到連結串列的位置的節點,r代表的是根節點 int dir, ph; K pk = p.key; if ((ph = p.hash) > h) // dir = -1; else if (ph < h) dir = 1; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) dir = tieBreakOrder(k, pk); //當key不可以比較,或者相等的時候採取的一種排序措施 TreeNode<K,V> xp = p; if ((p = (dir <= 0) ? p.left : p.right) == null) {//在這裡判斷要放的left/right是否為空,不為空繼續用left/right節點來判斷 x.parent = xp; if (dir <= 0) xp.left = x; else xp.right = x; r = balanceInsertion(r, x); //每次插入一個元素的時候都呼叫balanceInsertion來保持紅黑樹的平衡 break; } } } } this.root = r; assert checkInvariants(root); }轉化的過程大概如下:
接下來,用連結串列頭部的TreeNode來構造一個TreeBin,在TreeBin容器中,將連結串列轉化為紅黑樹。
首先是構造一個如下的TreeBin空節點。
構造完TreeBin這個空節點之後,就開始構造紅黑樹,首先是第一個節點,左右子節點設定為空,作為紅黑樹的root節點,設定為黑色,父節點為空。
接下來遍歷連結串列的後續節點,沒新增一個元素的時候,都會通過判斷hash值來決定是放在根節點的左節點還是有節點,如果左/右節點不為空,則繼續以左/右節點來重複判斷,直到左/右節點為空,則新增到左/右位置。
然後在每次新增完一個節點之後,都會呼叫balanceInsertion方法來維持這是一個紅黑樹的屬性和平衡性。紅黑樹所有操作的複雜度都是O(logn),所以當元素量比較大的時候,效率也很高。