Consul的分散式鎖實現
阿新 • • 發佈:2018-11-14
構建分散式系統的時候,經常需要控制對共享資源的互斥訪問,就涉及到分散式鎖(也稱為全域性鎖)的實現,基於目前的各種工具,我們已經有了大量的實現方式,比如:基於Redis的實現、基於Zookeeper的實現。本文將介紹一種基於Consul 的Key/Value儲存來實現分散式鎖以及訊號量的方法。
分散式鎖實現
-
基於Consul的分散式鎖主要利用Key/Value儲存API中的acquire和release操作來實現。acquire和release操作是類似Check-And-Set的操作:
-
acquire操作只有當鎖不存在持有者時才會返回true,並且set設定的Value值,同時執行操作的session會持有對該Key的鎖,否則就返回false
release操作則是使用指定的session來釋放某個Key的鎖,如果指定的session無效,那麼會返回false,否則就會set設定Value值,並返回true
基本流程
程式碼實現 pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.didispace</groupId> <artifactId>consul-distributed-lock</artifactId> <version>1.0-SNAPSHOT</version> <properties> <version.consul-api>1.2.1</version.consul-api> <version.slf4j>1.7.21</version.slf4j> <version.slf4j-log4j>1.7.21</version.slf4j-log4j> <version.log4j>1.2.17</version.log4j> <version.maven-compile-plugin>3.5.1</version.maven-compile-plugin> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.ecwid.consul</groupId> <artifactId>consul-api</artifactId> <version>${version.consul-api}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${version.slf4j}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${version.slf4j-log4j}</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${version.log4j}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.10</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>${version.maven-compile-plugin}</version> <configuration> <source>${maven.compiler.source}</source> <target>${maven.compiler.target}</target> </configuration> </plugin> </plugins> </build> </project>
程式碼實現
package com.didispace.lock.consul; import com.ecwid.consul.v1.ConsulClient; import com.ecwid.consul.v1.kv.model.PutParams; import lombok.extern.slf4j.Slf4j; import java.time.LocalDateTime; /** * 基於Consul的互斥鎖 * */ @Slf4j public class Lock extends BaseLock { private static final String prefix = "lock/"; // 同步鎖引數字首 /** * @param consulClient * @param lockKey 同步鎖在consul的KV儲存中的Key路徑,會自動增加prefix字首,方便歸類查詢 * @param checkTtl 對鎖Session的TTL */ public Lock(ConsulClient consulClient, String lockKey, CheckTtl checkTtl) { super(consulClient, prefix + lockKey, checkTtl); } /** * 獲取同步鎖 * * @param block 是否阻塞,直到獲取到鎖為止,預設嘗試間隔時間為500ms。 * @return */ public Boolean lock(boolean block) throws InterruptedException { return lock(block, 500L, null); } /** * 獲取同步鎖 * * @param block 是否阻塞,直到獲取到鎖為止 * @param timeInterval block=true時有效,再次嘗試的間隔時間 * @param maxTimes block=true時有效,最大嘗試次數 * @return */ public Boolean lock(boolean block, Long timeInterval, Integer maxTimes) throws InterruptedException { if (sessionId != null) { throw new RuntimeException(sessionId + " - Already locked!"); } sessionId = createSession("lock-" + this.keyPath); int count = 1; while(true) { PutParams putParams = new PutParams(); putParams.setAcquireSession(sessionId); if(consulClient.setKVValue(keyPath, "lock:" + LocalDateTime.now(), putParams).getValue()) { return true; } else if(block) { if(maxTimes != null && count >= maxTimes) { return false; } else { count ++; if(timeInterval != null) Thread.sleep(timeInterval); continue; } } else { return false; } } } /** * 釋放同步鎖 * * @return */ public Boolean unlock() { if(checkTtl != null) { checkTtl.stop(); } PutParams putParams = new PutParams(); putParams.setReleaseSession(sessionId); boolean result = consulClient.setKVValue(keyPath, "unlock:" + LocalDateTime.now(), putParams).getValue(); destroySession(); return result; } }
測式程式碼
import com.didispace.lock.consul.CheckTtl;
import com.didispace.lock.consul.Lock;
import com.ecwid.consul.v1.ConsulClient;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.Random;
/**
* 測試
*/
public class TestLock {
@Test
public void testLock() throws Exception {
ConsulClient consulClient = new ConsulClient();
CheckTtl checkTtl = new CheckTtl("lock-1", consulClient);
new Thread(new LockRunner(1, new CheckTtl("lock-1", consulClient))).start();
new Thread(new LockRunner(2, new CheckTtl("lock-2", consulClient))).start();
new Thread(new LockRunner(3, new CheckTtl("lock-3", consulClient))).start();
new Thread(new LockRunner(4, new CheckTtl("lock-4", consulClient))).start();
new Thread(new LockRunner(5, new CheckTtl("lock-5", consulClient))).start();
Thread.sleep(30000L);
}
}
@Slf4j
@AllArgsConstructor
class LockRunner implements Runnable {
private int flag;
private CheckTtl checkTtl;
@Override
public void run() {
Lock lock = new Lock(new ConsulClient(), "lock-key", checkTtl);
try {
// 獲取分散式互斥鎖(引數含義:阻塞模式、每次嘗試獲取鎖的間隔500ms、嘗試n次)
if (lock.lock(true, 500L, null)) {
log.info("Thread {} start!", flag);
// 處理業務邏輯
Thread.sleep(new Random().nextInt(5000));
log.info("Thread {} end!", flag);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
原始碼下載