1. 程式人生 > >【分散式鎖】02-使用Redisson實現公平鎖原理

【分散式鎖】02-使用Redisson實現公平鎖原理

前言

前面分析了Redisson可重入鎖的原理,主要是通過lua指令碼加鎖及設定過期時間來保證鎖執行的原子性,然後每個執行緒獲取鎖會將獲取鎖的次數+1,釋放鎖會將當前鎖次數-1,如果為0則表示釋放鎖成功。

可重入原理和JDK中的可重入鎖都是一致的。

Redisson公平鎖原理

JDK中也有公平鎖和非公平鎖,所謂公平鎖,就是保證客戶端獲取鎖的順序,跟他們請求獲取鎖的順序,是一樣的。公平鎖需要排隊,誰先申請獲取這把鎖,誰就可以先獲取到這把鎖,是按照請求的先後順序來的。

Redisson實現公平鎖原始碼分析

非公平鎖使用也很簡單:

1RLock lock = redisson.getFairLock("anyLock");
2lock.lock();
3lock.unlock();

核心lua指令碼程式碼:

 1<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
2    internalLockLeaseTime = unit.toMillis(leaseTime);
3
4    long currentTime = System.currentTimeMillis();    
5    if (command == RedisCommands.EVAL_LONG) {
6        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
7                // remove stale threads
8                "while true do "
9                + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
10                + "if firstThreadId2 == false then "
11                    + "break;"
12                + "end; "
13                + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
14                + "if timeout <= tonumber(ARGV[4]) then "
15                    + "redis.call('zrem', KEYS[3], firstThreadId2); "
16                    + "redis.call('lpop', KEYS[2]); "
17                + "else "
18                    + "break;"
19                + "end; "
20              + "end;"
21
22                  + "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
23                        + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
24                        "redis.call('lpop', KEYS[2]); " +
25                        "redis.call('zrem', KEYS[3], ARGV[2]); " +
26                        "redis.call('hset', KEYS[1], ARGV[2], 1); " +
27                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
28                        "return nil; " +
29                    "end; " +
30                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
31                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
32                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
33                        "return nil; " +
34                    "end; " +
35
36                    "local firstThreadId = redis.call('lindex', KEYS[2], 0); " +
37                    "local ttl; " + 
38                    "if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " + 
39                        "ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" + 
40                    "else "
41                      + "ttl = redis.call('pttl', KEYS[1]);" + 
42                    "end; " + 
43
44                    "local timeout = ttl + tonumber(ARGV[3]);" + 
45                    "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
46                        "redis.call('rpush', KEYS[2], ARGV[2]);" +
47                    "end; " +
48                    "return ttl;", 
49                    Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName), 
50                                internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime);
51    }
52
53    throw new IllegalArgumentException();
54}

KEYS/ARGV引數分析

KEYS = Arrays.asList(getName(), threadsQueueName, timeoutSetName)

 

  • KEYS1 = getName() = 鎖的名字,“anyLock”
  • KEYS[2] = threadsQueueName = redisson_lock_queue:{anyLock},基於redis的資料結構實現的一個佇列
  • KEYS[3] = timeoutSetName = redisson_lock_timeout:{anyLock},基於redis的資料結構實現的一個Set資料集合,有序集合,可以自動按照你給每個資料指定的一個分數(score)來進行排序

ARGV = internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime,
currentTime

  • ARGV1 = 30000毫秒
  • ARGV[2] = UUID:threadId
  • ARGV[3] = 當前時間(10:00:00) + 5000毫秒 = 10:00:05
  • ARGV[4] = 當前時間(10:00:00)

模擬不同執行緒獲取鎖步驟

  1. 客戶端A thread01 10:00:00 獲取鎖(第一次加鎖)
  2. 客戶端B thread02 10:00:10 獲取鎖
  3. 客戶端C therad03 10:00:15 獲取鎖

lua指令碼原始碼分析

客戶端A thread01 加鎖分析

thread01 在10:00:00 執行加鎖邏輯,下面開始一點點分析lua指令碼執行程式碼:

1"while true do "
2+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
3+ "if firstThreadId2 == false then "
4    + "break;"

lindex redisson_lock_queue:{anyLock} 0,就是從redisson_lock_queue:{anyLock}這個佇列中彈出來第一個元素,剛開始,佇列是空的,所以什麼都獲取不到,此時就會直接退出while true死迴圈

1"if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
2+ "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
3"redis.call('lpop', KEYS[2]); " +
4"redis.call('zrem', KEYS[3], ARGV[2]); " +
5"redis.call('hset', KEYS[1], ARGV[2], 1); " +
6"redis.call('pexpire', KEYS[1], ARGV[1]); " +
7"return nil; " +
8"end; " +

這段程式碼判斷邏輯的意思是:

  1. exists anyLock,鎖不存在,也就是沒人加鎖,剛開始確實是沒人加鎖的,這個條件肯定是成立的;
  2. 或者是exists redisson_lock_queue:{anyLock},這個佇列不存在
  3. 或者是lindex
    redisson_lock_queue:{anyLock} 0,佇列的第一個元素是UUID:threadId,或者是這個佇列存在,但是排在隊頭的第一個元素,是當前這個執行緒

那麼這個條件整體就可以成立了
anyLock和佇列,都是不存在的,所以這個條件肯定會成立。接著執行if中的具體邏輯:

  • lpop redisson_lock_queue:{anyLock},彈出佇列的第一個元素,現在佇列是空的,所以什麼都不會幹
  • zrem redisson_lock_timeout:{anyLock} UUID:threadId,從set集合中刪除threadId對應的元素,此時因為這個set集合是空的,所以什麼都不會幹
  • hset anyLock UUID:threadId_01 1,加鎖成功:
    anyLock: {
    "UUID_01:threadId_01": 1
    }
  • pexpire anyLock 30000,將這個鎖key的生存時間設定為30000毫秒

返回一個nil,在外層程式碼中,就會認為是加鎖成功,此時就會開啟一個watchdog看門狗定時排程的程式,每隔10秒判斷一下,當前這個執行緒是否還對這個鎖key持有著鎖,如果是,則重新整理鎖key的生存時間為30000毫秒 (看門狗的具體流程上一篇文章有講述)

客戶端B thread02 加鎖分析

此時thread01 已經獲取到了鎖,如果thread02 在10:00:10分來執行加鎖邏輯,具體的程式碼邏輯是怎樣執行的呢?

1"while true do "
2+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
3+ "if firstThreadId2 == false then "
4    + "break;"

進入while true死迴圈,lindex redisson_lock_queue:{anyLock} 0,獲取佇列的第一個元素,此時佇列還是空的,所以獲取到的是false,直接退出while true死迴圈

1"if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
2+ "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
3"redis.call('lpop', KEYS[2]); " +
4"redis.call('zrem', KEYS[3], ARGV[2]); " +
5"redis.call('hset', KEYS[1], ARGV[2], 1); " +
6"redis.call('pexpire', KEYS[1], ARGV[1]); " +
7"return nil; " +
8"end; " +

此時anyLock這個鎖key已經存在了,說明已經有人加鎖了,這個條件首先就肯定不成立了;

接著往下執行,看下另外的邏輯:

1"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
2    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
3    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
4    "return nil; " +
5"end; " +

判斷一下,此時這個第二個客戶端是UUID_02,threadId_02,此時會判斷一下,hexists anyLock
UUID_02:threadId_02,判斷一下在anyLock這個map中,是否存在UUID_02:threadId_02這個key?這個條件也不成立

繼續執行後續程式碼:

 1"local firstThreadId = redis.call('lindex', KEYS[2], 0); " +
2"local ttl; " + 
3"if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " + 
4    "ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" + 
5"else "
6  + "ttl = redis.call('pttl', KEYS[1]);" + 
7"end; " + 
8
9"local timeout = ttl + tonumber(ARGV[3]);" + 
10"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
11    "redis.call('rpush', KEYS[2], ARGV[2]);" +
12"end; " +
13"return ttl;", 

tonumber() 是lua中自帶的函式,tonumber會嘗試將它的引數轉換為數字。

lindex redisson_lock_queue:{anyLock} 0,從佇列中獲取第一個元素,此時佇列是空的,所以什麼都不會有

因為我們是在10:00:10 分請求的,因為anyLock預設過期時間是30s,所以在thread02請求的時候ttl還剩下20s

ttl = pttl anyLock = 20000毫秒,獲取anyLock剩餘的生存時間,ttl假設這裡就被設定為了20000毫秒

timeout = ttl + 當前時間 + 5000毫秒 = 20000毫秒 + 10:00:00 + 5000毫秒 = 10:00:25

接著執行:
zadd redisson_lock_timeout:{anyLock} 10:00:25 UUID_02:threadId_02

在set集合中插入一個元素,元素的值是UUID_02:threadId_02,他對應的分數是10:00:25(會用這個時間的long型的一個時間戳來表示這個時間,時間越靠後,時間戳就越大),sorted set,有序set集合,他會自動根據你插入的元素的分數從小到大來進行排序

繼續執行:
rpush redisson_lock_queue:{anyLock} UUID_02:theadId_02

這個指令就是將UUID_02:threadId_02,插入到佇列的頭部去

返回的是ttl,也就是anyLock剩餘的生存時間,如果拿到的返回值是ttl是一個數字的話,那麼此時客戶端B而言就會進入一個while true的死迴圈,每隔一段時間都嘗試去進行加鎖,重新執行這段lua指令碼

簡單畫圖總結如下:

image.png

客戶端C thread03 加鎖分析

此時thread03 在10:00:15來加鎖,分析一下執行原理:

 1"while true do "
2+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
3+ "if firstThreadId2 == false then "
4    + "break;"
5+ "end; "
6+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
7+ "if timeout <= tonumber(ARGV[4]) then "
8    + "redis.call('zrem', KEYS[3], firstThreadId2); "
9    + "redis.call('lpop', KEYS[2]); "
10+ "else "
11    + "break;"
12+ "end; "
13+ "end;"

while true死迴圈,lindex redisson_lock_queue:{anyLock} 0,獲取佇列中的第一個元素,UUID_02:threadId_02,代表的是這個客戶端02正在佇列裡排隊

zscore redisson_lock_timeout:{anyLock} UUID_02:threadId_02,從有序集合中獲取UUID_02:threadId_02對應的分數,timeout = 10:00:25

判斷:timeout <= 10:00:15?,這個條件不成立,退出死迴圈

 1"local firstThreadId = redis.call('lindex', KEYS[2], 0); " +
2"local ttl; " + 
3"if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " + 
4    "ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" + 
5"else "
6  + "ttl = redis.call('pttl', KEYS[1]);" + 
7"end; " + 
8
9"local timeout = ttl + tonumber(ARGV[3]);" + 
10"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
11    "redis.call('rpush', KEYS[2], ARGV[2]);" +
12"end; " +
13"return ttl;", 

firstThreadId獲取到的是佇列中的第一個元素:UUID_02:thread_02

ttl = 10:00:25 - 10:00:15 = 5000毫秒
timeout = 5000毫秒 + 10:00:15 + 5000毫秒 = 10:00:30

將客戶端C放入到對列和有序集合中:
zadd redisson_lock_timeout:{anyLock} 10:00:30 UUID_03:threadId_03
rpush redisson_lock_queue:{anyLock} UUID_03:theadId_03

最終執行完後 如下圖:

image.png

Redisson依次加鎖邏輯

上面已經知道了,多個執行緒加鎖過程中實際會進行排隊,根據加鎖的時間來作為獲取鎖的優先順序,如果此時客戶端A釋放了鎖,來看下客戶端B、C是如果獲取鎖的

當客戶端A釋放鎖
客戶端B請求獲取鎖

直接看核心邏輯:

1+ "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
2+ "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
3"redis.call('lpop', KEYS[2]); " +
4"redis.call('zrem', KEYS[3], ARGV[2]); " +
5"redis.call('hset', KEYS[1], ARGV[2], 1); " +
6"redis.call('pexpire', KEYS[1], ARGV[1]); " +
7"return nil; " +
8"end; " +

if中的判斷:
exists anyLock 是否不存在,此時客戶端A已經釋放鎖,所以這個條件成立。

然後判斷佇列不存在,或者佇列中第一個元素為空,此時條件不成立,但是後面是or關聯的判斷,接著判斷佇列中的第一個元素是否為當前請求的UUID_02:threadId_02, 如果判斷成功則開始加鎖。

這裡就是公平鎖依次加鎖的核心邏輯。

申明

本文章首發自本人部落格:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!

感興趣的小夥伴可關注個人公眾號:壹枝花算不算浪漫

&n