讀《分布式一致性原理》JAVA客戶端API操作2
創建節點
通過客戶端API來創建一個數據節點,有一下兩個接口:
public String create(final String path, byte data[], List<ACL> acl,
CreateMode createMode)
public void create(final String path, byte data[], List<ACL> acl,
CreateMode createMode, StringCallback cb, Object ctx)
這兩個接口分別是同步和異步的方式創建節點
需要註意的是無論是同步還是異步創建節點,zookeeper都不支持遞歸創建,即在不存在父節點的情況下創建一個子節點
。另外如果一個節點已經存在了,那麽再創建同名節點時會拋出異常:NodeExistException
目前,節點的內容只支持byte[]數組類型,也就是說zookeeper不負責對象序列化,需要開發者自己講內容進行序列化與反序列化。
對已字符串直接調用getByte就行。對於其他復雜對象,可以使用序列化工具來進行。
關於權限控制,如果你的應用場景中沒有復雜的權限要求,那麽直接調用I Ids.OPEN_ACL_UNSAFE,這表明之後對這個節點的任何操作不受權限控制。
使用API創建一個節點:
package znode;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import session.CreateZookeeper;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
public class CreateZnode implements Watcher{
public static CountDownLatch connectedSemaphore = new CountDownLatch(1);
@Override
public void process(WatchedEvent event) {
System.out.println("receive watched event:"+event);
if (KeeperState.SyncConnected==event.getState()) {
connectedSemaphore.countDown();
}
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
ZooKeeper zooKeeper = new ZooKeeper("192.168.64.60", 5000, new CreateZookeeper());
connectedSemaphore.await();
String path1 = zooKeeper.create("/zk-test-ephemera-","".getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode:"+path1);
String path2 = zooKeeper.create("/zk-test-ephemera-","".getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("success create znode:"+path2);
}
}
上面兩個片段使用同步方式創建節點:可以看出創建臨時節點返回值就是傳入的路勁
使用臨時順序節點返回值會自動加上一個數字
使用異步API創建節點
package znode;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import session.CreateZookeeper;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
public class CreateZnode2 implements Watcher{
public static CountDownLatch connectedSemaphore = new CountDownLatch(1);
@Override
public void process(WatchedEvent event) {
System.out.println("receive watched event:"+event);
if (KeeperState.SyncConnected==event.getState()) {
connectedSemaphore.countDown();
}
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
ZooKeeper zooKeeper = new ZooKeeper("192.168.64.60", 5000, new CreateZookeeper());
connectedSemaphore.await();
zooKeeper.create("/zk-test-ephemera-","".getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL
,new IStringCallback(),"I am context");
zooKeeper.create("/zk-test-ephemera-","".getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL
,new IStringCallback(),"I am context");
zooKeeper.create("/zk-test-ephemera-","".getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL
,new IStringCallback(),"I am context");
}
}
class IStringCallback implements AsyncCallback.StringCallback{
@Override
public void processResult(int rc, String path, Object ctx, String name) {
// TODO Auto-generated method stub
System.out.println("create path result: ["+rc+","+path+","+ctx+","+"real path name:"+name+"]");
}
}
和同步接口最大的區別在於,節點在創建的過程(包含網絡通信和服務端的創建過程),是異步的。而且我們需要註意的是
同步創建過程時我們需要關註接口拋出的異常,而在異步接口中,是不會拋出異常的,所有的異常都會在回調函數中通過Result Code來體現。
刪除節點
public void delete(final String path, int version)
public void delete(final String path, int version, VoidCallback cb,
Object ctx)
這裏列出的兩個API是同步和異步的刪除接口,API方法的參數說明如表5-5所示。
刪除節點和更新節點的操作非常相似,在zookeeper中只允許刪除葉子節點。也就是說,如果一個節點存在子節點的話
那麽這個節點將無法直接刪除,必須先刪除其所有子節點。
讀取數據
讀取數據,包含子節點列表的獲取和節點數據的獲取。
1.getChildren
首先我們先看看註冊watcher。如果zookeeper客戶端獲取到指定節點的子節點列表後,還需要訂閱這個子節點列表的變化通知,
那麽就可以通過註冊一個Watcher來實現。當有子節點添加或刪除時,服務端就會向客戶端發送一個NodeChildrenChange的事件。
需要註意的是服務端向客戶端發送事件通知時是不包含最新的節點列表的。是需要客戶端主動重新獲取的。
Stat,stat記錄一個節點的基本屬性信息。創建時的事務ID(cZxid),最後一次修改的事務ID(mZxid)和節點數據內容的長度
dataLength,我們可以將一個舊的stat變量傳入,該stat會在執行過程中,被來自服務端響應的心的stat的替換掉。
package getchildren;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
public class GetChildren1 implements Watcher {
public static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk = null;
@Override
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected==event.getState()) {
if (EventType.None.getIntValue()==event.getState().getIntValue()&&null==event.getPath()) {
connectedSemaphore.countDown();
}else if (event.getType()==EventType.NodeChildrenChanged) {
try {
System.out.println("ReGetChild:"+zk.getChildren(event.getPath(), true));
} catch (KeeperException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
String path = "/zk-book";
zk = new ZooKeeper("192.168.64.60:2181", 5000, new GetChildren1());
connectedSemaphore.await();
zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create(path+"/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
List<String> children = zk.getChildren(path, true);
System.out.println(children);
zk.create(path+"/c2", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
Thread.sleep(Integer.MAX_VALUE);;
}
}
使用異步API獲取子節點列表
package getchildren;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
public class GetChildren2 implements Watcher {
public static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk = null;
@Override
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected==event.getState()) {
if (EventType.None.getIntValue()==event.getState().getIntValue()&&null==event.getPath()) {
connectedSemaphore.countDown();
}else if (event.getType()==EventType.NodeChildrenChanged) {
try {
System.out.println("ReGetChild:"+zk.getChildren(event.getPath(), true));
} catch (KeeperException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
String path = "/zk-book";
zk = new ZooKeeper("192.168.64.60:2181", 5000, new GetChildren2());
connectedSemaphore.await();
zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create(path+"/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
zk.getChildren(path, true, new IChildren2Callback(),"i am context");
zk.create(path+"/c2", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
Thread.sleep(Integer.MAX_VALUE);;
}
}
class IChildren2Callback implements AsyncCallback.Children2Callback{
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
// TODO Auto-generated method stub
System.out.println("Get Children znode result: "+rc+","+path+","+ctx+","+children+","+stat);
}
}
getData
getData接口和上下文中的getChildren接口的用法相同,Watcher註冊後,一旦節點的內容狀態發生改變,zookeeper服務端會
向客戶端發送一個NodeDataChanged的事件。API返回的結果類型時byte[].
使用同步AIP獲取數據節點內容
package getdata;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import getchildren.GetChildren1;
public class GetData1 implements Watcher {
public static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk = null;
private static Stat stat = new Stat();
@Override
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected==event.getState()) {
if (EventType.None.getIntValue()==event.getState().getIntValue()&&null==event.getPath()) {
connectedSemaphore.countDown();
}else if (event.getType()==EventType.NodeDataChanged) {
try {
byte[] data = zk.getData(event.getPath(), true, stat);
System.out.println(new String(data));
} catch (KeeperException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
String path = "/zk-book";
zk = new ZooKeeper("192.168.64.60:2181", 5000, new GetData1());
connectedSemaphore.await();
zk.create(path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(zk.getData(path, true, stat));
zk.setData(path, "456".getBytes(), -1);
Thread.sleep(Integer.MAX_VALUE);;
}
}
數據內容或是數據版本發生變化,都胡出發服務端的NodeDataChanged通知。
異步API獲取
package getdata;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
public class GetData2 implements Watcher {
public static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk = null;
private static Stat stat = new Stat();
@Override
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected==event.getState()) {
if (EventType.None.getIntValue()==event.getState().getIntValue()&&null==event.getPath()) {
connectedSemaphore.countDown();
}else if (event.getType()==EventType.NodeDataChanged) {
zk.getData(event.getPath(), true, new IDataback(),null);
}
}
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
String path = "/zk-book";
zk = new ZooKeeper("192.168.64.60:2181", 5000, new GetData2());
connectedSemaphore.await();
zk.create(path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL );
zk.getData(path, true,new IDataback(),null);
zk.setData(path, "456".getBytes(), -1);
Thread.sleep(Integer.MAX_VALUE);;
}
}
class IDataback implements AsyncCallback.DataCallback{
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println(new String(data));
System.out.println(stat.getCzxid());
System.out.println(stat.getMzxid());
System.out.println(stat.getVersion());
}
}
讀《分布式一致性原理》JAVA客戶端API操作2