Netty In Action中文版 - 第四章:Transports(傳輸)
本章內容
- Transports(傳輸)
- NIO(non-blocking IO,New IO), OIO(Old IO,blocking IO), Local(本地), Embedded(嵌入式)
- Use-case(用例)
- APIs(接口)
Java開發網絡程序數據傳輸的過程和方式是被抽象了的,我們不須要關註底層接口。僅僅須要使用Java API或其它網絡框架如Netty就能達到數據傳輸的目的。
發送數據和接收數據都是字節碼。Nothing more,nothing less。
假設你以前使用Java提供的網絡接口工作過,你可能已經遇到過想從堵塞傳輸切換到非堵塞傳輸的情況,這樣的切換是比較困難的,由於堵塞IO和非堵塞IO使用的API有非常大的差異。Netty提供了上層的傳輸實現接口使得這樣的情況變得簡單。
我們能夠讓所寫的代碼盡可能通用,而不會依賴一些實現相關的APIs。當我們想切換傳輸方式的時候不須要花非常大的精力和時間來重構代碼。
本章將介紹統一的API以及怎樣使用它們,會拿Netty的API和Java的API做比較來告訴你為什麽Netty能夠更easy的使用。
本章也提供了一些優質的用例代碼,以便最佳使用Netty。使用Netty不須要其它的網絡框架或網絡編程經驗。若有則僅僅是對理解netty有幫助。但不是必要的。以下讓我們來看看真是世界裏的傳輸工作。
4.1 案例研究:切換傳輸方式
為了讓你想象怎樣運輸,我會從一個簡單的應用程序開始,這個應用程序什麽都不做,僅僅是接受client連接並發送“Hi!”字符串消息到client,發送完了就斷開連接。我不會具體解說這個過程的實現,它僅僅是一個樣例。
4.1.1 使用Java的I/O和NIO
我們將不用Netty實現這個樣例,以下代碼是使用堵塞IO實現的樣例:
package netty.in.action; import java.io.IOException; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.nio.charset.Charset; /** * Blocking networking without Netty * @author c.k * */ public class PlainOioServer { public void server(int port) throws Exception { //bind server to port final ServerSocket socket = new ServerSocket(port); try { while(true){ //accept connection final Socket clientSocket = socket.accept(); System.out.println("Accepted connection from " + clientSocket); //create new thread to handle connection new Thread(new Runnable() { @Override public void run() { OutputStream out; try{ out = clientSocket.getOutputStream(); //write message to connected client out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8"))); out.flush(); //close connection once message written and flushed clientSocket.close(); }catch(IOException e){ try { clientSocket.close(); } catch (IOException e1) { e1.printStackTrace(); } } } }).start();//start thread to begin handling } }catch(Exception e){ e.printStackTrace(); socket.close(); } } }
以下代碼是使用Java NIO實現的樣例:
package netty.in.action; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; /** * Asynchronous networking without Netty * @author c.k * */ public class PlainNioServer { public void server(int port) throws Exception { System.out.println("Listening for connections on port " + port); //open Selector that handles channels Selector selector = Selector.open(); //open ServerSocketChannel ServerSocketChannel serverChannel = ServerSocketChannel.open(); //get ServerSocket ServerSocket serverSocket = serverChannel.socket(); //bind server to port serverSocket.bind(new InetSocketAddress(port)); //set to non-blocking serverChannel.configureBlocking(false); //register ServerSocket to selector and specify that it is interested in new accepted clients serverChannel.register(selector, SelectionKey.OP_ACCEPT); final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes()); while (true) { //Wait for new events that are ready for process. This will block until something happens int n = selector.select(); if (n > 0) { //Obtain all SelectionKey instances that received events Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); try { //Check if event was because new client ready to get accepted if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); System.out.println("Accepted connection from " + client); client.configureBlocking(false); //Accept client and register it to selector client.register(selector, SelectionKey.OP_WRITE, msg.duplicate()); } //Check if event was because socket is ready to write data if (key.isWritable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buff = (ByteBuffer) key.attachment(); //write data to connected client while (buff.hasRemaining()) { if (client.write(buff) == 0) { break; } } client.close();//close client } } catch (Exception e) { key.cancel(); key.channel().close(); } } } } } }如你所見。即使它們實現的功能是一樣,可是代碼全然不同。以下我們將用Netty來實現同樣的功能。
4.1.2 Netty中使用I/O和NIO
以下代碼是使用Netty作為網絡框架編寫的一個堵塞IO樣例:
package netty.in.action; import java.net.InetSocketAddress; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.oio.OioServerSocketChannel; import io.netty.util.CharsetUtil; public class NettyOioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8)); //事件循環組 EventLoopGroup group = new NioEventLoopGroup(); try { //用來引導server配置 ServerBootstrap b = new ServerBootstrap(); //使用OIO堵塞模式 b.group(group).channel(OioServerSocketChannel.class).localAddress(new InetSocketAddress(port)) //指定ChannelInitializer初始化handlers .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { //加入一個“入站”handler到ChannelPipeline ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //連接後,寫消息到client,寫完後便關閉連接 ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE); } }); } }); //綁定server接受連接 ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } catch (Exception e) { //釋放全部資源 group.shutdownGracefully(); } } }上面代碼實現功能一樣,但結構清晰明了。這僅僅是Netty的優勢之中的一個。
4.1.3 Netty中實現異步支持
以下代碼是使用Netty實現異步。能夠看出使用Netty由OIO切換到NIO是很的方便。
package netty.in.action; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.CharsetUtil; import java.net.InetSocketAddress; public class NettyNioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8)); // 事件循環組 EventLoopGroup group = new NioEventLoopGroup(); try { // 用來引導server配置 ServerBootstrap b = new ServerBootstrap(); // 使用NIO異步模式 b.group(group).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port)) // 指定ChannelInitializer初始化handlers .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 加入一個“入站”handler到ChannelPipeline ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 連接後,寫消息到client。寫完後便關閉連接 ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE); } }); } }); // 綁定server接受連接 ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } catch (Exception e) { // 釋放全部資源 group.shutdownGracefully(); } } }由於Netty使用同樣的API來實現每一個傳輸,它並不關心你使用什麽來實現。Netty通過操作Channel接口和ChannelPipeline、ChannelHandler來實現傳輸。
4.2 Transport API
傳輸API的核心是Channel接口。它用於全部出站的操作。Channel接口的類層次結構例如以下
如上圖所看到的,每一個Channel都會分配一個ChannelPipeline和ChannelConfig。
ChannelConfig負責設置並存儲配置。並同意在執行期間更新它們。
傳輸一般有特定的配置設置。僅僅作用於傳輸,沒有其它的實現。ChannelPipeline容納了使用的ChannelHandler實例,這些ChannelHandler將處理通道傳遞的“入站”和“出站”數據。ChannelHandler的實現同意你改變數據狀態和數據傳輸,本書有章節具體解說ChannelHandler,ChannelHandler是Netty的重點概念。
- 數據傳輸時,將數據從一種格式轉換到還有一種格式
- 異常通知
- Channel變為有效或無效時獲得通知
- Channel被註冊或從EventLoop中註銷時獲得通知
- 通知用戶特定事件
它類似於一個鏈條,有使用過Servlet的讀者可能會更easy理解。
ChannelPipeline實現了攔截過濾器模式,這意味著我們連接不同的ChannelHandler來攔截並處理經過ChannelPipeline的數據或事件。能夠把ChannelPipeline想象成UNIX管道。它同意不同的命令鏈(ChannelHandler相當於命令)。你還能夠在執行時依據須要加入ChannelHandler實例到ChannelPipeline或從ChannelPipeline中刪除,這能幫助我們構建高度靈活的Netty程序。此外。訪問指定的ChannelPipeline和ChannelConfig,你能在Channel自身上進行操作。Channel提供了非常多方法。例如以下列表:
- eventLoop(),返回分配給Channel的EventLoop
- pipeline(),返回分配給Channel的ChannelPipeline
- isActive()。返回Channel是否激活,已激活說明與遠程連接對等
- localAddress()。返回已綁定的本地SocketAddress
- remoteAddress()。返回已綁定的遠程SocketAddress
- write(),寫數據到遠程client。數據通過ChannelPipeline傳輸過去
Channel channel = ... //Create ByteBuf that holds data to write ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8); //Write data ChannelFuture cf = channel.write(buf); //Add ChannelFutureListener to get notified after write completes cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { //Write operation completes without error if (future.isSuccess()) { System.out.println(.Write successful.); } else { //Write operation completed but because of error System.err.println(.Write error.); future.cause().printStacktrace(); } } });Channel是線程安全(thread-safe)的,它能夠被多個不同的線程安全的操作,在多線程環境下,全部的方法都是安全的。正由於Channel是安全的,我們存儲對Channel的引用,並在學習的時候使用它寫入數據到遠程已連接的client,使用多線程也是如此。以下的代碼是一個簡單的多線程樣例:
final Channel channel = ... //Create ByteBuf that holds data to write final ByteBuf buf = Unpooled.copiedBuffer("your data",CharsetUtil.UTF_8); //Create Runnable which writes data to channel Runnable writer = new Runnable() { @Override public void run() { channel.write(buf.duplicate()); } }; //Obtain reference to the Executor which uses threads to execute tasks Executor executor = Executors.newChachedThreadPool(); // write in one thread //Hand over write task to executor for execution in thread executor.execute(writer); // write in another thread //Hand over another write task to executor for execution in thread executor.execute(writer);此外,這樣的方法保證了寫入的消息以同樣的順序通過寫入它們的方法。
想了解全部方法的使用能夠參考Netty API文檔。
4.3 Netty包括的傳輸實現
Netty自帶了一些傳輸協議的實現,盡管沒有支持全部的傳輸協議,可是其自帶的已足夠我們來使用。Netty應用程序的傳輸協議依賴於底層協議,本節我們將學習Netty中的傳輸協議。
Netty中的傳輸方式有例如以下幾種:- NIO,io.netty.channel.socket.nio,基於java.nio.channels的工具包,使用選擇器作為基礎的方法。
- OIO,io.netty.channel.socket.oio,基於java.net的工具包,使用堵塞流。
- Local。io.netty.channel.local,用來在虛擬機之間本地通信。
- Embedded。io.netty.channel.embedded。嵌入傳輸,它同意在沒有真正網絡的運輸中使用ChannelHandler,能夠很實用的來測試ChannelHandler的實現。
4.3.1 NIO - Nonblocking I/O
NIO傳輸是眼下最經常使用的方式,它通過使用選擇器提供了全然異步的方式操作全部的I/O,NIO從Java 1.4才被提供。NIO中。我們能夠註冊一個通道或獲得某個通道的改變的狀態。通道狀態有以下幾種改變:
- 一個新的Channel被接受並已準備好
- Channel連接完畢
- Channel中有數據並已準備好讀取
- Channel發送數據出去
選擇器所支持的操作在SelectionKey中定義,詳細例如以下:
- OP_ACCEPT。有新連接時得到通知
- OP_CONNECT,連接完畢後得到通知
- OP_READ。準備好讀取數據時得到通知
- OP_WRITE,寫入數據到通道時得到通知
如前面所說,Netty隱藏內部的實現細節。將抽象出來的API暴露出來供使用,以下是處理流程圖:
NIO在處理過程也會有一定的延遲,若連接數不大的話,延遲一般在毫秒級。可是其吞吐量依舊比OIO模式的要高。Netty中的NIO傳輸是“zero-file-copy”,也就是零文件復制,這樣的機制能夠讓程序速度更快,更高效的從文件系統中傳輸內容,零復制就是我們的應用程序不會將發送的數據先拷貝到JVM堆棧在進行處理,而是直接從內核空間操作。接下來我們將討論OIO傳輸。它是堵塞的。
4.3.2 OIO - Old blocking I/O
OIO就是java中提供的Socket接口。java最開始僅僅提供了堵塞的Socket,堵塞會導致程序性能低。以下是OIO的處理流程圖。若想具體了解,能夠參閱其它相關資料。
4.3.3 Local - In VM transport
Netty包括了本地傳輸,這個傳輸實現使用同樣的API用於虛擬機之間的通信,傳輸是全然異步的。每一個Channel使用唯一的SocketAddress,client通過使用SocketAddress進行連接,在server會被註冊為長期執行,一旦通道關閉。它會自己主動註銷,client無法再使用它。
連接到本地傳輸server的行為與其它的傳輸實現差點兒是同樣的。須要註意的一個重點是僅僅能在本地的server和client上使用它們。Local未綁定不論什麽Socket,值提供JVM進程之間的通信。
4.3.4 Embedded transport
Netty還包含嵌入傳輸,與之前講述的其它傳輸實現比較。它是不是一個真的傳輸呢?若不是一個真的傳輸,我們用它能夠做什麽呢?Embedded transport同意更easy的使用不同的ChannelHandler之間的交互,這也更easy嵌入到其它的ChannelHandler實例並像一個輔助類一樣使用它們。它一般用來測試特定的ChannelHandler實現。也能夠在ChannelHandler中又一次使用一些ChannelHandler來進行擴展,為了實現這種目的。它自帶了一個詳細的Channel實現。即:EmbeddedChannel。
4.4 每種傳輸方式在什麽時候使用?
不多加贅述。看以下列表:
- OIO。在低連接數、須要低延遲時、堵塞時使用
- NIO。在高連接數時使用
- Local。在同一個JVM內通信時使用
- Embedded,測試ChannelHandler時使用
Netty In Action中文版 - 第四章:Transports(傳輸)