1. 程式人生 > 程式設計 >實現自己的 RPC 框架(二)

實現自己的 RPC 框架(二)

前段時間自己搞了個 RPC 的輪子,不過相對來說比較簡單,最近在原來的基礎上加以改造,使用 Zookeeper 實現了 provider 自動定址以及消費者的簡單負載均衡,對之前的感興趣的請轉 造個輪子---RPC動手實現

RPC 模型

在原來使用 TCP 直連的基礎上實現基於 Zookeeper 的服務的註冊與發現,改造後的依賴關係是這樣的。

child-rpc

怎麼用

話不多說,我們來看下如何釋出和引用服務。 服務端我們將服務的 IP 和埠號基礎資訊註冊到 Zookeeper 上。

/**
 * @author wuhaifei 2019-08-02
 */
public class ZookeeperServerMainTest
{ public static void main(String[] args) { ServerConfig serverConfig = new ServerConfig(); serverConfig.setSerializer(AbstractSerializer.SerializeEnum.HESSIAN.serializer) .setHost("172.16.30.114") .setPort(5201) .setRef(HelloServiceImpl.class.getName()) .setRegister(true
) .setInterfaceId(HelloService.class.getName()); RegistryConfig registryConfig = new RegistryConfig().setAddress("127.0.0.1:2181") .setSubscribe(true) .setRegister(true) .setProtocol(RpcConstants.ZOOKEEPER); ServerProxy serverProxy = new
ServerProxy(new NettyServerAbstract()) .setServerConfig(serverConfig) .setRegistryConfig(registryConfig); try { serverProxy.export(); while (true){ } } catch (Exception e) { e.printStackTrace(); } } } 複製程式碼

通過 Zookeeper 引用註冊在其上的服務。

/**
 * @author wuhaifei 2019-08-02
 */
public class ZookeeperClientMainTest {
    public static void main(String[] args) {
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.setProtocol(RpcConstants.ZOOKEEPER)
                .setTimeoutMillis(100000)
                .setSerializer(AbstractSerializer.SerializeEnum.HESSIAN.serializer);

        RegistryConfig registryConfig = new RegistryConfig()
                .setAddress("127.0.0.1:2181")
                .setProtocol(RpcConstants.ZOOKEEPER)
                .setRegister(true)
                .setSubscribe(true);
        ClientProxy<HelloService> clientProxy = new ClientProxy(clientConfig,new NettyClientAbstract(),HelloService.class)
                .setRegistryConfig(registryConfig);
        for (int i = 0; i < 10; i++) {
            HelloService helloService = clientProxy.refer();
            System.out.println(helloService.sayHi());
        }
    }
}
複製程式碼

執行結果就不一一貼出了,感興趣的小夥伴可以檢視樓主傳到 github 上的原始碼這是一個rpc的輪子

服務的釋出與訂閱

樓主在原來程式碼的基礎上添加了 Zookeeper 的註冊的邏輯,原來的程式碼相關介紹請轉 造個輪子---RPC動手實現

服務的釋出

/**
* 釋出服務
*/
public void export() {
    try {
        Object serviceBean = Class.forName((String) serverConfig.getRef()).newInstance();
        RpcInvokerHandler.serviceMap.put(serverConfig.getInterfaceId(),serviceBean);
        this.childServer.start(this.getServerConfig());

        if (serverConfig.isRegister()) {
            // 將服務註冊到zookeeper
            register();
        }
    } catch (Exception e) {
        // 取消服務註冊
        unregister();
        if (e instanceof ChildRpcRuntimeException) {
            throw (ChildRpcRuntimeException) e;
        } else {
            throw new ChildRpcRuntimeException("Build provider proxy error!",e);
        }
    }
    exported = true;
}

/**
 * 註冊服務
 */
protected void register() {
    if (serverConfig.isRegister()) {
        Registry registry = RegistryFactory.getRegistry(this.getRegistryConfig());
        registry.init();
        registry.start();
        try {
            registry.register(this.serverConfig);
        } catch (ChildRpcRuntimeException e) {
            throw e;
        } catch (Throwable e) {
            String appName = serverConfig.getInterfaceId();
            LOGGER.info(appName,"Catch exception when register to registry: "
                    + registryConfig.getId(),e);
        }
    }
}

複製程式碼

服務的訂閱

/**
* 服務的引用.
*/
public T refer() {
    try {
        if (config.isSubscribe()) {
            subscribe();
        }
        childClient.init(this.clientConfig);
        return invoke();
    } catch (Exception e) {
        e.printStackTrace();
    }
    return null;
}

/**
 * 訂閱zk的服務列表.
 */
private void subscribe() {
    Registry registry = RegistryFactory.getRegistry(this.getRegistryConfig());
    registry.init();
    registry.start();

    this.clientConfig = (ClientConfig) config;
    List<String> providerList = registry.subscribe(this.clientConfig);

    if (null == providerList) {
        throw new ChildRpcRuntimeException("無可用服務供訂閱!");
    }

    // 使用隨機演演算法,隨機選擇一個provider
    int index = ThreadLocalRandom.current().nextInt(providerList.size());
    String providerInfo = providerList.get(index);
    String[] providerArr = providerInfo.split(":");
    clientConfig = (ClientConfig) this.config;
    clientConfig.setHost(providerArr[0]);
    clientConfig.setPort(Integer.parseInt(providerArr[1]));
}
複製程式碼

上面程式碼比較簡單,就是在原來直連的基礎上新增 zk 的操作,在釋出服務的時候將 provider 的 IP 和埠號基礎資訊註冊到 zk 上,在引用服務的時候使用隨機演演算法從 zk 上選取可用的 provider 資訊,然後進行 invoke 呼叫。

小結

RPC(Remote procedure call)底層邏輯相對來說比較簡單,樓主在實現的過程中參考了其他 RPC 框架的部分程式碼,受益匪淺~