基於Netty實現簡易RPC框架
原文地址:pjmike的部落格
前言
現在網上有很多關於使用Netty來構建RPC框架的例子,為什麼我這裡還要寫一篇文章進行論述呢,我很清楚我可能沒有寫得他們那麼好。之所以還要寫,有兩點原因:
- 一是因為學過Netty之後,還需要去不斷實踐才能更好的把握Netty的用法,顯然,基於Netty實現RPC框架是一個很好的做法;
- 二是因為目前市面上有很多RPC框架,比如Dubbo,這些框架通訊底層都是Netty,所以說通過這個例子,也可以更好的去體驗RPC的設計。
下面我將從以下幾點闡述如何基於Netty實現簡易的RPC框架:
- RPC是什麼?
- 實現RPC框架需要關注哪些方面 ?
- 使用Netty如何實現?
RPC是什麼?
RPC,遠端過程呼叫,可以做到像本地呼叫一樣呼叫遠端服務,是一種程式間的通訊方式,概念想必大家都很清楚,在看到58沈劍寫的RPC文章 之後,意識到其實我們可以換一種思考方式去理解RPC,也就是從本地調用出發,進而去推導RPC呼叫
1. 本地函式呼叫
本地函式是我們經常碰到的,比如下面示例:
public String sayHello(String name) {
return "hello," + name;
}
複製程式碼
我們只需要傳入一個引數,呼叫sayHello方法就可以得到一個輸出,也就是輸入引數——>方法體——>輸出,入參、出參以及方法體都在同一個程式空間中,這就是本地函式呼叫
2. Socket通訊
那有沒有辦法實現不同程式之間通訊呢?呼叫方在程式A,需要呼叫方法A,但是方法A在程式B中
最容易想到的方式就是使用Socket通訊,使用Socket可以完成跨程式呼叫,我們需要約定一個程式通訊協議,來進行傳參,呼叫函式,出參。寫過Socket應該都知道,Socket是比較原始的方式,我們需要更多的去關注一些細節問題,比如引數和函式需要轉換成位元組流進行網路傳輸,也就是序列化操作,然後出參時需要反序列化;使用socket進行底層通訊,程式碼程式設計也比較容易出錯。
如果一個呼叫方需要關注這麼多問題,那無疑是個災難,所以有沒有什麼簡單方法,讓我們的呼叫方不需要關注細節問題,讓呼叫方像呼叫本地函式一樣,只要傳入引數,呼叫方法,然後坐等返回結果就可以了呢?
3. RPC框架
RPC框架就是用來解決上面的問題的,它能夠讓呼叫方像呼叫本地函式一樣呼叫遠端服務,底層通訊細節對呼叫方是透明的,將各種複雜性都給遮蔽掉,給予呼叫方極致體驗。
RPC呼叫需要關注哪些方面
前面就已經說到RPC框架,讓呼叫方像呼叫本地函式一樣呼叫遠端服務,那麼如何做到這一點呢?
在使用的時候,呼叫方是直接呼叫本地函式,傳入相應引數,其他細節它不用管,至於通訊細節交給RPC框架來實現。實際上RPC框架採用代理類的方式,具體來說是動態代理的方式,在執行時動態建立新的類,也就是代理類,在該類中實現通訊的細節問題,比如引數序列化。
當然不光是序列化,我們還需要約定一個雙方通訊的協議格式,規定好協議格式,比如請求引數的資料型別,請求的引數,請求的方法名等,這樣根據格式進行序列化後進行網路傳輸,然後服務端收到請求物件後按照指定格式進行解碼,這樣服務端才知道具體該呼叫哪個方法,傳入什麼樣的引數。
剛才又提到網路傳輸,RPC框架重要的一環也就是網路傳輸,服務是部署在不同主機上的,如何高效的進行網路傳輸,儘量不丟包,保證資料完整無誤的快速傳遞出去?實際上,就是利用我們今天的主角——Netty,Netty是一個高效能的網路通訊框架,它足以勝任我們的任務。
前面說了這麼多,再次總結下一個RPC框架需要重點關注哪幾個點:
- 代理 (動態代理)
- 通訊協議
- 序列化
- 網路傳輸
當然一個優秀的RPC框架需要關注的不止上面幾點,只不過本篇文章旨在做一個簡易的RPC框架,理解了上面關鍵的幾點就夠了
基於Netty實現RPC框架
終於到了本文的重頭戲了,我們將根據實現RPC需要關注的幾個要點(代理、序列化、協議、編解碼),使用Netty進行逐一實現
1. Protocol(協議)
首先我們需要確定通訊雙方的協議格式,請求物件和響應物件
請求物件:
@Data
@ToString
public class RpcRequest {
/**
* 請求物件的ID
*/
private String requestId;
/**
* 類名
*/
private String className;
/**
* 方法名
*/
private String methodName;
/**
* 引數型別
*/
private Class<?>[] parameterTypes;
/**
* 入參
*/
private Object[] parameters;
}
複製程式碼
- 請求物件的ID是客戶端用來驗證伺服器請求和響應是否匹配
響應物件:
@Data
public class RpcResponse {
/**
* 響應ID
*/
private String requestId;
/**
* 錯誤資訊
*/
private String error;
/**
* 返回的結果
*/
private Object result;
}
複製程式碼
2. 序列化
市面上序列化協議很多,比如jdk自帶的,Google的protobuf,kyro、Hessian等,只要不選擇jdk自帶的序列化方法,(因為其效能太差,序列化後產生的碼流太大),其他方式其實都可以,這裡為了方便起見,選用JSON作為序列化協議,使用fastjson作為JSON框架
為了後續擴充套件方便,先定義序列化介面:
public interface Serializer {
/**
* java物件轉換為二進位制
*
* @param object
* @return
*/
byte[] serialize(Object object) throws IOException;
/**
* 二進位制轉換成java物件
*
* @param clazz
* @param bytes
* @param <T>
* @return
*/
<T> T deserialize(Class<T> clazz,byte[] bytes) throws IOException;
}
複製程式碼
因為我們採用JSON的方式,所以定義JSONSerializer的實現類:
public class JSONSerializer implements Serializer{
@Override
public byte[] serialize(Object object) {
return JSON.toJSONBytes(object);
}
@Override
public <T> T deserialize(Class<T> clazz,byte[] bytes) {
return JSON.parseObject(bytes,clazz);
}
}
複製程式碼
如果後續要使用其他序列化方式,可以自行實現序列化介面
3. 編解碼器
約定好協議格式和序列化方式之後,我們還需要編解碼器,編碼器將請求物件轉換為適合於傳輸的格式(一般來說是位元組流),而對應的解碼器是將網路位元組流轉換回應用程式的訊息格式。
編碼器實現:
public class RpcEncoder extends MessageToByteEncoder {
private Class<?> clazz;
private Serializer serializer;
public RpcEncoder(Class<?> clazz,Serializer serializer) {
this.clazz = clazz;
this.serializer = serializer;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext,Object msg,ByteBuf byteBuf) throws Exception {
if (clazz != null && clazz.isInstance(msg)) {
byte[] bytes = serializer.serialize(msg);
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);
}
}
}
複製程式碼
解碼器實現:
public class RpcDecoder extends ByteToMessageDecoder {
private Class<?> clazz;
private Serializer serializer;
public RpcDecoder(Class<?> clazz,Serializer serializer) {
this.clazz = clazz;
this.serializer = serializer;
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext,ByteBuf byteBuf,List<Object> list) throws Exception {
//因為之前編碼的時候寫入一個Int型,4個位元組來表示長度
if (byteBuf.readableBytes() < 4) {
return;
}
//標記當前讀的位置
byteBuf.markReaderIndex();
int dataLength = byteBuf.readInt();
if (byteBuf.readableBytes() < dataLength) {
byteBuf.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
//將byteBuf中的資料讀入data位元組陣列
byteBuf.readBytes(data);
Object obj = serializer.deserialize(clazz,data);
list.add(obj);
}
}
複製程式碼
4. Netty 客戶端
下面來看看Netty客戶端是如何實現的,也就是如何使用Netty開啟客戶端。
實際上,熟悉Netty的朋友應該都知道,我們需要注意以下幾點:
- 編寫啟動方法,指定傳輸使用Channel
- 指定ChannelHandler,對網路傳輸中的資料進行讀寫處理
- 新增編解碼器
- 新增失敗重試機制
- 新增傳送請求訊息的方法
下面來看具體的實現程式碼:
@Slf4j
public class NettyClient {
private EventLoopGroup eventLoopGroup;
private Channel channel;
private ClientHandler clientHandler;
private String host;
private Integer port;
private static final int MAX_RETRY = 5;
public NettyClient(String host,Integer port) {
this.host = host;
this.port = port;
}
public void connect() {
clientHandler = new ClientHandler();
eventLoopGroup = new NioEventLoopGroup();
//啟動類
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
//指定傳輸使用的Channel
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE,true)
.option(ChannelOption.TCP_NODELAY,true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//新增編碼器
pipeline.addLast(new RpcEncoder(RpcRequest.class,new JSONSerializer()));
//新增解碼器
pipeline.addLast(new RpcDecoder(RpcResponse.class,new JSONSerializer()));
//請求處理類
pipeline.addLast(clientHandler);
}
});
connect(bootstrap,host,port,MAX_RETRY);
}
/**
* 失敗重連機制,參考Netty入門實戰掘金小冊
*
* @param bootstrap
* @param host
* @param port
* @param retry
*/
private void connect(Bootstrap bootstrap,String host,int port,int retry) {
ChannelFuture channelFuture = bootstrap.connect(host,port).addListener(future -> {
if (future.isSuccess()) {
log.info("連線服務端成功");
} else if (retry == 0) {
log.error("重試次數已用完,放棄連線");
} else {
//第幾次重連:
int order = (MAX_RETRY - retry) + 1;
//本次重連的間隔
int delay = 1 << order;
log.error("{} : 連線失敗,第 {} 重連....",new Date(),order);
bootstrap.config().group().schedule(() -> connect(bootstrap,retry - 1),delay,TimeUnit.SECONDS);
}
});
channel = channelFuture.channel();
}
/**
* 傳送訊息
*
* @param request
* @return
*/
public RpcResponse send(final RpcRequest request) {
try {
channel.writeAndFlush(request).await();
} catch (InterruptedException e) {
e.printStackTrace();
}
return clientHandler.getRpcResponse(request.getRequestId());
}
@PreDestroy
public void close() {
eventLoopGroup.shutdownGracefully();
channel.closeFuture().syncUninterruptibly();
}
}
複製程式碼
我們對於資料的處理重點在於ClientHandler類上,它繼承了ChannelDuplexHandler類,可以對出站和入站的資料進行處理
public class ClientHandler extends ChannelDuplexHandler {
/**
* 使用Map維護請求物件ID與響應結果Future的對映關係
*/
private final Map<String,DefaultFuture> futureMap = new ConcurrentHashMap<>();
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
if (msg instanceof RpcResponse) {
//獲取響應物件
RpcResponse response = (RpcResponse) msg;
DefaultFuture defaultFuture =
futureMap.get(response.getRequestId());
//將結果寫入DefaultFuture
defaultFuture.setResponse(response);
}
super.channelRead(ctx,msg);
}
@Override
public void write(ChannelHandlerContext ctx,ChannelPromise promise) throws Exception {
if (msg instanceof RpcRequest) {
RpcRequest request = (RpcRequest) msg;
//傳送請求物件之前,先把請求ID儲存下來,並構建一個與響應Future的對映關係
futureMap.putIfAbsent(request.getRequestId(),new DefaultFuture());
}
super.write(ctx,msg,promise);
}
/**
* 獲取響應結果
*
* @param requsetId
* @return
*/
public RpcResponse getRpcResponse(String requsetId) {
try {
DefaultFuture future = futureMap.get(requsetId);
return future.getRpcResponse(10);
} finally {
//獲取成功以後,從map中移除
futureMap.remove(requsetId);
}
}
}
複製程式碼
參考文章: xilidou.com/2018/09/26/…
從上面實現可以看出,我們定義了一個Map,維護請求ID與響應結果的對映關係,目的是為了客戶端用來驗證服務端響應是否與請求相匹配,因為Netty的channel可能被多個執行緒使用,當結果返回時,你不知道是從哪個執行緒返回的,所以需要一個對映關係。
而我們的結果是封裝在DefaultFuture中的,因為Netty是非同步框架,所有的返回都是基於Future和Callback機制的,我們這裡自定義Future來實現客戶端"非同步呼叫"
public class DefaultFuture {
private RpcResponse rpcResponse;
private volatile boolean isSucceed = false;
private final Object object = new Object();
public RpcResponse getRpcResponse(int timeout) {
synchronized (object) {
while (!isSucceed) {
try {
object.wait(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return rpcResponse;
}
}
public void setResponse(RpcResponse response) {
if (isSucceed) {
return;
}
synchronized (object) {
this.rpcResponse = response;
this.isSucceed = true;
object.notify();
}
}
}
複製程式碼
- 實際上用了wait和notify機制,同時使用一個boolean變數做輔助
5. Netty服務端
Netty服務端的實現跟客戶端的實現差不多,只不過要注意的是,當對請求進行解碼過後,需要通過代理的方式呼叫本地函式。下面是Server端程式碼:
public class NettyServer implements InitializingBean {
private EventLoopGroup boss = null;
private EventLoopGroup worker = null;
@Autowired
private ServerHandler serverHandler;
@Override
public void afterPropertiesSet() throws Exception {
//此處使用了zookeeper做註冊中心,本文不涉及,可忽略
ServiceRegistry registry = new ZkServiceRegistry("127.0.0.1:2181");
start(registry);
}
public void start(ServiceRegistry registry) throws Exception {
//負責處理客戶端連線的執行緒池
boss = new NioEventLoopGroup();
//負責處理讀寫操作的執行緒池
worker = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss,worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//新增解碼器
pipeline.addLast(new RpcEncoder(RpcResponse.class,new JSONSerializer()));
//新增編碼器
pipeline.addLast(new RpcDecoder(RpcRequest.class,new JSONSerializer()));
//新增請求處理器
pipeline.addLast(serverHandler);
}
});
bind(serverBootstrap,8888);
}
/**
* 如果埠繫結失敗,埠數+1,重新繫結
*
* @param serverBootstrap
* @param port
*/
public void bind(final ServerBootstrap serverBootstrap,int port) {
serverBootstrap.bind(port).addListener(future -> {
if (future.isSuccess()) {
log.info("埠[ {} ] 繫結成功",port);
} else {
log.error("埠[ {} ] 繫結失敗",port);
bind(serverBootstrap,port + 1);
}
});
}
@PreDestroy
public void destory() throws InterruptedException {
boss.shutdownGracefully().sync();
worker.shutdownGracefully().sync();
log.info("關閉Netty");
}
}
複製程式碼
下面是處理讀寫操作的Handler類:
@Component
@Slf4j
@ChannelHandler.Sharable
public class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Override
protected void channelRead0(ChannelHandlerContext ctx,RpcRequest msg) {
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setRequestId(msg.getRequestId());
try {
Object handler = handler(msg);
log.info("獲取返回結果: {} ",handler);
rpcResponse.setResult(handler);
} catch (Throwable throwable) {
rpcResponse.setError(throwable.toString());
throwable.printStackTrace();
}
ctx.writeAndFlush(rpcResponse);
}
/**
* 服務端使用代理處理請求
*
* @param request
* @return
*/
private Object handler(RpcRequest request) throws ClassNotFoundException,InvocationTargetException {
//使用Class.forName進行載入Class檔案
Class<?> clazz = Class.forName(request.getClassName());
Object serviceBean = applicationContext.getBean(clazz);
log.info("serviceBean: {}",serviceBean);
Class<?> serviceClass = serviceBean.getClass();
log.info("serverClass:{}",serviceClass);
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
//使用CGLIB Reflect
FastClass fastClass = FastClass.create(serviceClass);
FastMethod fastMethod = fastClass.getMethod(methodName,parameterTypes);
log.info("開始呼叫CGLIB動態代理執行服務端方法...");
return fastMethod.invoke(serviceBean,parameters);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
複製程式碼
6. 客戶端代理
客戶端使用Java動態代理,在代理類中實現通訊細節,眾所眾知,Java動態代理需要實現InvocationHandler介面
@Slf4j
public class RpcClientDynamicProxy<T> implements InvocationHandler {
private Class<T> clazz;
public RpcClientDynamicProxy(Class<T> clazz) throws Exception {
this.clazz = clazz;
}
@Override
public Object invoke(Object proxy,Method method,Object[] args) throws Throwable {
RpcRequest request = new RpcRequest();
String requestId = UUID.randomUUID().toString();
String className = method.getDeclaringClass().getName();
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
request.setRequestId(requestId);
request.setClassName(className);
request.setMethodName(methodName);
request.setParameterTypes(parameterTypes);
request.setParameters(args);
log.info("請求內容: {}",request);
//開啟Netty 客戶端,直連
NettyClient nettyClient = new NettyClient("127.0.0.1",8888);
log.info("開始連線服務端:{}",new Date());
nettyClient.connect();
RpcResponse send = nettyClient.send(request);
log.info("請求呼叫返回結果:{}",send.getResult());
return send.getResult();
}
}
複製程式碼
- 在invoke方法中封裝請求物件,構建NettyClient物件,並開啟客戶端,傳送請求訊息
代理工廠類如下:
public class ProxyFactory {
public static <T> T create(Class<T> interfaceClass) throws Exception {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class<?>[] {interfaceClass},new RpcClientDynamicProxy<T>(interfaceClass));
}
}
複製程式碼
- 通過Proxy.newProxyInstance建立介面的代理類
7. RPC遠端呼叫測試
API:
public interface HelloService {
String hello(String name);
}
複製程式碼
- 準備一個測試API介面
客戶端:
@SpringBootApplication
@Slf4j
public class ClientApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(ClientApplication.class,args);
HelloService helloService = ProxyFactory.create(HelloService.class);
log.info("響應結果“: {}",helloService.hello("pjmike"));
}
}
複製程式碼
- 客戶端呼叫介面的方法
服務端:
//服務端實現
@Service
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String name) {
return "hello," + name;
}
}
複製程式碼
執行結果:
小結
以上我們基於Netty實現了一個非非非常簡陋的RPC框架,比起成熟的RPC框架來說相差甚遠,甚至說基本的註冊中心都沒有實現,但是通過本次實踐,可以說我對於RPC的理解更深了,瞭解了一個RPC框架到底需要關注哪些方面,未來當我們使用成熟的RPC框架時,比如Dubbo,能夠做到心中有數,能明白其底層不過也是使用Netty作為基礎通訊框架。往後,如果更深入翻看開源RPC框架原始碼時,也相對比較容易
專案地址: github.com/pjmike/spri…