1. 程式人生 > 實用技巧 >redis分散式鎖實現

redis分散式鎖實現

說實話,如果在公司裡落地生產環境用分散式鎖的時候,一定是會用開源類庫的,比如Redis分散式鎖,一般就是用Redisson框架就好了,非常的簡便易用。

大家如果有興趣,可以去看看Redisson的官網,看看如何在專案中引入Redisson的依賴,然後基於Redis實現分散式鎖的加鎖與釋放鎖。

下面給大家看一段簡單的使用程式碼片段,先直觀的感受一下:

怎麼樣,上面那段程式碼,是不是感覺簡單的不行!

此外,人家還支援redis單例項、redis哨兵、redis cluster、redis master-slave等各種部署架構,都可以給你完美實現。

二、Redisson實現Redis分散式鎖的底層原理

好的,接下來就通過一張手繪圖,給大家說說Redisson這個開源框架對Redis分散式鎖的實現原理。

(1)加鎖機制

咱們來看上面那張圖,現在某個客戶端要加鎖。如果該客戶端面對的是一個redis cluster叢集,他首先會根據hash節點選擇一臺機器。

這裡注意,僅僅只是選擇一臺機器!這點很關鍵!

緊接著,就會發送一段lua指令碼到redis上,那段lua指令碼如下所示:

為啥要用lua指令碼呢?

因為一大坨複雜的業務邏輯,可以通過封裝在lua指令碼中傳送給redis,保證這段複雜業務邏輯執行的原子性。

那麼,這段lua指令碼是什麼意思呢?

KEYS[1]代表的是你加鎖的那個key,比如說:

RLock lock = redisson.getLock("myLock");

這裡你自己設定了加鎖的那個鎖key就是“myLock”。

ARGV[1]代表的就是鎖key的預設生存時間,預設30秒。

ARGV[2]代表的是加鎖的客戶端的ID,類似於下面這樣:

8743c9c0-0795-4907-87fd-6c719a6b4586:1

給大家解釋一下,第一段if判斷語句,就是用“exists myLock”命令判斷一下,如果你要加鎖的那個鎖key不存在的話,你就進行加鎖。

如何加鎖呢?很簡單,用下面的命令:

hset myLock

8743c9c0-0795-4907-87fd-6c719a6b4586:1 1

通過這個命令設定一個hash資料結構,這行命令執行後,會出現一個類似下面的資料結構:

上述就代表“8743c9c0-0795-4907-87fd-6c719a6b4586:1”這個客戶端對“myLock”這個鎖key完成了加鎖。

接著會執行“pexpire myLock 30000”命令,設定myLock這個鎖key的生存時間是30秒。

好了,到此為止,ok,加鎖完成了。

(2)鎖互斥機制

那麼在這個時候,如果客戶端2來嘗試加鎖,執行了同樣的一段lua指令碼,會咋樣呢?

很簡單,第一個if判斷會執行“exists myLock”,發現myLock這個鎖key已經存在了。

接著第二個if判斷,判斷一下,myLock鎖key的hash資料結構中,是否包含客戶端2的ID,但是明顯不是的,因為那裡包含的是客戶端1的ID。

所以,客戶端2會獲取到pttl myLock返回的一個數字,這個數字代表了myLock這個鎖key的剩餘生存時間。比如還剩15000毫秒的生存時間。

此時客戶端2會進入一個while迴圈,不停的嘗試加鎖。

(3)watch dog自動延期機制

客戶端1加鎖的鎖key預設生存時間才30秒,如果超過了30秒,客戶端1還想一直持有這把鎖,怎麼辦呢?

簡單!只要客戶端1一旦加鎖成功,就會啟動一個watch dog看門狗,他是一個後臺執行緒,會每隔10秒檢查一下,如果客戶端1還持有鎖key,那麼就會不斷的延長鎖key的生存時間。

(4)可重入加鎖機制

那如果客戶端1都已經持有了這把鎖了,結果可重入的加鎖會怎麼樣呢?

比如下面這種程式碼:

這時我們來分析一下上面那段lua指令碼。

第一個if判斷肯定不成立,“exists myLock”會顯示鎖key已經存在了。

第二個if判斷會成立,因為myLock的hash資料結構中包含的那個ID,就是客戶端1的那個ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”

此時就會執行可重入加鎖的邏輯,他會用:

incrby myLock

8743c9c0-0795-4907-87fd-6c71a6b4586:1 1

通過這個命令,對客戶端1的加鎖次數,累加1。

此時myLock資料結構變為下面這樣:

大家看到了吧,那個myLock的hash資料結構中的那個客戶端ID,就對應著加鎖的次數

(5)釋放鎖機制

如果執行lock.unlock(),就可以釋放分散式鎖,此時的業務邏輯也是非常簡單的。

其實說白了,就是每次都對myLock資料結構中的那個加鎖次數減1。

如果發現加鎖次數是0了,說明這個客戶端已經不再持有鎖了,此時就會用:

“del myLock”命令,從redis裡刪除這個key。

然後呢,另外的客戶端2就可以嘗試完成加鎖了。

這就是所謂的分散式鎖的開源Redisson框架的實現機制。

一般我們在生產系統中,可以用Redisson框架提供的這個類庫來基於redis進行分散式鎖的加鎖與釋放鎖。

(6)上述Redis分散式鎖的缺點

其實上面那種方案最大的問題,就是如果你對某個redis master例項,寫入了myLock這種鎖key的value,此時會非同步複製給對應的master slave例項。

但是這個過程中一旦發生redis master宕機,主備切換,redis slave變為了redis master。

接著就會導致,客戶端2來嘗試加鎖的時候,在新的redis master上完成了加鎖,而客戶端1也以為自己成功加了鎖。

此時就會導致多個客戶端對一個分散式鎖完成了加鎖。

這時系統在業務語義上一定會出現問題,導致各種髒資料的產生。

所以這個就是redis cluster,或者是redis master-slave架構的主從非同步複製導致的redis分散式鎖的最大缺陷:在redis master例項宕機的時候,可能導致多個客戶端同時完成加鎖。

文章轉載自

https://mp.weixin.qq.com/s?__biz=MzU0OTk3ODQ3Ng==&mid=2247483893&idx=1&sn=32e7051116ab60e41f72e6c6e29876d9&chksm=fba6e9f6ccd160e0c9fa2ce4ea1051891482a95b1483a63d89d71b15b33afcdc1f2bec17c03c&mpshare=1&scene=1&srcid=0416Kx8ryElbpy4xfrPkSSdB&key=1eff032c36dd9b3716bab5844171cca99a4ea696da85eed0e4b2b7ea5c39a665110b82b4c975d2fd65c396e91f4c7b3e8590c2573c6b8925de0df7daa886be53d793e7f06b2c146270f7c0a5963dd26a&ascene=1&uin=MTg2ODMyMTYxNQ%3D%3D&devicetype=Windows+10&version=62060739&lang=zh_CN&pass_ticket=y1D2AijXbuJ8HCPhyIi0qPdkT0TXqKFYo%2FmW07fgvW%2FXxWFJiJjhjTsnInShv0ap

手動實現redis分散式鎖的正確姿勢

package com.shuangyueliao.shuangcloud.redislock;

import redis.clients.jedis.Jedis;

import java.util.Collections;

public class RedisTool {

private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";

/**
 * 嘗試獲取分散式鎖
 *
 * @param jedis      Redis客戶端
 * @param lockKey    鎖
 * @param requestId  請求標識
 * @param expireTime 超期時間
 * @return 是否獲取成功
 */
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

    String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
    if (LOCK_SUCCESS.equals(result)) {
        return true;
    }
    return false;

}

private static final Long RELEASE_SUCCESS = 1L;

/**
 * 釋放分散式鎖
 *
 * @param jedis     Redis客戶端
 * @param lockKey   鎖
 * @param requestId 請求標識
 * @return 是否釋放成功
 */
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {

    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

    if (RELEASE_SUCCESS.equals(result)) {
        return true;
    }
    return false;
}

}
分析了一下網上的不少手寫實現redis分式鎖,都有不少相同的錯誤,下面分析一下錯誤的姿勢

先來看一下redis加鎖

/**
*
* @param acquireTimeout 獲取鎖的超時時間
* @param timeOut 鎖的過期時間
* @return
*/
public String getRedisLock(Long acquireTimeout, Long timeOut) {
Jedis conn = null;
try {
conn = jedisPool.getResource();
String identifierValue = UUID.randomUUID().toString();
int expireLock = (int) (timeOut / 1000);
Long endTime = System.currentTimeMillis() + acquireTimeout;
while (System.currentTimeMillis() < endTime) {
if (conn.setnx(redislockKey, identifierValue) == 1) {
conn.expire(redislockKey, expireLock);
return identifierValue;
}
}
} catch (Exception e) {
e.printStackTrace();
if (conn != null) {
conn.close();
}
}
return null;
}
注意問題:

1、要為鎖的value設定一個唯一的值,這樣就避免了任意執行緒都能釋放鎖,因為如果業務時間小於鎖的過期時間,鎖被釋放而業務沒有執行完,另一執行緒獲得鎖,但會因第一個執行緒最後的釋放鎖而受到影響

2、conn.setnx和con.expire應該用lua指令碼,保證其原子操作,上述程式碼就明顯錯誤了,如果conn.setnx執行完後,redis伺服器宕機了那麼會導致鎖永遠無法釋放

接著看一下釋放鎖

/**
*
* @param identifierValue 鎖的value
*/
public void unRedisLock(String identifierValue) {
Jedis conn = null;
conn = jedisPool.getResource();
try {
if (conn.get(redislockKey).equals(identifierValue)) {
System.out.println("釋放鎖" + Thread.currentThread().getName() + ",identifierValue: " + identifierValue);
conn.del(redislockKey);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (conn != null) {
conn.close();
}
}
}
此處同樣要用lua指令碼來保證conn.get和conn.del的原子性操作,如果執行到conn.get後剛好鎖過期了,而另一執行緒獲得鎖,但conn.del會把鎖刪掉,雖然判斷了鎖的value後再刪除仍會出現一個執行緒刪除了另一執行緒獲得的鎖