1. 程式人生 > 程式設計 >基於Netty實現簡易RPC框架

基於Netty實現簡易RPC框架

原文地址:pjmike的部落格

前言

現在網上有很多關於使用Netty來構建RPC框架的例子,為什麼我這裡還要寫一篇文章進行論述呢,我很清楚我可能沒有寫得他們那麼好。之所以還要寫,有兩點原因:

  • 一是因為學過Netty之後,還需要去不斷實踐才能更好的把握Netty的用法,顯然,基於Netty實現RPC框架是一個很好的做法;
  • 二是因為目前市面上有很多RPC框架,比如Dubbo,這些框架通訊底層都是Netty,所以說通過這個例子,也可以更好的去體驗RPC的設計。

下面我將從以下幾點闡述如何基於Netty實現簡易的RPC框架:

  • RPC是什麼?
  • 實現RPC框架需要關注哪些方面 ?
  • 使用Netty如何實現?

RPC是什麼?

RPC,遠端過程呼叫,可以做到像本地呼叫一樣呼叫遠端服務,是一種程式間的通訊方式,概念想必大家都很清楚,在看到58沈劍寫的RPC文章 之後,意識到其實我們可以換一種思考方式去理解RPC,也就是從本地調用出發,進而去推導RPC呼叫

rpc_58

1. 本地函式呼叫

本地函式是我們經常碰到的,比如下面示例:

public String sayHello(String name) {
    return "hello," + name;
}
複製程式碼

我們只需要傳入一個引數,呼叫sayHello方法就可以得到一個輸出,也就是輸入引數——>方法體——>輸出,入參、出參以及方法體都在同一個程式空間中,這就是本地函式呼叫

2. Socket通訊

那有沒有辦法實現不同程式之間通訊呢?呼叫方在程式A,需要呼叫方法A,但是方法A在程式B中

rpc_2

最容易想到的方式就是使用Socket通訊,使用Socket可以完成跨程式呼叫,我們需要約定一個程式通訊協議,來進行傳參,呼叫函式,出參。寫過Socket應該都知道,Socket是比較原始的方式,我們需要更多的去關注一些細節問題,比如引數和函式需要轉換成位元組流進行網路傳輸,也就是序列化操作,然後出參時需要反序列化;使用socket進行底層通訊,程式碼程式設計也比較容易出錯。

如果一個呼叫方需要關注這麼多問題,那無疑是個災難,所以有沒有什麼簡單方法,讓我們的呼叫方不需要關注細節問題,讓呼叫方像呼叫本地函式一樣,只要傳入引數,呼叫方法,然後坐等返回結果就可以了呢?

3. RPC框架

RPC框架就是用來解決上面的問題的,它能夠讓呼叫方像呼叫本地函式一樣呼叫遠端服務,底層通訊細節對呼叫方是透明的,將各種複雜性都給遮蔽掉,給予呼叫方極致體驗。

rpc_3

RPC呼叫需要關注哪些方面

前面就已經說到RPC框架,讓呼叫方像呼叫本地函式一樣呼叫遠端服務,那麼如何做到這一點呢?

在使用的時候,呼叫方是直接呼叫本地函式,傳入相應引數,其他細節它不用管,至於通訊細節交給RPC框架來實現。實際上RPC框架採用代理類的方式,具體來說是動態代理的方式,在執行時動態建立新的類,也就是代理類,在該類中實現通訊的細節問題,比如引數序列化

當然不光是序列化,我們還需要約定一個雙方通訊的協議格式,規定好協議格式,比如請求引數的資料型別,請求的引數,請求的方法名等,這樣根據格式進行序列化後進行網路傳輸,然後服務端收到請求物件後按照指定格式進行解碼,這樣服務端才知道具體該呼叫哪個方法,傳入什麼樣的引數。

剛才又提到網路傳輸,RPC框架重要的一環也就是網路傳輸,服務是部署在不同主機上的,如何高效的進行網路傳輸,儘量不丟包,保證資料完整無誤的快速傳遞出去?實際上,就是利用我們今天的主角——Netty,Netty是一個高效能的網路通訊框架,它足以勝任我們的任務。

前面說了這麼多,再次總結下一個RPC框架需要重點關注哪幾個點:

  • 代理 (動態代理)
  • 通訊協議
  • 序列化
  • 網路傳輸

當然一個優秀的RPC框架需要關注的不止上面幾點,只不過本篇文章旨在做一個簡易的RPC框架,理解了上面關鍵的幾點就夠了

rpc_4

基於Netty實現RPC框架

終於到了本文的重頭戲了,我們將根據實現RPC需要關注的幾個要點(代理、序列化、協議、編解碼),使用Netty進行逐一實現

1. Protocol(協議)

首先我們需要確定通訊雙方的協議格式,請求物件和響應物件

請求物件:

@Data
@ToString
public class RpcRequest {
    /**
     * 請求物件的ID
     */
    private String requestId;
    /**
     * 類名
     */
    private String className;
    /**
     * 方法名
     */
    private String methodName;
    /**
     * 引數型別
     */
    private Class<?>[] parameterTypes;
    /**
     * 入參
     */
    private Object[] parameters;
}

複製程式碼
  • 請求物件的ID是客戶端用來驗證伺服器請求和響應是否匹配

響應物件:

@Data
public class RpcResponse {
    /**
     * 響應ID
     */
    private String requestId;
    /**
     * 錯誤資訊
     */
    private String error;
    /**
     * 返回的結果
     */
    private Object result;
}
複製程式碼

2. 序列化

市面上序列化協議很多,比如jdk自帶的,Google的protobuf,kyro、Hessian等,只要不選擇jdk自帶的序列化方法,(因為其效能太差,序列化後產生的碼流太大),其他方式其實都可以,這裡為了方便起見,選用JSON作為序列化協議,使用fastjson作為JSON框架

為了後續擴充套件方便,先定義序列化介面:

public interface Serializer {
    /**
     * java物件轉換為二進位制
     *
     * @param object
     * @return
     */
    byte[] serialize(Object object) throws IOException;

    /**
     * 二進位制轉換成java物件
     *
     * @param clazz
     * @param bytes
     * @param <T>
     * @return
     */
    <T> T deserialize(Class<T> clazz,byte[] bytes) throws IOException;
}
複製程式碼

因為我們採用JSON的方式,所以定義JSONSerializer的實現類:

public class JSONSerializer implements Serializer{

    @Override
    public byte[] serialize(Object object) {
        return JSON.toJSONBytes(object);
    }

    @Override
    public <T> T deserialize(Class<T> clazz,byte[] bytes) {
        return JSON.parseObject(bytes,clazz);
    }
}
複製程式碼

如果後續要使用其他序列化方式,可以自行實現序列化介面

3. 編解碼器

約定好協議格式和序列化方式之後,我們還需要編解碼器,編碼器將請求物件轉換為適合於傳輸的格式(一般來說是位元組流),而對應的解碼器是將網路位元組流轉換回應用程式的訊息格式。

編碼器實現:

public class RpcEncoder extends MessageToByteEncoder {
    private Class<?> clazz;
    private Serializer serializer;

    public RpcEncoder(Class<?> clazz,Serializer serializer) {
        this.clazz = clazz;
        this.serializer = serializer;
    }


    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext,Object msg,ByteBuf byteBuf) throws Exception {
        if (clazz != null && clazz.isInstance(msg)) {
            byte[] bytes = serializer.serialize(msg);
            byteBuf.writeInt(bytes.length);
            byteBuf.writeBytes(bytes);
        }
    }
}
複製程式碼

解碼器實現:

public class RpcDecoder extends ByteToMessageDecoder {
    private Class<?> clazz;
    private Serializer serializer;

    public RpcDecoder(Class<?> clazz,Serializer serializer) {
        this.clazz = clazz;
        this.serializer = serializer;
    }
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext,ByteBuf byteBuf,List<Object> list) throws Exception {
        //因為之前編碼的時候寫入一個Int型,4個位元組來表示長度
        if (byteBuf.readableBytes() < 4) {
            return;
        }
        //標記當前讀的位置
        byteBuf.markReaderIndex();
        int dataLength = byteBuf.readInt();
        if (byteBuf.readableBytes() < dataLength) {
            byteBuf.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        //將byteBuf中的資料讀入data位元組陣列
        byteBuf.readBytes(data);
        Object obj = serializer.deserialize(clazz,data);
        list.add(obj);
    }
}
複製程式碼

4. Netty 客戶端

下面來看看Netty客戶端是如何實現的,也就是如何使用Netty開啟客戶端。

實際上,熟悉Netty的朋友應該都知道,我們需要注意以下幾點:

  • 編寫啟動方法,指定傳輸使用Channel
  • 指定ChannelHandler,對網路傳輸中的資料進行讀寫處理
  • 新增編解碼器
  • 新增失敗重試機制
  • 新增傳送請求訊息的方法

下面來看具體的實現程式碼:

@Slf4j
public class NettyClient {
    private EventLoopGroup eventLoopGroup;
    private Channel channel;
    private ClientHandler clientHandler;
    private String host;
    private Integer port;
    private static final int MAX_RETRY = 5;
    public NettyClient(String host,Integer port) {
        this.host = host;
        this.port = port;
    }
    public void connect() {
        clientHandler = new ClientHandler();
        eventLoopGroup = new NioEventLoopGroup();
        //啟動類
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                //指定傳輸使用的Channel
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE,true)
                .option(ChannelOption.TCP_NODELAY,true)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //新增編碼器
                        pipeline.addLast(new RpcEncoder(RpcRequest.class,new JSONSerializer()));
                        //新增解碼器
                        pipeline.addLast(new RpcDecoder(RpcResponse.class,new JSONSerializer()));
                        //請求處理類
                        pipeline.addLast(clientHandler);
                    }
                });
        connect(bootstrap,host,port,MAX_RETRY);
    }

    /**
     * 失敗重連機制,參考Netty入門實戰掘金小冊
     *
     * @param bootstrap
     * @param host
     * @param port
     * @param retry
     */
    private void connect(Bootstrap bootstrap,String host,int port,int retry) {
        ChannelFuture channelFuture = bootstrap.connect(host,port).addListener(future -> {
            if (future.isSuccess()) {
                log.info("連線服務端成功");
            } else if (retry == 0) {
                log.error("重試次數已用完,放棄連線");
            } else {
                //第幾次重連:
                int order = (MAX_RETRY - retry) + 1;
                //本次重連的間隔
                int delay = 1 << order;
                log.error("{} : 連線失敗,第 {} 重連....",new Date(),order);
                bootstrap.config().group().schedule(() -> connect(bootstrap,retry - 1),delay,TimeUnit.SECONDS);
            }
        });
        channel = channelFuture.channel();
    }

    /**
     * 傳送訊息
     *
     * @param request
     * @return
     */
    public RpcResponse send(final RpcRequest request) {
        try {
            channel.writeAndFlush(request).await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return clientHandler.getRpcResponse(request.getRequestId());
    }
    @PreDestroy
    public void close() {
        eventLoopGroup.shutdownGracefully();
        channel.closeFuture().syncUninterruptibly();
    }
}

複製程式碼

我們對於資料的處理重點在於ClientHandler類上,它繼承了ChannelDuplexHandler類,可以對出站和入站的資料進行處理

public class ClientHandler extends ChannelDuplexHandler {
    /**
     * 使用Map維護請求物件ID與響應結果Future的對映關係
     */
    private final Map<String,DefaultFuture> futureMap = new ConcurrentHashMap<>();
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
        if (msg instanceof RpcResponse) {
            //獲取響應物件
            RpcResponse response = (RpcResponse) msg;
            DefaultFuture defaultFuture =
            futureMap.get(response.getRequestId());
            //將結果寫入DefaultFuture
            defaultFuture.setResponse(response);
        }
        super.channelRead(ctx,msg);
    }

    @Override
    public void write(ChannelHandlerContext ctx,ChannelPromise promise) throws Exception {
        if (msg instanceof RpcRequest) {
            RpcRequest request = (RpcRequest) msg;
            //傳送請求物件之前,先把請求ID儲存下來,並構建一個與響應Future的對映關係
            futureMap.putIfAbsent(request.getRequestId(),new DefaultFuture());
        }
        super.write(ctx,msg,promise);
    }

    /**
     * 獲取響應結果
     *
     * @param requsetId
     * @return
     */
    public RpcResponse getRpcResponse(String requsetId) {
        try {
            DefaultFuture future = futureMap.get(requsetId);
            return future.getRpcResponse(10);
        } finally {
            //獲取成功以後,從map中移除
            futureMap.remove(requsetId);
        }
    }
}

複製程式碼

參考文章: xilidou.com/2018/09/26/…

從上面實現可以看出,我們定義了一個Map,維護請求ID與響應結果的對映關係,目的是為了客戶端用來驗證服務端響應是否與請求相匹配,因為Netty的channel可能被多個執行緒使用,當結果返回時,你不知道是從哪個執行緒返回的,所以需要一個對映關係。

而我們的結果是封裝在DefaultFuture中的,因為Netty是非同步框架,所有的返回都是基於Future和Callback機制的,我們這裡自定義Future來實現客戶端"非同步呼叫"

public class DefaultFuture {
    private RpcResponse rpcResponse;
    private volatile boolean isSucceed = false;
    private final Object object = new Object();

    public RpcResponse getRpcResponse(int timeout) {
        synchronized (object) {
            while (!isSucceed) {
                try {
                    object.wait(timeout);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return rpcResponse;
        }
    }

    public void setResponse(RpcResponse response) {
        if (isSucceed) {
            return;
        }
        synchronized (object) {
            this.rpcResponse = response;
            this.isSucceed = true;
            object.notify();
        }
    }
}

複製程式碼
  • 實際上用了wait和notify機制,同時使用一個boolean變數做輔助

5. Netty服務端

Netty服務端的實現跟客戶端的實現差不多,只不過要注意的是,當對請求進行解碼過後,需要通過代理的方式呼叫本地函式。下面是Server端程式碼:

public class NettyServer implements InitializingBean {
    private EventLoopGroup boss = null;
    private EventLoopGroup worker = null;
    @Autowired
    private ServerHandler serverHandler;
    @Override
    public void afterPropertiesSet() throws Exception {
        //此處使用了zookeeper做註冊中心,本文不涉及,可忽略
        ServiceRegistry registry = new ZkServiceRegistry("127.0.0.1:2181");
        start(registry);
    }

    public void start(ServiceRegistry registry) throws Exception {
        //負責處理客戶端連線的執行緒池
        boss = new NioEventLoopGroup();
        //負責處理讀寫操作的執行緒池
        worker = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss,worker)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG,1024)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //新增解碼器
                        pipeline.addLast(new RpcEncoder(RpcResponse.class,new JSONSerializer()));
                        //新增編碼器
                        pipeline.addLast(new RpcDecoder(RpcRequest.class,new JSONSerializer()));
                        //新增請求處理器
                        pipeline.addLast(serverHandler);

                    }
                });
        bind(serverBootstrap,8888);
    }

    /**
     * 如果埠繫結失敗,埠數+1,重新繫結
     *
     * @param serverBootstrap
     * @param port
     */
    public void bind(final ServerBootstrap serverBootstrap,int port) {
        serverBootstrap.bind(port).addListener(future -> {
            if (future.isSuccess()) {
                log.info("埠[ {} ] 繫結成功",port);
            } else {
                log.error("埠[ {} ] 繫結失敗",port);
                bind(serverBootstrap,port + 1);
            }
        });
    }

    @PreDestroy
    public void destory() throws InterruptedException {
        boss.shutdownGracefully().sync();
        worker.shutdownGracefully().sync();
        log.info("關閉Netty");
    }
}
複製程式碼

下面是處理讀寫操作的Handler類:

@Component
@Slf4j
@ChannelHandler.Sharable
public class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> implements ApplicationContextAware {
    private ApplicationContext applicationContext;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx,RpcRequest msg) {
        RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setRequestId(msg.getRequestId());
        try {
            Object handler = handler(msg);
            log.info("獲取返回結果: {} ",handler);
            rpcResponse.setResult(handler);
        } catch (Throwable throwable) {
            rpcResponse.setError(throwable.toString());
            throwable.printStackTrace();
        }
        ctx.writeAndFlush(rpcResponse);
    }

    /**
     * 服務端使用代理處理請求
     *
     * @param request
     * @return
     */
    private Object handler(RpcRequest request) throws ClassNotFoundException,InvocationTargetException {
        //使用Class.forName進行載入Class檔案
        Class<?> clazz = Class.forName(request.getClassName());
        Object serviceBean = applicationContext.getBean(clazz);
        log.info("serviceBean: {}",serviceBean);
        Class<?> serviceClass = serviceBean.getClass();
        log.info("serverClass:{}",serviceClass);
        String methodName = request.getMethodName();

        Class<?>[] parameterTypes = request.getParameterTypes();
        Object[] parameters = request.getParameters();

        //使用CGLIB Reflect
        FastClass fastClass = FastClass.create(serviceClass);
        FastMethod fastMethod = fastClass.getMethod(methodName,parameterTypes);
        log.info("開始呼叫CGLIB動態代理執行服務端方法...");
        return fastMethod.invoke(serviceBean,parameters);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}
複製程式碼

6. 客戶端代理

客戶端使用Java動態代理,在代理類中實現通訊細節,眾所眾知,Java動態代理需要實現InvocationHandler介面

@Slf4j
public class RpcClientDynamicProxy<T> implements InvocationHandler {
    private Class<T> clazz;
    public RpcClientDynamicProxy(Class<T> clazz) throws Exception {
        this.clazz = clazz;
    }

    @Override
    public Object invoke(Object proxy,Method method,Object[] args) throws Throwable {
        RpcRequest request = new RpcRequest();
        String requestId = UUID.randomUUID().toString();

        String className = method.getDeclaringClass().getName();
        String methodName = method.getName();

        Class<?>[] parameterTypes = method.getParameterTypes();

        request.setRequestId(requestId);
        request.setClassName(className);
        request.setMethodName(methodName);
        request.setParameterTypes(parameterTypes);
        request.setParameters(args);
        log.info("請求內容: {}",request);
        
        //開啟Netty 客戶端,直連
        NettyClient nettyClient = new NettyClient("127.0.0.1",8888);
        log.info("開始連線服務端:{}",new Date());
        nettyClient.connect();
        RpcResponse send = nettyClient.send(request);
        log.info("請求呼叫返回結果:{}",send.getResult());
        return send.getResult();
    }
}

複製程式碼
  • 在invoke方法中封裝請求物件,構建NettyClient物件,並開啟客戶端,傳送請求訊息

代理工廠類如下:

public class ProxyFactory {
    public static <T> T create(Class<T> interfaceClass) throws Exception {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class<?>[] {interfaceClass},new RpcClientDynamicProxy<T>(interfaceClass));
    }
}
複製程式碼
  • 通過Proxy.newProxyInstance建立介面的代理類

7. RPC遠端呼叫測試

API:

public interface HelloService {
    String hello(String name);
}
複製程式碼
  • 準備一個測試API介面

客戶端:

@SpringBootApplication
@Slf4j
public class ClientApplication {
    public static void main(String[] args) throws Exception {
        SpringApplication.run(ClientApplication.class,args);
        HelloService helloService = ProxyFactory.create(HelloService.class);
        log.info("響應結果“: {}",helloService.hello("pjmike"));
    }
}
複製程式碼
  • 客戶端呼叫介面的方法

服務端:

//服務端實現
@Service
public class HelloServiceImpl implements HelloService {
    @Override
    public String hello(String name) {
        return "hello," + name;
    }
}
複製程式碼

執行結果:

rpc_5

小結

以上我們基於Netty實現了一個非非非常簡陋的RPC框架,比起成熟的RPC框架來說相差甚遠,甚至說基本的註冊中心都沒有實現,但是通過本次實踐,可以說我對於RPC的理解更深了,瞭解了一個RPC框架到底需要關注哪些方面,未來當我們使用成熟的RPC框架時,比如Dubbo,能夠做到心中有數,能明白其底層不過也是使用Netty作為基礎通訊框架。往後,如果更深入翻看開源RPC框架原始碼時,也相對比較容易

專案地址: github.com/pjmike/spri…

參考資料 & 鳴謝