zookeeper分散式鎖程式碼實現(一)
阿新 • • 發佈:2019-02-11
利用zookeeper的臨時節點實現分散式鎖,這種方法簡單,斷開連線後能自動刪除臨時節點,相當於已獲得鎖的呼叫者掛掉後自動釋放鎖,但當呼叫者太多,會出現“驚群”現象。
/** * zookeeper鎖實現 * @author skymr * */ public class ZookeeperLock implements Lock, Watcher{ public ZookeeperLock(String url, int sessionTimeOut, String path){ this.path = path; try { //url是zookepper伺服器的地址 zk = new ZooKeeper(url, sessionTimeOut, this); latch.await(); } catch (Exception e) { e.printStackTrace(); } } //zk客戶端 private ZooKeeper zk; //結點路徑 private String path; //用於初始化zk的,zk連線是非同步的,但連線成功後才能進行呼叫 private CountDownLatch latch = new CountDownLatch(1); public void lock() { if(!tryLock()){ try { Stat s = zk.exists(path, false); if(s == null){ //建立節點失敗,但節點現在這不存在了 lock(); return; } } catch (Exception e1) { } //如果嘗試加鎖失敗,則進入等待 synchronized(zk){ System.out.println(Thread.currentThread().getName() +" lock失敗,進入等待"); try { zk.wait(); } catch (Exception e) { } System.out.println(Thread.currentThread().getName() +" lock等待完成"); } //等待別人釋放鎖後,自己再去加鎖 lock(); } else{ System.out.println(Thread.currentThread().getName() +" lock成功"); } } public void lockInterruptibly() throws InterruptedException { } public boolean tryLock() { try { //加鎖程式碼是建立一個節點 zk.create(path, "111".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); //不拋異常就表示建立成功啦 return true; } catch (Exception e) { try { //建立失敗,那就監聽此節點的動態, 當此節點刪除後要重新加鎖的 zk.getChildren(path, this); //這裡不應該用zk.exists(path, true)嗎,不曉得為什麼getChildren也可以 } catch (Exception e1) {} } finally{ } return false; } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } public void unlock() { try { //釋放鎖,刪除節點 zk.delete(path, -1); } catch (Exception e) { } } public Condition newCondition() { return null; } public void process(WatchedEvent event) { System.out.println(event); if(event.getType() == EventType.None){ latch.countDown(); } if(event.getType() == EventType.NodeDeleted && event.getPath().equals(path)){ synchronized(zk){ zk.notifyAll(); } System.out.println(Thread.currentThread().getName() +" 通知Lock等待中的執行緒重試加鎖"); } } }
說明:
只實現了Lock介面的 trylock, lock,與unlock方法
測試:
建立maven專案
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>club.skymr</groupId> <artifactId>zklock</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>zklock</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.13</version> </dependency> </dependencies> </project>
測試類:
public class LockTest { // private static Lock lock = new ReentrantLock(); // private static Lock lock = new ZookeeperLock("localhost", 3000, "/node"); public static void main(String[] args) throws Exception{ for(int i = 0; i < 2; i++){ new Thread(){ public void run(){ Lock lock = new ZookeeperLock("localhost", 3000, "/node"); try{ lock.lock(); System.out.println(Thread.currentThread().getName() + "開始執行"); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread().getName() + "執行完成 "); } finally{ lock.unlock(); } } }.start(); } } }
測試結果:
WatchedEvent state:SyncConnected type:None path:null
WatchedEvent state:SyncConnected type:None path:null
Thread-0 lock成功
Thread-0開始執行
Thread-1 lock失敗,進入等待
Thread-0執行完成
WatchedEvent state:SyncConnected type:NodeDeleted path:/node
Thread-1 lock等待完成
Thread-1-EventThread 通知Lock等待中的執行緒重試加鎖
Thread-1 lock成功
Thread-1開始執行
Thread-1執行完成
程式碼比較粗糙,但已實現基本的分散式鎖