Netty學習筆記03-Netty核心模組元件與Google Protobuf
Netty 核心模組元件
Bootstrap、ServerBootstrap
-
Bootstrap 意思是引導,一個 Netty 應用通常由一個 Bootstrap 開始,主要作用是配置整個 Netty 程式,串聯各個元件,Netty 中 Bootstrap 類是客戶端程式的啟動引導類,ServerBootstrap 是服務端啟動引導類
-
常見的方法有
-
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup)
:該方法用於伺服器端,用來設定兩個 EventLoop
-
public B group(EventLoopGroup group)
-
public B channel(Class<? extends C> channelClass)
:該方法用來設定一個伺服器端的通道實現 -
public <T> B option(ChannelOption<T> option, T value)
:用來給 ServerChannel 新增配置 -
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value)
:用來給接收到的通道新增配置 -
public ServerBootstrap childHandler(ChannelHandler childHandler)
:該方法用來設定業務處理類(自定義handler) -
public ChannelFuture bind(int inetPort)
:該方法用於伺服器端,用來設定佔用的埠號 -
public ChannelFuture connect(String inetHost, int inetPort)
:該方法用於客戶端,用來連線伺服器端
-
Future、ChannelFuture
- Netty 中所有的 IO 操作都是非同步的,不能立刻得知訊息是否被正確處理。但是可以過一會等它執行完成或者直接註冊一個監聽,具體的實現就是通過 Future 和 ChannelFutures
- 常見的方法有
Channel channel()
:返回當前正在進行 IO 操作的通道ChannelFuture sync()
:等待非同步操作執行完畢
Channel
- Netty 網路通訊的元件,能夠用於執行網路 I/O 操作。
- 通過 Channel 可獲得當前網路連線的通道的狀態
- 通過 Channel 可獲得 網路連線的配置引數 (例如接收緩衝區大小)
- Channel 提供非同步的網路 I/O 操作(如建立連線,讀寫,繫結埠),非同步呼叫意味著任何 I/O 呼叫都將立即返回,並且不保證在呼叫結束時所請求的 I/O 操作已完成
- 呼叫立即返回一個 ChannelFuture 例項,通過註冊監聽器到 ChannelFuture 上,可以 I/O 操作成功、失敗或取消時回撥通知呼叫方
- 支援關聯 I/O 操作與對應的處理程式
- 不同協議、不同的阻塞型別的連線都有不同的 Channel 型別與之對應,常用的 Channel 型別:
NioSocketChannel
:非同步的客戶端 TCP Socket 連線。NioServerSocketChannel
:非同步的伺服器端 TCP Socket 連線。NioDatagramChannel
:非同步的 UDP 連線。NioSctpChannel
:非同步的客戶端 Sctp 連線。NioSctpServerChannel
:非同步的 Sctp 伺服器端連線,這些通道涵蓋了 UDP 和 TCP 網路 IO 以及檔案 IO。
Selector
- Netty 基於 Selector 物件實現 I/O 多路複用,通過 Selector 一個執行緒可以監聽多個連線的 Channel 事件。
- 當向一個 Selector 中註冊 Channel 後,Selector 內部的機制就可以自動不斷地查詢(Select) 這些註冊的Channel 是否有已就緒的 I/O 事件(例如可讀,可寫,網路連線完成等),這樣程式就可以很簡單地使用一個執行緒高效地管理多個 Channel。
ChannelHandler 及其實現類
-
ChannelHandler 是一個介面,處理 I/O 事件或攔截 I/O 操作,並將其轉發到其 ChannelPipeline(業務處理鏈) 中的下一個處理程式。
-
ChannelHandler 本身並沒有提供很多方法,因為這個介面有許多的方法需要實現,方便使用期間,可以繼承它的子類
-
ChannelHandler 及其實現類一覽圖(後)
ChannelInboundHandler
用於處理入站 I/O 事件。ChannelOutboundHandler
用於處理出站 I/O 操作。- //介面卡
ChannelInboundHandlerAdapter
用於處理入站 I/O 事件。ChannelOutboundHandlerAdapter
用於處理出站 I/O 操作。ChannelDuplexHandler
用於處理入站和出站事件。
-
我們經常需要自定義一個 Handler 類去繼承 ChannelInboundHandlerAdapter,然後通過重寫相應方法實現業務邏輯,我們接下來看看一般都需要重寫哪些方法
Pipeline 和 ChannelPipeline
ChannelPipeline 是一個重點:
-
ChannelPipeline 是一個 Handler 的集合,它負責處理和攔截 inbound 或者 outbound 的事件和操作,相當於一個貫穿 Netty 的鏈。(也可以這樣理解:ChannelPipeline 是 儲存 ChannelHandler 的 List,用於處理或攔截Channel 的入站事件和出站操作)
-
ChannelPipeline 實現了一種高階形式的攔截過濾器模式,使使用者可以完全控制事件的處理方式,以及 Channel中各個ChannelHandler 如何相互互動
-
在 Netty 中每個 Channel 都有且僅有一個 ChannelPipeline 與之對應,它們的組成關係如下
- 一個 Channel 包含了一個 ChannelPipeline,而 ChannelPipeline 中又維護了一個由 ChannelHandlerContext 組成的雙向連結串列,並且每個 ChannelHandlerContext 中又關聯著一個 ChannelHandler
- 入站事件和出站事件在一個雙向連結串列中,入站事件會從連結串列 head 往後傳遞到最後一個入站的 handler,出站事件會從連結串列 tail 往前傳遞到最前一個出站的 handler,兩種型別的 handler 互不干擾
-
常用方法
ChannelPipeline addFirst(ChannelHandler... handlers)
,把一個業務處理類(handler)新增到鏈中的第一個位置ChannelPipeline addLast(ChannelHandler... handlers)
,把一個業務處理類(handler)新增到鏈中的最後一個位置
ChannelHandlerContext
- 儲存 Channel 相關的所有上下文資訊,同時關聯一個 ChannelHandler 物件
- 即 ChannelHandlerContext 中 包 含 一 個 具 體 的 事 件 處 理 器 ChannelHandler , 同 時ChannelHandlerContext 中也綁定了對應的 pipeline 和 Channel 的資訊,方便對 ChannelHandler 進行呼叫.
- 常用方法
ChannelFuture close()
,關閉通道ChannelOutboundInvoker flush()
,重新整理ChannelFuture writeAndFlush(Object msg)
, 將 數 據 寫 到 ChannelPipeline 中 ,當 前ChannelHandler
的下一個ChannelHandler
開始處理(出站)
ChannelOption
- Netty 在建立 Channel 例項後,一般都需要設定
ChannelOption
引數。 ChannelOption
引數如下:ChannelOption.SO_BACKLOG
:對應 TCP/IP 協議 listen 函式中的 backlog 引數,用來初始化伺服器可連線佇列大小。服務端處理客戶端連線請求是順序處理的,所以同一時間只能處理一個客戶端連線。多個客戶端來的時候,服務端將不能處理的客戶端連線請求放在佇列中等待處理,backlog
引數指定了佇列的大小。ChannelOption.SO_KEEPALIVE
:一直保持連線活動狀態
EventLoopGroup 和其實現類 NioEventLoopGroup
-
EventLoopGroup 是一組 EventLoop 的抽象,Netty 為了更好的利用多核 CPU 資源,一般會有多個 EventLoop同時工作,每個 EventLoop 維護著一個 Selector 例項。
-
EventLoopGroup 提供 next 介面,可以從組裡面按照一定規則獲取其中一個 EventLoop 來處理任務。在 Netty 服 務 器 端 編 程 中 , 我 們 一 般 都 需 要 提 供 兩 個 EventLoopGroup , 例 如 : BossEventLoopGroup 和 WorkerEventLoopGroup。
-
通常一個服務埠即一個 ServerSocketChannel 對應一個 Selector 和一個 EventLoop 執行緒。BossEventLoop 負責
接收客戶端的連線並將 SocketChannel 交給 WorkerEventLoopGroup 來進行 IO 處理,如下圖所示
- BossEventLoopGroup 通常是一個單執行緒的 EventLoop,EventLoop 維護著一個註冊了ServerSocketChannel 的 Selector 例項BossEventLoop 不斷輪詢 Selector 將連線事件分離出來
- 通常是 OP_ACCEPT 事件,然後將接收到的 SocketChannel 交給 WorkerEventLoopGroup
- WorkerEventLoopGroup 會由 next 選擇其中一個 EventLoop來將這個 SocketChannel 註冊到其維護的 Selector 並對其後續的 IO 事件進行處理
-
常用方法
public NioEventLoopGroup()
,構造方法public Future<?> shutdownGracefully()
,斷開連線,關閉執行緒
Unpooled 類
-
Netty 提供一個專門用來操作緩衝區(即Netty的資料容器)的工具類
-
常用方法如下所示
- 通過給定的資料和字元編碼返回一個 ByteBuf 物件(類似於 NIO 中的 ByteBuffer 但有區別)
public static ByteBuf copiedBuffer(CharSequence string, Charset charset)
-
舉例說明 Unpooled 獲取 Netty 的資料容器 ByteBuf 的基本使用 【案例演示】
-
案例 1
package com.atguigu.netty.buf; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; public class NettyByteBuf01 { public static void main(String[] args) { //建立一個ByteBuf //說明 //1. 建立 物件,該物件包含一個數組arr , 是一個byte[10] //2. 在netty 的buffer中,不需要使用flip 進行反轉 // 底層維護了 readerindex 和 writerIndex //3. 通過 readerindex 和 writerIndex 和 capacity, 將buffer分成三個區域 // 0---readerindex 已經讀取的區域 // readerindex---writerIndex , 可讀的區域 // writerIndex -- capacity, 可寫的區域 ByteBuf buffer = Unpooled.buffer(10); for(int i = 0; i < 10; i++) { buffer.writeByte(i); } System.out.println("capacity=" + buffer.capacity());//10 //輸出 // for(int i = 0; i<buffer.capacity(); i++) { // System.out.println(buffer.getByte(i)); // } for(int i = 0; i < buffer.capacity(); i++) { System.out.println(buffer.readByte()); } System.out.println("執行完畢"); } }
-
案例2
package com.atguigu.netty.buf; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.nio.charset.Charset; public class NettyByteBuf02 { public static void main(String[] args) { //建立ByteBuf ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", Charset.forName("utf-8")); //使用相關的方法 if(byteBuf.hasArray()) { // true byte[] content = byteBuf.array(); //將 content 轉成字串 System.out.println(new String(content, Charset.forName("utf-8"))); System.out.println("byteBuf=" + byteBuf); System.out.println(byteBuf.arrayOffset()); // 0 System.out.println(byteBuf.readerIndex()); // 0 System.out.println(byteBuf.writerIndex()); // 12 System.out.println(byteBuf.capacity()); // 36 //System.out.println(byteBuf.readByte()); // System.out.println(byteBuf.getByte(0)); // 104 int len = byteBuf.readableBytes(); //可讀的位元組數 12 System.out.println("len=" + len); //使用for取出各個位元組 for(int i = 0; i < len; i++) { System.out.println((char) byteBuf.getByte(i)); } //按照某個範圍讀取 System.out.println(byteBuf.getCharSequence(0, 4, Charset.forName("utf-8"))); System.out.println(byteBuf.getCharSequence(4, 6, Charset.forName("utf-8"))); } } }
-
Netty 應用例項-群聊系統
例項要求
- 編寫一個 Netty 群聊系統,實現伺服器端和客戶端之間的資料簡單通訊(非阻塞)
- 實現多人群聊
- 伺服器端:可以監測使用者上線,離線,並實現訊息轉發功能
- 客戶端:通過 channel 可以無阻塞傳送訊息給其它所有使用者,同時可以接受其它使用者傳送的訊息(有伺服器轉發得到)
- 目的:進一步理解 Netty 非阻塞網路程式設計機制
- 看老師程式碼演示
服務端
package com.atguigu.netty.groupchat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class GroupChatServer {
private int port; //監聽埠
public GroupChatServer(int port) {
this.port = port;
}
//編寫run方法,處理客戶端的請求
public void run() throws Exception{
//建立兩個執行緒組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//獲取到pipeline
ChannelPipeline pipeline = ch.pipeline();
//向pipeline加入解碼器
pipeline.addLast("decoder", new StringDecoder());
//向pipeline加入編碼器
pipeline.addLast("encoder", new StringEncoder());
//加入自己的業務處理handler
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("netty 伺服器啟動");
ChannelFuture channelFuture = b.bind(port).sync();
//監聽關閉
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatServer(7000).run();
}
}
package com.atguigu.netty.groupchat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
//public static List<Channel> channels = new ArrayList<Channel>();
//使用一個hashmap 管理
//public static Map<String, Channel> channels = new HashMap<String,Channel>();
//定義一個channle 組,管理所有的channel
//GlobalEventExecutor.INSTANCE) 是全域性的事件執行器,是一個單例
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//handlerAdded 表示連線建立,一旦連線,第一個被執行
//將當前channel 加入到 channelGroup
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//將該客戶加入聊天的資訊推送給其它線上的客戶端
/*
該方法會將 channelGroup 中所有的channel 遍歷,併發送 訊息,
我們不需要自己遍歷
*/
channelGroup.writeAndFlush("[客戶端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");
channelGroup.add(channel);
}
//斷開連線, 將xx客戶離開資訊推送給當前線上的客戶
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客戶端]" + channel.remoteAddress() + " 離開了\n");
System.out.println("channelGroup size" + channelGroup.size());
}
//表示channel 處於活動狀態, 提示 xx上線
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 上線了~");
}
//表示channel 處於不活動狀態, 提示 xx離線了
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 離線了~");
}
//讀取資料
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
//獲取到當前channel
Channel channel = ctx.channel();
//這時我們遍歷channelGroup, 根據不同的情況,回送不同的訊息
channelGroup.forEach(ch -> {
if(channel != ch) { //不是當前的channel,轉發訊息
ch.writeAndFlush("[客戶]" + channel.remoteAddress() + " 傳送了訊息" + msg + "\n");
}else {//回顯自己傳送的訊息給自己
ch.writeAndFlush("[自己]傳送了訊息" + msg + "\n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//關閉通道
ctx.close();
}
}
客戶端
package com.atguigu.netty.groupchat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
public class GroupChatClient {
//屬性
private final String host;
private final int port;
public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//得到pipeline
ChannelPipeline pipeline = ch.pipeline();
//加入相關handler
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
//加入自定義的handler
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//得到channel
Channel channel = channelFuture.channel();
System.out.println("-------" + channel.localAddress()+ "--------");
//客戶端需要輸入資訊,建立一個掃描器
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
//通過channel 傳送到伺服器端
channel.writeAndFlush(msg + "\r\n");
}
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatClient("127.0.0.1", 7000).run();
}
}
package com.atguigu.netty.groupchat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}
Netty 心跳檢測機制案例
例項要求:
-
編寫一個 Netty 心跳檢測機制案例, 當伺服器超過 3 秒沒有讀時,就提示讀空閒
-
當伺服器超過 5 秒沒有寫操作時,就提示寫空閒
-
實現當伺服器超過 7 秒沒有讀或者寫操作時,就提示讀寫空閒
-
程式碼如下:
package com.atguigu.netty.heartbeat; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; public class MyServer { public static void main(String[] args) throws Exception{ //建立兩個執行緒組 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //加入一個netty 提供 IdleStateHandler /* 說明 1. IdleStateHandler 是netty 提供的處理空閒狀態的處理器 2. long readerIdleTime : 表示多長時間沒有讀, 就會發送一個心跳檢測包檢測是否連線 3. long writerIdleTime : 表示多長時間沒有寫, 就會發送一個心跳檢測包檢測是否連線 4. long allIdleTime : 表示多長時間沒有讀寫, 就會發送一個心跳檢測包檢測是否連線 5. 文件說明 triggers an {@link IdleStateEvent} when a {@link Channel} has not performed * read, write, or both operation for a while. * 6. 當 IdleStateEvent 觸發後 , 就會傳遞給管道 的下一個handler去處理 * 通過呼叫(觸發)下一個handler 的 userEventTiggered , 在該方法中去處理 IdleStateEvent(讀空閒,寫空閒,讀寫空閒) */ pipeline.addLast(new IdleStateHandler(7000,7000,10, TimeUnit.SECONDS)); //加入一個對空閒檢測進一步處理的handler(自定義) pipeline.addLast(new MyServerHandler()); } }); //啟動伺服器 ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package com.atguigu.netty.heartbeat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleStateEvent; public class MyServerHandler extends ChannelInboundHandlerAdapter { /** * * @param ctx 上下文 * @param evt 事件 * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent) { //將 evt 向下轉型 IdleStateEvent IdleStateEvent event = (IdleStateEvent) evt; String eventType = null; switch (event.state()) { case READER_IDLE: eventType = "讀空閒"; break; case WRITER_IDLE: eventType = "寫空閒"; break; case ALL_IDLE: eventType = "讀寫空閒"; break; } System.out.println(ctx.channel().remoteAddress() + "--超時時間--" + eventType); System.out.println("伺服器做相應處理.."); //如果發生空閒,我們關閉通道 // ctx.channel().close(); } } }
package com.atguigu.netty.heartbeat; public class Test { public static void main(String[] args) throws Exception { System.out.println(System.nanoTime()); //納秒 10億分之1 Thread.sleep(1000); System.out.println(System.nanoTime()); } }
Netty 通過 WebSocket 程式設計實現伺服器和客戶端長連線
例項要求:
-
Http 協議是無狀態的, 瀏覽器和伺服器間的請求響應一次,下一次會重新建立連線.
-
要求:實現基於 webSocket 的長連線的全雙工的互動
-
改變 Http 協議多次請求的約束,實現長連線了, 伺服器可以傳送訊息給瀏覽器
-
客戶端瀏覽器和伺服器端會相互感知,比如伺服器關閉了,瀏覽器會感知,同樣瀏覽器關閉了,伺服器會感知
-
執行介面
服務端
package com.atguigu.netty.websocket; import com.atguigu.netty.heartbeat.MyServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; public class MyServer { public static void main(String[] args) throws Exception{ //建立兩個執行緒組 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //因為基於http協議,使用http的編碼和解碼器 pipeline.addLast(new HttpServerCodec()); //是以塊方式寫,新增ChunkedWriteHandler處理器 pipeline.addLast(new ChunkedWriteHandler()); /* 說明 1. http資料在傳輸過程中是分段, HttpObjectAggregator ,就是可以將多個段聚合 2. 這就就是為什麼,當瀏覽器傳送大量資料時,就會發出多次http請求 */ pipeline.addLast(new HttpObjectAggregator(8192)); /* 說明 1. 對應websocket ,它的資料是以 幀(frame) 形式傳遞 2. 可以看到WebSocketFrame 下面有六個子類 3. 瀏覽器請求時 ws://localhost:7000/hello 表示請求的uri 4. WebSocketServerProtocolHandler 核心功能是將 http協議升級為 ws協議 , 保持長連線 5. 是通過一個 狀態碼 101 */ pipeline.addLast(new WebSocketServerProtocolHandler("/hello2")); //自定義的handler ,處理業務邏輯 pipeline.addLast(new MyTextWebSocketFrameHandler()); } }); //啟動伺服器 ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package com.atguigu.netty.websocket; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import java.time.LocalDateTime; //這裡 TextWebSocketFrame 型別,表示一個文字幀(frame) public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { System.out.println("伺服器收到訊息 " + msg.text()); //回覆訊息 ctx.channel().writeAndFlush(new TextWebSocketFrame("伺服器時間" + LocalDateTime.now() + " " + msg.text())); } //當web客戶端連線後, 觸發方法 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //id 表示唯一的值,LongText 是唯一的 ShortText 不是唯一 System.out.println("handlerAdded 被呼叫" + ctx.channel().id().asLongText()); System.out.println("handlerAdded 被呼叫" + ctx.channel().id().asShortText()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerRemoved 被呼叫" + ctx.channel().id().asLongText()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("異常發生 " + cause.getMessage()); ctx.close(); //關閉連線 } }
客戶端
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <script> var socket; //判斷當前瀏覽器是否支援websocket if(window.WebSocket) { //go on socket = new WebSocket("ws://localhost:7000/hello2"); //相當於channelReado, ev 收到伺服器端回送的訊息 socket.onmessage = function (ev) { var rt = document.getElementById("responseText"); rt.value = rt.value + "\n" + ev.data; } //相當於連線開啟(感知到連線開啟) socket.onopen = function (ev) { var rt = document.getElementById("responseText"); rt.value = "連線開啟了.." } //相當於連線關閉(感知到連線關閉) socket.onclose = function (ev) { var rt = document.getElementById("responseText"); rt.value = rt.value + "\n" + "連線關閉了.." } } else { alert("當前瀏覽器不支援websocket") } //傳送訊息到伺服器 function send(message) { if(!window.socket) { //先判斷socket是否建立好 return; } if(socket.readyState == WebSocket.OPEN) { //通過socket 傳送訊息 socket.send(message) } else { alert("連線沒有開啟"); } } </script> <form onsubmit="return false"> <textarea name="message" style="height: 300px; width: 300px"></textarea> <input type="button" value="發生訊息" onclick="send(this.form.message.value)"> <textarea id="responseText" style="height: 300px; width: 300px"></textarea> <input type="button" value="清空內容" onclick="document.getElementById('responseText').value=''"> </form> </body> </html>
Google Protobuf
編碼和解碼的基本介紹
-
編寫網路應用程式時,因為資料在網路中傳輸的都是二進位制位元組碼資料,在傳送資料時就需要編碼,接收資料時就需要解碼 [示意圖]
-
codec(編解碼器) 的組成部分有兩個:decoder(解碼器)和 encoder(編碼器)。encoder 負責把業務資料轉換成位元組碼資料,decoder 負責把位元組碼資料轉換成業務資料
Netty 本身的編碼解碼的機制和問題分析
- Netty 自身提供了一些 codec(編解碼器)
- Netty 提供的編碼器
- StringEncoder,對字串資料進行編碼
- ObjectEncoder,對 Java 物件進行編碼
- Netty 提供的解碼器
- StringDecoder, 對字串資料進行解碼
- ObjectDecoder,對 Java 物件進行解碼
- Netty 本身自帶的 ObjectDecoder 和 ObjectEncoder 可以用來實現 POJO 物件或各種業務物件的編碼和解碼,底層使用的仍是 Java 序列化技術 , 而 Java 序列化技術本身效率就不高,存在如下問題
- 無法跨語言
- 序列化後的體積太大,是二進位制編碼的 5 倍多。
- 序列化效能太低
- 引出 新的解決方案 [Google 的 Protobuf]
Protobuf
-
Protobuf 基本介紹和使用示意圖
-
Protobuf 是 Google 釋出的開源專案,全稱 Google Protocol Buffers,是一種輕便高效的結構化資料儲存格式,可以用於結構化資料序列化,或者說序列化。它很適合做資料儲存或 RPC[遠端過程呼叫 remote procedure call ] 資料交換格式 。 目前很多公司 http+json -> tcp+protobuf
-
參考文件 : https://developers.google.com/protocol-buffers/docs/proto 語言指南
-
Protobuf 是以 message 的方式來管理資料的.
-
支援跨平臺、跨語言,即[客戶端和伺服器端可以是不同的語言編寫的] (支援目前絕大多數語言,例如 C++、C#、Java、python 等)
-
高效能,高可靠性
-
使用 protobuf 編譯器能自動生成程式碼,Protobuf 是將類的定義使用.proto 檔案進行描述。說明,在 idea 中編寫 .proto 檔案時,會自動提示是否下載 .ptotot 編寫外掛. 可以讓語法高亮。
-
然後通過 protoc.exe 編譯器根據.proto 自動生成.java 檔案
-
protobuf 使用示意圖
Protobuf 快速入門例項
編寫程式,使用 Protobuf 完成如下功能
-
客戶端可以傳送一個 Student PoJo 物件到伺服器 (通過 Protobuf 編碼)
-
服務端能接收 Student PoJo 物件,並顯示資訊(通過 Protobuf 解碼)
-
演示步驟
Student.proto
syntax = "proto3"; //版本 option java_outer_classname = "StudentPOJO";//生成的外部類名,同時也是檔名 //protobuf 使用message 管理資料 message Student { //會在 StudentPOJO 外部類生成一個內部類 Student, 他是真正傳送的POJO物件 int32 id = 1; // Student 類中有 一個屬性 名字為 id 型別為int32(protobuf型別) 1表示屬性序號,不是值 string name = 2; }
編譯
protoc.exe --java_out=. Student.proto
將生成的 StudentPOJO 放入到專案使用
Protobuf 快速入門例項 2
編寫程式,使用 Protobuf 完成如下功能
-
客戶端可以隨機發送 Student PoJo/ Worker PoJo 物件到伺服器 (通過 Protobuf 編碼)
-
服務端能接收 Student PoJo/ Worker PoJo 物件(需要判斷是哪種型別),並顯示資訊(通過 Protobuf 解碼)
-
演示步驟
Student.proto
syntax = "proto3"; option optimize_for = SPEED; // 加快解析 option java_package="com.atguigu.netty.codec2"; //指定生成到哪個包下 option java_outer_classname="MyDataInfo"; // 外部類名, 檔名 //protobuf 可以使用message 管理其他的message message MyMessage { //定義一個列舉型別 enum DataType { StudentType = 0; //在proto3 要求enum的編號從0開始 WorkerType = 1; } //用data_type 來標識傳的是哪一個列舉型別 DataType data_type = 1; //表示每次列舉型別最多隻能出現其中的一個, 節省空間 oneof dataBody { Student student = 2; Worker worker = 3; } } message Student { int32 id = 1;//Student類的屬性 string name = 2; // } message Worker { string name=1; int32 age=2; }