1. 程式人生 > >從零寫分散式RPC框架 系列 2.0 (3)RPC-Server和RPC-Client模組改造

從零寫分散式RPC框架 系列 2.0 (3)RPC-Server和RPC-Client模組改造

2.0版本RPC-Server改動不大,主要變化在於RPC-Client使用了服務地址快取,並引入監控機制,第一時間獲取zk叢集中服務地址資訊變化並重新整理本地快取。另外,RPC-Client還使用了RpcClientProperties開放對負載均衡策略和序列化策略的選擇。

系列文章:

專欄:從零開始寫分散式RPC框架
專案GitHub地址:https://github.com/linshenkx/rpc-netty-spring-boot-starter

手寫通用型別負載均衡路由引擎(含隨機、輪詢、雜湊等及其帶權形式)
實現 序列化引擎(支援 JDK預設、Hessian、Json、Protostuff、Xml、Avro、ProtocolBuffer、Thrift等序列化方式)


從零寫分散式RPC框架 系列 2.0 (1)架構升級
從零寫分散式RPC框架 系列 2.0 (2)RPC-Common模組設計實現
從零寫分散式RPC框架 系列 2.0 (3)RPC-Server和RPC-Client模組改造
從零寫分散式RPC框架 系列 2.0 (4)使用BeanPostProcessor實現自定義@RpcReference註解注入

文章目錄

RPC-Server

1 結構圖

結構圖注意,RpcService註解移動到了RPC-Common模組下,另外新加了ServiceInfo代表將存到註冊中心的服務資訊(也在RPC-Common模組下),其他的除了RpcServer基本沒有變化

2 RpcService註解

主要是多了weight和workerThreads,分別代表權重和最大工作執行緒數。

/**
 * @version V1.0
 * @author: lin_shen
 * @date: 2018/10/31
 * @Description:
 * RPC服務註解(標註在rpc服務實現類上)
 * 使用@Service註解使被@RpcService標註的類都能被Spring管理
 */
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Service public @interface RpcService { Class<?> value(); int weight() default 1; int workerThreads() default 10; }

3 ServiceInfo

/**
 * @version V1.0
 * @author: lin_shen
 * @date: 18-11-13
 * @Description: 服務資訊,用於儲存到註冊中心
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ServiceInfo implements WeightGetAble {

    private String host;
    private int port;
    /**
     * 權重資訊
     */
    private int weight;
    /**
     * 最大工作執行緒數
     */
    private int workerThreads;

    public ServiceInfo (ServiceInfo serviceInfo){
        this.host = serviceInfo.host;
        this.port = serviceInfo.port;
        this.weight = serviceInfo.weight;
        this.workerThreads = serviceInfo.workerThreads;
    }

    @Override
    public int getWeightFactors() {
        return getWeight();
    }
}

4 RpcServer

RpcServer主要是多了對 serviceSemaphoreMap 和 serviceRpcServiceMap的管理。其中serviceSemaphoreMap 將作為引數傳入RpcServerHandler提供限流資訊,而serviceRpcServiceMap將註冊到ZK叢集。

/**
 * @version V1.0
 * @author: lin_shen
 * @date: 2018/10/31
 * @Description: TODO
 */
@Log4j2
@AutoConfigureAfter({ZKServiceRegistry.class})
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcServer implements ApplicationContextAware, InitializingBean {

    /**
     * 存放 服務名稱 與 服務例項 之間的對映關係
     */
    private Map<String,Object> handlerMap=new HashMap<>();

    /**
     * 存放 服務名稱 與 訊號量 之間的對映關係
     * 用於限制每個服務的工作執行緒數
     */
    private Map<String, Semaphore> serviceSemaphoreMap=new HashMap<>();

    /**
     * 存放 服務名稱 與 服務資訊 之間的對映關係
     */
    private Map<String, RpcService> serviceRpcServiceMap=new HashMap<>();

    @Autowired
    private RpcServerProperties rpcProperties;

    @Autowired
    private ZKServiceRegistry rpcServiceRegistry;

    /**
     * 在類初始化時執行,將所有被@RpcService標記的類納入管理
     * @param applicationContext
     * @throws BeansException
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        //獲取帶有@RpcService註解的類
        Map<String,Object> rpcServiceMap=applicationContext.getBeansWithAnnotation(RpcService.class);
        //以@RpcService註解的value的類的類名為鍵將該標記類存入handlerMap和serviceSemaphoreMap
        if(!CollectionUtils.isEmpty(rpcServiceMap)){
            for(Object object:rpcServiceMap.values()){
                RpcService rpcService=object.getClass().getAnnotation(RpcService.class);
                String serviceName=rpcService.value().getName();
                handlerMap.put(serviceName,object);
                serviceSemaphoreMap.put(serviceName,new Semaphore(rpcService.workerThreads()));
                serviceRpcServiceMap.put(serviceName,rpcService);
            }
        }

    }


    /**
     * 在所有屬性值設定完成後執行,負責啟動RPC服務
     * @throws Exception
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        //管理相關childGroup
        EventLoopGroup bossGroup=new NioEventLoopGroup();
        //處理相關RPC請求
        EventLoopGroup childGroup=new NioEventLoopGroup();

        try {
            //啟動RPC服務
            ServerBootstrap bootstrap=new ServerBootstrap();
            bootstrap.group(bossGroup,childGroup);
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.option(ChannelOption.SO_BACKLOG,1024)
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    .handler(new LoggingHandler(LogLevel.INFO));
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel channel) throws Exception {
                    ChannelPipeline pipeline=channel.pipeline();
                    //解碼RPC請求
                    pipeline.addLast(new RemotingTransporterDecoder());
                    //編碼RPC請求
                    pipeline.addFirst(new RemotingTransporterEncoder());
                    //處理RPC請求
                    pipeline.addLast(new RpcServerHandler(handlerMap,serviceSemaphoreMap));
                }
            });
            //同步啟動,RPC伺服器啟動完畢後才執行後續程式碼
            ChannelFuture future=bootstrap.bind(rpcProperties.getPort()).sync();
            log.info("server started,listening on {}",rpcProperties.getPort());

            //啟動後註冊服務
            registry();

            //釋放資源
            future.channel().closeFuture().sync();
        }catch (Exception e){
            log.entry("server exception",e);
        }finally {
            //關閉RPC服務
            childGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }

    }

    private void registry() throws UnknownHostException {
        //註冊RPC服務地址
        String hostAddress=InetAddress.getLocalHost().getHostAddress();
        int port=rpcProperties.getPort();

        for(String interfaceName:handlerMap.keySet()){
            ServiceInfo serviceInfo=
                    new ServiceInfo(hostAddress,port,serviceRpcServiceMap.get(interfaceName).weight(),serviceRpcServiceMap.get(interfaceName).workerThreads());
            String serviceInfoString= JSON.toJSONString(serviceInfo);
            rpcServiceRegistry.register(interfaceName,serviceInfoString);
            log.info("register service:{}=>{}",interfaceName,serviceInfoString);
        }
    }
}

RPC-Client

1 結構圖

RPC-Client
新增RpcClientProperties提供配置屬性讀入(路由策略和序列化方式),ZKServiceDiscovery增加ConcurrentMap<String,List> servicePathsMap來管理服務地址列表。RpcClient相應作出調整。

2 RpcClientProperties

注意這裡屬性用的是列舉型別而不是字串,另外預設路由策略是隨機,預設序列化策略是json

@Data
@ConfigurationProperties(prefix = "rpc.client")
public class RpcClientProperties {
    private RouteStrategyEnum routeStrategy= RouteStrategyEnum.Random;
    private SerializeTypeEnum serializeType=SerializeTypeEnum.JSON;
}

3 ZKServiceDiscovery

這裡使用了IZkChildListener 來對目標路徑下子節點變化進行監控,如果發生變化(新增或刪減)則重新執行discover方法拉取最新服務地址列表。
zkChildListenerMap的作用是管理服務和對應的服務地址列表監聽器,避免重複註冊監聽器。

/**
 * @version V1.0
 * @author: lin_shen
 * @date: 2018/10/31
 * @Description: zookeeper服務註冊中心
 */
@Component
@Log4j2
@EnableConfigurationProperties(ZKProperties.class)
public class ZKServiceDiscovery {

  @Autowired
  private ZKProperties zkProperties;

  /**
   * 服務名和服務地址列表的Map
   */
  private ConcurrentMap<String,List<String>> servicePathsMap=new ConcurrentHashMap<>();

  /**
   * 服務監聽器 Map,監聽子節點服務資訊
   */
  private ConcurrentMap<String, IZkChildListener> zkChildListenerMap=new ConcurrentHashMap<>();

  private ZkClient zkClient;

  @PostConstruct
  public void init() {
    // 建立 ZooKeeper 客戶端
    zkClient = new ZkClient(zkProperties.getAddress(), zkProperties.getSessionTimeOut(), zkProperties.getConnectTimeOut());
    log.info("connect to zookeeper");
  }

  /**
   *
   * 根據服務名獲取服務地址並保持監控
   * @param serviceName
   * @return
   */
  public void discover(String serviceName){
    log.info("discovering:"+serviceName);
    String servicePath=zkProperties.getRegistryPath()+"/"+serviceName;
    //找不到對應服務
    if(!zkClient.exists(servicePath)){
      throw new RuntimeException("can not find any service node on path: "+servicePath);
    }
    //獲取服務地址列表
    List<String> addressList=zkClient.getChildren(servicePath);
    if(CollectionUtils.isEmpty(addressList)){
      throw new RuntimeException("can not find any address node on path: "+servicePath);
    }
    //儲存地址列表
    List<String> paths=new ArrayList<>(addressList.size());
    for(String address:addressList){
      paths.add(zkClient.readData(servicePath+"/"+address));
    }
    servicePathsMap.put(serviceName,paths);
    //保持監控
    if(!zkChildListenerMap.containsKey(serviceName)){
      IZkChildListener iZkChildListener= (parentPath, currentChilds) -> {
        //當子節點列表變化時重新discover
        discover(serviceName);
        log.info("子節點列表發生變化 ");
      };
      zkClient.subscribeChildChanges(servicePath, iZkChildListener);
      zkChildListenerMap.put(serviceName,iZkChildListener);
    }
  }

  public List<String> getAddressList(String serviceName){
      List<String> addressList=servicePathsMap.get(serviceName);
      if(addressList==null||addressList.isEmpty()){
          discover(serviceName);
          return servicePathsMap.get(serviceName);
      }
      return addressList;
  }

}

4 RpcClient

主要是配合RemotingTransporter做了調整和升級,整體變化不大。
另外一個要注意的就是 ConcurrentMap<String,RouteStrategy> serviceRouteStrategyMap,用於在使用輪詢策略時,為不同的服務呼叫保管對應的輪詢器(輪詢器內部儲存index記錄,是有狀態的)。

@Log4j2
@Component
@AutoConfigureAfter(ZKServiceDiscovery.class)
@EnableConfigurationProperties(RpcClientProperties.class)
public class RpcClient {

    @Autowired
    private ZKServiceDiscovery zkServiceDiscovery;

    @Autowired
    private RpcClientProperties rpcClientProperties;

    /**
     * 維持服務的 輪詢 路由狀態
     * 不同服務狀態不同(服務列表也不同)
     * 非輪詢無需維持狀態
     */
    private ConcurrentMap<String,RouteStrategy> serviceRouteStrategyMap=new ConcurrentHashMap<>();

    /**
     * 存放請求編號與響應物件的對映關係
     */
    private ConcurrentMap<Long, RemotingTransporter> remotingTransporterMap=new ConcurrentHashMap<>();

    @SuppressWarnings("unchecked")
    public <T> T create(final Class<?> interfaceClass){
        //建立動態代理物件
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                (proxy, method, args) -> {
                    //建立RPC請求物件
                    RpcRequest rpcRequest=new RpcRequest();
                    rpcRequest.setInterfaceName(method.getDeclaringClass().getName());
                    rpcRequest.setMethodName(method.getName());
                    rpcRequest.setParameterTypes(method.getParameterTypes());
                    rpcRequest.setParameters(args);
                    //獲取RPC服務資訊列表
                    String serviceName=interfaceClass.getName();
                    List<String> addressList=zkServiceDiscovery.getAddressList(serviceName);

                    List<ServiceInfo> serviceInfoList=new ArrayList<>(addressList.size());
                    for(String serviceInfoString:addressList){
                        serviceInfoList.add(JSON.parseObject(serviceInfoString,ServiceInfo.class));
                    }
                    //根據配置檔案獲取路由策略
                    log.info("使用負載均衡策略:"+rpcClientProperties.getRouteStrategy());
                    log.info("使用序列化策略:"+rpcClientProperties.getSerializeType());
                    RouteStrategy routeStrategy ;
                    //如果使用輪詢,則需要儲存狀態(按服務名儲存)
                    if(RouteStrategyEnum.Polling==rpcClientProperties.getRouteStrategy()){
                        routeStrategy=serviceRouteStrategyMap.getOrDefault(serviceName,RouteEngine.queryClusterStrategy(RouteStrategyEnum.Polling));
                        serviceRouteStrategyMap.put(serviceName,routeStrategy);
                    }else {
                        routeStrategy= RouteEngine.queryClusterStrategy(rpcClientProperties.getRouteStrategy());
                    }
                    //根據路由策略選取服務提供方
                    ServiceInfo serviceInfo = routeStrategy.select(serviceInfoList);

                    RemotingTransporter remotingTransporter=new RemotingTransporter();
                    //設定flag為請求,雙路,非ping,非其他,序列化方式為 配置檔案中SerializeTypeEnum對應的code
                    remotingTransporter.setFlag(new RemotingTransporter.Flag(true,true,false,false,rpcClientProperties.getSerializeType().getCode()));

                    remotingTransporter.setBodyContent(rpcRequest);

                    log.info("get serviceInfo:"+serviceInfo);
                    //從RPC服務地址中解析主機名與埠號
                    //傳送RPC請求
                    RpcResponse rpcResponse=send(remotingTransporter,serviceInfo.getHost(),serviceInfo.getPort());
                    //獲取響應結果
                    if(rpcResponse==null){
                        log.error("send request failure",new IllegalStateException("response is null"));
                        return null;
                    }
                    if(rpcResponse.getException()!=null){
                        log.error("response has exception",rpcResponse.getException());
                        return null;
                    }

                    return rpcResponse.getResult();
                }
        );
    }

    private RpcResponse send(RemotingTransporter remotingTransporter,String host,int port){
        log.info("sen