1. 程式人生 > >Consul的分散式鎖實現

Consul的分散式鎖實現

構建分散式系統的時候,經常需要控制對共享資源的互斥訪問,就涉及到分散式鎖(也稱為全域性鎖)的實現,基於目前的各種工具,我們已經有了大量的實現方式,比如:基於Redis的實現、基於Zookeeper的實現。本文將介紹一種基於Consul 的Key/Value儲存來實現分散式鎖以及訊號量的方法。
分散式鎖實現

  • 基於Consul的分散式鎖主要利用Key/Value儲存API中的acquire和release操作來實現。acquire和release操作是類似Check-And-Set的操作:

  • acquire操作只有當鎖不存在持有者時才會返回true,並且set設定的Value值,同時執行操作的session會持有對該Key的鎖,否則就返回false
    release操作則是使用指定的session來釋放某個Key的鎖,如果指定的session無效,那麼會返回false,否則就會set設定Value值,並返回true

基本流程
在這裡插入圖片描述

程式碼實現 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.didispace</groupId>
    <artifactId>consul-distributed-lock</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <version.consul-api>1.2.1</version.consul-api>
        <version.slf4j>1.7.21</version.slf4j>
        <version.slf4j-log4j>1.7.21</version.slf4j-log4j>
        <version.log4j>1.2.17</version.log4j>
        <version.maven-compile-plugin>3.5.1</version.maven-compile-plugin>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>

        <dependency>
            <groupId>com.ecwid.consul</groupId>
            <artifactId>consul-api</artifactId>
            <version>${version.consul-api}</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${version.slf4j}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${version.slf4j-log4j}</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${version.log4j}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.10</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${version.maven-compile-plugin}</version>
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

程式碼實現

package com.didispace.lock.consul;

import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.kv.model.PutParams;
import lombok.extern.slf4j.Slf4j;

import java.time.LocalDateTime;

/**
 * 基於Consul的互斥鎖
 *
 */
@Slf4j
public class Lock extends BaseLock {

    private static final String prefix = "lock/";  // 同步鎖引數字首

    /**
     * @param consulClient
     * @param lockKey       同步鎖在consul的KV儲存中的Key路徑,會自動增加prefix字首,方便歸類查詢
     * @param checkTtl      對鎖Session的TTL
     */
    public Lock(ConsulClient consulClient, String lockKey, CheckTtl checkTtl) {
        super(consulClient, prefix + lockKey, checkTtl);
    }

    /**
     * 獲取同步鎖
     *
     * @param block            是否阻塞,直到獲取到鎖為止,預設嘗試間隔時間為500ms。
     * @return
     */
    public Boolean lock(boolean block) throws InterruptedException {
        return lock(block, 500L, null);
    }


    /**
     * 獲取同步鎖
     *
     * @param block            是否阻塞,直到獲取到鎖為止
     * @param timeInterval     block=true時有效,再次嘗試的間隔時間
     * @param maxTimes         block=true時有效,最大嘗試次數
     * @return
     */
    public Boolean lock(boolean block, Long timeInterval, Integer maxTimes) throws InterruptedException {
        if (sessionId != null) {
            throw new RuntimeException(sessionId + " - Already locked!");
        }
        sessionId = createSession("lock-" + this.keyPath);
        int count = 1;
        while(true) {
            PutParams putParams = new PutParams();
            putParams.setAcquireSession(sessionId);
            if(consulClient.setKVValue(keyPath, "lock:" + LocalDateTime.now(), putParams).getValue()) {
                return true;
            } else if(block) {
                if(maxTimes != null && count >= maxTimes) {
                    return false;
                } else {
                    count ++;
                    if(timeInterval != null)
                        Thread.sleep(timeInterval);
                    continue;
                }
            } else {
                return false;
            }
        }
    }

    /**
     * 釋放同步鎖
     *
     * @return
     */
    public Boolean unlock() {
        if(checkTtl != null) {
            checkTtl.stop();
        }

        PutParams putParams = new PutParams();
        putParams.setReleaseSession(sessionId);
        boolean result = consulClient.setKVValue(keyPath, "unlock:" + LocalDateTime.now(), putParams).getValue();

        destroySession();
        return result;
    }


}

測式程式碼

import com.didispace.lock.consul.CheckTtl;
import com.didispace.lock.consul.Lock;
import com.ecwid.consul.v1.ConsulClient;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.Random;

/**
 * 測試
 */

public class TestLock {

    @Test
    public void testLock() throws Exception  {
        ConsulClient consulClient = new ConsulClient();
        CheckTtl checkTtl = new CheckTtl("lock-1", consulClient);
        new Thread(new LockRunner(1, new CheckTtl("lock-1", consulClient))).start();
        new Thread(new LockRunner(2, new CheckTtl("lock-2", consulClient))).start();
        new Thread(new LockRunner(3, new CheckTtl("lock-3", consulClient))).start();
        new Thread(new LockRunner(4, new CheckTtl("lock-4", consulClient))).start();
        new Thread(new LockRunner(5, new CheckTtl("lock-5", consulClient))).start();
        Thread.sleep(30000L);
    }


}

@Slf4j
@AllArgsConstructor
class LockRunner implements Runnable {

    private int flag;
    private CheckTtl checkTtl;

    @Override
    public void run() {
        Lock lock = new Lock(new ConsulClient(), "lock-key", checkTtl);
        try {
            // 獲取分散式互斥鎖(引數含義:阻塞模式、每次嘗試獲取鎖的間隔500ms、嘗試n次)
            if (lock.lock(true, 500L, null)) {
                log.info("Thread {} start!", flag);
                // 處理業務邏輯
                Thread.sleep(new Random().nextInt(5000));
                log.info("Thread {} end!", flag);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }
}

原始碼下載