實現自己的 RPC 框架(二)
阿新 • • 發佈:2019-12-31
前段時間自己搞了個 RPC 的輪子,不過相對來說比較簡單,最近在原來的基礎上加以改造,使用 Zookeeper 實現了 provider 自動定址以及消費者的簡單負載均衡,對之前的感興趣的請轉 造個輪子---RPC動手實現。
RPC 模型
在原來使用 TCP 直連的基礎上實現基於 Zookeeper 的服務的註冊與發現,改造後的依賴關係是這樣的。
怎麼用
話不多說,我們來看下如何釋出和引用服務。 服務端我們將服務的 IP 和埠號基礎資訊註冊到 Zookeeper 上。
/**
* @author wuhaifei 2019-08-02
*/
public class ZookeeperServerMainTest {
public static void main(String[] args) {
ServerConfig serverConfig = new ServerConfig();
serverConfig.setSerializer(AbstractSerializer.SerializeEnum.HESSIAN.serializer)
.setHost("172.16.30.114")
.setPort(5201)
.setRef(HelloServiceImpl.class.getName())
.setRegister(true )
.setInterfaceId(HelloService.class.getName());
RegistryConfig registryConfig = new RegistryConfig().setAddress("127.0.0.1:2181")
.setSubscribe(true)
.setRegister(true)
.setProtocol(RpcConstants.ZOOKEEPER);
ServerProxy serverProxy = new ServerProxy(new NettyServerAbstract())
.setServerConfig(serverConfig)
.setRegistryConfig(registryConfig);
try {
serverProxy.export();
while (true){
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
複製程式碼
通過 Zookeeper 引用註冊在其上的服務。
/**
* @author wuhaifei 2019-08-02
*/
public class ZookeeperClientMainTest {
public static void main(String[] args) {
ClientConfig clientConfig = new ClientConfig();
clientConfig.setProtocol(RpcConstants.ZOOKEEPER)
.setTimeoutMillis(100000)
.setSerializer(AbstractSerializer.SerializeEnum.HESSIAN.serializer);
RegistryConfig registryConfig = new RegistryConfig()
.setAddress("127.0.0.1:2181")
.setProtocol(RpcConstants.ZOOKEEPER)
.setRegister(true)
.setSubscribe(true);
ClientProxy<HelloService> clientProxy = new ClientProxy(clientConfig,new NettyClientAbstract(),HelloService.class)
.setRegistryConfig(registryConfig);
for (int i = 0; i < 10; i++) {
HelloService helloService = clientProxy.refer();
System.out.println(helloService.sayHi());
}
}
}
複製程式碼
執行結果就不一一貼出了,感興趣的小夥伴可以檢視樓主傳到 github 上的原始碼這是一個rpc的輪子。
服務的釋出與訂閱
樓主在原來程式碼的基礎上添加了 Zookeeper 的註冊的邏輯,原來的程式碼相關介紹請轉 造個輪子---RPC動手實現。
服務的釋出
/**
* 釋出服務
*/
public void export() {
try {
Object serviceBean = Class.forName((String) serverConfig.getRef()).newInstance();
RpcInvokerHandler.serviceMap.put(serverConfig.getInterfaceId(),serviceBean);
this.childServer.start(this.getServerConfig());
if (serverConfig.isRegister()) {
// 將服務註冊到zookeeper
register();
}
} catch (Exception e) {
// 取消服務註冊
unregister();
if (e instanceof ChildRpcRuntimeException) {
throw (ChildRpcRuntimeException) e;
} else {
throw new ChildRpcRuntimeException("Build provider proxy error!",e);
}
}
exported = true;
}
/**
* 註冊服務
*/
protected void register() {
if (serverConfig.isRegister()) {
Registry registry = RegistryFactory.getRegistry(this.getRegistryConfig());
registry.init();
registry.start();
try {
registry.register(this.serverConfig);
} catch (ChildRpcRuntimeException e) {
throw e;
} catch (Throwable e) {
String appName = serverConfig.getInterfaceId();
LOGGER.info(appName,"Catch exception when register to registry: "
+ registryConfig.getId(),e);
}
}
}
複製程式碼
服務的訂閱
/**
* 服務的引用.
*/
public T refer() {
try {
if (config.isSubscribe()) {
subscribe();
}
childClient.init(this.clientConfig);
return invoke();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 訂閱zk的服務列表.
*/
private void subscribe() {
Registry registry = RegistryFactory.getRegistry(this.getRegistryConfig());
registry.init();
registry.start();
this.clientConfig = (ClientConfig) config;
List<String> providerList = registry.subscribe(this.clientConfig);
if (null == providerList) {
throw new ChildRpcRuntimeException("無可用服務供訂閱!");
}
// 使用隨機演演算法,隨機選擇一個provider
int index = ThreadLocalRandom.current().nextInt(providerList.size());
String providerInfo = providerList.get(index);
String[] providerArr = providerInfo.split(":");
clientConfig = (ClientConfig) this.config;
clientConfig.setHost(providerArr[0]);
clientConfig.setPort(Integer.parseInt(providerArr[1]));
}
複製程式碼
上面程式碼比較簡單,就是在原來直連的基礎上新增 zk 的操作,在釋出服務的時候將 provider 的 IP 和埠號基礎資訊註冊到 zk 上,在引用服務的時候使用隨機演演算法從 zk 上選取可用的 provider 資訊,然後進行 invoke 呼叫。
小結
RPC(Remote procedure call)底層邏輯相對來說比較簡單,樓主在實現的過程中參考了其他 RPC 框架的部分程式碼,受益匪淺~