Netty中粘包和拆包的解決方案
粘包和拆包是TCP網路程式設計中不可避免的,無論是服務端還是客戶端,當我們讀取或者傳送訊息的時候,都需要考慮TCP底層的粘包/拆包機制。
TCP粘包和拆包
TCP是個“流”協議,所謂流,就是沒有界限的一串資料。TCP底層並不瞭解上層業務資料的具體含義,它會根據TCP緩衝區的實際情況進行包的劃分,所以在業務上認為,一個完整的包可能會被TCP拆分成多個包進行傳送,也有可能把多個小的包封裝成一個大的資料包傳送,這就是所謂的TCP粘包和拆包問題。
如圖所示,假設客戶端分別傳送了兩個資料包D1和D2給服務端,由於服務端一次讀取到的位元組數是不確定的,故可能存在以下4種情況。
- 服務端分兩次讀取到了兩個獨立的資料包,分別是D1和D2,沒有粘包和拆包;
- 服務端一次接收到了兩個資料包,D1和D2粘合在一起,被稱為TCP粘包;
- 服務端分兩次讀取到了兩個資料包,第一次讀取到了完整的D1包和D2包的部分內容,第二次讀取到了D2包的剩餘內容,這被稱為TCP拆包
- 服務端分兩次讀取到了兩個資料包,第一次讀取到了D1包的部分內容D1_1,第二次讀取到了D1包的剩餘內容D1_2和D2包的整包。
如果此時服務端TCP接收滑窗非常小,而資料包D1和D2比較大,很有可能會發生第五種可能,即服務端分多次才能將D1和D2包接收完全,期間發生多次拆包。
TCP粘包和拆包產生的原因
資料從傳送方到接收方需要經過作業系統的緩衝區,而造成粘包和拆包的主要原因就在這個緩衝區上。粘包可以理解為緩衝區資料堆積,導致多個請求資料粘在一起,而拆包可以理解為傳送的資料大於緩衝區,進行拆分處理。
詳細來說,造成粘包和拆包的原因主要有以下三個:
- 應用程式write寫入的位元組大小大於套介面傳送緩衝區大小
- 進行MSS大小的TCP分段
- 乙太網幀的payload大於MTU進行IP分片。
粘包和拆包的解決方法
由於底層的TCP無法理解上層的業務資料,所以在底層是無法保證資料包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案,可以歸納如下。
- 訊息長度固定,累計讀取到長度和為定長LEN的報文後,就認為讀取到了一個完整的資訊
- 將回車換行符作為訊息結束符
- 將特殊的分隔符作為訊息的結束標誌,回車換行符就是一種特殊的結束分隔符
- 通過在訊息頭中定義長度欄位來標識訊息的總長度
Netty中的粘包和拆包解決方案
針對上一小節描述的粘包和拆包的解決方案,對於拆包問題比較簡單,使用者可以自己定義自己的編碼器進行處理,Netty並沒有提供相應的元件。對於粘包的問題,由於拆包比較複雜,程式碼比較處理比較繁瑣,Netty提供了4種解碼器來解決,分別如下:
- 固定長度的拆包器 FixedLengthFrameDecoder,每個應用層資料包的都拆分成都是固定長度的大小
- 行拆包器 LineBasedFrameDecoder,每個應用層資料包,都以換行符作為分隔符,進行分割拆分
- 分隔符拆包器 DelimiterBasedFrameDecoder,每個應用層資料包,都通過自定義的分隔符,進行分割拆分
- 基於資料包長度的拆包器 LengthFieldBasedFrameDecoder,將應用層資料包的長度,作為接收端應用層資料包的拆分依據。按照應用層資料包的大小,拆包。這個拆包器,有一個要求,就是應用層協議中包含資料包的長度
以上解碼器在使用時只需要新增到Netty的責任鏈中即可,大多數情況下這4種解碼器都可以滿足了,當然除了以上4種解碼器,使用者也可以自定義自己的解碼器進行處理。具體可以參考以下程式碼示例:
// Server主程式
public class XNettyServer {
public static void main(String[] args) throws Exception {
// accept 處理連線的執行緒池
NioEventLoopGroup acceptGroup = new NioEventLoopGroup();
// read io 處理資料的執行緒池
NioEventLoopGroup readGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(acceptGroup, readGroup)
.channel(NioServerSocketChannel.class)
.childHandler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 增加解碼器
pipeline.addLast(new XDecoder());
// 打印出內容 handdler
pipeline.addLast(new XHandler());
}
});
System.out.println("啟動成功,埠 7777");
serverBootstrap.bind(7777).sync().channel().closeFuture().sync();
} finally {
acceptGroup.shutdownGracefully();
readGroup.shutdownGracefully();
}
}
}
// 解碼器
public class XDecoder extends ByteToMessageDecoder {
static final int PACKET_SIZE = 220;
// 用來臨時保留沒有處理過的請求報文
ByteBuf tempMsg = Unpooled.buffer();
/**
* @param ctx
* @param in 請求的資料
* @param out 將粘在一起的報文拆分後的結果保留起來
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println(Thread.currentThread() + "收到了一次資料包,長度是:" + in.readableBytes());
// 合併報文
ByteBuf message = null;
int tmpMsgSize = tempMsg.readableBytes();
// 如果暫存有上一次餘下的請求報文,則合併
if (tmpMsgSize > 0) {
message = Unpooled.buffer();
message.writeBytes(tempMsg);
message.writeBytes(in);
System.out.println("合併:上一資料包餘下的長度為:" + tmpMsgSize + ",合併後長度為:" + message.readableBytes());
} else {
message = in;
}
int size = message.readableBytes();
int counter = size / PACKET_SIZE;
for (int i = 0; i < counter; i++) {
byte[] request = new byte[PACKET_SIZE];
// 每次從總的訊息中讀取220個位元組的資料
message.readBytes(request);
// 將拆分後的結果放入out列表中,交由後面的業務邏輯去處理
out.add(Unpooled.copiedBuffer(request));
}
// 多餘的報文存起來
// 第一個報文: i+ 暫存
// 第二個報文: 1 與第一次
size = message.readableBytes();
if (size != 0) {
System.out.println("多餘的資料長度:" + size);
// 剩下來的資料放到tempMsg暫存
tempMsg.clear();
tempMsg.writeBytes(message.readBytes(size));
}
}
}
// 處理器
public class XHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
byte[] content = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(content);
System.out.println(Thread.currentThread() + ": 最終列印" + new String(content));
((ByteBuf) msg).release();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}