zookeeper curator使用caches實現各種監聽
1、篇首語
curator是zookeeper的一個高階api開發包。封裝了zookeeper眾多的recipes,並且實現了一些新的recipes原語,最重要的是基於zookeeper提供的各種機制實現了更健壯的連線和異常處理。
本文將其中比較常用的一種recipe,就是cache。
2、各種Caches
cache是一種快取機制,可以藉助cache實現監聽。
簡單來說,cache在客戶端快取了znode的各種狀態,當感知到zk叢集的znode狀態變化,會觸發event事件,註冊的監聽器會處理這些事件。是不是很簡單。
curator支援的cache種類有3種Path Cache,Node Cache,Tree Cache
1)Path Cache
Path Cache用來觀察ZNode的子節點並快取狀態,如果ZNode的子節點被建立,更新或者刪除,那麼Path Cache會更新快取,並且觸發事件給註冊的監聽器。
Path Cache是通過PathChildrenCache類來實現的,監聽器註冊是通過PathChildrenCacheListener。
2)Node Cache
Node Cache用來觀察ZNode自身,如果ZNode節點本身被建立,更新或者刪除,那麼Node Cache會更新快取,並觸發事件給註冊的監聽器。
Node Cache是通過NodeCache類來實現的,監聽器對應的介面為NodeCacheListener。
3)Tree Cache
可以看做是上兩種的合體,Tree Cache觀察的是所有節點的所有資料。
3、下面給出一個例子。
1)這是在springboot中使用curator,先給出curator依賴pom
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.8</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.9.1</version> </dependency>
2)三種cache的實現
package com.dqa.prometheus.client.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
public class ZkClient {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private CuratorFramework client;
private NodeCache nodeCache;
private PathChildrenCache pathChildrenCache;
private TreeCache treeCache;
private String zookeeperServer;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
private int baseSleepTimeMs;
private int maxRetries;
public void setZookeeperServer(String zookeeperServer) {
this.zookeeperServer = zookeeperServer;
}
public String getZookeeperServer() {
return zookeeperServer;
}
public void setSessionTimeoutMs(int sessionTimeoutMs) {
this.sessionTimeoutMs = sessionTimeoutMs;
}
public int getSessionTimeoutMs() {
return sessionTimeoutMs;
}
public void setConnectionTimeoutMs(int connectionTimeoutMs) {
this.connectionTimeoutMs = connectionTimeoutMs;
}
public int getConnectionTimeoutMs() {
return connectionTimeoutMs;
}
public void setBaseSleepTimeMs(int baseSleepTimeMs) {
this.baseSleepTimeMs = baseSleepTimeMs;
}
public int getBaseSleepTimeMs() {
return baseSleepTimeMs;
}
public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}
public int getMaxRetries() {
return maxRetries;
}
public void init() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
client = CuratorFrameworkFactory.builder().connectString(zookeeperServer).retryPolicy(retryPolicy)
.sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).build();
client.start();
}
public void stop() {
if (client != null) CloseableUtils.closeQuietly(client);
if (pathChildrenCache != null) CloseableUtils.closeQuietly(pathChildrenCache);
if (nodeCache != null) CloseableUtils.closeQuietly(nodeCache);
if (treeCache != null) CloseableUtils.closeQuietly(treeCache);
}
public CuratorFramework getClient() {
return client;
}
/*
* 設定Path Cache, 監控本節點的子節點被建立,更新或者刪除,注意是子節點, 子節點下的子節點不能遞迴監控
* 事件型別有3個, 可以根據不同的動作觸發不同的動作
* 本例子只是演示, 所以只是列印了狀態改變的資訊, 並沒有在PathChildrenCacheListener中實現複雜的邏輯
* @Param path 監控的節點路徑, cacheData 是否快取data
* 可重入監聽
* */
public void setPathCacheListener(String path, boolean cacheData) {
try {
pathChildrenCache = new PathChildrenCache(client, path, cacheData);
PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
ChildData data = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
logger.info("子節點增加, path={}, data={}", data.getPath(), data.getData());
break;
case CHILD_UPDATED:
logger.info("子節點更新, path={}, data={}", data.getPath(), data.getData());
break;
case CHILD_REMOVED:
logger.info("子節點刪除, path={}, data={}", data.getPath(), data.getData());
break;
default:
break;
}
}
};
pathChildrenCache.getListenable().addListener(childrenCacheListener);
pathChildrenCache.start(StartMode.POST_INITIALIZED_EVENT);
} catch (Exception e) {
logger.error("PathCache監聽失敗, path=", path);
}
}
/*
* 設定Node Cache, 監控本節點的新增,刪除,更新
* 節點的update可以監控到, 如果刪除會自動再次建立空節點
* 本例子只是演示, 所以只是列印了狀態改變的資訊, 並沒有在NodeCacheListener中實現複雜的邏輯
* @Param path 監控的節點路徑, dataIsCompressed 資料是否壓縮
* 不可重入監聽
* */
public void setNodeCacheListener(String path, boolean dataIsCompressed) {
try {
nodeCache = new NodeCache(client, path, dataIsCompressed);
NodeCacheListener nodeCacheListener = new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData childData = nodeCache.getCurrentData();
logger.info("ZNode節點狀態改變, path={}", childData.getPath());
logger.info("ZNode節點狀態改變, data={}", childData.getData());
logger.info("ZNode節點狀態改變, stat={}", childData.getStat());
}
};
nodeCache.getListenable().addListener(nodeCacheListener);
nodeCache.start();
} catch (Exception e) {
logger.error("建立NodeCache監聽失敗, path={}", path);
}
}
/*
* 設定Tree Cache, 監控本節點的新增,刪除,更新
* 節點的update可以監控到, 如果刪除不會自動再次建立
* 本例子只是演示, 所以只是列印了狀態改變的資訊, 並沒有在NodeCacheListener中實現複雜的邏輯
* @Param path 監控的節點路徑, dataIsCompressed 資料是否壓縮
* 可重入監聽
* */
public void setTreeCacheListener(final String path) {
try {
treeCache = new TreeCache(client, path);
TreeCacheListener treeCacheListener = new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
if(data != null){
switch (event.getType()) {
case NODE_ADDED:
logger.info("[TreeCache]節點增加, path={}, data={}", data.getPath(), data.getData());
break;
case NODE_UPDATED:
logger.info("[TreeCache]節點更新, path={}, data={}", data.getPath(), data.getData());
break;
case NODE_REMOVED:
logger.info("[TreeCache]節點刪除, path={}, data={}", data.getPath(), data.getData());
break;
default:
break;
}
}else{
logger.info("[TreeCache]節點資料為空, path={}", data.getPath());
}
}
};
treeCache.getListenable().addListener(treeCacheListener);
treeCache.start();
} catch (Exception e) {
logger.error("建立TreeCache監聽失敗, path={}", path);
}
}
}
3)configuration
init方法是初始化zookeeper client的操作
stop是停止zookeeper是的清理動作
package com.dqa.prometheus.configuration;
import com.xiaoju.dqa.prometheus.client.zookeeper.ZkClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ZkConfiguration {
@Value("${zookeeper.server}")
private String zookeeperServer;
@Value(("${zookeeper.sessionTimeoutMs}"))
private int sessionTimeoutMs;
@Value("${zookeeper.connectionTimeoutMs}")
private int connectionTimeoutMs;
@Value("${zookeeper.maxRetries}")
private int maxRetries;
@Value("${zookeeper.baseSleepTimeMs}")
private int baseSleepTimeMs;
@Bean(initMethod = "init", destroyMethod = "stop")
public ZkClient zkClient() {
ZkClient zkClient = new ZkClient();
zkClient.setZookeeperServer(zookeeperServer);
zkClient.setSessionTimeoutMs(sessionTimeoutMs);
zkClient.setConnectionTimeoutMs(connectionTimeoutMs);
zkClient.setMaxRetries(maxRetries);
zkClient.setBaseSleepTimeMs(baseSleepTimeMs);
return zkClient;
}
}
3)zk配置檔案
其中最重要的應該是會話超時和重試機制了。
============== zookeeper ===================
zookeeper.server=10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181
zookeeper.sessionTimeoutMs=6000
zookeeper.connectionTimeoutMs=6000
zookeeper.maxRetries=3
zookeeper.baseSleepTimeMs=1000