Java 併發程式設計(六)併發容器和框架
傳統 Map 的侷限性
HashMap
-
JDK 1.7 的
HashMap
JDK 1.7 中
HashMap
的實現只是單純的 “陣列 + 連結串列 ” 的組合方式,具體的組成如下:[1]
在 JDK 1.7 的實現中,HashMap 內部會維護一個數組,陣列中的每個元素都是一個單向連結串列。這是因為不同的物件可能會有相同的
hashCode
HashMap
關鍵的幾個屬性 :-
capacity
:表示當前陣列的容量,是中為 2^n,可以對陣列進行擴容,擴容後的大小為原來陣列長度的兩倍 -
loadFactor
:負載因子,預設為 0.75,用於衡量當前陣列的填充情況 -
threshold
put
方法的實現,對應的原始碼:public V put(K key, V value) { // 處理 key 為 null 的情況,null 表示一個特殊的物件 if (key == null) return putForNullKey(value); /* 獲取這個 key 物件的 hashCode,這個獲取的過程很複雜, 只需要知道它能夠儘可能地獲取一個不會發生衝突的 code 就行了 */ int hash = hash(key); /* 通過得到的 hashCode 得到 key 在陣列中所處的位置索引 在 JDK 1.7 中就是和 (table.length - 1) 執行按位與的操作 */ int i = indexFor(hash, table.length); /* 遍歷這個位置的不連結串列,檢查是否這個 key 物件是否已經存在連結串列中了(這裡需要使用到 equals 方法) 如果這個 key 已經存在於連結串列中了,那麼只需要更新這個 key 所在的 Entry 的 value 即可 */ for (Entry<K,V> e = table[i]; e != null; e = e.next) { Object k; if (e.hash == hash && ((k = e.key) == key || key.equals(k))) { V oldValue = e.value; e.value = value; e.recordAccess(this); return oldValue; } } // 執行到這裡就說明 key 物件不在原有的 HashMap 中 modCount++; addEntry(hash, key, value, i); // 將這個鍵值對新增到這個陣列位置對應的連結串列中 return null; }
比較關係的是
addEntry
方法的實現,具體的程式碼如下所示:void addEntry(int hash, K key, V value, int bucketIndex) { /* 為了能夠得到更好的效果,雜湊表的陣列的大小一般是元素的兩倍,但是由於當前使用的 “陣列 + 連結串列” 的組合結構 只要不超過當前的閾值即可達到近似的效果,因此噹噹前 HashMap 的元素個數已經達到了閾值時, 需要進行擴容以達到更好的效果 */ if ((size >= threshold) && (null != table[bucketIndex])) { resize(2 * table.length); // 擴容,在此略過 /* 參考上文提到的生成 hashCode 的操作,當前陣列的長度已經變了, 那麼 key 的 hashCode 也是需要重新計算的 */ hash = (null != key) ? hash(key) : 0; // 下標也是需要重新計算的 bucketIndex = indexFor(hash, table.length); } createEntry(hash, key, value, bucketIndex); } // 插入新的 Entry void createEntry(int hash, K key, V value, int bucketIndex) { Entry<K,V> e = table[bucketIndex]; // 記錄當前連結串列的頭指標 // 很明顯,這裡使用到的是 “頭插法” 的方式插入的元素 table[bucketIndex] = new Entry<>(hash, key, value, e); size++; }
現在,分析一下 JDK 1.7 中
HashMap
在併發情況下可能會發生的一些問題,主要問題發生在 “插入新的 Entry” 這裡,當存在多個執行緒同時插入元素時,由於沒有任何同步機制來確保操作的可見性,因此可能會使得部分新插入的資料丟失,因此它不是執行緒安全的;除了這個問題之外,在極端的情況下,可能由於 “頭插法” 的插入方式,使得多個執行緒同時修改後續結點的連線關係,這種情況下,很有可能導致這些節點之間的連結存在 “環”,是的HashMap
的某些操作無法正常執行。
-
-
JDK 1.8 的
HashMap
JDK 1.8 的
HashMap
與 JDK 1.7 最大的不同點在與使用的資料結構,JDK 1.8 繼承了原有的 “陣列 + 連結串列” 的組成結構,同時也引入了 “紅黑樹” 的資料結構來處理在元素個數較多的情況。現在 JDK 1.8
HashMap
的一般結構如下:[1]
“陣列 + 連結串列” 和 “陣列 + 紅黑樹” 在同一時刻只能有一種組合是有效的,
HashMap
會自動根據當前的元素的數量自動地實現這兩種組合之間的轉換相關的比較重要的屬性已經在 JDK 1.7 的
HashMap
中提及到了,在此不做過多的贅述
對於併發場景,主要關心的是
put
方法,對應的原始碼如下:public V put(K key, V value) { return putVal(hash(key), key, value, false, true); } // putVal final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) { /* JDK 1.8 使用 Node 替換掉了 Entry,但是如果使用 “陣列 + 紅黑樹” 的組合結構的話, 那麼將使用 TreeNode 作為 Entry 的實現 */ Node<K,V>[] tab; Node<K,V> p; int n, i; if ((tab = table) == null || (n = tab.length) == 0) n = (tab = resize()).length; if ((p = tab[i = (n - 1) & hash]) == null) tab[i] = newNode(hash, key, value, null); else { Node<K,V> e; K k; if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k)))) e = p; else if (p instanceof TreeNode) /* 在 “陣列 + 紅黑樹” 的組合情況下插入新的節點,相當於在紅黑樹中插入或者更新一個新的節點 同樣是缺少同步機制的 */ e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value); else { for (int binCount = 0; ; ++binCount) { if ((e = p.next) == null) { /* 重點在這,使用 “陣列 + 連結串列” 的組合情況下,插入節點, 結合這裡的條件,不難發現,這裡的插入方式是 “尾插法” */ p.next = newNode(hash, key, value, null); if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st treeifyBin(tab, hash); break; } if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) break; p = e; } } if (e != null) { // existing mapping for key V oldValue = e.value; if (!onlyIfAbsent || oldValue == null) e.value = value; afterNodeAccess(e); return oldValue; } } ++modCount; if (++size > threshold) resize(); afterNodeInsertion(evict); return null; }
JDK 1.8 中 “陣列 + 連結串列” 的插入方式改為了 “尾插法”,解決了 JDK 1.7 中可能會出現環的問題,但是由於缺少同步機制,不管是使用何種組合方式,都無法保證這個插入操作是執行緒安全的。
Hashtable
Hashtable
通過使得每個操作都加上 synchronized
修飾,使得每個操作都只能通過單個執行緒來訪問,從根源上解決了併發帶來的問題。但是缺點也很明顯,每個操作都只能由單個執行緒訪問,因此完全無法得到多執行緒帶來的效能提升。
在實際使用過程中,由於 JVM
的鎖消除機制,大部分的情況下 HashTable
的同步機制實際上是沒有效果的,而且由於 Hashtable
無法得到多執行緒帶來的效能提升,實際上很少會使用到,一般情況下都會直接使用 HashMap
,如果要保證執行緒安全,那麼一般會使用 ConcurrentHashMap
,從而能夠得到多執行緒帶來的效能的提升
ConcurrentHashMap
如果想要使得 HashMap
成為一個執行緒安全的 Map
結構,可以通過 Collections.synchronizedMap(map)
將 HashMap
轉換成為一個 SynchronizedMap
,這種方式與使用 Hashtable
並沒有什麼不同,更好的選擇方案是使用 ConcurrentHashMap
。
Java 7 的 ConcurrentHashMap
JDK 1.7 中的 ConcurrentHashMap
通過引入分段鎖的方式來提高併發操作的能力,在 JDK 1.7 中,ConcurrentHashMap
的結構如下:
[1]
JDK 1.7 中ConcurrentHashMap
中常量欄位如下:
// 預設初始容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
// 預設負載因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// 預設併發級別
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
static final int MAXIMUM_CAPACITY = 1 << 30;
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
static final int MAX_SEGMENTS = 1 << 16;
// 重試次數
static final int RETRIES_BEFORE_LOCK = 2;
JDK 1.7 中 ConcurrentHashMap
的建構函式如下:
// 最終都會走到這個建構函式
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
/*
通過 concurrencyLevel 計算並行級別 ssize,ssize 應當是大於或者等於
concurrencyLevel 的最小的 2 的 n 次方
*/
int sshift = 0;
int ssize = 1; // segment 陣列的長度
while (ssize < concurrencyLevel) {
++sshift;
/*
為了能夠通過按位於的雜湊演算法來定位 segments 陣列的索引,
必須保證 segments 陣列額的長度是 2 的 n 次方
*/
ssize <<= 1;
}
// 計算並行級別結束
this.segmentShift = 32 - sshift; // 段偏移量,預設為 28
this.segmentMask = ssize - 1; // 段掩碼,預設為 15
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
// segment 內部陣列的初始化容量,預設為 2
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1; // 需要保證 segment 內部陣列的長度也是 2 的 n 次方
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
put
方法:
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
/*
第一次計算 hash 值,用於定位當前 key 在 segment 陣列中的位置
*/
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask; // 定位到segment
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j); // 第一次訪問segment時,建立segment物件(確保 segment 是存在的)
return s.put(key, hash, value, false); // 委託給特定的段
}
Segment
物件中的 put
方法:
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
/*
tryLock()加鎖,不成功則執行scanAndLockForPut,先完成節點例項化
*/
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
// 第二次 hash 用於獲取在 Segment 陣列中的索引
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) { // 進行元素定位即更新操作
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) { // key 已存在
oldValue = e.value;
if (!onlyIfAbsent) { // onlyIfAbsent決定是否更新值
e.value = value;
++modCount; // 修改次數
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first); // 頭插法, node已經在嘗試獲取鎖的時候例項化過了
else
node = new HashEntry<K,V>(hash, key, value, first); // 頭插法, 未例項化過
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node); // 擴容, 只對當前segment 擴容, 和 hashmap 擴容 類似
else
/*
設定 node 在 segment 中元素陣列的位置,由於 table 是 volatile 變數,
因此對於 table 的寫入操作對於其它執行緒來講是可見的
*/
setEntryAt(tab, index, node); // 頭插法
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
get
方法對應的原始碼:
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // 定位到segment
/*
table、next 等共享變數都是 volatile 變數,因此對於它們的寫入都會發生在讀操作之前,
因此在這裡就不需要再加鎖
*/
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
// 定位到在 segment 陣列中的位置
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
size
方法對應的原始碼:
public int size() {
/*
先嚐試兩次不對Segment加鎖方式來統計count值,如果統計過程中count發生變化了,再加鎖。
如果沒有改變,則返回size。
*/
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation 兩次嘗試失敗了,加鎖
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
/*
嘗試成功(通過判斷兩次的總修改次數沒有變化) 或者是 加鎖之後肯定滿足條件
*/
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
Java 8 的 ConcurrentHashMap
JDK 1.8 與 JDK 1.7 相比,具有以下的不同:
- 放棄分段鎖
- 使用
Node
+CAS
+syncronized
來保證執行緒安全性 - 底層使用 “陣列 + 連結串列” 和 “陣列 + 紅黑樹” 的方式來儲存元素
原始碼分析在此略過。。。。。。
ConcurrentLinkedQueue
ConcurrentLinkedQueue
具有以下特點:
- 基於連結節點的先進先出的無界執行緒安全佇列
- 使用
CAS
來保證執行緒安全性
以 JDK 1.7 為例,介紹一下關於 ConcurrentLinkedQueue
的入隊和出隊操作
入隊的 offer()
方法:
// 通過對 tailer (volatile 變數)的讀來減少對 tailer 的寫入次數
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final Node<E> n = new Node<E>(e); // 入隊前,建立一個入隊節點
retry:
// 使得入隊的操作在 CAS 中最終能夠成功
for (;;) {
// 建立一個指向tail節點的引用
Node<E> t = tail;
// p用來表示佇列的尾節點,預設情況下等於tail節點。
Node<E> p = t;
/*
只有 tailer 距離新插入的節點長度大於 hops 時,才考慮更新 tailer,
這樣就能夠使得對 tailer 的寫入次數減少,從而提高了效能
這種方式的缺點在於現在 tailer 的位置已經不再是實際尾節點了,
因此每次都需要再遍歷 hops 次才能得到實際的尾節點
*/
for (int hops = 0; ; hops++) {
// 獲得p節點的下一個節點。
Node<E> next = succ(p);
// next 節點不為空,說明 p 不是尾節點,需要更新 p 後在將它指向 next 節點
if (next != null) {
/*
如果已經至少越過了兩個節點,且 tail 被修改
(tail 被修改,說明其他執行緒向佇列添加了新的節點,且更新 tail 成功 )
*/
if (hops > HOPS && t != tail)
continue retry; // 跳出內層迴圈,重新開始迭代(因為 tail 剛剛被其他執行緒更新了)。
p = next;
}
// 如果 p 是尾節點,則設定 p 節點的 next 節點為入隊節點。
else if (p.casNext(null, n)) {
/*
如果已經至少越過了一個節點(此時,tail 至少滯後尾節點兩個節點)才去更新尾節點。
更新失敗了也沒關係,因為失敗了表示有其他執行緒成功更新了 tail 節點
*/
if (hops >= HOPS)
casTail(t, n); // 更新 tail 節點,允許失敗
return true;
}
// CAS 更新失敗,p 有 next 節點, 則重新設定 p 節點
else {
p = succ(p);
}
}
}
}
// succ 的實現
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
/*
如果 p 節點的 next 域連結到自身(p 節點是哨兵節點, 也就是被出隊的節點)
就跳轉到 head,從 head 開始繼續遍歷,否則向後推進到下一個節點
從上面的 updateHead 方法 可以看出
*/
return (p == next) ? head : next;
}
出隊操作的 poll
方法:
// 與 offer 方法類似,head 也不會在每次出隊時就更新,這樣就可以減少對 head 的寫入操作從而提高效能
public E poll() {
Node<E> h = head;
// p表示頭節點,需要出隊的節點
Node<E> p = h;
for (int hops = 0;; hops++) {
// 獲取p節點的元素
E item = p.item;
/*
如果p節點的元素不為空,使用CAS設定p節點引用的元素為null,
如果成功則返回p節點的元素。
*/
if (item != null && p.casItem(item, null)) {
if (hops >= HOPS) { // 通過 hops 變數來減少對 head 的寫入次數
// 將p節點下一個節點設定成head節點
Node<E> q = p.next;
updateHead(h, (q != null) ? q:p);
}
return item;
}
/*
如果頭節點的元素為空或頭節點發生了變化,這說明頭節點已經被另外
一個執行緒修改了。那麼獲取p節點的下一個節點
*/
Node<E> next = succ(p);
// 如果p的下一個節點也為空,說明這個佇列已經空了
if (next == null) {
// 更新頭節點。
updateHead(h, p);
break;
}
// 如果下一個元素不為空,則將頭節點的下一個節點設定成頭節點
p = next;
}
return null;
}
由於篇幅原因,JDK 1.8 的原始碼在此略過。。。。
阻塞佇列
阻塞佇列的類結構圖如下所示:
阻塞佇列的阻塞體現在當佇列已滿或者佇列為空時,對於佇列的入隊和出隊操作都將導致當前的訪問執行緒阻塞,直到滿足對應操作的基本條件
方法的規範如下:
常見的阻塞佇列的實現類:
-
ArrayBlockingQueue
由陣列組成的有界阻塞佇列,預設情況下不保證執行緒公平地訪問佇列
-
LinkedBlockingQueue
由連結串列組成的有界阻塞佇列
-
PriorityBlockingQueue
帶有優先順序的有界阻塞佇列
-
DelayQueue
使用優先佇列實現的無界的延時阻塞佇列,支援延時獲取元素。使用場景:快取系統的設計;定時任務排程
-
SynchronousQueue
不儲存元素的阻塞佇列,表示每個
put
操作都需要等待一個get
操作,預設情況下不使用公平地訪問佇列使用場景:適合傳遞性的使用場景
-
LinkedTransferQueue
由連結串列組成的無界阻塞佇列
-
LinkedBlockingDeque
由連結串列結構組成的雙向阻塞佇列
Fork/Join 框架
Fork
將一個大的任務拆分成多個小任務,使得小任務能夠併發地執行;Join
則是將多個子任務合併起來,彙總小任務的結果得到大任務的最終結果
處理結果如下圖所示:
基本使用
-
建立任務
需要通過定一個類,繼承
java.util.concurrent.ForkJoinTask
來定義具體的任務,但是一般情況下,只需要繼承java.util.concurrent.RecursiveTask
或java.util.concurrent.RecursiveAction
即可,兩者分別處理含有返回結果和不包含處理結果的任務現在,使用
Fork/Join
框架來處理從 \(1+2+3+……+100\) 的任務首先,定義一個任務,用於分割任務和實際處理任務:
static class CountTask extends RecursiveTask<Integer> { private static final int THRESHOLD = Runtime.getRuntime().availableProcessors(); // 閾值 private static volatile int unit = -1; private final int start; private final int end; public CountTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; boolean canCompute = (end - start) <= unit; if (canCompute) { // 如果任務足夠小就計算任務 for (int i = start; i <= end; i++) { sum += i; } } else { // 如果任務大於閾值,就分裂成多個子任務計算 CountTask[] tasks = new CountTask[THRESHOLD]; int[] res = new int[THRESHOLD]; // 第一次訪問時設定 unit if (unit < 0) unit = (int) Math.ceil((end - start) * 1.0 / THRESHOLD); for (int i = 0; i < THRESHOLD; ++i) { if (i == 0) tasks[i] = new CountTask(start, start + (i + 1) * unit); else tasks[i] = new CountTask(start + i * unit + 1, Math.min(start + (i + 1) * unit, end)); } // 啟動所有的子任務 for (CountTask task : tasks) task.fork(); // 等待子任務執行完,並得到其結果 for (int i = 0; i < res.length; ++i) { res[i] = tasks[i].join(); } // 合併子任務 for (int val : res) sum += val; } return sum; } }
-
執行任務
首先,任務執行要之
ForkJoinPool
中才能執行,因此首先需要建立一個ForkJoinPool
:ForkJoinPool pool = new ForkJoinPool();
隨後,定義任務。並且獲取結果:
CountTask task = new CountTask(1, 1000); Future<Integer> res = pool.submit(task); if (task.isCompletedAbnormally()) { task.getException().printStackTrace(); } System.out.println(res.get());
工作竊取演算法
某個已經執行完自身小任務的執行緒從其它執行緒那裡獲取未執行的任務來執行,這樣可以儘可能地得到多執行緒帶來的效能提升
具體的處理情況如下:
參考: