1. 程式人生 > >zookeeper分散式鎖程式碼實現(一)

zookeeper分散式鎖程式碼實現(一)

利用zookeeper的臨時節點實現分散式鎖,這種方法簡單,斷開連線後能自動刪除臨時節點,相當於已獲得鎖的呼叫者掛掉後自動釋放鎖,但當呼叫者太多,會出現“驚群”現象。

/**
 * zookeeper鎖實現
 * @author skymr
 *
 */
public class ZookeeperLock implements Lock, Watcher{
	
	
	public ZookeeperLock(String url, int sessionTimeOut, String path){
		this.path = path;
		try {
			//url是zookepper伺服器的地址
			zk = new ZooKeeper(url, sessionTimeOut, this);
			latch.await();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	//zk客戶端
	private ZooKeeper zk;
	
	//結點路徑
	private String path;
	
	//用於初始化zk的,zk連線是非同步的,但連線成功後才能進行呼叫
	private CountDownLatch latch = new CountDownLatch(1);

	public void lock() {
		if(!tryLock()){
			try {
				Stat s = zk.exists(path, false);
				if(s == null){
					//建立節點失敗,但節點現在這不存在了
					lock();
					return;
				}
			} catch (Exception e1) {
			}
			//如果嘗試加鎖失敗,則進入等待
			synchronized(zk){
				System.out.println(Thread.currentThread().getName() +" lock失敗,進入等待");
				try {
					zk.wait();
				} catch (Exception e) {
				}
				System.out.println(Thread.currentThread().getName() +" lock等待完成");
			}
			//等待別人釋放鎖後,自己再去加鎖
			lock();
		}
		else{
			System.out.println(Thread.currentThread().getName() +" lock成功");
		}
	}

	public void lockInterruptibly() throws InterruptedException {
		
	}

	public boolean tryLock() {
		try {
			//加鎖程式碼是建立一個節點
			zk.create(path, "111".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
			//不拋異常就表示建立成功啦
			return true;
		} catch (Exception e) {
			try {
				//建立失敗,那就監聽此節點的動態, 當此節點刪除後要重新加鎖的
				zk.getChildren(path, this);
                //這裡不應該用zk.exists(path, true)嗎,不曉得為什麼getChildren也可以
			} catch (Exception e1) {} 
		}
		finally{
		}
		return false;
	}

	public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
		return false;
	}

	public void unlock() {
		try {
			//釋放鎖,刪除節點
			zk.delete(path, -1);
		} catch (Exception e) {
		}
	}

	public Condition newCondition() {
		return null;
	}

	public void process(WatchedEvent event) {
		System.out.println(event);
		if(event.getType() == EventType.None){
			latch.countDown();
		}
		if(event.getType() == EventType.NodeDeleted && event.getPath().equals(path)){
			synchronized(zk){
				zk.notifyAll();
			}
			System.out.println(Thread.currentThread().getName() +" 通知Lock等待中的執行緒重試加鎖");
		}
	}

}

說明:

只實現了Lock介面的 trylock, lock,與unlock方法

測試:

建立maven專案

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>club.skymr</groupId>
  <artifactId>zklock</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>zklock</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
	<dependency>
	    <groupId>org.apache.zookeeper</groupId>
	    <artifactId>zookeeper</artifactId>
	    <version>3.4.13</version>
	</dependency>

  </dependencies>
</project>

測試類:


public class LockTest {

//	private static Lock lock = new ReentrantLock();
	
//	private static Lock lock = new ZookeeperLock("localhost", 3000, "/node");
	
	public static void main(String[] args) throws Exception{
		
		for(int i = 0; i < 2; i++){
			new Thread(){
				public void run(){
					Lock lock = new ZookeeperLock("localhost", 3000, "/node");
					try{
						lock.lock();
						System.out.println(Thread.currentThread().getName() + "開始執行");
						try {
							Thread.sleep(2000);
						} catch (InterruptedException e) {
						}
						System.out.println(Thread.currentThread().getName() + "執行完成 ");
					}
					finally{
						lock.unlock();
					}
				}
			}.start();
		}
	}
}

測試結果:

WatchedEvent state:SyncConnected type:None path:null
WatchedEvent state:SyncConnected type:None path:null
Thread-0 lock成功
Thread-0開始執行
Thread-1 lock失敗,進入等待
Thread-0執行完成 
WatchedEvent state:SyncConnected type:NodeDeleted path:/node
Thread-1 lock等待完成
Thread-1-EventThread 通知Lock等待中的執行緒重試加鎖
Thread-1 lock成功
Thread-1開始執行
Thread-1執行完成 

程式碼比較粗糙,但已實現基本的分散式鎖