基於Curator操作ZooKeeper(一)-基本操作
Java原生API操作ZooKeeper可參看:
相關內容:
基於Curator操作ZooKeeper(二)-Watcher操作
基於Curator操作ZooKeeper(二)-Watcher操作-補充TreeCache
Java原生操作API有以下幾個不足之處:
- 超時重連,不支援自動,必須要手動實現;
- Watcher註冊一次後只能使用一次;
- 不支援遞迴建立節點;
一般企業操作ZooKeeper的客戶端都會使用Apache Curator。和ZkClient一樣,Curator解決了很ZooKeeper客戶端非常底層的細節開發工作,包括連線重連、反覆註冊Watcher和NodeExistsException異常等,目前已經成為了Apache的頂級專案,是全世界範圍內使用最廣泛的ZooKeeper客戶端之一。
除了封裝一些開發人員不需要特別關注的底層細節之外,Curator還在ZooKeeperAPI的基礎上進行了包裝,提供了一套易用性和可讀性更強的Fluent風格的客戶端API框架。除此之外,Curator中還提供了ZooKeeper各種應用場景(Recipe,如共享鎖服務、Master選舉機制和分散式計數器等)的抽象封裝。
引入相關的依賴
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.11</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency>
建立客戶端
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; public class CuratorOperator { public CuratorFramework client = null; public static final String zkServerPath = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181"; /** * 例項化zk客戶端 */ public CuratorOperator() { /** * 同步建立zk示例,原生api是非同步的 * * curator連結zookeeper的重試策略: * * 1>ExponentialBackoffRetry【推薦】 * baseSleepTimeMs:初始sleep時間(ms) * maxRetries:最大重試次數,超過時間就不連結了 * maxSleepMs:最大重試時間(ms) * * 給定一個初始sleep時間base5leep丁imeMs,在這個基礎上結合重試次數,通過以下公式計算出當前需要sleep的時間: 當前sleep時間=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1))) 可以看出,隨著重試次數的增加,計算出的sleep時間會越來越大。如果該sleep時間在maxSleepMs的範圍之內,那麼就使用該sleep時間,否則使用maxSleepMs。另外, maxRetries引數控制了最大重試次數,以避免無限制的重試。 */ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5); /** * curator連結zookeeper的策略: * 2>RetryNTimes【推薦】 * n:重試的次數 * sleepMsBetweenRetries:每次重試間隔的時間(ms) */ // RetryPolicy retryPolicy = new RetryNTimes(3, 5000); /** * curator連結zookeeper的策略: * 3>RetryOneTime * sleepMsBetweenRetry:只重試一次,重試間隔的時間 */ // RetryPolicy retryPolicy2 = new RetryOneTime(3000); /** * 4> * 永遠重試,不推薦使用 */ // RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs) /** * curator連結zookeeper的策略: * 5>RetryUntilElapsed * maxElapsedTimeMs:最大重試時間 * sleepMsBetweenRetries:每次重試間隔 * 重試時間超過maxElapsedTimeMs後,就不再重試 */ // RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000); //建立客戶端 client = CuratorFrameworkFactory.builder() //builder .connectString(zkServerPath) .sessionTimeoutMs(10000) //session超時時間 .retryPolicy(retryPolicy) //重試策略 .build(); /** * CuratorFrameworkFactory工廠在創建出一個客戶端CuratorFramework例項之後,實質上並沒有完成會話的建立,而是需要呼叫 CuratorFramework的sta rt)方法來完成會話的建立。 */ client.start(); } /** * * @Description: 關閉zk客戶端連線 */ public void closeZKClient() { if (client != null) { this.client.close(); } } public static void main(String[] args) throws Exception { // 例項化 CuratorOperator cto = new CuratorOperator(); boolean isZkCuratorStarted = cto.client.isStarted(); System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉")); Thread.sleep(3000); cto.closeZKClient(); boolean isZkCuratorStarted2 = cto.client.isStarted(); System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉")); } }
執行結果:
namespace
在3.2.0及其之後版本的ZooKeeper中,添加了“Chroot”特性,該特性允許每個客戶端為自己設定一個名稱空間 ( Namespace )。如果一個ZooKeeper客戶端設定了Chroot,那麼該客戶端對伺服器的任何操作,都將會被限制在其自己的名稱空間下。
舉個例子來說,如果我們希望為應用X分配/apps/X下的所有子節點,那麼該應用可以將其所有ZooKeeper客戶端的Chroot設定為/apps/X的。一旦設定了Chroot之後,那麼對這個客戶端來說,所有的節點路徑都以/apps/X為根節點,它和ZooKeeper發起的所有請求中相關的節點路徑,都將是一個相對路徑—相對於/apps/X的路徑。例如通過ZooKeeper客戶端API建立點/test_chroot,那麼實際上在服務端被建立的節點是/apps/X/test_ chroot,通過設定Chroot,我們能夠將一個客戶端應用與ZooKeeper服務端
的一棵子樹相對應,在那些多個應用共用一個ZooKeeper叢集的場景下,這對於實現不同應用之間的相互隔離非常有幫助。
節點的增刪改查(同步)
新增節點:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
public class CuratorOperator {
public CuratorFramework client = null;
public static final String zkServerPath = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181";
/**
* 例項化zk客戶端
*/
public CuratorOperator() {
/**
* 同步建立zk示例,原生api是非同步的
*
* curator連結zookeeper的重試策略:
*
* 1>ExponentialBackoffRetry【推薦】
* baseSleepTimeMs:初始sleep時間(ms)
* maxRetries:最大重試次數,超過時間就不連結了
* maxSleepMs:最大重試時間(ms)
*
* 給定一個初始sleep時間base5leep丁imeMs,在這個基礎上結合重試次數,通過以下公式計算出當前需要sleep的時間:
當前sleep時間=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
可以看出,隨著重試次數的增加,計算出的sleep時間會越來越大。如果該sleep時間在maxSleepMs的範圍之內,那麼就使用該sleep時間,否則使用maxSleepMs。另外,
maxRetries引數控制了最大重試次數,以避免無限制的重試。
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
/**
* curator連結zookeeper的策略:
* 2>RetryNTimes【推薦】
* n:重試的次數
* sleepMsBetweenRetries:每次重試間隔的時間(ms)
*/
// RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
/**
* curator連結zookeeper的策略:
* 3>RetryOneTime
* sleepMsBetweenRetry:只重試一次,重試間隔的時間
*/
// RetryPolicy retryPolicy2 = new RetryOneTime(3000);
/**
* 4>
* 永遠重試,不推薦使用
*/
// RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
/**
* curator連結zookeeper的策略:
* 5>RetryUntilElapsed
* maxElapsedTimeMs:最大重試時間
* sleepMsBetweenRetries:每次重試間隔
* 重試時間超過maxElapsedTimeMs後,就不再重試
*/
// RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);
//建立客戶端
client = CuratorFrameworkFactory.builder() //builder
.connectString(zkServerPath)
.sessionTimeoutMs(10000) //session超時時間
.retryPolicy(retryPolicy) //重試策略
//namespace
.namespace("testCRUD")
.build();
/**
* CuratorFrameworkFactory工廠在創建出一個客戶端CuratorFramework例項之後,實質上並沒有完成會話的建立,而是需要呼叫
CuratorFramework的sta rt)方法來完成會話的建立。
*/
client.start();
}
/**
*
* @Description: 關閉zk客戶端連線
*/
public void closeZKClient() {
if (client != null) {
this.client.close();
}
}
public static void main(String[] args) throws Exception {
// 例項化
CuratorOperator cto = new CuratorOperator();
boolean isZkCuratorStarted = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
//建立節點
String nodePath = "/dongguabai/test";
byte[] data = "abcd".getBytes();
cto.client.create()
.creatingParentContainersIfNeeded() //遞迴建立節點
.withMode(CreateMode.PERSISTENT) //節點模式
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //ACL
.forPath(nodePath,data); //不指定內容,則內容為空
Thread.sleep(3000);
cto.closeZKClient();
boolean isZkCuratorStarted2 = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
}
}
有個要注意的地方是:
由於在ZooKeeper中規定了所有非葉子節點必須為持久節點,呼叫上面這個API之後,只有path引數對應的資料節點是臨時節點,其父節點均為持久節點。
查詢某個節點下的資料:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
public class CuratorOperator {
public CuratorFramework client = null;
public static final String zkServerPath = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181";
/**
* 例項化zk客戶端
*/
public CuratorOperator() {
/**
* 同步建立zk示例,原生api是非同步的
*
* curator連結zookeeper的重試策略:
*
* 1>ExponentialBackoffRetry【推薦】
* baseSleepTimeMs:初始sleep時間(ms)
* maxRetries:最大重試次數,超過時間就不連結了
* maxSleepMs:最大重試時間(ms)
*
* 給定一個初始sleep時間base5leep丁imeMs,在這個基礎上結合重試次數,通過以下公式計算出當前需要sleep的時間:
當前sleep時間=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
可以看出,隨著重試次數的增加,計算出的sleep時間會越來越大。如果該sleep時間在maxSleepMs的範圍之內,那麼就使用該sleep時間,否則使用maxSleepMs。另外,
maxRetries引數控制了最大重試次數,以避免無限制的重試。
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
/**
* curator連結zookeeper的策略:
* 2>RetryNTimes【推薦】
* n:重試的次數
* sleepMsBetweenRetries:每次重試間隔的時間(ms)
*/
// RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
/**
* curator連結zookeeper的策略:
* 3>RetryOneTime
* sleepMsBetweenRetry:只重試一次,重試間隔的時間
*/
// RetryPolicy retryPolicy2 = new RetryOneTime(3000);
/**
* 4>
* 永遠重試,不推薦使用
*/
// RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
/**
* curator連結zookeeper的策略:
* 5>RetryUntilElapsed
* maxElapsedTimeMs:最大重試時間
* sleepMsBetweenRetries:每次重試間隔
* 重試時間超過maxElapsedTimeMs後,就不再重試
*/
// RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);
//建立客戶端
client = CuratorFrameworkFactory.builder() //builder
.connectString(zkServerPath)
.sessionTimeoutMs(10000) //session超時時間
.retryPolicy(retryPolicy) //重試策略
//namespace:
.namespace("testCRUD")
.build();
/**
* CuratorFrameworkFactory工廠在創建出一個客戶端CuratorFramework例項之後,實質上並沒有完成會話的建立,而是需要呼叫
CuratorFramework的sta rt)方法來完成會話的建立。
*/
client.start();
}
/**
*
* @Description: 關閉zk客戶端連線
*/
public void closeZKClient() {
if (client != null) {
this.client.close();
}
}
public static void main(String[] args) throws Exception {
// 例項化
CuratorOperator cto = new CuratorOperator();
boolean isZkCuratorStarted = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
//建立節點
String nodePath = "/dongguabai/test";
byte[] data = "abcd".getBytes();
cto.client.create()
.creatingParentContainersIfNeeded() //遞迴建立節點
.withMode(CreateMode.PERSISTENT) //節點模式
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //ACL
.forPath(nodePath,data); //不指定內容,則內容為空
//獲取節點
byte[] bytes = cto.client.getData().forPath(nodePath);
System.out.println("第一次獲取節點資料為:"+new String(bytes));
Stat stat = new Stat();
byte[] bytes1 = cto.client.getData().storingStatIn(stat).forPath(nodePath);
System.out.println("第二次獲取節點資料為:"+new String(bytes1));
System.out.println("獲取的Stat為:"+ JsonUtil.toJSON(stat));
Thread.sleep(3000);
cto.closeZKClient();
boolean isZkCuratorStarted2 = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
}
}
輸出結果:
查詢子節點:
資料準備:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.List;
public class CuratorOperator {
public CuratorFramework client = null;
public static final String zkServerPath = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181";
/**
* 例項化zk客戶端
*/
public CuratorOperator() {
/**
* 同步建立zk示例,原生api是非同步的
*
* curator連結zookeeper的重試策略:
*
* 1>ExponentialBackoffRetry【推薦】
* baseSleepTimeMs:初始sleep時間(ms)
* maxRetries:最大重試次數,超過時間就不連結了
* maxSleepMs:最大重試時間(ms)
*
* 給定一個初始sleep時間base5leep丁imeMs,在這個基礎上結合重試次數,通過以下公式計算出當前需要sleep的時間:
當前sleep時間=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
可以看出,隨著重試次數的增加,計算出的sleep時間會越來越大。如果該sleep時間在maxSleepMs的範圍之內,那麼就使用該sleep時間,否則使用maxSleepMs。另外,
maxRetries引數控制了最大重試次數,以避免無限制的重試。
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
/**
* curator連結zookeeper的策略:
* 2>RetryNTimes【推薦】
* n:重試的次數
* sleepMsBetweenRetries:每次重試間隔的時間(ms)
*/
// RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
/**
* curator連結zookeeper的策略:
* 3>RetryOneTime
* sleepMsBetweenRetry:只重試一次,重試間隔的時間
*/
// RetryPolicy retryPolicy2 = new RetryOneTime(3000);
/**
* 4>
* 永遠重試,不推薦使用
*/
// RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
/**
* curator連結zookeeper的策略:
* 5>RetryUntilElapsed
* maxElapsedTimeMs:最大重試時間
* sleepMsBetweenRetries:每次重試間隔
* 重試時間超過maxElapsedTimeMs後,就不再重試
*/
// RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);
//建立客戶端
client = CuratorFrameworkFactory.builder() //builder
.connectString(zkServerPath)
.sessionTimeoutMs(10000) //session超時時間
.retryPolicy(retryPolicy) //重試策略
//namespace:
.namespace("testCRUD")
.build();
/**
* CuratorFrameworkFactory工廠在創建出一個客戶端CuratorFramework例項之後,實質上並沒有完成會話的建立,而是需要呼叫
CuratorFramework的sta rt)方法來完成會話的建立。
*/
client.start();
}
/**
*
* @Description: 關閉zk客戶端連線
*/
public void closeZKClient() {
if (client != null) {
this.client.close();
}
}
public static void main(String[] args) throws Exception {
// 例項化
CuratorOperator cto = new CuratorOperator();
boolean isZkCuratorStarted = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
String nodePath = "/dongguabai/a";
//建立節點
/* byte[] data = "abcd".getBytes();
cto.client.create()
.creatingParentContainersIfNeeded() //遞迴建立節點
.withMode(CreateMode.PERSISTENT) //節點模式
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //ACL
.forPath(nodePath,data); //不指定內容,則內容為空*/
//獲取節點
/* byte[] bytes = cto.client.getData().forPath(nodePath);
System.out.println("第一次獲取節點資料為:"+new String(bytes));
Stat stat = new Stat();
byte[] bytes1 = cto.client.getData().storingStatIn(stat).forPath(nodePath);
System.out.println("第二次獲取節點資料為:"+new String(bytes1));
System.out.println("獲取的Stat為:"+ JsonUtil.toJSON(stat));
*/
//獲取子節點
List<String> list = cto.client.getChildren().forPath(nodePath);
System.out.println("開始列印子節點:");
list.forEach(result-> System.out.println(result));
System.out.println("列印結束!");
//修改節點
/* Stat stat = cto.client.setData().forPath(nodePath,"new1".getBytes());
System.out.println("第一次獲取節點資料為:"+new String(cto.client.getData().forPath(nodePath)));
Stat stat1 = cto.client.setData().withVersion(stat.getVersion()).forPath(nodePath, "new2".getBytes());
System.out.println("第二次獲取節點資料為:"+new String(cto.client.getData().forPath(nodePath)));*/
//刪除節點
/* Stat stat = new Stat();
byte[] bytes1 = cto.client.getData().storingStatIn(stat).forPath(nodePath);
System.out.println("獲取節點資料為:"+new String(bytes1));
cto.client.delete()
.guaranteed() //防止網路抖動,只要客戶端會話有效,那麼Curator 會在後臺持續進行刪除操作,直到節點刪除成功
.deletingChildrenIfNeeded() //如果有子節點會刪除,注意除非人為刪除namespace,否則namespace不會刪除
.withVersion(stat.getVersion())
.forPath(nodePath);*/
Thread.sleep(3000);
cto.closeZKClient();
boolean isZkCuratorStarted2 = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
}
}
修改節點:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
public class CuratorOperator {
public CuratorFramework client = null;
public static final String zkServerPath = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181";
/**
* 例項化zk客戶端
*/
public CuratorOperator() {
/**
* 同步建立zk示例,原生api是非同步的
*
* curator連結zookeeper的重試策略:
*
* 1>ExponentialBackoffRetry【推薦】
* baseSleepTimeMs:初始sleep時間(ms)
* maxRetries:最大重試次數,超過時間就不連結了
* maxSleepMs:最大重試時間(ms)
*
* 給定一個初始sleep時間base5leep丁imeMs,在這個基礎上結合重試次數,通過以下公式計算出當前需要sleep的時間:
當前sleep時間=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
可以看出,隨著重試次數的增加,計算出的sleep時間會越來越大。如果該sleep時間在maxSleepMs的範圍之內,那麼就使用該sleep時間,否則使用maxSleepMs。另外,
maxRetries引數控制了最大重試次數,以避免無限制的重試。
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
/**
* curator連結zookeeper的策略:
* 2>RetryNTimes【推薦】
* n:重試的次數
* sleepMsBetweenRetries:每次重試間隔的時間(ms)
*/
// RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
/**
* curator連結zookeeper的策略:
* 3>RetryOneTime
* sleepMsBetweenRetry:只重試一次,重試間隔的時間
*/
// RetryPolicy retryPolicy2 = new RetryOneTime(3000);
/**
* 4>
* 永遠重試,不推薦使用
*/
// RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
/**
* curator連結zookeeper的策略:
* 5>RetryUntilElapsed
* maxElapsedTimeMs:最大重試時間
* sleepMsBetweenRetries:每次重試間隔
* 重試時間超過maxElapsedTimeMs後,就不再重試
*/
// RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);
//建立客戶端
client = CuratorFrameworkFactory.builder() //builder
.connectString(zkServerPath)
.sessionTimeoutMs(10000) //session超時時間
.retryPolicy(retryPolicy) //重試策略
//namespace:
.namespace("testCRUD")
.build();
/**
* CuratorFrameworkFactory工廠在創建出一個客戶端CuratorFramework例項之後,實質上並沒有完成會話的建立,而是需要呼叫
CuratorFramework的sta rt)方法來完成會話的建立。
*/
client.start();
}
/**
*
* @Description: 關閉zk客戶端連線
*/
public void closeZKClient() {
if (client != null) {
this.client.close();
}
}
public static void main(String[] args) throws Exception {
// 例項化
CuratorOperator cto = new CuratorOperator();
boolean isZkCuratorStarted = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
String nodePath = "/dongguabai/test";
//建立節點
/* byte[] data = "abcd".getBytes();
cto.client.create()
.creatingParentContainersIfNeeded() //遞迴建立節點
.withMode(CreateMode.PERSISTENT) //節點模式
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //ACL
.forPath(nodePath,data); //不指定內容,則內容為空*/
//獲取節點
/* byte[] bytes = cto.client.getData().forPath(nodePath);
System.out.println("第一次獲取節點資料為:"+new String(bytes));
Stat stat = new Stat();
byte[] bytes1 = cto.client.getData().storingStatIn(stat).forPath(nodePath);
System.out.println("第二次獲取節點資料為:"+new String(bytes1));
System.out.println("獲取的Stat為:"+ JsonUtil.toJSON(stat));*/
//修改節點
Stat stat = cto.client.setData().forPath(nodePath,"new1".getBytes());
System.out.println("第一次獲取節點資料為:"+new String(cto.client.getData().forPath(nodePath)));
Stat stat1 = cto.client.setData().withVersion(stat.getVersion()).forPath(nodePath, "new2".getBytes());
System.out.println("第二次獲取節點資料為:"+new String(cto.client.getData().forPath(nodePath)));
Thread.sleep(3000);
cto.closeZKClient();
boolean isZkCuratorStarted2 = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
}
}
輸出結果:
刪除節點:
執行程式之前:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
public class CuratorOperator {
public CuratorFramework client = null;
public static final String zkServerPath = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181";
/**
* 例項化zk客戶端
*/
public CuratorOperator() {
/**
* 同步建立zk示例,原生api是非同步的
*
* curator連結zookeeper的重試策略:
*
* 1>ExponentialBackoffRetry【推薦】
* baseSleepTimeMs:初始sleep時間(ms)
* maxRetries:最大重試次數,超過時間就不連結了
* maxSleepMs:最大重試時間(ms)
*
* 給定一個初始sleep時間base5leep丁imeMs,在這個基礎上結合重試次數,通過以下公式計算出當前需要sleep的時間:
當前sleep時間=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
可以看出,隨著重試次數的增加,計算出的sleep時間會越來越大。如果該sleep時間在maxSleepMs的範圍之內,那麼就使用該sleep時間,否則使用maxSleepMs。另外,
maxRetries引數控制了最大重試次數,以避免無限制的重試。
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
/**
* curator連結zookeeper的策略:
* 2>RetryNTimes【推薦】
* n:重試的次數
* sleepMsBetweenRetries:每次重試間隔的時間(ms)
*/
// RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
/**
* curator連結zookeeper的策略:
* 3>RetryOneTime
* sleepMsBetweenRetry:只重試一次,重試間隔的時間
*/
// RetryPolicy retryPolicy2 = new RetryOneTime(3000);
/**
* 4>
* 永遠重試,不推薦使用
*/
// RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
/**
* curator連結zookeeper的策略:
* 5>RetryUntilElapsed
* maxElapsedTimeMs:最大重試時間
* sleepMsBetweenRetries:每次重試間隔
* 重試時間超過maxElapsedTimeMs後,就不再重試
*/
// RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);
//建立客戶端
client = CuratorFrameworkFactory.builder() //builder
.connectString(zkServerPath)
.sessionTimeoutMs(10000) //session超時時間
.retryPolicy(retryPolicy) //重試策略
//namespace:
.namespace("testCRUD")
.build();
/**
* CuratorFrameworkFactory工廠在創建出一個客戶端CuratorFramework例項之後,實質上並沒有完成會話的建立,而是需要呼叫
CuratorFramework的sta rt)方法來完成會話的建立。
*/
client.start();
}
/**
*
* @Description: 關閉zk客戶端連線
*/
public void closeZKClient() {
if (client != null) {
this.client.close();
}
}
public static void main(String[] args) throws Exception {
// 例項化
CuratorOperator cto = new CuratorOperator();
boolean isZkCuratorStarted = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
String nodePath = "/dongguabai/test";
//建立節點
/* byte[] data = "abcd".getBytes();
cto.client.create()
.creatingParentContainersIfNeeded() //遞迴建立節點
.withMode(CreateMode.PERSISTENT) //節點模式
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //ACL
.forPath(nodePath,data); //不指定內容,則內容為空*/
//獲取節點
/* byte[] bytes = cto.client.getData().forPath(nodePath);
System.out.println("第一次獲取節點資料為:"+new String(bytes));
Stat stat = new Stat();
byte[] bytes1 = cto.client.getData().storingStatIn(stat).forPath(nodePath);
System.out.println("第二次獲取節點資料為:"+new String(bytes1));
System.out.println("獲取的Stat為:"+ JsonUtil.toJSON(stat));*/
//修改節點
/* Stat stat = cto.client.setData().forPath(nodePath,"new1".getBytes());
System.out.println("第一次獲取節點資料為:"+new String(cto.client.getData().forPath(nodePath)));
Stat stat1 = cto.client.setData().withVersion(stat.getVersion()).forPath(nodePath, "new2".getBytes());
System.out.println("第二次獲取節點資料為:"+new String(cto.client.getData().forPath(nodePath)));*/
//刪除節點
Stat stat = new Stat();
byte[] bytes1 = cto.client.getData().storingStatIn(stat).forPath(nodePath);
System.out.println("獲取節點資料為:"+new String(bytes1));
cto.client.delete()
.guaranteed() //防止網路抖動,只要客戶端會話有效,那麼Curator 會在後臺持續進行刪除操作,直到節點刪除成功
.deletingChildrenIfNeeded() //如果有子節點會刪除,注意除非人為刪除namespace,否則namespace不會刪除
.withVersion(stat.getVersion())
.forPath(nodePath);
Thread.sleep(3000);
cto.closeZKClient();
boolean isZkCuratorStarted2 = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
}
}
執行結果:
判斷節點是否存在:
其他相關資料:
https://blog.csdn.net/en_joker/article/details/78778917
https://www.cnblogs.com/LiZhiW/p/4926385.html?utm_source=tuicool&utm_medium=referral