同步工具類(鎖、閉鎖、柵欄、訊號量)
1 AQS
state
AQS 的關鍵
CAS
Compare And Swap 修改 state
Unsafe
使用 LockSupport 的 park()、unpark() 掛起和喚醒執行緒
同步佇列
sync queue
條件佇列
condition queue
等待佇列
2 ReentrantLock
2.1 使用場景
2.2 AQS 實現
ReadWriteReentrantLock
StampLock
2.3 分散式實現
參考 分散式鎖
3 CountDownLatch
閉鎖,允許N個執行緒一直等待,直到其他執行緒執行的操作全部完成。
CountDownLatch(int count)
count 表示計數器。
3.1 使用場景
資料庫脫敏,每批次查詢 1000 條,提交給執行緒池,執行緒在脫敏完成後 countDown,然後繼續。
主執行緒 await 阻塞,等待脫敏完 1000 條記錄後返回,並批量更新資料庫。
3.2 AQS 實現
- 主執行緒構造 CountDownLatch,設定 AQS state 為 count
- 主執行緒呼叫 await(),當 state > 0 時進入 AQS 阻塞佇列
- 子執行緒呼叫 countDown() 原子遞減 state,當 state == 0 時喚醒所有呼叫await()方法阻塞的執行緒
3.3 分散式實現
setCount()
EXISTS key,如果不存在則 SET count
countDown()
DECR key,如果返回值 <= 0,則 DEL key
await()
在 while 迴圈中 GET key,如果返回值 == 0,則結束迴圈
4 CylicBarrier
柵欄,讓一組執行緒互相等待,直到所有執行緒都到達一個同步點。Cylic 表示可以迴圈利用。
CyclicBarrier(int parties, Runnable barrierAction)
parties 表示預期到達柵欄的執行緒數;barrierAction 表示所有執行緒到達柵欄後執行的回撥
4.1 使用場景
資料庫脫敏,每批次查詢 1000 條,提交給執行緒池,執行緒在脫敏完成後 await。
當 1000 個執行緒 await 時,CylicBarrier 喚醒其中一個執行緒執行批量更新。
4.2 AQS 實現
- 主執行緒構造 CylicBarrier,設定 AQS state 為 parties
- 子執行緒呼叫 await(),獲取排斥鎖,遞減 state,進入阻塞佇列,釋放鎖
- 當 state == 0 時,喚醒其中一個執行緒執行構造器中設定的回撥,並重置 state 為 parties,迴圈利用
4.3 分散式實現
5 Semaphore
訊號量,控制資源可被同時訪問的執行緒個數。
Semaphore(int permits, boolean fair)
permits 表示許可個數;fair 表示公平競爭,預設 false。
5.1 使用場景
限流。
5.2 AQS 實現
- 主執行緒構造 Semaphore,設定 AQS state 為 permits
- 主執行緒呼叫 acquire(N),用 state - N,如果小於0,則進入阻塞佇列,大於0則通過CAS設定當前訊號量為剩餘值,同時返回剩餘值
- 子執行緒呼叫 release(N),用 state + N,同時不停的嘗試因為呼叫acquire()進入阻塞的執行緒
5.3 分散式實現
setPermits()
GET key,如果返回值 == 0,則 SET key
acquire()
GET key,如果返回值 > 0,則 INCRBY permits
release()
INCRBY permits