1. 程式人生 > 實用技巧 >Netty學習筆記03-Netty核心模組元件與Google Protobuf

Netty學習筆記03-Netty核心模組元件與Google Protobuf

Netty 核心模組元件

Bootstrap、ServerBootstrap

  • Bootstrap 意思是引導,一個 Netty 應用通常由一個 Bootstrap 開始,主要作用是配置整個 Netty 程式,串聯各個元件,Netty 中 Bootstrap 類客戶端程式的啟動引導類,ServerBootstrap服務端啟動引導類

  • 常見的方法有

    • public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup):該方法用於伺服器端

      用來設定兩個 EventLoop

    • public B group(EventLoopGroup group)

      :該方法用於客戶端,用來設定一個 EventLoop

    • public B channel(Class<? extends C> channelClass):該方法用來設定一個伺服器端的通道實現

    • public <T> B option(ChannelOption<T> option, T value):用來給 ServerChannel 新增配置

    • public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value):用來給接收到的通道新增配置

    • public ServerBootstrap childHandler(ChannelHandler childHandler):該方法用來設定業務處理類(自定義handler)

    • public ChannelFuture bind(int inetPort) :該方法用於伺服器端,用來設定佔用的埠號

    • public ChannelFuture connect(String inetHost, int inetPort) :該方法用於客戶端,用來連線伺服器端

Future、ChannelFuture

  • Netty 中所有的 IO 操作都是非同步的,不能立刻得知訊息是否被正確處理。但是可以過一會等它執行完成或者直接註冊一個監聽,具體的實現就是通過 Future 和 ChannelFutures
    ,他們可以註冊一個監聽,當操作執行成功或失敗時監聽會自動觸發註冊的監聽事件。
  • 常見的方法有
    • Channel channel():返回當前正在進行 IO 操作的通道
    • ChannelFuture sync():等待非同步操作執行完畢

Channel

  1. Netty 網路通訊的元件,能夠用於執行網路 I/O 操作。
  2. 通過 Channel 可獲得當前網路連線的通道的狀態
  3. 通過 Channel 可獲得 網路連線的配置引數 (例如接收緩衝區大小)
  4. Channel 提供非同步的網路 I/O 操作(如建立連線,讀寫,繫結埠),非同步呼叫意味著任何 I/O 呼叫都將立即返回,並且不保證在呼叫結束時所請求的 I/O 操作已完成
  5. 呼叫立即返回一個 ChannelFuture 例項,通過註冊監聽器到 ChannelFuture 上,可以 I/O 操作成功、失敗或取消時回撥通知呼叫方
  6. 支援關聯 I/O 操作與對應的處理程式
  7. 不同協議、不同的阻塞型別的連線都有不同的 Channel 型別與之對應,常用的 Channel 型別:
    1. NioSocketChannel:非同步的客戶端 TCP Socket 連線。
    2. NioServerSocketChannel:非同步的伺服器端 TCP Socket 連線。
    3. NioDatagramChannel:非同步的 UDP 連線
    4. NioSctpChannel:非同步的客戶端 Sctp 連線。
    5. NioSctpServerChannel:非同步的 Sctp 伺服器端連線,這些通道涵蓋了 UDP 和 TCP 網路 IO 以及檔案 IO。

Selector

  1. Netty 基於 Selector 物件實現 I/O 多路複用,通過 Selector 一個執行緒可以監聽多個連線的 Channel 事件。
  2. 當向一個 Selector 中註冊 Channel 後,Selector 內部的機制就可以自動不斷地查詢(Select) 這些註冊的Channel 是否有已就緒的 I/O 事件(例如可讀,可寫,網路連線完成等),這樣程式就可以很簡單地使用一個執行緒高效地管理多個 Channel。

ChannelHandler 及其實現類

  • ChannelHandler 是一個介面,處理 I/O 事件或攔截 I/O 操作,並將其轉發到其 ChannelPipeline(業務處理鏈) 中的下一個處理程式。

  • ChannelHandler 本身並沒有提供很多方法,因為這個介面有許多的方法需要實現,方便使用期間,可以繼承它的子類

  • ChannelHandler 及其實現類一覽圖(後)

    • ChannelInboundHandler 用於處理入站 I/O 事件。
    • ChannelOutboundHandler 用於處理出站 I/O 操作。
    • //介面卡
    • ChannelInboundHandlerAdapter 用於處理入站 I/O 事件。
    • ChannelOutboundHandlerAdapter 用於處理出站 I/O 操作。
    • ChannelDuplexHandler 用於處理入站和出站事件。
  • 我們經常需要自定義一個 Handler 類去繼承 ChannelInboundHandlerAdapter,然後通過重寫相應方法實現業務邏輯,我們接下來看看一般都需要重寫哪些方法

Pipeline 和 ChannelPipeline

ChannelPipeline 是一個重點:

  1. ChannelPipeline 是一個 Handler 的集合,它負責處理和攔截 inbound 或者 outbound 的事件和操作,相當於一個貫穿 Netty 的鏈。(也可以這樣理解:ChannelPipeline 是 儲存 ChannelHandler 的 List,用於處理或攔截Channel 的入站事件和出站操作)

  2. ChannelPipeline 實現了一種高階形式的攔截過濾器模式,使使用者可以完全控制事件的處理方式,以及 Channel中各個ChannelHandler 如何相互互動

  3. 在 Netty 中每個 Channel 都有且僅有一個 ChannelPipeline 與之對應,它們的組成關係如下

    • 一個 Channel 包含了一個 ChannelPipeline,而 ChannelPipeline 中又維護了一個由 ChannelHandlerContext 組成的雙向連結串列,並且每個 ChannelHandlerContext 中又關聯著一個 ChannelHandler
    • 入站事件和出站事件在一個雙向連結串列中,入站事件會從連結串列 head 往後傳遞到最後一個入站的 handler,出站事件會從連結串列 tail 往前傳遞到最前一個出站的 handler,兩種型別的 handler 互不干擾
  4. 常用方法

    • ChannelPipeline addFirst(ChannelHandler... handlers),把一個業務處理類(handler)新增到鏈中的第一個位置
    • ChannelPipeline addLast(ChannelHandler... handlers),把一個業務處理類(handler)新增到鏈中的最後一個位置

ChannelHandlerContext

  1. 儲存 Channel 相關的所有上下文資訊,同時關聯一個 ChannelHandler 物件
  2. 即 ChannelHandlerContext 中 包 含 一 個 具 體 的 事 件 處 理 器 ChannelHandler , 同 時ChannelHandlerContext 中也綁定了對應的 pipeline 和 Channel 的資訊,方便對 ChannelHandler 進行呼叫.
  3. 常用方法
    • ChannelFuture close(),關閉通道
    • ChannelOutboundInvoker flush(),重新整理
    • ChannelFuture writeAndFlush(Object msg) , 將 數 據 寫 到 ChannelPipeline 中 ,當 前ChannelHandler 的下一個 ChannelHandler 開始處理(出站)

ChannelOption

  1. Netty 在建立 Channel 例項後,一般都需要設定 ChannelOption引數。
  2. ChannelOption 引數如下:
    • ChannelOption.SO_BACKLOG:對應 TCP/IP 協議 listen 函式中的 backlog 引數,用來初始化伺服器可連線佇列大小。服務端處理客戶端連線請求是順序處理的,所以同一時間只能處理一個客戶端連線。多個客戶端來的時候,服務端將不能處理的客戶端連線請求放在佇列中等待處理,backlog 引數指定了佇列的大小。
    • ChannelOption.SO_KEEPALIVE:一直保持連線活動狀態

EventLoopGroup 和其實現類 NioEventLoopGroup

  1. EventLoopGroup一組 EventLoop 的抽象,Netty 為了更好的利用多核 CPU 資源,一般會有多個 EventLoop同時工作,每個 EventLoop 維護著一個 Selector 例項。

  2. EventLoopGroup 提供 next 介面,可以從組裡面按照一定規則獲取其中一個 EventLoop 來處理任務。在 Netty 服 務 器 端 編 程 中 , 我 們 一 般 都 需 要 提 供 兩 個 EventLoopGroup , 例 如 : BossEventLoopGroupWorkerEventLoopGroup

  3. 通常一個服務埠即一個 ServerSocketChannel 對應一個 Selector 和一個 EventLoop 執行緒。BossEventLoop 負責

    接收客戶端的連線並將 SocketChannel 交給 WorkerEventLoopGroup 來進行 IO 處理,如下圖所示

    • BossEventLoopGroup 通常是一個單執行緒的 EventLoop,EventLoop 維護著一個註冊了ServerSocketChannel 的 Selector 例項BossEventLoop 不斷輪詢 Selector 將連線事件分離出來
    • 通常是 OP_ACCEPT 事件,然後將接收到的 SocketChannel 交給 WorkerEventLoopGroup
    • WorkerEventLoopGroup 會由 next 選擇其中一個 EventLoop來將這個 SocketChannel 註冊到其維護的 Selector 並對其後續的 IO 事件進行處理
  4. 常用方法

    • public NioEventLoopGroup(),構造方法
    • public Future<?> shutdownGracefully(),斷開連線,關閉執行緒

Unpooled 類

  1. Netty 提供一個專門用來操作緩衝區(即Netty的資料容器)的工具類

  2. 常用方法如下所示

    • 通過給定的資料和字元編碼返回一個 ByteBuf 物件(類似於 NIO 中的 ByteBuffer 但有區別)
    • public static ByteBuf copiedBuffer(CharSequence string, Charset charset)
  3. 舉例說明 Unpooled 獲取 Netty 的資料容器 ByteBuf 的基本使用 【案例演示】

    • 案例 1

      package com.atguigu.netty.buf;
      
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      
      public class NettyByteBuf01 {
          public static void main(String[] args) {
              //建立一個ByteBuf
              //說明
              //1. 建立 物件,該物件包含一個數組arr , 是一個byte[10]
              //2. 在netty 的buffer中,不需要使用flip 進行反轉
              //   底層維護了 readerindex 和 writerIndex
              //3. 通過 readerindex 和  writerIndex 和  capacity, 將buffer分成三個區域
              // 0---readerindex 已經讀取的區域
              // readerindex---writerIndex , 可讀的區域
              // writerIndex -- capacity, 可寫的區域
              ByteBuf buffer = Unpooled.buffer(10);
      
              for(int i = 0; i < 10; i++) {
                  buffer.writeByte(i);
              }
      
              System.out.println("capacity=" + buffer.capacity());//10
              //輸出
      //        for(int i = 0; i<buffer.capacity(); i++) {
      //            System.out.println(buffer.getByte(i));
      //        }
              for(int i = 0; i < buffer.capacity(); i++) {
                  System.out.println(buffer.readByte());
              }
              System.out.println("執行完畢");
          }
      }
      
    • 案例2

      package com.atguigu.netty.buf;
      
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      
      import java.nio.charset.Charset;
      
      public class NettyByteBuf02 {
          public static void main(String[] args) {
      
              //建立ByteBuf
              ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", Charset.forName("utf-8"));
      
              //使用相關的方法
              if(byteBuf.hasArray()) { // true
      
                  byte[] content = byteBuf.array();
      
                  //將 content 轉成字串
                  System.out.println(new String(content, Charset.forName("utf-8")));
      
                  System.out.println("byteBuf=" + byteBuf);
      
                  System.out.println(byteBuf.arrayOffset()); // 0
                  System.out.println(byteBuf.readerIndex()); // 0
                  System.out.println(byteBuf.writerIndex()); // 12
                  System.out.println(byteBuf.capacity()); // 36
      
                  //System.out.println(byteBuf.readByte()); //
                  System.out.println(byteBuf.getByte(0)); // 104
      
                  int len = byteBuf.readableBytes(); //可讀的位元組數  12
                  System.out.println("len=" + len);
      
                  //使用for取出各個位元組
                  for(int i = 0; i < len; i++) {
                      System.out.println((char) byteBuf.getByte(i));
                  }
      
                  //按照某個範圍讀取
                  System.out.println(byteBuf.getCharSequence(0, 4, Charset.forName("utf-8")));
                  System.out.println(byteBuf.getCharSequence(4, 6, Charset.forName("utf-8")));
      
      
              }
          }
      }
      

Netty 應用例項-群聊系統

例項要求

  1. 編寫一個 Netty 群聊系統,實現伺服器端和客戶端之間的資料簡單通訊(非阻塞)
  2. 實現多人群聊
  3. 伺服器端:可以監測使用者上線,離線,並實現訊息轉發功能
  4. 客戶端:通過 channel 可以無阻塞傳送訊息給其它所有使用者,同時可以接受其它使用者傳送的訊息(有伺服器轉發得到)
  5. 目的:進一步理解 Netty 非阻塞網路程式設計機制
  6. 看老師程式碼演示

服務端

package com.atguigu.netty.groupchat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class GroupChatServer {

    private int port; //監聽埠


    public GroupChatServer(int port) {
        this.port = port;
    }

    //編寫run方法,處理客戶端的請求
    public void run() throws  Exception{

        //建立兩個執行緒組
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop

        try {
            ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            //獲取到pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //向pipeline加入解碼器
                            pipeline.addLast("decoder", new StringDecoder());
                            //向pipeline加入編碼器
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自己的業務處理handler
                            pipeline.addLast(new GroupChatServerHandler());

                        }
                    });

            System.out.println("netty 伺服器啟動");
            ChannelFuture channelFuture = b.bind(port).sync();

            //監聽關閉
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws Exception {

        new GroupChatServer(7000).run();
    }
}
package com.atguigu.netty.groupchat;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {

    //public static List<Channel> channels = new ArrayList<Channel>();

    //使用一個hashmap 管理
    //public static Map<String, Channel> channels = new HashMap<String,Channel>();

    //定義一個channle 組,管理所有的channel
    //GlobalEventExecutor.INSTANCE) 是全域性的事件執行器,是一個單例
    private static ChannelGroup  channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    //handlerAdded 表示連線建立,一旦連線,第一個被執行
    //將當前channel 加入到  channelGroup
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //將該客戶加入聊天的資訊推送給其它線上的客戶端
        /*
        該方法會將 channelGroup 中所有的channel 遍歷,併發送 訊息,
        我們不需要自己遍歷
         */
        channelGroup.writeAndFlush("[客戶端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");
        channelGroup.add(channel);
    }

    //斷開連線, 將xx客戶離開資訊推送給當前線上的客戶
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[客戶端]" + channel.remoteAddress() + " 離開了\n");
        System.out.println("channelGroup size" + channelGroup.size());

    }

    //表示channel 處於活動狀態, 提示 xx上線
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.out.println(ctx.channel().remoteAddress() + " 上線了~");
    }

    //表示channel 處於不活動狀態, 提示 xx離線了
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        System.out.println(ctx.channel().remoteAddress() + " 離線了~");
    }

    //讀取資料
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

        //獲取到當前channel
        Channel channel = ctx.channel();
        //這時我們遍歷channelGroup, 根據不同的情況,回送不同的訊息

        channelGroup.forEach(ch -> {
            if(channel != ch) { //不是當前的channel,轉發訊息
                ch.writeAndFlush("[客戶]" + channel.remoteAddress() + " 傳送了訊息" + msg + "\n");
            }else {//回顯自己傳送的訊息給自己
                ch.writeAndFlush("[自己]傳送了訊息" + msg + "\n");
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //關閉通道
        ctx.close();
    }
}

客戶端

package com.atguigu.netty.groupchat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;


public class GroupChatClient {

    //屬性
    private final String host;
    private final int port;

    public GroupChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();

        try {


        Bootstrap bootstrap = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {

                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {

                        //得到pipeline
                        ChannelPipeline pipeline = ch.pipeline();
                        //加入相關handler
                        pipeline.addLast("decoder", new StringDecoder());
                        pipeline.addLast("encoder", new StringEncoder());
                        //加入自定義的handler
                        pipeline.addLast(new GroupChatClientHandler());
                    }
                });

        ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
        //得到channel
            Channel channel = channelFuture.channel();
            System.out.println("-------" + channel.localAddress()+ "--------");
            //客戶端需要輸入資訊,建立一個掃描器
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();
                //通過channel 傳送到伺服器端
                channel.writeAndFlush(msg + "\r\n");
            }
        }finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new GroupChatClient("127.0.0.1", 7000).run();
    }
}
package com.atguigu.netty.groupchat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg.trim());
    }
}

Netty 心跳檢測機制案例

例項要求:

  1. 編寫一個 Netty 心跳檢測機制案例, 當伺服器超過 3 秒沒有讀時,就提示讀空閒

  2. 當伺服器超過 5 秒沒有寫操作時,就提示寫空閒

  3. 實現當伺服器超過 7 秒沒有讀或者寫操作時,就提示讀寫空閒

  4. 程式碼如下:

    package com.atguigu.netty.heartbeat;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.IdleStateHandler;
    
    import java.util.concurrent.TimeUnit;
    
    public class MyServer {
        public static void main(String[] args) throws Exception{
    
    
            //建立兩個執行緒組
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop
            try {
    
                ServerBootstrap serverBootstrap = new ServerBootstrap();
    
                serverBootstrap.group(bossGroup, workerGroup);
                serverBootstrap.channel(NioServerSocketChannel.class);
                serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
                serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //加入一個netty 提供 IdleStateHandler
                        /*
                        說明
                        1. IdleStateHandler 是netty 提供的處理空閒狀態的處理器
                        2. long readerIdleTime : 表示多長時間沒有讀, 就會發送一個心跳檢測包檢測是否連線
                        3. long writerIdleTime : 表示多長時間沒有寫, 就會發送一個心跳檢測包檢測是否連線
                        4. long allIdleTime : 表示多長時間沒有讀寫, 就會發送一個心跳檢測包檢測是否連線
    
                        5. 文件說明
                        triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
     * read, write, or both operation for a while.
     *                  6. 當 IdleStateEvent 觸發後 , 就會傳遞給管道 的下一個handler去處理
     *                  通過呼叫(觸發)下一個handler 的 userEventTiggered , 在該方法中去處理 IdleStateEvent(讀空閒,寫空閒,讀寫空閒)
                         */
                        pipeline.addLast(new IdleStateHandler(7000,7000,10, TimeUnit.SECONDS));
                        //加入一個對空閒檢測進一步處理的handler(自定義)
                        pipeline.addLast(new MyServerHandler());
                    }
                });
    
                //啟動伺服器
                ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
                channelFuture.channel().closeFuture().sync();
    
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    package com.atguigu.netty.heartbeat;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.timeout.IdleStateEvent;
    
    public class MyServerHandler extends ChannelInboundHandlerAdapter {
    
        /**
         *
         * @param ctx 上下文
         * @param evt 事件
         * @throws Exception
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    
            if(evt instanceof IdleStateEvent) {
    
                //將  evt 向下轉型 IdleStateEvent
                IdleStateEvent event = (IdleStateEvent) evt;
                String eventType = null;
                switch (event.state()) {
                    case READER_IDLE:
                      eventType = "讀空閒";
                      break;
                    case WRITER_IDLE:
                        eventType = "寫空閒";
                        break;
                    case ALL_IDLE:
                        eventType = "讀寫空閒";
                        break;
                }
                System.out.println(ctx.channel().remoteAddress() + "--超時時間--" + eventType);
                System.out.println("伺服器做相應處理..");
    
                //如果發生空閒,我們關閉通道
               // ctx.channel().close();
            }
        }
    }
    
    package com.atguigu.netty.heartbeat;
    
    public class Test {
        public static void main(String[] args) throws Exception {
    
            System.out.println(System.nanoTime()); //納秒  10億分之1
            Thread.sleep(1000);
            System.out.println(System.nanoTime());
    
        }
    }
    

Netty 通過 WebSocket 程式設計實現伺服器和客戶端長連線

例項要求:

  1. Http 協議是無狀態的, 瀏覽器和伺服器間的請求響應一次,下一次會重新建立連線.

  2. 要求:實現基於 webSocket 的長連線的全雙工的互動

  3. 改變 Http 協議多次請求的約束,實現長連線了, 伺服器可以傳送訊息給瀏覽器

  4. 客戶端瀏覽器和伺服器端會相互感知,比如伺服器關閉了,瀏覽器會感知,同樣瀏覽器關閉了,伺服器會感知

  5. 執行介面

    服務端

    package com.atguigu.netty.websocket;
    
    import com.atguigu.netty.heartbeat.MyServerHandler;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.stream.ChunkedWriteHandler;
    import io.netty.handler.timeout.IdleStateHandler;
    
    import java.util.concurrent.TimeUnit;
    
    public class MyServer {
        public static void main(String[] args) throws Exception{
    
    
            //建立兩個執行緒組
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop
            try {
    
                ServerBootstrap serverBootstrap = new ServerBootstrap();
    
                serverBootstrap.group(bossGroup, workerGroup);
                serverBootstrap.channel(NioServerSocketChannel.class);
                serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
                serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
    
                        //因為基於http協議,使用http的編碼和解碼器
                        pipeline.addLast(new HttpServerCodec());
                        //是以塊方式寫,新增ChunkedWriteHandler處理器
                        pipeline.addLast(new ChunkedWriteHandler());
    
                        /*
                        說明
                        1. http資料在傳輸過程中是分段, HttpObjectAggregator ,就是可以將多個段聚合
                        2. 這就就是為什麼,當瀏覽器傳送大量資料時,就會發出多次http請求
                         */
                        pipeline.addLast(new HttpObjectAggregator(8192));
                        /*
                        說明
                        1. 對應websocket ,它的資料是以 幀(frame) 形式傳遞
                        2. 可以看到WebSocketFrame 下面有六個子類
                        3. 瀏覽器請求時 ws://localhost:7000/hello 表示請求的uri
                        4. WebSocketServerProtocolHandler 核心功能是將 http協議升級為 ws協議 , 保持長連線
                        5. 是通過一個 狀態碼 101
                         */
                        pipeline.addLast(new WebSocketServerProtocolHandler("/hello2"));
    
                        //自定義的handler ,處理業務邏輯
                        pipeline.addLast(new MyTextWebSocketFrameHandler());
                    }
                });
    
                //啟動伺服器
                ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
                channelFuture.channel().closeFuture().sync();
    
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    package com.atguigu.netty.websocket;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    
    import java.time.LocalDateTime;
    
    //這裡 TextWebSocketFrame 型別,表示一個文字幀(frame)
    public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
    
            System.out.println("伺服器收到訊息 " + msg.text());
    
            //回覆訊息
            ctx.channel().writeAndFlush(new TextWebSocketFrame("伺服器時間" + LocalDateTime.now() + " " + msg.text()));
        }
    
        //當web客戶端連線後, 觸發方法
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            //id 表示唯一的值,LongText 是唯一的 ShortText 不是唯一
            System.out.println("handlerAdded 被呼叫" + ctx.channel().id().asLongText());
            System.out.println("handlerAdded 被呼叫" + ctx.channel().id().asShortText());
        }
    
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    
            System.out.println("handlerRemoved 被呼叫" + ctx.channel().id().asLongText());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("異常發生 " + cause.getMessage());
            ctx.close(); //關閉連線
        }
    }
    

    客戶端

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Title</title>
    </head>
    <body>
    <script>
        var socket;
        //判斷當前瀏覽器是否支援websocket
        if(window.WebSocket) {
            //go on
            socket = new WebSocket("ws://localhost:7000/hello2");
            //相當於channelReado, ev 收到伺服器端回送的訊息
            socket.onmessage = function (ev) {
                var rt = document.getElementById("responseText");
                rt.value = rt.value + "\n" + ev.data;
            }
    
            //相當於連線開啟(感知到連線開啟)
            socket.onopen = function (ev) {
                var rt = document.getElementById("responseText");
                rt.value = "連線開啟了.."
            }
    
            //相當於連線關閉(感知到連線關閉)
            socket.onclose = function (ev) {
    
                var rt = document.getElementById("responseText");
                rt.value = rt.value + "\n" + "連線關閉了.."
            }
        } else {
            alert("當前瀏覽器不支援websocket")
        }
    
        //傳送訊息到伺服器
        function send(message) {
            if(!window.socket) { //先判斷socket是否建立好
                return;
            }
            if(socket.readyState == WebSocket.OPEN) {
                //通過socket 傳送訊息
                socket.send(message)
            } else {
                alert("連線沒有開啟");
            }
        }
    </script>
        <form onsubmit="return false">
            <textarea name="message" style="height: 300px; width: 300px"></textarea>
            <input type="button" value="發生訊息" onclick="send(this.form.message.value)">
            <textarea id="responseText" style="height: 300px; width: 300px"></textarea>
            <input type="button" value="清空內容" onclick="document.getElementById('responseText').value=''">
        </form>
    </body>
    </html>
    

Google Protobuf

編碼和解碼的基本介紹

  1. 編寫網路應用程式時,因為資料在網路中傳輸的都是二進位制位元組碼資料,在傳送資料時就需要編碼,接收資料時就需要解碼 [示意圖]

  2. codec(編解碼器) 的組成部分有兩個:decoder(解碼器)encoder(編碼器)。encoder 負責把業務資料轉換成位元組碼資料,decoder 負責把位元組碼資料轉換成業務資料

Netty 本身的編碼解碼的機制和問題分析

  1. Netty 自身提供了一些 codec(編解碼器)
  2. Netty 提供的編碼器
    • StringEncoder,對字串資料進行編碼
    • ObjectEncoder,對 Java 物件進行編碼
  3. Netty 提供的解碼器
    • StringDecoder, 對字串資料進行解碼
    • ObjectDecoder,對 Java 物件進行解碼
  4. Netty 本身自帶的 ObjectDecoder 和 ObjectEncoder 可以用來實現 POJO 物件或各種業務物件的編碼和解碼,底層使用的仍是 Java 序列化技術 , 而 Java 序列化技術本身效率就不高,存在如下問題
    • 無法跨語言
    • 序列化後的體積太大,是二進位制編碼的 5 倍多。
    • 序列化效能太低
  5. 引出 新的解決方案 [Google 的 Protobuf]

Protobuf

  1. Protobuf 基本介紹和使用示意圖

  2. Protobuf 是 Google 釋出的開源專案,全稱 Google Protocol Buffers,是一種輕便高效的結構化資料儲存格式,可以用於結構化資料序列化,或者說序列化。它很適合做資料儲存或 RPC[遠端過程呼叫 remote procedure call ] 資料交換格式 。 目前很多公司 http+json -> tcp+protobuf

  3. 參考文件 : https://developers.google.com/protocol-buffers/docs/proto 語言指南

  4. Protobuf 是以 message 的方式來管理資料的.

  5. 支援跨平臺、跨語言,即[客戶端和伺服器端可以是不同的語言編寫的] (支援目前絕大多數語言,例如 C++、C#、Java、python 等)

  6. 高效能,高可靠性

  7. 使用 protobuf 編譯器能自動生成程式碼,Protobuf 是將類的定義使用.proto 檔案進行描述。說明,在 idea 中編寫 .proto 檔案時,會自動提示是否下載 .ptotot 編寫外掛. 可以讓語法高亮。

  8. 然後通過 protoc.exe 編譯器根據.proto 自動生成.java 檔案

  9. protobuf 使用示意圖

Protobuf 快速入門例項

編寫程式,使用 Protobuf 完成如下功能

  1. 客戶端可以傳送一個 Student PoJo 物件到伺服器 (通過 Protobuf 編碼)

  2. 服務端能接收 Student PoJo 物件,並顯示資訊(通過 Protobuf 解碼)

  3. 演示步驟

    Student.proto

    syntax = "proto3"; //版本
    option java_outer_classname = "StudentPOJO";//生成的外部類名,同時也是檔名
    //protobuf 使用message 管理資料
    message Student { //會在 StudentPOJO 外部類生成一個內部類 Student, 他是真正傳送的POJO物件
        int32 id = 1; // Student 類中有 一個屬性 名字為 id 型別為int32(protobuf型別) 1表示屬性序號,不是值
        string name = 2;
    }
    

    編譯

    protoc.exe --java_out=. Student.proto

    將生成的 StudentPOJO 放入到專案使用

Protobuf 快速入門例項 2

編寫程式,使用 Protobuf 完成如下功能

  1. 客戶端可以隨機發送 Student PoJo/ Worker PoJo 物件到伺服器 (通過 Protobuf 編碼)

  2. 服務端能接收 Student PoJo/ Worker PoJo 物件(需要判斷是哪種型別),並顯示資訊(通過 Protobuf 解碼)

  3. 演示步驟

    Student.proto

    syntax = "proto3";
    option optimize_for = SPEED; // 加快解析
    option java_package="com.atguigu.netty.codec2";   //指定生成到哪個包下
    option java_outer_classname="MyDataInfo"; // 外部類名, 檔名
    
    //protobuf 可以使用message 管理其他的message
    message MyMessage {
    
        //定義一個列舉型別
        enum DataType {
            StudentType = 0; //在proto3 要求enum的編號從0開始
            WorkerType = 1;
        }
    
        //用data_type 來標識傳的是哪一個列舉型別
        DataType data_type = 1;
    
        //表示每次列舉型別最多隻能出現其中的一個, 節省空間
        oneof dataBody {
            Student student = 2;
            Worker worker = 3;
        }
    
    }
    
    
    message Student {
        int32 id = 1;//Student類的屬性
        string name = 2; //
    }
    message Worker {
        string name=1;
        int32 age=2;
    }