Zookeeper的java客戶端API使用方法(五)
阿新 • • 發佈:2018-12-24
前面幾篇博文,我們簡單的介紹了一下zookeeper,如何安裝zookeeper叢集,以及如何使用命令列等。這篇博文我們重點來看下Zookeeper的java客戶端API使用方式。
建立會話
客戶端可以通過建立一個Zookeeper(org.apache.zookeeper.ZooKeeper)例項來連線ZooKeeper伺服器。給大家推薦一篇博文,構造方法和引數都介紹的非常的詳細,看一下就好。
我們看一下建立會話的程式碼就好了。
public class CreateSession implements Watcher{
private static ZooKeeper zookeeper;
//建立一個與伺服器的連線 需要(服務端的 ip+埠號)(session過期時間)(Watcher監聽註冊)
public static void main(String[] args) throws IOException, InterruptedException {
zookeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateSession());
System.out.println(zookeeper.getState());
Thread.sleep(Integer.MAX_VALUE);
}
private void doSomething(){
System.out.println("do something" );
}
@Override
public void process(WatchedEvent event) {
System.out.println("Receive watched event:" + event);
if(event.getState() == KeeperState.SyncConnected){
System.out.println("ZooKeeper session established.");
doSomething();
}
}
}
需要我們注意就一點:ZooKeeper 允許客戶端向服務端註冊一個 Watcher 監聽,當服務端的一些指定事件觸發了這個 Watcher,那麼就會向指定客戶端傳送一個事件通知來實現分散式的通知功能。
zookeeper的所有的API,都有同步和非同步兩種方式,使用非同步API時,client可為每個operation設定callback,在operation被執行後,zookeeper會執行對應的callback。
下面給大家展示一下兩個方式建立節點的不同。
同步建立節點
public class CreateNodeSync implements Watcher{
private static ZooKeeper zookeeper;
//建立一個與伺服器的連線 需要(服務端的 ip+埠號)(session過期時間)(Watcher監聽註冊)
public static void main(String[] args) throws IOException, InterruptedException {
zookeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateNodeSync());
System.out.println(zookeeper.getState());
Thread.sleep(Integer.MAX_VALUE);
}
/**
* 建立Znode
* CreateMode:
* PERSISTENT (持續的,相對於EPHEMERAL,不會隨著client的斷開而消失)
* PERSISTENT_SEQUENTIAL(持久的且帶順序的)
* EPHEMERAL (短暫的,生命週期依賴於client session)
* EPHEMERAL_SEQUENTIAL (短暫的,帶順序的)
*/
private void doSomething(){
try {
String path = zookeeper.create("/node_2", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("return path:" + path);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("do something");
}
@Override
public void process(WatchedEvent event) {
System.out.println("Receive watched event:" + event);
if(event.getState() == KeeperState.SyncConnected){
doSomething();
}
}
}
非同步建立節點
public class CreateNodeASync implements Watcher {
private static ZooKeeper zookeeper;
//建立一個與伺服器的連線 需要(服務端的 ip+埠號)(session過期時間)(Watcher監聽註冊)
public static void main(String[] args) throws IOException, InterruptedException {
zookeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateNodeASync());
System.out.println(zookeeper.getState());
Thread.sleep(Integer.MAX_VALUE);
}
private void doSomething() {
zookeeper.create("/node_1", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
new IStringCallback(), "this is content");
System.out.println("do something");
}
@Override
public void process(WatchedEvent event) {
System.out.println("Receive watched event:" + event);
if (event.getState() == KeeperState.SyncConnected) {
doSomething();
}
}
static class IStringCallback implements AsyncCallback.StringCallback {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("rc:" + rc);
System.out.println("path:" + path);
System.out.println("ctx:" + ctx);
System.out.println("name:" + name);
}
}
}
他們的執行結果就不再給大家展示了,篇幅會比較大,我對API對zookeeper節點增刪改查,都做了同步和非同步的編寫,大家可以在下面的連結地址中下載。
下載
總結:
- 在這裡api對zookeeper節點進行增刪改查,有同步和非同步的方
- zookeeper不支援遞迴建立子節點(也就是說在父節點不存在的情況下,不允許建立子節點)
- zookeeper不支援遞迴刪除(也就是說在父節點有子節點的情況下,不允許直接刪除父節點)
下篇博文,我們進行zookeeper的實戰部分。