J.U.C 之ConcurrentHashMap(JDK1.7)
Hashmap多執行緒會導致HashMap的Entry連結串列形成環形資料結構,一旦形成環形資料結構,Entry的next節點永遠不為空,就會產生死迴圈獲取Entry。
HashTable使用synchronized來保證執行緒安全,但線上程競爭激烈的情況下HashTable的效率非常低下。因為當一個執行緒訪問HashTable的同步方法,其他執行緒也訪問HashTable的同步方法時,會進入阻塞或輪詢狀態。如執行緒1使用put進行元素新增,執行緒2不但不能使用put方法新增元素,也不能使用get方法來獲取元素,所以競爭越激烈效率越低。
原理和實現
分段鎖技術
HashTable容器在競爭激烈的併發環境下表現出效率低下的原因,是因為所有訪問HashTable的執行緒都必須競爭同一把鎖。
那假如容器裡有多把鎖,每一把鎖用於鎖容器其中一部分資料,那麼當多執行緒訪問容器裡不同資料段的資料時,執行緒間就不會存在鎖競爭,從而可以有效的提高併發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術,首先將資料分成一段一段的儲存,然後給每一段資料配一把鎖,當一個執行緒佔用鎖訪問其中一個段資料的時候,其他段的資料也能被其他執行緒訪問。
另外,ConcurrentHashMap可以做到讀取資料不加鎖,並且其內部的結構可以讓其在進行寫操作的時候能夠將鎖的粒度保持地儘量地小,不用對整個ConcurrentHashMap加鎖。
ConcurrentHashMap的內部結構
ConcurrentHashMap是由Segment陣列結構和HashEntry陣列結構組成。
Segment是一種可重入鎖ReentrantLock,在ConcurrentHashMap裡扮演鎖的角色,HashEntry則用於儲存鍵值對資料。
一個ConcurrentHashMap裡包含一個Segment陣列,Segment的結構和HashMap類似,是一種陣列和連結串列結構,
一個Segment裡包含一個HashEntry陣列,每個HashEntry是一個連結串列結構的元素,
每個Segment守護著一個HashEntry陣列裡的元素,當對HashEntry陣列的資料進行修改時,必須首先獲得它對應的Segment鎖。
結構圖如下:
從上面的結構我們可以瞭解到,ConcurrentHashMap定位一個元素的過程需要進行兩次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在的連結串列的頭部,因此,這一種結構的帶來的副作用是Hash的過程要比普通的HashMap要長,但是帶來的好處是寫操作的時候可以只對元素所在的Segment進行加鎖即可,不會影響到其他的Segment,這樣,在最理想的情況下,ConcurrentHashMap可以最高同時支援Segment數量大小的寫操作(剛好這些寫操作都非常平均地分佈在所有的Segment上),所以,通過這一種結構,ConcurrentHashMap的併發能力可以大大的提高。
ConcurrentHashMap原始碼分析
Segment
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile int count;
transient int modCount;
transient int threshold;
transient volatile HashEntry<K,V>[] table;
final float loadFactor;
}
複製程式碼
- count:Segment中元素的數量
- modCount:對table的大小造成影響的操作的數量(比如put或者remove操作)
- threshold:閾值,Segment裡面元素的數量超過這個值依舊就會對Segment進行擴容
- table:連結串列陣列,陣列中的每一個元素代表了一個連結串列的頭部
- loadFactor:負載因子,用於確定threshold
count用來統計該段資料的個數,它是volatile變數,它用來協調修改和讀取操作,以保證讀取操作能夠讀取到幾乎最新的修改。協調方式是這樣的,每次修改操作做了結構上的改變,如增加/刪除節點(修改節點的值不算結構上的改變),都要寫count值,每次讀取操作開始都要讀取count的值。這利用了 Java 5中對volatile語義的增強,對同一個volatile變數的寫和讀存在happens-before關係。
modCount統計段結構改變的次數,主要是為了檢測對多個段進行遍歷過程中某個段是否發生改變。
threashold用來表示需要進行rehash的界限值。
table陣列儲存段中節點,每個陣列元素是個hash鏈,用HashEntry表示。table也是volatile,這使得能夠讀取到最新的 table值而不需要同步。loadFactor表示負載因子。
HashEntry
Segment中的元素是以HashEntry的形式存放在連結串列陣列中的,看一下HashEntry的結構:
static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
}
複製程式碼
可以看到HashEntry的一個特點,除了value以外,其他的幾個變數都是final的,這意味著不能從hash鏈的中間或尾部新增或刪除節點,因為這需要修改next 引用值,所有的節點的修改只能從頭部開始(頭插法)。
對於put操作,可以一律新增到Hash鏈的頭部。
但是對於remove操作,可能需要從中間刪除一個節點,這就需要將要刪除節點的前面所有節點整個複製一遍,最後一個節點指向要刪除結點的下一個結點。。為了確保讀操作能夠看到最新的值,將value設定成volatile,這避免了加鎖。
ConcurrentHashMap的成員變數
...
//初始的容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
//初始的載入因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//初始的併發等級,表示當前更新執行緒的估計數
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
//最小的segment數量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//最大的segment數量
static final int MAX_SEGMENTS = 1 << 16;
//
static final int RETRIES_BEFORE_LOCK = 2;
// segments 的掩碼值,key 的雜湊碼的高位用來選擇具體的 segment
final int segmentMask;
// 偏移量
final int segmentShift;
final Segment<K,V>[] segments;
複製程式碼
ConcurrentHashMap的初始化
// 建立一個帶有指定初始容量、載入因子和併發級別的新的空對映
public ConcurrentHashMap(int initialCapacity,float loadFactor,int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// 尋找最佳匹配引數(不小於給定引數的最接近的 2^n)
int sshift = 0; // 用來記錄向左按位移動的次數
int ssize = 1; // 用來記錄Segment陣列的大小
// 計算並行級別 ssize,因為要保持並行級別是 2^n
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//用於定位元素所在segment。
//segmentShift表示偏移位數,通過前面的int型別的位的描述我們可以得知,int型別的數字在變大的過程中,
//低位總是比高位先填滿的,為保證元素在segment級別分佈的儘量均勻,計算元素所在segment時,
//總是取hash值的高位進行計算。segmentMask作用就是為了利用位運算中取模的操作:
//a % (Math.pow(2,n)) 等價於 a&( Math.pow(2,n)-1)
// 若為預設值,concurrencyLevel 為 16,sshift 為 4
// 那麼計算出 segmentShift 為 28,segmentMask 為 15
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// 記錄每個 Segment 上要放置多少個元素
int c = initialCapacity / ssize;
// 假如有餘數,則Segment數量加1
if (c * ssize < initialCapacity)
++c;
//保證每個Segment中tabel陣列的大小,一定為2的冪,初始化的三個引數取預設值時,table陣列大小為2
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
//初始化Segment陣列,並實際只填充Segment陣列的第0個元素。
Segment<K,V> s0 =
new Segment<K,V>(loadFactor,(int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss,SBASE,s0); // ordered write of segments[0]
this.segments = ss;
}
複製程式碼
CurrentHashMap的初始化一共有三個引數:
- 一個initialCapacity,表示初始的容量,
- 一個loadFactor,表示負載引數,
- 最後一個是concurrentLevel,代表ConcurrentHashMap內部的Segment的數量,ConcurrentLevel一經指定,不可改變,後續如果ConcurrentHashMap的元素數量增加導致ConrruentHashMap需要擴容,ConcurrentHashMap不會增加Segment的數量,而只會增加Segment中連結串列陣列的容量大小,這樣的好處是擴容過程不需要對整個ConcurrentHashMap做rehash,而只需要對Segment裡面的元素做一次rehash就可以了。
整個ConcurrentHashMap的初始化方法還是非常簡單的,先是根據concurrentLevel來new出Segment,這裡Segment的數量是不大於concurrentLevel的最大的2的指數,就是說Segment的數量永遠是2的指數個,這樣的好處是方便採用移位操作來進行hash,加快hash的過程。
接下來就是根據intialCapacity確定Segment的容量的大小,每一個Segment的容量大小也是2的指數,同樣使為了加快hash的過程。
這邊需要特別注意一下兩個變數,分別是segmentShift和segmentMask,這兩個變數在後面將會起到很大的作用,假設建構函式確定了Segment的數量是2的n次方,那麼segmentShift就等於32減去n,而segmentMask就等於2的n次方減一。
當用 new ConcurrentHashMap() 無參建構函式進行初始化的,那麼初始化完成後:
- Segment 陣列長度為 16,不可以擴容
- Segment[i] 的預設大小為 2,負載因子是 0.75,得出初始閾值為 1.5,也就是以後插入第一個元素不會觸發擴容,插入第二個會進行第一次擴容
- 這裡初始化了 segment[0],其他位置還是 null
- 當前 segmentShift 的值為 32 – 4 = 28,segmentMask 為 16 – 1 = 15,姑且把它們簡單 翻譯 為移位數和掩碼,這兩個值馬上就會用到
hash()方法
private int hash(Object k) {
int h = hashSeed;
//如果Key是字串型別,則使用專門為字串設計的Hash方法,否則使用一連串的異或操作增加hash隨機性
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
複製程式碼
初始化Segment
ConcurrentHashMap 初始化的時候會初始化第一個槽 segment[0],對於其他槽來說,在插入第一個值的時候進行初始化。
這裡需要考慮併發,因為很可能會有多個執行緒同時進來初始化同一個槽 segment[k],不過只要有一個成功了就可以。
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss,u)) == null) {
// 這裡看到為什麼之前要初始化 segment[0] 了,
// 使用當前 segment[0] 處的陣列長度和負載因子來初始化 segment[k]
// 為什麼要用“當前”,因為 segment[0] 可能早就擴容過了
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
// 初始化 segment[k] 內部的陣列
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,u))
== null) { // recheck Segment[k] 是否被其它執行緒初始化了
Segment<K,V> s = new Segment<K,V>(lf,threshold,tab);
// 使用 while 迴圈,內部用 CAS,當前執行緒成功設值或其他執行緒成功設值後,退出
while ((seg = (Segment<K,u))
== null) {
if (UNSAFE.compareAndSwapObject(ss,u,null,seg = s))
break;
}
}
}
return seg;
}
複製程式碼
put過程分析
當執行put方法插入資料時,根據key的hash值,在Segment陣列中找到相應的位置,如果相應位置的Segment還未初始化,則通過CAS進行賦值,接著執行Segment物件的put方法通過加鎖機制插入資料
場景:執行緒A和執行緒B同時執行相同Segment物件的put方法
1. 執行緒A執行tryLock()方法成功獲取鎖,則把HashEntry物件插入到相應的位置;
2. 執行緒B獲取鎖失敗,則執行scanAndLockForPut()方法,在scanAndLockForPut方法中,會通過重複執行tryLock()方法嘗試獲取鎖,在多處理器環境下,重複次數為64,單處理器重複次數為1,當執行tryLock()方法的次數超過上限時,則執行lock()方法掛起執行緒B;
3. 當執行緒A執行完插入操作時,會通過unlock()方法釋放鎖,接著喚醒執行緒B繼續執行;
複製程式碼
put 方法的過程:
-
判斷value是否為null,如果為null,直接丟擲異常。注:不允許key或者value為null
-
通過雜湊演演算法定位到Segment(key通過一次hash運算得到一個hash值,將得到hash值向右按位移動segmentShift位,然後再與segmentMask做&運算得到segment的索引j)。
-
使用Unsafe的方式從Segment陣列中獲取該索引對應的Segment物件
-
向這個Segment物件中put值
注:對共享變數進行寫入操作為了執行緒安全,在操作共享變數時必須得加鎖,持有段鎖(鎖定整個segment)的情況下執行的。修改資料是不能併發進行的
判斷該值的插入是否會導致該 segment 的元素個數超過閾值,以確保容量不足時能夠rehash擴容,再插值。
注:rehash 擴容 segment 陣列不能擴容,擴容的是 segment 陣列某個位置內部的陣列 HashEntry[] 擴容為原來的 2 倍。先進行擴容,再插值
查詢是否存在同樣一個key的結點,存在直接替換這個結點的值。否則建立一個新的結點並新增到hash鏈的頭部,修改modCount和count的值,修改count的值一定要放在最後一步。
public V put(K key,V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
// 根據 hash 值找到 Segment 陣列中的位置 j
// hash 是 32 位,無符號右移 segmentShift(28) 位,剩下高 4 位,
// 然後和 segmentMask(15) 做一次與操作,也就是說 j 是 hash 值的高 4 位,也就是槽的陣列下標
int j = (hash >>> segmentShift) & segmentMask;
// 剛剛說了,初始化的時候初始化了 segment[0],但是其他位置還是 null,
// ensureSegment(j) 對 segment[j] 進行初始化
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments,(j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
// 插入新值到 槽 s 中
return s.put(key,hash,value,false);
}
複製程式碼
Segment 內部是由 陣列+連結串列 組成的。
final V put(K key,int hash,V value,boolean onlyIfAbsent) {
// 先獲取該 segment 的獨佔鎖
// 每一個Segment進行put時,都會加鎖
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key,value);
V oldValue;
try {
// segment 內部的陣列
HashEntry<K,V>[] tab = table;
// 利用 hash 值,求應該放置的陣列下標
int index = (tab.length - 1) & hash;
// 陣列該位置處的連結串列的表頭
HashEntry<K,V> first = entryAt(tab,index);
for (HashEntry<K,V> e = first;;) {
// 如果鏈頭不為 null
if (e != null) {
K k;
//如果在該鏈中找到相同的key,則用新值替換舊值,並退出迴圈
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
//如果沒有和key相同的,一直遍歷到鏈尾,鏈尾的next為null,進入到else
e = e.next;
}
else {
// node 到底是不是 null,這個要看獲取鎖的過程,不過和這裡都沒有關係。
// 如果不為 null,那就直接將它設定為連結串列表頭;如果是null,初始化並設定為連結串列表頭。
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash,key,first);
int c = count + 1;
// 如果超過了該 segment 的閾值,這個 segment 需要擴容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
// 沒有達到閾值,將 node 放到陣列 tab 的 index 位置,
// 其實就是將新的節點設定成原連結串列的表頭
setEntryAt(tab,index,node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 解鎖
unlock();
}
return oldValue;
}
複製程式碼
get()方法
-
計算 hash 值,找到 segment 陣列中的具體位置,使用Unsafe獲取對應的Segment
-
根據 hash 找到陣列中具體的位置
-
從連結串列頭開始遍歷整個連結串列(因為Hash可能會有碰撞,所以用一個連結串列儲存),如果找到對應的key,則返回對應的value值,否則返回null。
注:get操作不需要鎖,由於其中涉及到的共享變數都使用volatile修飾,volatile可以保證記憶體可見性,所以不會讀取到過期資料。
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments,u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab,((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
複製程式碼
remove操作
Remove操作的前面一部分和前面的get和put操作一樣,都是定位Segment的過程,然後再呼叫Segment的remove方法:
final V remove(Object key,Object value) {
if (!tryLock())
scanAndLock(key,hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab,index);
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab,next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
複製程式碼
首先remove操作也是確定需要刪除的元素的位置,不過這裡刪除元素的方法不是簡單地把待刪除元素的前面的一個元素的next指向後面一個就完事了,前面已經說過HashEntry中的next是final的,一經賦值以後就不可修改,在定位到待刪除元素的位置以後,程式就將待刪除元素前面的那一些元素全部複製一遍,然後再一個一個重新接到連結串列上去,看一下下面這一幅圖來瞭解這個過程:
假設連結串列中原來的元素如上圖所示,現在要刪除元素3,那麼刪除元素3以後的連結串列就如下圖所示:
注意:圖1和2的元素順序相反了,為什麼這樣,不防再仔細看看原始碼或者再讀一遍上面remove的分析過程,元素複製是從待刪除元素位置起將前面的元素逐一複製的,然後再將後面的連結起來。
size 操作
size操作需要遍歷所有的Segment才能算出整個Map的大小。先採用不加鎖的方式,迴圈所有的Segment(通過Unsafe的getObjectVolatile()以保證原子讀語義)連續計算元素的個數,最多計算3次:
- 如果前後兩次計算結果相同,則說明計算出來的元素個數是準確的;
- 如果前後兩次計算結果都不同,則給每個Segment進行加鎖,再計算一次元素的個數;
注:在put,remove和clean方法裡操作元素前都會將變數modCount進行加1,那麼在統計size前後比較modCount是否發生變化,從而得知容器的大小是否發生變化。