1. 程式人生 > >分散式鎖基於Redis和Zookeeper的實現方案

分散式鎖基於Redis和Zookeeper的實現方案

一,為什麼要用分散式鎖?

如果不同的系統之間共享了一組資源,那麼訪問這組資源的時候,往往就需要通過一些互斥的手段來防止彼此間的干擾,以保證資料的一致性。

如圖1所示,在單機部署的系統中,使用執行緒鎖來解決高併發的問題,多執行緒訪問共享變數的問題達到資料一致性,例如使用synchornized,的的ReentrantLock等。

2,在多機部署的系統中,是在不同的JVM虛擬機器中執行的,就無法使用JDK提供的阿比來解決併發的問題,這個時候就需要使用分散式鎖來應對。

對於分散式的實現方案,目前有幾種比較流行的做法:

1,基於Redis的的實現分散式鎖

2,基於資料庫實現分散式鎖

3,基於動物園管理員實現分散式鎖

這裡主要講解基於Redis的的,動物園管理員的實現方案

二,基於Redis的的的實現方案

2.1,單節點的實現方案

2.1.1,獲取鎖

客戶端向Redis的的節點發送如下命令:

SET resource_name my_random_value NX PX 30000

RESOURCE_NAME:關鍵值

my_random_value:這個必須是唯一的隨機字串。

NX:表示鍵必須不存在,才可以設定成功,用於新增;這保證了只有第一個請求的客戶端才能獲得鎖,而其它客戶端在鎖被釋放之前都無法獲得鎖。

PX 30000:表示為鍵設定毫秒級的過期時間,這個鎖有一個30秒的自動過期時間。這裡可以根據業務場景選擇合適的過期時間。

這裡有兩點需要注意的是:

1,這裡my_random_value為啥必須是唯一的隨機字串,而不是一個固定的值考慮一種情況?

如圖1所示,客戶端1首先獲取到了鎖

2,客戶端1長時間的GC暫停

3,客戶端1的過期時間到了釋放了鎖

如圖4所示,此時客戶端2套成功獲取到同一資源的鎖

5,客戶端1從從阻塞種恢復過來,執行業務邏輯完成後,釋放了客戶端的鎖

上面的這種情況明顯的體現了my_random_value不能設定為一個固定值,因為這樣客戶端在訪問共享資源時,無法保證鎖可以提供正確的訪問保證

2,將獲取鎖的命令分成兩步操作

Long result = jedis.setnx("key", "value");
if (result == 1) {
   jedis.expire("key"
, 3000);
}

為什麼說這裡會有問題呢?

如圖1所示,客戶端1執行SETNX成功後崩潰了,後續也就談不上執行到期設定超時時間了,就會導致客戶端1一直持有鎖

2,setnx和expire雖然可以起到和SET resource_name my_random_value NX PX 30000相同的作用,但卻不是原子的

2.1.2:釋放鎖

執行下面的Redis Lua指令碼來釋放鎖,Lua指令碼在Redis中是原子執行的,執行過程中間不會插入其他命令

if redis.call("get",KEYS[1]) == ARGV[1] then
   return redis.call("del",KEYS[1])
else
   return 0
end

KEYS [1]:RESOURCE_NAME作為KEYS [1]的值傳入

ARGV [1]:my_random_value作為ARGV [1]的值傳入

到這裡基於單節點的redis的的分散式鎖就結束了,那麼問題來了,如果這個節點掛了,服務也就不可用了,客戶端也就無法獲取到鎖了,此時就要解決不高可用的問題,就需要部署redis叢集,叢集的部署可以參見:Redis三主三從叢集搭建

下面提供一個完整的獲取,釋放鎖的程式碼

public class RedisLock {

   private static final String ACQUIRE_SUCCESS="ok";

   private static final Long RELEASE_SUCCESS = 1L;

   private static final String KET_NOT_EXSIT="NX";

   private static final String KET_EXPIRE_TIME="PX";

   private JedisCluster jedis;


   /**獲取鎖
    *
    * @param key
    * @param randomValue
    * @param expireTime
    * @return
    */

   public boolean acquireLock(String key,String randomValue,long expireTime){

       String result = jedis.set(key, randomValue, KET_NOT_EXSIT, KET_EXPIRE_TIME, expireTime);
       if (ACQUIRE_SUCCESS.equals(result)){
           return true;
       }
       return false;

   }

   /**
    * 釋放鎖
    * @param key
    * @param randomValue
    * @return
    */

   public boolean releaseLock(String key,String randomValue){

       String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
       Object result = jedis.eval(script, Collections.singletonList(key), Collections.singletonList(randomValue));
       if (RELEASE_SUCCESS.equals(result)){
           return true;
       }
       return false;
   }
}

其實這裡還是有問題的,參考下面的場景:

如圖1所示,客戶端1執行acquireLock獲取到鎖。

2,客戶端1因為某種原因導致長時間的GC暫停,這個時間大於鎖的過期時間

3,此時客戶端2執行acquireLock獲取到了相同的鎖。

如圖4所示,客戶端1從長時間的GC暫停中恢復過來,此時它並不知道自己持有的鎖過期了它依然向共享資源發起了寫資料請求。

5,這時鎖實際上被客戶端2持有,因此兩個客戶端的寫請求就有可能衝突,鎖的互斥作用失效了。

redis的的作者antirez提出了新的分散式演算法Redlock,上述說的GC暫停的問題都可以在這裡解決,但是會存在時鐘跳躍的問題會導致分散式鎖不安全的問題。關於時鐘問題,Redis的分散式鎖是否安全的問題,大家可以搜尋微信公眾號:鐵勒,部落格,檢視張鐵蕾蕾哥的基於Redis的的分散式鎖到底安全嗎的博文有詳細解釋,相當好!

三,基於動物園管理員的分散式鎖實現方案

這裡主要講基於的ZooKeeper的實現排他鎖,共享鎖

3.1,排他鎖

假設我們需要給你一個商品加鎖,基本思路如下:

1,建立ZK的一個臨時節點,模擬給某個商品ID加鎖。

2,ZK會保證只會建立一個臨時節點,其他請求過來如果再要建立臨時節點,就會NodeExistsException異常。

3,如果臨時節點建立成功了,那麼說明我們成功加鎖了,此時就可以去執行對應的業務邏輯。

如圖4所示,如果臨時節點建立失敗了,說明有人已經在拿到鎖了,那麼就不斷的等待,直到自己可以獲取到鎖為止。

5,釋放一個分散式鎖,刪除掉那個臨時節點就可以了,就代表釋放了一個鎖,那麼此時其他的機器就可以成功建立臨時節點,獲取到鎖。

這種分散式鎖的做法比較簡單但是很實用,可以滿足大部分的業務場景,程式碼如下:

public class ZooKeeperSession {

   private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

   private ZooKeeper zookeeper;

   public ZooKeeperSession() {
       // 連線zookeeper server,建立會話的時候,是非同步去進行的,所以要給一個監聽器告訴我們何時才真正完成了跟zk server的連線
       try {
           this.zookeeper = new ZooKeeper(
                   "192.168.31.193:2181,192.168.31.160:2181,192.168.31.114:2181",
                   40000,
                   new ZooKeeperWatcher());
           // 給一個狀態CONNECTING,連線中
           System.out.println(zookeeper.getState());

           try {

               // 如果數字減到0,那麼之前所有在await的執行緒,都會逃出阻塞的狀 繼續向下執行
               connectedSemaphore.await();
           } catch(InterruptedException e) {
               e.printStackTrace();
           }

           System.out.println("ZooKeeper session established......");
       } catch (Exception e) {
           e.printStackTrace();
       }
   }

   /**
    * 獲取分散式鎖
    * @param productId
    */

   public void acquireDistributedLock(Long productId) {
       String path = "/product-lock-" + productId;

       try {
           zookeeper.create(path, "".getBytes(),
                   Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
           System.out.println("success to acquire lock for product[id=" + productId + "]");
       } catch (Exception e) {
           // 如果商品對應的鎖的node已經存在了,那麼就會報異常,說明鎖已經被其他執行緒獲取了
           int count = 0;
           while(true) {
               try {
                   Thread.sleep(20);
                   zookeeper.create(path, "".getBytes(),
                           Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
               } catch (Exception e2) {
                   e2.printStackTrace();
                   count++;
                   continue;
               }
               System.out.println("success to acquire lock for product[id=" + productId + "] after " + count + " times try......");
               break;
           }
       }
   }

   /**
    * 釋放掉一個分散式鎖
    * @param productId
    */

   public void releaseDistributedLock(Long productId) {
       String path = "/product-lock-" + productId;
       try {
           zookeeper.delete(path, -1);
       } catch (Exception e) {
           e.printStackTrace();
       }
   }

   /**
    * 建立zk session的watcher
    *
    */

   private class ZooKeeperWatcher implements Watcher {

       public void process(WatchedEvent event) {
           System.out.println("Receive watched event: " + event.getState());
           if(KeeperState.SyncConnected == event.getState()) {
               connectedSemaphore.countDown();
           }
       }

   }

   /**
    * 封裝單例的靜態內部類
    *
    */

   private static class Singleton {

       private static ZooKeeperSession instance;

       static {
           instance = new ZooKeeperSession();
       }

       public static ZooKeeperSession getInstance() {
           return instance;
       }

   }

   /**
    * 獲取單例
    * @return
    */

   public static ZooKeeperSession getInstance() {
       return Singleton.getInstance();
   }

   /**
    * 初始化單例的便捷方法
    */

   public static void init() {
       getInstance();
   }
}

3.2,共享鎖

。動物園管理員提供的臨時順序節點節點就可以實現分散式共享鎖對於共享鎖:

1,讀請求:如果沒有比自己序號小的子節點,或是所有比自己序號小的子節點都是讀請求,那麼表明可以成功獲取到共享鎖;如果比自己序號小的子節點中有寫請求,則進入等待。

2,寫請求:如果自己不是序號最小的子節點,那麼就需要進入等待。

大致的思路如下:

如圖1所示,根據業務場景對於讀的場景的路徑字首為:product_read_,寫場景的路徑字首為product_wirte_。

2,建立一個臨時順序節點。

3,呼叫的的getChildren()介面來獲取所有已經建立的子節點列表。

如圖4所示,判斷當前建立的節點是否為第一個節點,如果是則獲取分散式鎖,返回真。

5,如果第四步不成立,則對於讀請求:所有比自己序號小的子節點都是讀請求,那麼表明可以成功獲取到共享鎖;如果比自己序號小的子節點中有寫請求,則進入等待對於寫請求: