從零開始實現簡單 RPC 框架 9:網路通訊之心跳與重連機制
一、心跳
什麼是心跳
在 TPC 中,客戶端和服務端建立連線之後,需要定期傳送資料包,來通知對方自己還線上,以確保 TPC 連線的有效性。如果一個連線長時間沒有心跳,需要及時斷開,否則服務端會維護很多無用連線,浪費服務端的資源。
IdleStateHandler
Netty 已經為我們提供了心跳的 Handler:IdleStateHandler
。當連線的空閒時間(讀或者寫)太長時,IdleStateHandler
將會觸發一個 IdleStateEvent
事件,傳遞的下一個 Handler。我們可以通過在 Pipeline Handler 中重寫 userEventTrigged
方法來處理該事件,注意我們自己的 Handler 需要在 IdleStateHandler
下面我們來看看 IdleStateHandler 的原始碼。
1. 建構函式
最完整的建構函式如下:
public IdleStateHandler(boolean observeOutput,
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
}
引數解析:
observeOutput
:是否考慮出站時較慢的情況。如果 true:當出站時間太長,超過空閒時間,那麼將不觸發此次事件。如果 false,超過空閒時間就會觸發事件。預設 false。readerIdleTime
:讀空閒的時間,0 表示禁用讀空閒事件。writerIdleTime
:寫空閒的時間,0 表示禁用寫空閒事件。allIdleTime
:讀或寫空閒的時間,0 表示禁用事件。unit
:前面三個時間的單位。
2. 事件處理
IdleStateHandler
繼承 ChannelDuplexHandler
,重寫了出站和入站的事件,我們來看看程式碼。
當 channel 新增、註冊、活躍的時候,會初始化 initialize(ctx)
,刪除、不活躍的時候銷燬 destroy()
,讀寫的時候設定 lastReadTime
和 lastWriteTime
欄位。
public class IdleStateHandler extends ChannelDuplexHandler { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isActive() && ctx.channel().isRegistered()) { initialize(ctx); } } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { destroy(); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isActive()) { initialize(ctx); } super.channelRegistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { initialize(ctx); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { destroy(); super.channelInactive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 判斷是否開啟 讀空閒 或者 讀寫空閒 監控 if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) { // 設定 reading 標誌位 reading = true; firstReaderIdleEvent = firstAllIdleEvent = true; } ctx.fireChannelRead(msg); } // 讀完成之後 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // 判斷是否開啟 讀空閒 或者 讀寫空閒 監控,檢查 reading 標誌位 if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) { // 設定 lastReadTime,後面判斷讀超時有用 lastReadTime = ticksInNanos(); reading = false; } ctx.fireChannelReadComplete(); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { // 判斷是否開啟 寫空閒 或者 讀寫空閒 監控 if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) { // writeListener 的方法在下面,主要是設定 lastWriteTime ctx.write(msg, promise.unvoid()).addListener(writeListener); } else { ctx.write(msg, promise); } } private final ChannelFutureListener writeListener = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { lastWriteTime = ticksInNanos(); firstWriterIdleEvent = firstAllIdleEvent = true; } }; }
3. 初始化
當 channel 新增、註冊、活躍的時候,會初始化 initialize(ctx)
,下面我們就來看看初始化的程式碼:
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
其實初始化很簡單,就是根據建構函式給的 讀寫空閒時間 去決定初始化哪些定時任務,分別是:ReaderIdleTimeoutTask
(讀空閒超時任務)、WriterIdleTimeoutTask
(寫空閒超時任務)、AllIdleTimeoutTask
(讀寫空閒超時任務)。
4. 定時任務
我們來看看 ReaderIdleTimeoutTask
,剩下兩個的原理跟 ReaderIdleTimeoutTask
差不多,感興趣的同學自行閱讀原始碼吧。
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
// 檢視是否超時
long nextDelay = readerIdleTimeNanos;
if (!reading) {
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
// 超時了,重新啟動一個新的定時器,然後觸發事件
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
// 構造事件
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
// 觸發事件
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// 沒有超時,設定新的定時器,不過這次的時間是更短的時間
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
從上面的程式碼可以看出:
① 如果讀空閒超時了,則重新起一個定時器,然後觸發事件
② 如果讀空閒未超時,則新起一個時間更短(readerIdleTimeNanos - ticksInNanos() - lastReadTime
)的定時器
5. 觸發事件
上面的觸發事件方法是:channelIdle
,經過重重程式碼撥開,其實最終就是呼叫到了下面的程式碼:
private void invokeUserEventTriggered(Object event) {
if (invokeHandler()) {
try {
// 觸發事件,說白了,就是直接呼叫 userEventTriggered 方法而已
((ChannelInboundHandler) handler()).userEventTriggered(this, event);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireUserEventTriggered(event);
}
}
其實觸發事件,就是把事件傳給下一個 Handler (next
),就是呼叫 userEventTriggered
方法而已。所以我們處理心跳的 Handler 一定要寫到 IdleStateHandler
。
ccx-rpc 心跳實現
1. 客戶端
IdleStateHandler
放到啟動類的 PipleLine
註冊上,業務處理器 NettyClientHandler
在其後面。
public class NettyClient {
// ... 忽略其他程式碼
private NettyClient() {
bootstrap = new Bootstrap()
// ... 省略其他程式碼
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// 設定 IdleStateHandler 心跳檢測每 5 秒進行一次寫檢測
// write()方法超過 5 秒沒呼叫,就呼叫 userEventTrigger
p.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
// 編碼器
p.addLast(new RpcMessageEncoder());
// 解碼器
p.addLast(new RpcMessageDecoder());
// 業務處理器
p.addLast(new NettyClientHandler());
}
});
}
}
接下來我們來看看 NettyClientHandler
是如何處理心跳事件的:
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcMessage> {
// ... 忽略其他程式碼
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
// 根據上面的配置,超過 5 秒沒有寫請求,會觸發 WRITER_IDLE 事件
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.WRITER_IDLE) {
log.info("write idle happen [{}]", ctx.channel().remoteAddress());
Channel channel = ctx.channel();
// 觸發寫空閒事件後,就應該發心跳了。
// 組裝訊息
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setSerializeType(SerializeType.PROTOSTUFF.getValue());
rpcMessage.setCompressTye(CompressType.DUMMY.getValue());
rpcMessage.setMessageType(MessageType.HEARTBEAT.getValue());
// 發心跳訊息
channel.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
2. 服務端
同樣,服務端的 IdleStateHandler
放到啟動類的 PipleLine
註冊上,業務處理器 NettyServerHandler
在其後面。
public class NettyServerBootstrap {
public void start() {
ServerBootstrap bootstrap = new ServerBootstrap()
// ... 忽略其他程式碼
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// 30 秒之內沒有收到客戶端請求的話就關閉連線
p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
// 編解碼器
p.addLast(new RpcMessageEncoder());
p.addLast(new RpcMessageDecoder());
// RPC 訊息處理器
p.addLast(serviceHandlerGroup, new NettyServerHandler());
}
});
// ... 忽略其他程式碼
}
}
服務端收到超過 30 秒沒有讀請求的事件後,呼叫 ctx.close
將連線關閉。
同時,如果收到了客戶端發來的心跳訊息,直接忽略即可。如果每個心跳都要去響應,會加重伺服器的負擔的。
NettyServerHandler
的程式碼如下
public class NettyServerHandler extends SimpleChannelInboundHandler<RpcMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcMessage requestMsg) {
// 不處理心跳訊息
if (requestMsg.getMessageType() != MessageType.REQUEST.getValue()) {
return;
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 處理空閒狀態的
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.READER_IDLE) {
log.info("idle check happen, so close the connection");
ctx.close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
二、重連機制
很多時候服務端和客戶端連線斷開,僅僅是因為網路問題或者處理程式慢,並不是程式掛了。那麼客戶端想再發起請求,就發不出去了。此時需要一個功能:當發現連線斷了之後,如果想往連線寫資料,就自動重新連線上,這個就是重連機制。
客戶端想請求服務端的介面,先從註冊中心中,獲得服務端的地址,然後跟服務端連線,然後寫資料。
簡單程式碼如下:
protected RpcResult doInvoke(RpcRequest request, URL selected) throws RpcException {
// ... 忽略其他程式碼
// 服務端地址
InetSocketAddress socketAddress = new InetSocketAddress(selected.getHost(), selected.getPort());
// 獲取連線(Channel)
Channel channel = nettyClient.getChannel(socketAddress);
// 構建訊息
RpcMessage rpcMessage = buildRpcMessage(request);
// 寫訊息(發請求)
channel.writeAndFlush(rpcMessage);
}
這個 nettyClient.getChannel(socketAddress)
是重連機制的祕密:
/**
* 獲取和指定地址連線的 channel,如果獲取不到,則連線
*
* @param address 指定要連線的地址
* @return channel
*/
public Channel getChannel(SocketAddress address) {
// 根據地址從快取中獲取 Channel
Channel channel = CHANNEL_MAP.get(address);
// 如果獲取不到,或者 channel 已經斷開,則重連,然後放到 CHANNEL_MAP 快取起來
if (channel == null || !channel.isActive()) {
// 連線
channel = connect(address);
CHANNEL_MAP.put(address, channel);
}
return channel;
}
程式碼一目瞭然,就是使用了 CHANNEL_MAP
作為快取,發現找不到或者已斷開,就重新連線,然後放到 CHANNEL_MAP
中,以便下次獲取。
總結
心跳是用於服務端和客戶端保持有效連線的一種手段,客戶端每隔一小段時間發一個心跳包,服務端收到之後不用響應,但是會記下客戶端最後一次讀的時間。伺服器起定時器,定時檢測客戶端上次讀請求的時間超過配置的值,超過就會觸發事件,斷開連線。
重連機制是連線斷開之後,要使用的時候自動重連的機制。
心跳和重連機制,結合起來讓服務端和客戶端的連線使用更加合理,該斷開的斷開節省服務端資源,該重連的重連提高可用性。
ccx-rpc 程式碼已經開源
Github:https://github.com/chenchuxin/ccx-rpc
Gitee:https://gitee.com/imccx/ccx-rpc