Dubbo原始碼解析(十八)遠端通訊——Zookeeper
遠端通訊——Zookeeper
目標:介紹基於zookeeper的來實現的遠端通訊、介紹dubbo-remoting-zookeeper內的原始碼解析。
前言
對於zookeeper我相信肯定不陌生,在之前的文章裡面也有講到zookeeper來作為註冊中心。在這裡,基於zookeeper來實現遠端通訊,duubo封裝了zookeeper client,來和zookeeper server通訊。
下面是類圖:
原始碼分析
(一)ZookeeperClient
public interface ZookeeperClient {
/**
* 建立client
* @param path
* @param ephemeral
*/
void create(String path,boolean ephemeral);
/**
* 刪除client
* @param path
*/
void delete(String path);
/**
* 獲得子節點集合
* @param path
* @return
*/
List<String> getChildren(String path);
/**
* 向zookeeper的該節點發起訂閱,獲得該節點所有
* @param path
* @param listener
* @return
*/
List<String> addChildListener(String path,ChildListener listener);
/**
* 移除該節點的子節點監聽器
* @param path
* @param listener
*/
void removeChildListener(String path,ChildListener listener);
/**
* 新增狀態監聽器
* @param listener
*/
void addStateListener(StateListener listener);
/**
* 移除狀態監聽
* @param listener
*/
void removeStateListener(StateListener listener);
/**
* 判斷是否連線
* @return
*/
boolean isConnected();
/**
* 關閉客戶端
*/
void close();
/**
* 獲得url
* @return
*/
URL getUrl();
}
複製程式碼
該介面是基於zookeeper的客戶端介面,其中封裝了客戶端的一些方法。
(二)AbstractZookeeperClient
該類實現了ZookeeperClient介面,是客戶端的抽象類,它實現了一些公共邏輯,把具體的doClose、createPersistent等方法抽象出來,留給子類來實現。
1.屬性
/**
* url物件
*/
private final URL url;
/**
* 狀態監聽器集合
*/
private final Set<StateListener> stateListeners = new CopyOnWriteArraySet<StateListener>();
/**
* 客戶端監聽器集合
*/
private final ConcurrentMap<String,ConcurrentMap<ChildListener,TargetChildListener>> childListeners = new ConcurrentHashMap<String,TargetChildListener>>();
/**
* 是否關閉
*/
private volatile boolean closed = false;
複製程式碼
2.create
@Override
public void create(String path,boolean ephemeral) {
// 如果不是臨時節點
if (!ephemeral) {
// 判斷該客戶端是否存在
if (checkExists(path)) {
return;
}
}
// 獲得/的位置
int i = path.lastIndexOf('/');
if (i > 0) {
// 建立客戶端
create(path.substring(0,i),false);
}
// 如果是臨時節點
if (ephemeral) {
// 建立臨時節點
createEphemeral(path);
} else {
// 遞迴建立節點
createPersistent(path);
}
}
複製程式碼
該方法是建立客戶端的方法,其中createEphemeral和createPersistent方法都被抽象出來。具體看下面的類的介紹。
3.addStateListener
@Override
public void addStateListener(StateListener listener) {
// 狀態監聽器加入集合
stateListeners.add(listener);
}
複製程式碼
該方法就是增加狀態監聽器。
4.close
@Override
public void close() {
if (closed) {
return;
}
closed = true;
try {
// 關閉
doClose();
} catch (Throwable t) {
logger.warn(t.getMessage(),t);
}
}
複製程式碼
該方法是關閉客戶端,其中doClose方法也被抽象出。
/**
* 關閉客戶端
*/
protected abstract void doClose();
/**
* 遞迴建立節點
* @param path
*/
protected abstract void createPersistent(String path);
/**
* 建立臨時節點
* @param path
*/
protected abstract void createEphemeral(String path);
/**
* 檢測該節點是否存在
* @param path
* @return
*/
protected abstract boolean checkExists(String path);
/**
* 建立子節點監聽器
* @param path
* @param listener
* @return
*/
protected abstract TargetChildListener createTargetChildListener(String path,ChildListener listener);
/**
* 為子節點新增監聽器
* @param path
* @param listener
* @return
*/
protected abstract List<String> addTargetChildListener(String path,TargetChildListener listener);
/**
* 移除子節點監聽器
* @param path
* @param listener
*/
protected abstract void removeTargetChildListener(String path,TargetChildListener listener);
複製程式碼
上述的方法都是被抽象的,又它的兩個子類來實現。
(三)ZkclientZookeeperClient
該類繼承了AbstractZookeeperClient,是zk客戶端的實現類。
1.屬性
/**
* zk客戶端包裝類
*/
private final ZkClientWrapper client;
/**
* 連線狀態
*/
private volatile KeeperState state = KeeperState.SyncConnected;
複製程式碼
該類有兩個屬性,其中client就是核心所在,幾乎所有方法都呼叫了client的方法。
2.建構函式
public ZkclientZookeeperClient(URL url) {
super(url);
// 新建一個zkclient包裝類
client = new ZkClientWrapper(url.getBackupAddress(),30000);
// 增加狀態監聽
client.addListener(new IZkStateListener() {
/**
* 如果狀態改變
* @param state
* @throws Exception
*/
@Override
public void handleStateChanged(KeeperState state) throws Exception {
ZkclientZookeeperClient.this.state = state;
// 如果狀態變為了斷開連線
if (state == KeeperState.Disconnected) {
// 則修改狀態
stateChanged(StateListener.DISCONNECTED);
} else if (state == KeeperState.SyncConnected) {
stateChanged(StateListener.CONNECTED);
}
}
@Override
public void handleNewSession() throws Exception {
// 狀態變為重連
stateChanged(StateListener.RECONNECTED);
}
});
// 啟動客戶端
client.start();
}
複製程式碼
該方法是構造方法,同時在裡面也做了建立客戶端和啟動客戶端的操作。其他方法都是實現了父類抽象的方法,並且呼叫的是client方法,為舉個例子:
@Override
public void createPersistent(String path) {
try {
// 遞迴建立節點
client.createPersistent(path);
} catch (ZkNodeExistsException e) {
}
}
複製程式碼
該方法是遞迴場景節點,呼叫的就是client.createPersistent(path)。
(四)CuratorZookeeperClient
該類是Curator框架提供的一套高階API,簡化了ZooKeeper的操作,從而對客戶端的實現。
1.屬性
/**
* 框架式客戶端
*/
private final CuratorFramework client;
複製程式碼
2.構造方法
public CuratorZookeeperClient(URL url) {
super(url);
try {
// 工廠建立者
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(1,1000))
.connectionTimeoutMs(5000);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest",authority.getBytes());
}
// 建立客戶端
client = builder.build();
// 新增監聽器
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client,ConnectionState state) {
// 如果為狀態為lost,則改變為未連線
if (state == ConnectionState.LOST) {
CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
} else if (state == ConnectionState.CONNECTED) {
// 改變狀態為連線
CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
} else if (state == ConnectionState.RECONNECTED) {
// 改變狀態為未連線
CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
}
}
});
// 啟動客戶端
client.start();
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(),e);
}
}
複製程式碼
該方法是構造方法,同樣裡面也包含了客戶端建立和啟動的邏輯。
其他的方法也一樣是實現了父類的抽象方法,舉個列子:
@Override
public void createPersistent(String path) {
try {
client.create().forPath(path);
} catch (NodeExistsException e) {
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(),e);
}
}
複製程式碼
(五)ZookeeperTransporter
@SPI("curator")
public interface ZookeeperTransporter {
/**
* 連線伺服器
* @param url
* @return
*/
@Adaptive({Constants.CLIENT_KEY,Constants.TRANSPORTER_KEY})
ZookeeperClient connect(URL url);
}
複製程式碼
該方法是zookeeper的資訊交換介面。同樣也是一個可擴充套件介面,預設實現CuratorZookeeperTransporter類。
(六)ZkclientZookeeperTransporter
public class ZkclientZookeeperTransporter implements ZookeeperTransporter {
@Override
public ZookeeperClient connect(URL url) {
// 新建ZkclientZookeeperClient例項
return new ZkclientZookeeperClient(url);
}
}
複製程式碼
該類實現了ZookeeperTransporter,其中就是建立了ZkclientZookeeperClient例項。
(七)CuratorZookeeperTransporter
public class CuratorZookeeperTransporter implements ZookeeperTransporter {
@Override
public ZookeeperClient connect(URL url) {
// 建立CuratorZookeeperClient例項
return new CuratorZookeeperClient(url);
}
}
複製程式碼
該介面實現了ZookeeperTransporter,是ZookeeperTransporter預設的實現類,同樣也是建立了;對應的CuratorZookeeperClient例項。
(八)ZkClientWrapper
該類是zk客戶端的包裝類。
1.屬性
/**
* 超時事件
*/
private long timeout;
/**
* zk客戶端
*/
private ZkClient client;
/**
* 客戶端狀態
*/
private volatile KeeperState state;
/**
* 客戶端執行緒
*/
private ListenableFutureTask<ZkClient> listenableFutureTask;
/**
* 是否開始
*/
private volatile boolean started = false;
複製程式碼
2.構造方法
public ZkClientWrapper(final String serverAddr,long timeout) {
this.timeout = timeout;
listenableFutureTask = ListenableFutureTask.create(new Callable<ZkClient>() {
@Override
public ZkClient call() throws Exception {
// 建立zk客戶端
return new ZkClient(serverAddr,Integer.MAX_VALUE);
}
});
}
複製程式碼
設定了超時時間和客戶端執行緒。
3.start
public void start() {
// 如果客戶端沒有開啟
if (!started) {
// 建立連線執行緒
Thread connectThread = new Thread(listenableFutureTask);
connectThread.setName("DubboZkclientConnector");
connectThread.setDaemon(true);
// 開啟執行緒
connectThread.start();
try {
// 獲得zk客戶端
client = listenableFutureTask.get(timeout,TimeUnit.MILLISECONDS);
} catch (Throwable t) {
logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!",t);
}
started = true;
} else {
logger.warn("Zkclient has already been started!");
}
}
複製程式碼
該方法是客戶端啟動方法。
4.addListener
public void addListener(final IZkStateListener listener) {
// 增加監聽器
listenableFutureTask.addListener(new Runnable() {
@Override
public void run() {
try {
client = listenableFutureTask.get();
// 增加監聽器
client.subscribeStateChanges(listener);
} catch (InterruptedException e) {
logger.warn(Thread.currentThread().getName() + " was interrupted unexpectedly,which may cause unpredictable exception!");
} catch (ExecutionException e) {
logger.error("Got an exception when trying to create zkclient instance,can not connect to zookeeper server,please check!",e);
}
}
});
}
複製程式碼
該方法是為客戶端新增監聽器。
其他方法都是對於 客戶端是否還連線的檢測,可自行檢視程式碼。
(九)ChildListener
public interface ChildListener {
/**
* 子節點修改
* @param path
* @param children
*/
void childChanged(String path,List<String> children);
}
複製程式碼
該介面是子節點的監聽器,當子節點變化的時候會用到。
(十)StateListener
public interface StateListener {
int DISCONNECTED = 0;
int CONNECTED = 1;
int RECONNECTED = 2;
/**
* 狀態修改
* @param connected
*/
void stateChanged(int connected);
}
複製程式碼
該介面是狀態監聽器,其中定義了一個狀態更改的方法以及三種狀態。
後記
該部分相關的原始碼解析地址:github.com/CrazyHZM/in…
該文章講解了基於zookeeper的來實現的遠端通訊、介紹dubbo-remoting-zookeeper內的原始碼解析,關鍵需要對zookeeper有所瞭解。該篇之後,遠端通訊的原始碼解析就先到這裡了,其實大家會發現,如果能夠對講解api系列的文章瞭解透了,那麼後面的文章九很簡單,就好像軌道鋪好,可以直接順著軌道往後,根本沒有阻礙。接下來我將開始對rpc模組進行講解。