1. 程式人生 > 其它 >【Netty】入門Netty官方例子解析(三)處理一個基於流的傳輸 TCP粘包和拆包問題分析和解決

【Netty】入門Netty官方例子解析(三)處理一個基於流的傳輸 TCP粘包和拆包問題分析和解決

技術標籤:nettynettyTCP粘包和拆包問題分析和解決TCP粘包拆包

關於 Socket Buffer的一個小警告
基於流的傳輸比如 TCP/IP, 接收到資料是存在 socket 接收的 buffer 中。不幸的是,基於流的傳輸並不是一個數據包佇列,而是一個位元組佇列。意味著,即使你傳送了2個獨立的資料包,作業系統也不會作為2個訊息處理而僅僅是作為一連串的位元組而言。因此這是不能保證你遠端寫入的資料就會準確地讀取。舉個例子,讓我們假設作業系統的 TCP/TP 協議棧已經接收了3個數據包:
在這裡插入圖片描述
由於基於流傳輸的協議的這種普通的性質,在你的應用程式裡讀取資料的時候會有很高的可能性被分成下面的片段(這種現象稱為:TCP粘包)

在這裡插入圖片描述
因此,一個接收方不管他是客戶端還是服務端,都應該把接收到的資料整理成一個或者多個更有意思並且能夠讓程式的業務邏輯更好理解的資料。在上面的例子中,接收到的資料應該被構造成下面的格式:
在這裡插入圖片描述

The First Solution 辦法一

回到 TIME 客戶端例子。同樣也有類似的問題。一個32位整型是非常小的資料,他並不見得會被經常拆分到到不同的資料段內。然而,問題是他確實可能會被拆分到不同的資料段內,並且拆分的可能性會隨著通訊量的增加而增加。

最簡單的方案是構造一個內部的可積累的緩衝,直到4個位元組全部接收到了內部緩衝。下面的程式碼修改了 TimeClientHandler 的實現類修復了這個問題

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    // 自定義快取區,只要channel不斷,就一直存在
    private ByteBuf buf;

    @Override
    public void handlerAdded(
ChannelHandlerContext ctx) { buf = ctx.alloc().buffer(4); // (1) } @Override public void handlerRemoved(ChannelHandlerContext ctx) { buf.release(); // (1) buf = null; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("TimeClientHandler 收到了資料"); ByteBuf m = (ByteBuf) msg; buf.writeBytes(m); // (2) m.release(); // 沒達到4個長度,說明沒收到完整的一個時間戳,啥事也不幹 if (buf.readableBytes() >= 4) { // (3) long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); // 關閉通道,同時銷燬buf ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }

1.ChannelHandler 有2個生命週期的監聽方法:handlerAdded()handlerRemoved()。你可以完成任意初始化任務只要他不會被阻塞很長的時間。

2.首先,所有接收的資料都應該被累積在 buf 變數裡。

3.然後,處理器必須檢查 buf 變數是否有足夠的資料,在這個例子中是4個位元組,然後處理實際的業務邏輯。否則,Netty 會重複呼叫channelRead() 當有更多資料到達直到4個位元組的資料被積累。

我們可以來模擬下TimeServer每次傳送部分碼流的場景(是我自己加的):
TimeClient :

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public static void main(String[] args) throws Exception {

        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });

            ChannelFuture f = b.connect("localhost", 8088).sync();

            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

TimeServer :

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class TimeServer {
    private int port;

    public TimeServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeServerHandler());
                        };
                    }).option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        try {
            new TimeServer(8088).run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

TimeServerHandler :

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        Integer intValue = (int) (System.currentTimeMillis() / 1000L + 2208988800L);
        byte[] bytes = int2Bytes(intValue);
        System.out.println("總共需要傳送" + bytes.length + "次");
        for (int i = 1; i <= bytes.length; i++) {
            System.out.println("傳送第" + i + "次");
            final ByteBuf time = ctx.alloc().buffer(1); 
            time.writeBytes(bytes, i - 1, 1);
            ctx.writeAndFlush(time); 

            try {
                // 通過sleep模擬傳送不完整資料
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

    }

    /**
     * 把int值轉為bytes陣列
     * 
     * @param num
     * @return
     */
    public static byte[] int2Bytes(int num) {
        byte[] bytes = new byte[4];
        // 通過移位運算,擷取低8位的方式,將int儲存到byte陣列
        bytes[0] = (byte) (num >>> 24);
        bytes[1] = (byte) (num >>> 16);
        bytes[2] = (byte) (num >>> 8);
        bytes[3] = (byte) num;
        return bytes;
    }
}

上述的TimeServerHandler 稍微有點瑕疵,沒有關閉ctx通道,原因是cts傳送訊息是非同步的,需要用到ChannelFuture ,這裡傳送了4次,每次都有ChannelFuture ,理論上要4個都完成後才能關閉。

final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)

我們看下執行結果,TIME Server:

總共需要傳送4次
傳送第1次
傳送第2次
傳送第3次
傳送第4次

一個完整的時間被拆分為4次傳送,每次間隔3s

TIME Client:
大概等待十來秒後收到了完整的資料。

TimeClientHandler 收到了資料
TimeClientHandler 收到了資料
TimeClientHandler 收到了資料
TimeClientHandler 收到了資料
Sun May 18 03:31:17 CST 2008

The Second Solution 方法二

儘管第一個解決方案已經解決了 TIME 客戶端的問題了,但是修改後的處理器看起來不那麼的簡潔,想象一下如果由多個欄位比如可變長度的欄位組成的更為複雜的協議時,你的 ChannelInboundHandler 的實現將很快地變得難以維護,方法一是固定長度的

正如你所知的,你可以增加多個 ChannelHandler 到ChannelPipeline ,因此你可以把一整個ChannelHandler 拆分成多個模組以減少應用的複雜程度,比如你可以把TimeClientHandler 拆分成2個處理器:

  • TimeDecoder 處理資料拆分的問題
  • TimeClientHandler 原始版本的實現
    幸運地是,Netty 提供了一個可擴充套件的類,幫你完成 TimeDecoder 的開發。
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class TimeDecoder extends ByteToMessageDecoder { // (1)

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("TimeDecoder 收到了資料");
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        out.add(in.readBytes(4)); // (4)

    }
}

1.ByteToMessageDecoder 是 ChannelInboundHandler 的一個實現類,他可以在處理資料拆分的問題上變得很簡單。

2.每當有新資料接收的時候,ByteToMessageDecoder 都會呼叫 decode() 方法來處理內部的那個累積緩衝。

3.Decode() 方法可以決定當累積緩衝裡沒有足夠資料時可以往 out 物件裡放任意資料。當有更多的資料被接收了 ByteToMessageDecoder 會再一次呼叫 decode() 方法。

4.如果在 decode() 方法裡增加了一個物件到 out 物件裡,這意味著解碼器解碼訊息成功。ByteToMessageDecoder 將會丟棄在累積緩衝裡已經被讀過的資料。請記得你不需要對多條訊息呼叫 decode(),ByteToMessageDecoder 會持續呼叫 decode() 直到不放任何資料到 out 裡。

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient2 {
    public static void main(String[] args) throws Exception {

        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
                }
            });

            ChannelFuture f = b.connect("localhost", 8088).sync(); // (5)

            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

通過addLast(new TimeDecoder(), new TimeClientHandler())添加了2個hander到pipline中。
我們看下執行結果:

TimeDecoder 收到了資料
TimeDecoder 收到了資料
TimeDecoder 收到了資料
TimeDecoder 收到了資料
TimeClientHandler 收到了資料
Thu May 29 02:46:29 CST 2008

從上面的結果中,我們可以看到,當訊息長度小於4時,訊息不會發送至TimeClientHandler,也就是說TimeDecoder 起到了一個緩衝作用。