Java進階專題(二十五) 分散式鎖實現業務冪等
阿新 • • 發佈:2021-02-08
## 前言
現如今很多系統都會基於分散式或微服務思想完成對系統的架構設計。那麼在這一個系統中,就會存在若干個微服務,而且服務間也會產生相互通訊呼叫。那麼既然產生了服務呼叫,就必然會存在服務呼叫延遲或失敗的問題。當出現這種問題,服務端會進行重試等操作或客戶端有可能會進行多次點選提交。如果這樣請求多次的話,那最終處理的資料結果就一定要保證統一,如支付場景。此時就需要通過保證業務冪等性方案來完成。
## 什麼是冪等
冪等本身是一個數學概念。即 f(n) = 1^n ,無論n為多少,f(n)的值永遠為1。在程式設計開發中,對於冪等的定義為:無論對某一個資源操作了多少次,其影響都應是相同的。 換句話說就是:在介面重複呼叫的情況下,對系統產生的影響是一樣的,但是返回值允許不同,如查詢。
冪等性不僅僅只是一次或多次操作對資源沒有產生影響,還包括第一次操作產生影響後,以後多次操作不會再產生影響。並且冪等關注的是是否對資源產生影響,而不關注結果。
以SQL為例:
```sql
select * from table where id=1 。此SQL無論執行多少次,雖然結果有可能出現不同,都不會對資料產生
改變,具備冪等性。
insert into table(id,name) values(1,'heima') 。此SQL如果id或name有唯一性約束,多次操作只允許插
入一條記錄,則具備冪等性。如果不是,則不具備冪等性,多次操作會產生多條資料。
update table set score=100 where id = 1 。此SQL無論執行多少次,對資料產生的影響都是相同的。具備
冪等性。
update table set score=50+score where id = 1 。此SQL涉及到了計算,每次操作對資料都會產生影響。
不具備冪等性。
delete from table where id = 1 。此SQL多次操作,產生的結果相同,具備冪等性。
```
冪等性設計主要從兩個維度進行考慮:空間、時間。
空間:定義了冪等的範圍,如生成訂單的話,不允許出現重複下單。
時間:定義冪等的有效期。有些業務需要永久性保證冪等,如下單、支付等。而部分業務只要保證一段時間
冪等即可。
同時對於冪等的使用一般都會伴隨著出現鎖的概念,用於解決併發安全問題。
##介面冪等
對於冪等的考慮,主要解決兩點前後端互動與服務間互動。這兩點有時都要考慮冪等性的實現。從前端的思路解決
的話,主要有三種:**前端防重、PRG模式、Token機制**。
**2.1)前端防重**
通過前端防重保證冪等是最簡單的實現方式,前端相關屬性和JS程式碼即可完成設定。可靠性並不好,有經驗的人員可以通過工具跳過頁面仍能重複提交。主要適用於表單重複提交或按鈕重複點選。
**2.2)PRG模式**
PRG模式即POST-REDIRECT-GET。當用戶進行表單提交時,會重定向到另外一個提交成功頁面,而不是停留在原先的表單頁面。這樣就避免了使用者重新整理導致重複提交。同時防止了通過瀏覽器按鈕前進/後退導致表單重複提交。
是一種比較常見的前端防重策略。
**2.3)token機制**
2.3.1)方案介紹
通過token機制來保證冪等是一種非常常見的解決方案,同時也適合絕大部分場景。該方案需要前後端進行一定程度的互動來完成。
![](https://img2020.cnblogs.com/blog/874710/202102/874710-20210208171214304-766976238.png)
1)服務端提供獲取token介面,供客戶端進行使用。服務端生成token後,如果當前為分散式架構,將token存放於redis中,如果是單體架構,可以儲存在jvm快取中。
2)當客戶端獲取到token後,會攜帶著token發起請求。
3)服務端接收到客戶端請求後,首先會判斷該token在redis中是否存在。如果存在,則完成進行業務處理,業務處理完成後,再刪除token。如果不存在,代表當前請求是重複請求,直接向客戶端返回對應標識。
但是現在有一個問題,當前是先執行業務再刪除token。在高併發下,很有可能出現第一次訪問時token存在,完成具體業務操作。但在還沒有刪除token時,客戶端又攜帶token發起請求,此時,因為token還存在,第二次請求也會驗證通過,執行具體業務操作。
對於這個問題的解決方案的思想就是並行變序列。會造成一定效能損耗與吞吐量降低。
第一種方案:對於業務程式碼執行和刪除token整體加執行緒鎖。當後續執行緒再來訪問時,則阻塞排隊。
第二種方案:藉助redis單執行緒和incr是原子性的特點。當第一次獲取token時,以token作為key,對其進行自增。
然後將token進行返回,當客戶端攜帶token訪問執行業務程式碼時,對於判斷token是否存在不用刪除,而是對其繼續incr。如果incr後的返回值為2。則是一個合法請求允許執行,如果是其他值,則代表是非法請求,直接返回。
那如果先刪除token再執行業務呢?其實也會存在問題,假設具體業務程式碼執行超時或失敗,沒有向客戶端返回明確結果,那客戶端就很有可能會進行重試,但此時之前的token已經被刪除了,則會被認為是重複請求,不再進行業務處理。
這種方案無需進行額外處理,一個token只能代表一次請求。一旦業務執行出現異常,則讓客戶端重新獲取令牌,重新發起一次訪問即可。推薦使用先刪除token方案
但是無論先刪token還是後刪token,都會有一個相同的問題。每次業務請求都回產生一個額外的請求去獲取token。但是,業務失敗或超時,在生產環境下,一萬個裡最多也就十個左右會失敗,那為了這十來個請求,讓其他九千九百多個請求都產生額外請求,就有一些得不償失了。雖然redis效能好,但是這也是一種資源的浪費。
## 服務冪等
###防重表
對於防止資料重複提交,還有一種解決方案就是通過防重表實現。防重表的實現思路也非常簡單。首先建立一張表
作為防重表,同時在該表中建立一個或多個欄位的唯一索引作為防重欄位,用於保證併發情況下,資料只有一條。
在向業務表中插入資料之前先向防重表插入,如果插入失敗則表示是重複資料。
對於防重表的解決方案,可能有人會說為什麼不使用悲觀鎖。悲觀鎖在使用的過程中也是會發生死鎖的。悲觀鎖是
通過鎖表的方式實現的。 假設現在一個使用者A訪問表A(鎖住了表A),然後試圖訪問表B; 另一個使用者B訪問表
B(鎖住了表B),然後試圖訪問表A。 這時對於使用者A來說,由於表B已經被使用者B鎖住了,所以使用者A必須等到用
戶B釋放表B才能訪問。 同時對於使用者B來說,由於表A已經被使用者A鎖住了,所以使用者B必須等到使用者A釋放表A才
能訪問。此時死鎖就已經產生了。
### Mysql樂觀鎖保證冪等
MySQL樂觀鎖是基於資料庫完成分散式鎖的一種實現,實現的方式有兩種:基於版本號、基於條件。但是實現思
想都是基於MySQL的行鎖思想來實現的。
![](https://img2020.cnblogs.com/blog/874710/202102/874710-20210208171222950-1755678734.png)
通過版本號控制是一種非常常見的方式,適合於大多數場景。但現在庫存扣減的場景來說,通過版本號控制就是多
人併發訪問購買時,查詢時顯示可以購買,但最終只有一個人能成功,這也是不可以的。其實最終只要商品庫存不
發生超賣就可以。那此時就可以通過條件來進行控制。
mysql樂觀鎖更適用於一些需要計數的表上,而且在競爭不激烈,出現併發衝突機率較小時,推薦使用樂觀鎖。雖
然通過MySQL樂觀鎖可以完成併發控制,但鎖的操作是直接作用於資料庫上,這樣就會在一定程度上對資料庫效能產生影響。並且mysql的連線數量是有限的,如果出現大量鎖操作佔用連線時,也會造成MySQL的效能瓶頸。
### zookeeper分散式鎖
#### 實現思想
對於分散式鎖的實現,zookeeper天然攜帶的一些特效能夠很完美的實現分散式鎖。其內部主要是利用znode節點
特性和watch機制完成。
**在zookeeper中節點會分為四類,分別是:**
**持久節點:**一旦建立,則永久存在於zookeeper中,除非手動刪除。
**持久有序節點:**一旦建立,則永久存在於zookeeper中,除非手動刪除。同時每個節點都會預設存在節點序號,每個節點的序號都是有序遞增的。如demo000001、demo000002.....demo00000N。
**臨時節點:**當節點建立後,一旦伺服器重啟或宕機,則被自動刪除。
**臨時有序節點:**當節點建立後,一旦伺服器重啟或宕機,則被自動刪除。同時每個節點都會預設存在節點序號,每個節點的序號都是有序遞增的。如demo000001、demo000002.....demo00000N。
**watch監聽機制**
watch監聽機制主要用於監聽節點狀態變更,用於後續事件觸發,假設當B節點監聽A節點時,一旦A節點發生修
改、刪除、子節點列表發生變更等事件,B節點則會收到A節點改變的通知,接著完成其他額外事情。
![](https://img2020.cnblogs.com/blog/874710/202102/874710-20210208171232165-722144073.png)
**實現原理**
其實現思想是當某個執行緒要對方法加鎖時,首先會在zookeeper中建立一個與當前方法對應的父節點,接著每個要
獲取當前方法的鎖的執行緒,都會在父節點下建立一個臨時有序節點,因為節點序號是遞增的,所以後續要獲取鎖的
執行緒在zookeeper中的序號也是逐次遞增的。根據這個特性,當前序號最小的節點一定是首先要獲取鎖的執行緒,因
此可以規定序號最小的節點獲得鎖。所以,每個執行緒再要獲取鎖時,可以判斷自己的節點序號是否是最小的,如果
是則獲取到鎖。當釋放鎖時,只需將自己的臨時有序節點刪除即可。
![](https://img2020.cnblogs.com/blog/874710/202102/874710-20210208171240533-268182020.png)
在併發下,每個執行緒都會在對應方法節點下建立屬於自己的臨時節點,且每個節點都是臨時且有序的。
那麼zookeeper又是如何有序的將鎖分配給不同執行緒呢? 這裡就應用到了watch監聽機制。每當新增一個新的臨時
節點時,其都會基於watcher機制監聽著它本身的前一個節點等待前一個節點的通知,當前一個節點刪除時,就輪
到它來持有鎖了。然後依次類推。
**優缺點**
1)zookeeper是基於cp模式,能夠保證資料強一致性。
2)基於watch機制實現鎖釋放的自動監聽,鎖操作效能較好。
3)頻繁建立節點,對於zk伺服器壓力較大,吞吐量沒有redis強。
#### 原理剖析
**低效鎖思想**
在通過zookeeper實現分散式鎖時,有另外一種實現的寫法,這種也是非常常見的,但是它的效率並不高,此處可
以先對這種實現方式進行探討。
![](https://img2020.cnblogs.com/blog/874710/202102/874710-20210208171249716-561277975.png)
此種實現方式,只會存在一個鎖節點。當建立鎖節點時,如果鎖節點不存在,則建立成功,代表當前執行緒獲取到
鎖,如果建立鎖節點失敗,代表已經有其他執行緒獲取到鎖,則該執行緒會監聽鎖節點的釋放。當鎖節點釋放後,則繼
續嘗試建立鎖節點加鎖。
這種方案的低效點就在於,只有一個鎖節點,其他執行緒都會監聽同一個鎖節點,一旦鎖節點釋放後,其他執行緒都會
收到通知,然後競爭獲取鎖節點。這種大量的通知操作會嚴重降低zookeeper效能,對於這種由於一個被watch的
znode節點的變化,而造成大量的通知操作,叫做**羊群效應**。
**高效鎖思想**
為了避免羊群效應的出現,業界內普遍的解決方案就是,讓獲取鎖的執行緒產生排隊,後一個監聽前一個,依次排
序。推薦使用這種方式實現分散式鎖
![](https://img2020.cnblogs.com/blog/874710/202102/874710-20210208171259372-385856202.png)
按照上述流程會在根節點下為每一個等待獲取鎖的執行緒建立一個對應的臨時有序節點,序號最小的節點會持有鎖,
並且後一個節點只監聽其前面的一個節點,從而可以讓獲取鎖的過程有序且高效。
**程式碼實現**
```java
public abstract class AbstractLock {
//zookeeper伺服器地址
public static final String ZK_SERVER_ADDR="192.168.200.131:2181";
//zookeeper超時時間
public static final int CONNECTION_TIME_OUT=30000;
public static final int SESSION_TIME_OUT=30000;
//建立zk客戶端
protected ZkClient zkClient = new
ZkClient(ZK_SERVER_ADDR,SESSION_TIME_OUT,CONNECTION_TIME_OUT);
/**
* 獲取鎖
* @return
*/
public abstract boolean tryLock();
/**
* 等待加鎖
*/
public abstract void waitLock();
/**
* 釋放鎖
*/
public abstract void releaseLock();
public void getLock() {
String threadName = Thread.currentThread().getName();
if (tryLock()) {
System.out.println(threadName+": 獲取鎖成功");
}else {
System.out.println(threadName+": 獲取鎖失敗,等待中");
//等待鎖
waitLock();
getLock();
}
}
}
```
```java
public class HighLock extends AbstractLock{
private static final String PARENT_NODE_PATH="/high_lock";
//當前節點路徑
private String currentNodePath;
//前一個節點的路徑
private String preNodePath;
private CountDownLatch countDownLatch;
@Override
public boolean tryLock() {
//判斷父節點是否存在
if (!zkClient.exists(PARENT_NODE_PATH)){
//不存在
zkClient.createPersistent(PARENT_NODE_PATH);
}
//建立第一個臨時有序子節點
if (currentNodePath == null || "".equals(currentNodePath)){
//根節點下沒有節點資訊,將當前節點作為第一個子節點,型別:臨時有序
currentNodePath = zkClient.createEphemeralSequential(PARENT_NODE_PATH+"/","lock");
}
//不是第一個子節點,獲取父節點下所有子節點
List childrenNodeList = zkClient.getChildren(PARENT_NODE_PATH);
//子節點升序排序
Collections.sort(childrenNodeList);
//判斷是否加鎖成功
if (currentNodePath.equals(PARENT_NODE_PATH+"/"+childrenNodeList.get(0))){
//當前節點是序號最小的節點
return true;
}else {
//當前節點不是序號最小的節點,獲取其前面的節點名稱,並賦值
int length = PARENT_NODE_PATH.length();
int currentNodeNumber = Collections.binarySearch(childrenNodeList,
currentNodePath.substring(length + 1));
preNodePath = PARENT_NODE_PATH+"/"+childrenNodeList.get(currentNodeNumber‐1);
}
return false;
}
@Override
public void waitLock() {
IZkDataListener zkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
if (countDownLatch != null){
countDownLatch.countDown();
}
}
};
//監聽前一個節點的改變
zkClient.subscribeDataChanges(preNodePath,zkDataListener);
if (zkClient.exists(preNodePath)){
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await();
} catch (InterruptedException e) {
}
}
zkClient.unsubscribeDataChanges(preNodePath,zkDataListener);
}
@Override
public void releaseLock() {
zkClient.delete(currentNodePath);
zkClient.close();
}
}
```
### Redis分散式鎖
#### 原理&實現
分散式鎖的一個很重要的特性就是互斥性,同一時間內多個呼叫方加鎖競爭,只能有一個呼叫方加鎖成功。而redis是基於單執行緒模型的,可以利用這個特性讓呼叫方的請求排隊,對於併發請求,只會有一個請求能獲取到鎖。
redis實現分散式鎖也很簡單,基於客戶端的幾個API就可以完成,主要涉及三個核心API:
setNx():向redis中存key-value,只有當key不存在時才會設定成功,否則返回0。用於體現互斥性。
expire():設定key的過期時間,用於避免死鎖出現。
delete():刪除key,用於釋放鎖。
1)編寫工具類實現加鎖
通過jedis.set進行加鎖,如果返回值是OK,代表加鎖成功
如果加鎖失敗,則自旋不斷嘗試獲取鎖,同時在一定時間內如果仍沒有獲取到鎖,則退出自旋,不再嘗試獲取鎖。
requestId:用於標識當前每個執行緒自己持有的鎖標記
```java
public class SingleRedisLock {
JedisPool jedisPool = new JedisPool("192.168.200.128",6379);
//鎖過期時間
protected long internalLockLeaseTime = 30000;
//獲取鎖的超時時間
private long timeout = 999999;
/**
* 加鎖
* @param lockKey 鎖鍵
* @param requestId 請求唯一標識
* @return
*/
SetParams setParams = SetParams.setParams().nx().px(internalLockLeaseTime);
public boolean tryLock(String lockKey, String requestId){
String threadName = Thread.currentThread().getName();
Jedis jedis = this.jedisPool.getResource();
Long start = System.currentTimeMillis();
try{
for (;;){
String lockResult = jedis.set(lockKey, requestId, setParams);
if ("OK".equals(lockResult)){
System.out.println(threadName+": 獲取鎖成功");
return true;
}
//否則迴圈等待,在timeout時間內仍未獲取到鎖,則獲取失敗
System.out.println(threadName+": 獲取鎖失敗,等待中");
long l = System.currentTimeMillis() ‐ start;
if (l>=timeout) {
return false;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}finally {
jedis.close();
}
}
}
```
解鎖時,要避免當前執行緒將別人的鎖釋放掉。假設執行緒A加鎖成功,當過了一段時間執行緒A來解鎖,但執行緒A的鎖已
經過期了,在這個時間節點,執行緒B也來加鎖,因為執行緒A的鎖已經過期,所以執行緒B時可以加鎖成功的。此時,就
會出現問題,執行緒A將執行緒B的鎖給釋放了。
對於這個問題,就需要使用到加鎖時的requestId。當解鎖時要判斷當前鎖鍵的value與傳入的value是否相同,相
同的話,則代表是同一個人,可以解鎖。否則不能解鎖。
但是對於這個操作,有非常多的人,會先查詢做對比,接著相同則刪除。雖然思路是對的,但是忽略了一個問題,
原子性。判斷與刪除分成兩步執行,則無法保證原子性,一樣會出現問題。所以解鎖時不僅要保證加鎖和解鎖是同
一個人還要保證解鎖的原子性。因此結合lua指令碼完成查詢&刪除操作。
```java
/**
* 解鎖
* @param lockKey 鎖鍵
* @param requestId 請求唯一標識
* @return
*/
public boolean releaseLock(String lockKey,String requestId){
String threadName = Thread.currentThread().getName();
System.out.println(threadName+":釋放鎖");
Jedis jedis = this.jedisPool.getResource();
String lua =
"if redis.call('get',KEYS[1]) == ARGV[1] then" +
" return redis.call('del',KEYS[1]) " +
"else" +
" return 0 " +
"end";
try {
Object result = jedis.eval(lua, Collections.singletonList(lockKey),
Collections.singletonList(requestId));
if("1".equals(result.toString())){
return true;
}
return false;
}finally {
jedis.close();
}
}
```
測試類
```java
public class LoclTest {
public static void main(String[] args) {
//模擬多個5個客戶端
for (int i=0;i<5;i++) {
Thread thread = new Thread(new LockRunnable());
thread.start();
}
}
private static class LockRunnable implements Runnable {
@Override
public void run() {
SingleRedisLock singleRedisLock = new SingleRedisLock();
String requestId = UUID.randomUUID().toString();
boolean lockResult = singleRedisLock.tryLock("lock", requestId);
if (lockResult){
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
singleRedisLock.releaseLock("lock",requestId);
}
}
}
```
此時可以發現,多執行緒會競爭同一把鎖,且沒有獲取獲取到鎖的執行緒會自旋不斷嘗試去獲取鎖。每當一個執行緒將鎖
釋放後,則會有另外一個執行緒持有鎖。依次類推。
#### 存在的問題
鎖續期
當對業務進行加鎖時,鎖的過期時間,絕對不能想當然的設定一個值。假設執行緒A在執行某個業務時加鎖成功
並設定鎖過期時間。但該業務執行時間過長,業務的執行時間超過了鎖過期時間,那麼在業務還沒執行完
時,鎖就自動釋放了。接著後續執行緒就可以獲取到鎖,又來執行該業務。就會造成執行緒A還沒執行完,後續線
程又來執行,導致同一個業務邏輯被重複執行。因此對於鎖的超時時間,需要結合著業務執行時間來判斷,
讓鎖的過期時間大於業務執行時間。
上面的方案是一個基礎解決方案,但是仍然是有問題的。
業務執行時間的影響因素太多了,無法確定一個準確值,只能是一個估值。無法百分百保證業務執行期間,
鎖只能被一個執行緒佔有。
如想保證的話,可以在建立鎖的同時建立一個守護執行緒,同時定義一個定時任務每隔一段時間去為未釋放的
鎖增加過期時間。當業務執行完,釋放鎖後,再關閉守護執行緒。 這種實現思想可以用來解決鎖續期。
服務單點&叢集問題
在單點redis雖然可以完成鎖操作,可一旦redis服務節點掛掉了,則無法提供鎖操作。
在生產環境下,為了保證redis高可用,會採用非同步複製方法進行主從部署。當主節點寫入資料成功,會非同步的將
資料複製給從節點,並且當主節點宕機,從節點會被提升為主節點繼續工作。假設主節點寫入資料成功,在沒有將
資料複製給從節點時,主節點宕機。則會造成提升為主節點的從節點中是沒有鎖資訊的,其他執行緒則又可以繼續加
鎖,導致互斥失效。
### Redisson分散式鎖
redisson是redis官網推薦實現分散式鎖的一個第三方類庫。其內部完成的功能非常強大,對各種鎖都有實現,同
時對於使用者來說非常簡單,讓使用者能夠將更多的關注點放在業務邏輯上。此處重點利用Redisson解決單機
Redis鎖產生的兩個問題。
####單機Redisson實現
依賴
```xml
org.apache.commons
commons‐pool2
org.redisson
redisson‐spring‐boot‐starter
3.13.1
```
配置檔案
```yaml
server:
redis:
host: 192.168.200.150
port: 6379
database: 0
jedis:
pool:
max‐active: 500
max‐idle: 1000
min‐idle: 4
```
啟動類
```java
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Bean
public RedissonClient redissonClient(){
RedissonClient redissonClient;
Config config = new Config();
String url = "redis://" + host + ":" + port;
config.useSingleServer().setAddress(url);
try {
redissonClient = Redisson.create(config);
return redissonClient;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
```
鎖工具
```java
@Component
public class RedissonLock {
@Autowired
private RedissonClient redissonClient;
/**
* 加鎖
* @param lockKey
* @return
*/
public boolean addLock(String lockKey){
try {
if (redissonClient == null){
System.out.println("redisson client is null");
return false;
}
RLock lock = redissonClient.getLock(lockKey);
//設定鎖超時時間為5秒,到期自動釋放
lock.lock(5, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName()+": 獲取到鎖");
//加鎖成功
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean releaseLock(String lockKey){
try{
if (redissonClient == null){
System.out.println("redisson client is null");
return false;
}
RLock lock = redissonClient.getLock(lockKey);
lock.unlock();
System.out.println(Thread.currentThread().getName()+": 釋放鎖");
return true;
}catch (Exception e){
e.printStackTrace();
return false;
}
}
}
```
測試類
```java
@SpringBootTest
@RunWith(SpringRunner.class)
public class RedissonLockTest {
@Autowired
private RedissonLock redissonLock;
@Test
public void easyLock(){
//模擬多個10個客戶端
for (int i=0;i<10;i++) {
Thread thread = new Thread(new LockRunnable());
thread.start();
}
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
private class LockRunnable implements Runnable {
@Override
public void run() {
redissonLock.addLock("demo");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
redissonLock.releaseLock("demo");
}
}
}
```
根據執行效果可知,多執行緒併發獲取所時,當一個執行緒獲取到鎖,其他執行緒則獲取不到,並且其內部會不斷嘗試獲
取鎖,當持有鎖的執行緒將鎖釋放後,其他執行緒則會繼續去競爭鎖。
#### 原始碼分析
**lock()原始碼分析**
當獲取到RLock物件後,呼叫其內部的lock()執行加鎖操作。根據原始碼描述,當執行緒獲取鎖時,如果沒有獲取到
鎖,則會讓其進入自旋,直到獲取到鎖。 如果獲取到鎖,則會一直保留到呼叫unLock()手動釋放或根據傳入的
leaseTime時間自動釋放。
當前傳入兩個引數值:鎖超時時間,時間單位。主要用於避免死鎖的出現,假設持有鎖的redis節點宕機,到期後
鎖可以自動釋放。
![](https://img2020.cnblogs.com/blog/874710/202102/874710-20210208171309082-1832578123.png)
lock()方法中還會呼叫lock()的另外一個過載方法,需要傳入三個引數:過期時間、時間單位、是否中斷。
![](https://img2020.cnblogs.com/blog/874710/202102/874710-20210208171318152-598391062.png)
在三個引數的lock()過載方法中,首先會獲取當前執行緒id,接著呼叫tryAcquire()方法嘗試獲取鎖,如果返回值為
null,代表獲取到鎖。 如果返回值不是null,則根據當前執行緒id建立非同步任務並放入執行緒池中,接著進入自旋,在
自旋過程中,嘗試呼叫tryAcquire()獲取鎖,如果獲取到則退出自旋。否則會不斷的嘗試獲取鎖。
![](https://img2020.cnblogs.com/blog/874710/202102/874710-20210208171338740-1492500716.png)
在lock()方法中,最核心的是tryAcquire()。其內部核心實現會呼叫tryAcquireAsync(),並傳入過期時間、時間單位
和當前執行緒id,進行鎖的獲取。如果leaseTime不為-1,代表設定了有效時間,接著呼叫tryAcquireAsync()去獲取
鎖。如果是-1的話,則預設把永不過期改為30秒過期,並且建立非同步任務,如果沒有獲取到鎖,則什麼都不做。如果獲取到了鎖,則呼叫scheduleExpirationRenewal()對當前執行緒id的鎖進行延時。
![](https://img2020.cnblogs.com/blog/874710/202102/874710-20210208171346570-165719589.png)
最終的tryLockInnerAsync()則是獲取鎖的具體實現。可以看到,其內部是基於lua指令碼語言完成鎖獲取的。因為獲
取鎖的過程涉及到了多步,為了保證執行過程的原子性,所以使用了lua,最核心的就是要理解這段lua指令碼的執行
過程。
![](https://img2020.cnblogs.com/blog/874710/202102/874710-20210208171355976-400126795.png)
對於這款lua指令碼來說,KEYS[1]代表需要加鎖的key,ARGV[1]代表鎖的超時時間,ARGV[2]代表鎖的唯一標識。
對於這段lua指令碼,簡單來說:
1)檢查鎖key是否被佔用了,如果沒有則設定鎖key和唯一標識,初始值為1,並且設定鎖key的過期時間。
2)如果鎖key存在,並且value也匹配,表示是當前執行緒持有的鎖,那麼重入次數加1,並且設定失效時間。
3)返回鎖key的失效時間毫秒數。
**unLock()原始碼分析**
在釋放鎖時,unlock()內部會呼叫unlockAsync()對當前執行緒持有的鎖進行釋放。其內部最終會執行unlockInnerAsync()方法完成鎖釋放並返回結果。
![](https://img2020.cnblogs.com/blog/874710/202102/874710-20210208171405303-1619089041.png)
在unlockInnerAsync()中仍然是結合lua指令碼完成釋放鎖操作。
相關引數:
KEYS[1]:當前鎖key。
KEYS[2]:redis訊息的ChannelName,每個鎖對應唯一的一個 channelName。
ARGV[1]:redis訊息體,用於標記redis的key已經解鎖,用於通知其他執行緒申請鎖。
ARGV[2]:鎖超時時間。
ARGV[3]:鎖的唯一標識。
1)判斷鎖key和鎖的唯一標識是否匹配,如果不匹配,表示鎖已經被佔用,那麼直接返回。
2)如果是當前執行緒持有鎖,則value值-1,用於重入操作。
3)如果-1後的值大於0,則對鎖設定過期時間。
4)如果-1後的值為0,則刪除鎖key,併發布訊息,該鎖已被釋放。用於通知其他執行緒申請鎖。
![](https://img2020.cnblogs.com/blog/874710/202102/874710-20210208171415057-1984559905.png)
#### 鎖續期
對於鎖續期問題,在單點redis實現分散式鎖時已經介紹過了,用於防止業務執行超時或宕機而引起的業務被重複
執行。
根據對lock方法的解析,可以發現,當設定完過期時間後,當前鎖的過期時間就已經被設定了,不會發生改變,鎖
到期後則會被自動釋放,因此在業務執行中,通過lock()方法加鎖會造成隱患。
#### 紅鎖
當在單點redis中實現redis鎖時,一旦redis伺服器宕機,則無法進行鎖操作。因此會考慮將redis配置為主從結
構,但在主從結構中,資料複製是非同步實現的。假設在主從結構中,master會非同步將資料複製到slave中,一旦某
個執行緒持有了鎖,在還沒有將資料複製到slave時,master宕機。則slave會被提升為master,但被提升為slave的
master中並沒有之前執行緒的鎖資訊,那麼其他執行緒則又可以重新加鎖
**redlock演算法**
redlock是一種基於多節點redis實現分散式鎖的演算法,可以有效解決redis單點故障的問題。官方建議搭建五臺
redis伺服器對redlock演算法進行實現。
在redis官網中,對於redlock演算法的實現思想也做了詳細的介紹。地址:https://redis.io/topics/distlock。整個實
現過程分為五步:
1)記錄獲取鎖前的當前時間
2)使用相同的key,value獲取所有redis例項中的鎖,並且設定獲取鎖的時間要遠遠小於鎖自動釋放的時間。假設
鎖自動釋放時間是10秒,則獲取時間應在5-50毫秒之間。通過這種方式避免客戶端長時間等待一個已經關閉的實
例,如果一個例項不可用了,則嘗試獲取下一個例項。
3)客戶端通過獲取所有例項的鎖後的時間減去第一步的時間,得到的差值要小於鎖自動釋放時間,避免拿到一個
已經過期的鎖。並且要有超過半數的redis例項成功獲取到鎖,才算最終獲取鎖成功。如果不是超過半數,有可能
出現多個客戶端重複獲取到鎖,導致鎖失效。
4)當已經獲取到鎖,那麼它的真正失效時間應該為:過期時間-第三步的差值。
5)如果客戶端獲取鎖失敗,則在所有redis例項中釋放掉鎖。為了保證更高效的獲取鎖,還可以設定重試策略,在
一定時間後重新嘗試獲取鎖,但不能是無休止的,要設定重試次數。
雖然通過redlock能夠更加有效的防止redis單點問題,但是仍然是存在隱患的。假設redis沒有開啟持久化,
clientA獲取鎖後,所有redis故障重啟,則會導致clientA鎖記錄消失,clientB仍然能夠獲取到鎖。這種情況雖然發
生機率極低,但並不能保證肯定不會發生。
保證的方案就是開始AOF持久化,但是要注意同步的策略,使用每秒同步,如果在一秒內重啟,仍然資料丟失。使
用always又會造成效能急劇下降。
官方推薦使用預設的AOF策略即每秒同步,且在redis停掉後,要在ttl時間後再重啟。 缺點就是ttl時間內redis無法
對外提供服務。
**實現**
redisson對於紅鎖的實現已經非常完善,通過其內部提供的api既可以完成紅鎖的操作。
```java
@Configuration
public class RedissonRedLockConfig {
public RedissonRedLock initRedissonClient(String lockKey){
Config config1 = new Config();
config1.useSingleServer().setAddress("redis://192.168.200.150:7000").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);
Config config2 = new Config();
config2.useSingleServer().setAddress("redis://192.168.200.150:7001").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);
Config config3 = new Config();
config3.useSingleServer().setAddress("redis://192.168.200.150:7002").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);
Config config4 = new Config();
config4.useSingleServer().setAddress("redis://192.168.200.150:7003").setDatabase(0);
RedissonClient redissonClient4 = Redisson.create(config4);
Config config5 = new Config();
config5.useSingleServer().setAddress("redis://192.168.200.150:7004").setDatabase(0);
RedissonClient redissonClient5 = Redisson.create(config5);
RLock rLock1 = redissonClient1.getLock(lockKey);
RLock rLock2 = redissonClient2.getLock(lockKey);
RLock rLock3 = redissonClient3.getLock(lockKey);
RLock rLock4 = redissonClient4.getLock(lockKey);
RLock rLock5 = redissonClient5.getLock(lockKey);
RedissonRedLock redissonRedLock = new
RedissonRedLock(rLock1,rLock2,rLock3,rLock4,rLock5);
return redissonRedLock;
}
}
```
測試類
```java
@SpringBootTest
@RunWith(SpringRunner.class)
public class RedLockTest {
@Autowired
private RedissonRedLockConfig redissonRedLockConfig;
@Test
public void easyLock(){
//模擬多個10個客戶端
for (int i=0;i<10;i++) {
Thread thread = new Thread(new RedLockTest.RedLockRunnable());
thread.start();
}
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
private class RedLockRunnable implements Runnable {
@Override
public void run() {
RedissonRedLock redissonRedLock = redissonRedLockConfig.initRedissonClient("demo");
try {
boolean lockResult = redissonRedLock.tryLock(100, 10, TimeUnit.SECONDS);
if (lockResult){
System.out.println("獲取鎖成功");
TimeUnit.SECONDS.sleep(3);
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
redissonRedLock.unlock();
System.out.println("釋放鎖");
}
}
}
}
```
**redissonRedLock加鎖原始碼分析**
```java
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException
{
long newLeaseTime = ‐1;
if (leaseTime != ‐1) {
newLeaseTime = unit.toMillis(waitTime)*2;
}
long time = System.currentTimeMillis();
long remainTime = ‐1;
if (waitTime != ‐1) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = calcLockWaitTime(remainTime);
/**
* 1. 允許加鎖失敗節點個數限制(N‐(N/2+1)),當前假設五個節點,則允許失敗節點數為2
*/
int failedLocksLimit = failedLocksLimit();
/**
* 2. 遍歷所有節點執行lua加鎖,用於保證原子性
*/
List acquiredLocks = new ArrayList<>(locks.size());
for (ListIterator iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
/**
* 3.對節點嘗試加鎖
*/
try {
if (waitTime == ‐1 && leaseTime == ‐1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
// 如果丟擲這類異常,為了防止加鎖成功,但是響應失敗,需要解鎖所有節點
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
// 丟擲異常表示獲取鎖失敗
lockAcquired = false;
}
if (lockAcquired) {
/**
*4. 如果獲取到鎖則新增到已獲取鎖集合中
*/
acquiredLocks.add(lock);
} else {
/**
* 5. 計算已經申請鎖失敗的節點是否已經到達 允許加鎖失敗節點個數限制 (N‐(N/2+1))
* 如果已經到達, 就認定最終申請鎖失敗,則沒有必要繼續從後面的節點申請了
* 因為 Redlock 演算法要求至少N/2+1 個節點都加鎖成功,才算最終的鎖申請成功
4)訊息冪等
*/
if (locks.size() ‐ acquiredLocks.size() == failedLocksLimit()) {
break;
}
if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == ‐1 && leaseTime == ‐1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit‐‐;
}
}
/**
* 6.計算從各個節點獲取鎖已經消耗的總時間,如果已經等於最大等待時間,則申請鎖失敗,返回false
*/
if (remainTime != ‐1) {
remainTime ‐= System.currentTimeMillis() ‐ time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime != ‐1) {
List> futures = new ArrayList<>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture future = ((RedissonLock)
rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
/**
* 7.如果邏輯正常執行完則認為最終申請鎖成功,返回true
*/
return true;
}
```