摘要:目前為止,我們已經(jīng)完成了一半的工作,剩下的就是在方法中啟動服務(wù)器。第一個通常被稱為,負(fù)責(zé)接收已到達的。這兩個指針恰好標(biāo)記著數(shù)據(jù)的起始終止位置。
前言
本篇翻譯自netty官方Get Start教程,一方面能把好的文章分享給各位,另一方面能鞏固所學(xué)的知識。若有錯誤和遺漏,歡迎各位指出。
https://netty.io/wiki/user-gu...
面臨的問題我們一般使用專用軟件或者是開源庫和其他系統(tǒng)通信。舉個例子,我們通常使用 http 客戶端從 web 服務(wù)器獲取信息,或者通過 web service 執(zhí)行一個 remote procedure call (遠程調(diào)用)。然而,一個通用的協(xié)議時常不具備很好的擴展性,例如我們不會使用一個通用 http 服務(wù)器去做如下類型的數(shù)據(jù)交換——大文件,電子郵件,近實時的金融數(shù)據(jù)或者是游戲數(shù)據(jù)。因此,一個高度優(yōu)化的致力于解決某些問題的通訊協(xié)議是很有必要的,例如你希望實現(xiàn)一臺優(yōu)化過的 http 服務(wù)器,致力于聊天應(yīng)用,流媒體傳輸,大文件傳輸?shù)取D闵踔量梢詾橐延行枨罅可矶ㄗ鲆粋€全新的通信協(xié)議。另一個不可避免的情況是,你必須處理一個古老的專用協(xié)議,使用他去跟遺留系統(tǒng)通信,問題是我們該如何快速實現(xiàn)協(xié)議,同時不犧牲應(yīng)用的穩(wěn)定性和性能。
解決方法The Netty project is an effort to provide an asynchronous event-driven network application framework and tooling for the rapid development of maintainable high-performance · high-scalability protocol servers and clients.
In other words, Netty is an NIO client server framework that enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server development.
"Quick and easy" does not mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences learned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.
Some users might already have found other network application framework that claims to have the same advantage, and you might want to ask what makes Netty so different from them. The answer is the philosophy it is built on. Netty is designed to give you the most comfortable experience both in terms of the API and the implementation from the day one. It is not something tangible but you will realize that this philosophy will make your life much easier as you read this guide and play with Netty.
Get Started本章使用簡單的例子帶你瀏覽 netty 的核心構(gòu)造,你快速上手。在本章過后,你就可以寫出一個基于 netty 的客戶端和服務(wù)器。如果你希望有更好的學(xué)習(xí)體驗,你可以先瀏覽 Chapter 2, Architectural Overview 后再回來本章學(xué)習(xí) (先看這里也是OK的)。
開始之前能跑通本章例子的最低要求:最新版本的 netty(4.x) 和 JDK 1.6 或以上的版本。
在閱讀時,當(dāng)你對本章中出現(xiàn)的 class 感到疑惑,請查閱他們的 api 文檔。而本章幾乎所有的 class 都會鏈接到他們的 api 文檔。如果你發(fā)現(xiàn)本章中有什么錯誤的信息、代碼語法錯誤、或者有什么好的想法,也請聯(lián)系 netty 社區(qū)通知我們。
世界上最簡單的協(xié)議并不是 hello world,而是Discard。這種協(xié)議會拋棄掉所有接收到的數(shù)據(jù),不會給客戶端任何響應(yīng),所以實現(xiàn)Discard協(xié)議唯一要做的是忽略所有接收到的數(shù)據(jù)。接下來讓我們著手寫一個 handler,用來處理I/O events(I/O事件)。
package io.netty.example.discard; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * Handles a server-side channel. */ public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1) @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2) // Discard the received data silently. ((ByteBuf) msg).release(); // (3) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
1.DiscardServerHandler 繼承ChannelInboundHandlerAdapter,并間接實現(xiàn)了ChannelInboundHandler。ChannelInboundHandler接口提供了多種 event handler method (事件處理方法),你可能要逐個實現(xiàn)接口中的方法,但直接繼承ChannelInboundHandlerAdapter會是更好的選擇。
2.這里我們重寫了channelRead(),當(dāng)有新數(shù)據(jù)到達時該方法就會被調(diào)用,并附帶接收到的數(shù)據(jù)作為方法參數(shù)。在本例中,接收到的數(shù)據(jù)類型是ByteBuf。
3.要實現(xiàn) Discard 協(xié)議,這里 handler 會忽略接收到的數(shù)據(jù)。ByteBuf作為 reference-counted (引用計數(shù)) 對象,通過調(diào)用方法release()釋放資源,請記住這個 release 動作在 handler 中完成 (原文:是handler的職責(zé))。通常,我們會像下面那樣實現(xiàn)channelRead()
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { // Do something with msg } finally { ReferenceCountUtil.release(msg); } }
4.當(dāng) netty 發(fā)生 I/O 錯誤,或者 handler 在處理 event (事件) 拋出異常時,exceptionCaught()就會被調(diào)用。大多數(shù)情況下我們應(yīng)該記錄下被捕獲的異常,并關(guān)閉與之關(guān)聯(lián)的channel(通道),但同時你也可以做一些額外的異常處理,例如在關(guān)閉連接之前,你可能會發(fā)送一條帶有錯誤代碼的響應(yīng)消息。
目前為止,我們已經(jīng)完成了一半的工作,剩下的就是在main()方法中啟動Discard服務(wù)器。
package io.netty.example.discard; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Discards any incoming data. */ public class DiscardServer { private int port; public DiscardServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new DiscardServer(port).run(); } }
NioEventLoopGroup是一個處理I/O操作的事件循環(huán)器 (其實是個線程池)。netty為不同類型的傳輸協(xié)議提供了多種NioEventLoopGroup的實現(xiàn)。在本例中我們要實現(xiàn)一個服務(wù)端應(yīng)用,并使用了兩個NioEventLoopGroup。第一個通常被稱為boss,負(fù)責(zé)接收已到達的 connection。第二個被稱作 worker,當(dāng) boss 接收到 connection 并把它注冊到 worker 后,worker 就可以處理 connection 上的數(shù)據(jù)通信。要創(chuàng)建多少個線程,這些線程如何匹配到Channel上會隨著EventLoopGroup實現(xiàn)的不同而改變,或者你可以通過構(gòu)造器去配置他們。
ServerBootstrap是用來搭建 server 的協(xié)助類。你也可以直接使用Channel搭建 server,然而這樣做步驟冗長,不是一個好的實踐,大多數(shù)情況下建議使用ServerBootstrap。
這里我們指定NioServerSocketChannel類,用來初始化一個新的Channel去接收到達的connection。
這里的 handler 會被用來處理新接收的Channel。ChannelInitializer是一個特殊的 handler,幫助開發(fā)者配置Channel,而多數(shù)情況下你會配置Channel下的ChannelPipeline,往 pipeline 添加一些 handler (例如DiscardServerHandler) 從而實現(xiàn)你的應(yīng)用邏輯。當(dāng)你的應(yīng)用變得復(fù)雜,你可能會向 pipeline 添加更多的 handler,并把這里的匿名類抽取出來作為一個多帶帶的類。
你可以給Channel配置特有的參數(shù)。這里我們寫的是 TCP/IP 服務(wù)器,所以可以配置一些 socket 選項,例如 tcpNoDeply 和 keepAlive。請參考ChannelOption和ChannelConfig文檔來獲取更多可用的 Channel 配置選項,并對此有個大概的了解。
注意到option()和childOption()了嗎?option()用來配置NioServerSocketChannel(負(fù)責(zé)接收到來的connection),而childOption()是用來配置被ServerChannel(這里是NioServerSocketChannel) 所接收的Channel
剩下的事情就是綁定端口并啟動服務(wù)器,這里我們綁定到機器的8080端口。你可以多次調(diào)用bind()(基于不同的地址)。
剛剛,你使用 netty 完成了第一個服務(wù)端程序,可喜可賀!
處理接收到的數(shù)據(jù)既然我們完成了第一個服務(wù)端程序,接下來要就要對它進行測試。最簡單的方法是使用命令行 telnet,例如在命令行輸入telnet localhost 8080,然后再輸入點別的東西。
然而我們并不知道服務(wù)端是否真的在工作,因為他是 Discard Server,我們得不到任何響應(yīng)。為了證明他真的在工作,我們讓服務(wù)端打印接收到的數(shù)據(jù)。
我們知道當(dāng)接收到數(shù)據(jù)時,channelRead()會被調(diào)用。所以讓我們加點代碼到 DiscardServerHandler 的channelRead()中:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; try { while (in.isReadable()) { // (1) System.out.print((char) in.readByte()); System.out.flush(); } } finally { ReferenceCountUtil.release(msg); // (2) } }
這步低效的循環(huán)可以替換成System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
你可以用in.release()替換這里的代碼。
如果你再運行 telnet 命令,服務(wù)端會打印出接收到的數(shù)據(jù)。
Discard Server 完整的源碼放在io.netty.example.discard這個包中。
目前為止,我們寫的服務(wù)程序消費了數(shù)據(jù)但沒有給出任何響應(yīng),而作為一臺服務(wù)器理應(yīng)要對每一個請求作出響應(yīng)。接下來讓我們實現(xiàn) ECHO 協(xié)議,學(xué)習(xí)如何響應(yīng)消息并把接收到的數(shù)據(jù)發(fā)回客戶端。
Echo Server 跟 Discard Server 唯一不同的地方在于,他把接收到的數(shù)據(jù)返回給客戶端,而不是把他們打印到控制臺。所以這里我們只需要修改channelRead()就行了:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); // (1) ctx.flush(); // (2) }
ChannelHandlerContext能觸發(fā)多種 I/O 事件和操作,這里我們調(diào)用write()方法逐字寫回接收到的數(shù)據(jù)。請注意我們并沒有釋放接收到的消息Object msg,因為在寫數(shù)據(jù)時ctx.write(msg),netty 已經(jīng)幫你釋放它了。
ctx.write()關(guān)沒有把消息寫到網(wǎng)絡(luò)上,他在內(nèi)部被緩存起來,你需要調(diào)用ctx.flush()把他刷新到網(wǎng)絡(luò)上。ctx.writeAndFlush(msg)是個更簡潔的方法。
如果再次使用命令行 telnet,你會看到服務(wù)端返回了你輸入過的東西。完整的 Echo Server 源碼放在io.netty.example.echo包下面。
寫一個 Time Server (時間服務(wù)器)我們這一小節(jié)要實現(xiàn) TIME 協(xié)議。跟前面的例子不同,Timer Server 在連接建立時 (收到請求前) 就返回一個32位 (4字節(jié)) 整數(shù),并在發(fā)送成功后關(guān)閉連接。在本例中,將會學(xué)習(xí)到如何構(gòu)造和發(fā)送一個消息,在發(fā)送完成時關(guān)閉連接。
因為要在剛建立連接時發(fā)送消息而不管后來接收到的數(shù)據(jù),這次我們不能使用channelRead(),取而代之的是channelActive方法,以下是具體實現(xiàn):
package io.netty.example.time; public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) { // (1) final ByteBuf time = ctx.alloc().buffer(4); // (2) time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture f = ctx.writeAndFlush(time); // (3) f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { assert f == future; ctx.close(); } }); // (4) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
當(dāng)連接被建立后channelActive()方法會被調(diào)用,我們在方法體中發(fā)送一個32位的代表當(dāng)前時間的整數(shù)。
要發(fā)送一個新的消息,需要分配一個新的buffer(緩沖區(qū)) 去包含這個消息。我們要寫一個32位的整數(shù),因此緩沖區(qū)ByteBuf的容量至少是4個字節(jié)。通過ChannelHandlerContext.alloc()獲取ByteBufAllocator(字節(jié)緩沖區(qū)分配器),用他來分配一個新的buffer
像往常一樣把消息寫到網(wǎng)絡(luò)上。
等一下Σ( ° △ °|||),flip()方法哪去了?還記不記得在NIO中曾經(jīng)使用過的java.nio.ByteBuffer.flip()(簡單總結(jié)就是把ByteBuffer從寫模式變成讀模式)?ByteBuf并沒有這個方法,因為他包含了兩個指針——讀指針和寫指針 (讀寫標(biāo)記,不要理解成C里的指針)。當(dāng)你往ByteBuf寫數(shù)據(jù)時,寫指針會移動而讀指針不變。這兩個指針恰好標(biāo)記著數(shù)據(jù)的起始、終止位置。
與之相反,原生 NIO 并沒有提供一個簡潔的方式去標(biāo)記數(shù)據(jù)的起始和終止位置,你必須要調(diào)用flip方法。有 時候你很可能忘記調(diào)用flip方法,導(dǎo)致發(fā)送不出數(shù)據(jù)或發(fā)送了錯誤數(shù)據(jù)。這樣的錯誤并不會發(fā)生在 netty,因為 netty 有不同的指針去應(yīng)對不同的操作 (讀寫操作),這使得編程更加簡單,因為你不再需要 flipping out (瘋狂輸出原生 NIO)
其他需要注意的是ChannelHandlerContext.write()/writeAndFlush()方法返回了ChannelFuture。ChannelFuture表示一個還沒發(fā)生的 I/O 操作。這意味著你請求的一些 I/O 操作可能還沒被處理,因為 netty 中所有的操作都是異步的。舉個例子,下面的代碼可能在消息發(fā)送之前就關(guān)閉了連接:
Channel ch = ...; ch.writeAndFlush(message); ch.close();
所以,你要在 (write()返回的)ChannelFuture完成之后再調(diào)用close()。當(dāng)write操作完成后,ChannelFuture會通知到他的listeners(監(jiān)聽器)。需加注意,close()方法可能不會立即關(guān)閉鏈接,同樣close()也會返回一個ChannelFuture
那么我們?nèi)绾沃缹懖僮魍瓿闪耍亢芎唵危灰?b>ChannelFuture注冊監(jiān)聽器 (ChannelFutureListener) 就行。這一步,我們創(chuàng)建了ChannelFutureListener的匿名類,在寫操作完成時關(guān)閉鏈接。
你也可以使用已經(jīng)定義好的監(jiān)聽器,例如這樣:
f.addListener(ChannelFutureListener.CLOSE);
為了測試 Time server 是否如期工作,你可以使用 unix 的命令行:
$ rdate -o寫一個 Time Client (時間客戶端)-p
跟 DISCARD 和 ECHO 服務(wù)器不同,我們要寫一個客戶端程序應(yīng)對 TIME 協(xié)議,因為你無法把一個32位整數(shù)翻譯成日期。本節(jié)中,我們確保服務(wù)端正常工作,并學(xué)習(xí)如何使用 netty 寫一個客戶端程序。
netty 客戶端和服務(wù)器最大的不同在于,客戶端使用了不同的Bootstrap和Channel實現(xiàn)類。請看下面的例子:
package io.netty.example.time; public class TimeClient { public static void main(String[] args) throws Exception { String host = args[0]; int port = Integer.parseInt(args[1]); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); // (1) b.group(workerGroup); // (2) b.channel(NioSocketChannel.class); // (3) b.option(ChannelOption.SO_KEEPALIVE, true); // (4) b.handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); // (5) // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
Bootstrap跟ServerBootstrap相似,但他是作用在客戶端或無連接模式的 Channel (通道)。
如果你只定義了一個EventLoopGroup,他會同時作為 boss group 和 worker group,雖然客戶端并沒有 boss worker 這個概念。
這里使用NioSocketChannel而不是NioServerSocketChannel。NioSocketChannel會被用來創(chuàng)建客戶端Channel
跟ServerBootstrap不同,這里我們沒有使用childOption(),因為客戶端的SocketChannel沒有父Channel
我們使用connect()代替bind()方法。
正如你所見,客戶端代碼跟服務(wù)端代碼沒有很大的區(qū)別。那么接下來就是實現(xiàn)ChannelHandler,他會從服務(wù)端接收一個32位整數(shù),翻譯成可讀的日期格式并打印出來,最后關(guān)閉連接:
package io.netty.example.time; import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; // (1) try { long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } finally { m.release(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
處理TCP/IP時, netty 把讀取的數(shù)據(jù)放到ByteBuf
看起來非常簡單,跟服務(wù)端沒有多大區(qū)別。然而有時候 handler 會發(fā)生錯誤,例如拋出異常IndexOutOfBoundsException,在下一章節(jié)我們會作具體討論。
基于流的傳輸協(xié)議 關(guān)于Socket緩沖區(qū)的小警告像TCP/IP這種基于流的傳輸協(xié)議,接收的數(shù)據(jù)會存儲到socket緩沖區(qū)。不幸的是,這類緩沖區(qū)不是數(shù)據(jù)包隊列,而是字節(jié)流隊列。這意味著,即使你想發(fā)送兩個消息并打包成兩個數(shù)據(jù)包,操作系統(tǒng)只會把他們當(dāng)作一連串字節(jié)。因此,這不能保證你讀到的數(shù)據(jù)恰好是遠程發(fā)送端寫出的數(shù)據(jù)。舉個例子,假設(shè)操作系統(tǒng)TCP/IP棧收到三個數(shù)據(jù)包:
因為這種流式協(xié)議的特性,應(yīng)用程序很有可能像下圖的方式那樣讀取數(shù)據(jù)碎片:
所以,作為接收端 (不管是服務(wù)端還是客戶端),應(yīng)把接收到的數(shù)據(jù) (字節(jié)流) 整理成一個或多個易于理解的數(shù)據(jù)貞。對于上述的例子,整理如下:
讓我們回到 TIME Client 這個例子。一32位整數(shù)的數(shù)據(jù)量非常小,在本例中不應(yīng)用被分割。然而,問題在于他確實有可能被分割,可能性隨著通信數(shù)據(jù)量的增大而增大。
一個簡單的方法是創(chuàng)建一個內(nèi)部的cumulative buffer(累積緩沖區(qū)),等待數(shù)據(jù)直到接收到4個字節(jié)為止。下面是修改過的TimeClientHandler:
package io.netty.example.time; import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { private ByteBuf buf; @Override public void handlerAdded(ChannelHandlerContext ctx) { buf = ctx.alloc().buffer(4); // (1) } @Override public void handlerRemoved(ChannelHandlerContext ctx) { buf.release(); // (1) buf = null; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; buf.writeBytes(m); // (2) m.release(); if (buf.readableBytes() >= 4) { // (3) long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
ChannelHandler有兩個與生命周期有關(guān)的監(jiān)聽方法:handlerAdded()和handlerRemove()。你可以在里面執(zhí)行任意的初始化或析構(gòu)任務(wù),只要他們不會阻塞程序很長時間。
首先,所有接收的數(shù)據(jù)被累積到緩沖區(qū)。
其次,handler 檢查緩沖區(qū)buf是否接收到足夠的數(shù)據(jù) (4個字節(jié)),若是,則進行實際業(yè)務(wù)處理。否則當(dāng)有更多數(shù)據(jù)到達時,netty 會再次調(diào)用channelRead(),直到緩沖區(qū)累積到4個字節(jié)。
解決方案二雖然方案一解決了問題,但修改過的 handler 看上去不是那么簡潔。想像一下協(xié)議變得更為復(fù)雜,例如包含多個可變長字段,你的ChannelInboundHandler很快會變得不可維護。
你可能會注意到,可以向ChannelPipeline添加多個ChannelHandler。所以,你可以把一個龐大復(fù)雜的ChannelHandler分割成多個小模塊,從而減小應(yīng)用的復(fù)雜性。舉個例子,你可以把TimeClientHandler分割成兩個handler:
處理數(shù)據(jù)碎片的TimeDecoder
最初始的簡單版本的TimeClientHandler
幸運的是,netty 提供了一個可擴展的父類,幫助你書寫TimeDecoder
package io.netty.example.time; public class TimeDecoder extends ByteToMessageDecoder { // (1) @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List
ByteToMessageDecoder實現(xiàn)了ChannelInboundHandler,使你更容易去處理數(shù)據(jù)碎片。
當(dāng)有新數(shù)據(jù)到達時,ByteToMessageDecoder調(diào)用decode()方法并維護了一個內(nèi)部的cumulative buffer(累積緩沖區(qū))
累積緩沖區(qū)數(shù)據(jù)不足時,decode()方法不會添加任何東西到 out 列表。當(dāng)有更多數(shù)據(jù)到達時,ByteToMessageDecoder會再次調(diào)用decode()方法。
如果decode()方法向 out 列表添加了一個對象,這表示decoder(解碼器) 成功解析了一個消息。ByteToMessageDecoder會拋棄掉cumulative buffer(累積緩沖區(qū))中已讀數(shù)據(jù)。請記住,你不需要去解析多個消息,因為ByteToMessageDecoder會持續(xù)調(diào)用decode(),直到他沒有往 out 列表添加對象。
既然希望往ChannelPipeline添加其他 handler (上面的TimeDecoder),我們要修改TimeClient中的ChannelInitializer:
b.handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler()); } });
如果你充滿冒險精神,你可以嘗試使用ReplayingDecoder,他會使代碼更加簡潔:
public class TimeDecoder extends ReplayingDecoder{ @Override protected void decode( ChannelHandlerContext ctx, ByteBuf in, List
此外,netty 提供了很多開箱即用的decoder,他們已經(jīng)實現(xiàn)了大多數(shù)的網(wǎng)絡(luò)協(xié)議,避免你自己去實現(xiàn)一個龐大的難以維護的handler。請參考下面的包獲取更多詳細(xì)例子:
io.netty.example.factorial 二進制協(xié)議
io.netty.example.telnet 文本協(xié)議
使用 POJO 代替 ByteBuf上面所有例子都使用了ByteBuf作為協(xié)議中基本的數(shù)據(jù)結(jié)構(gòu)。在本小節(jié),我們將要升級 TIME 協(xié)議中的客戶端和服務(wù)端,使用 POJO 代替ByteBuf
使用 POJO 的優(yōu)勢是顯而易見的:你的 handler 變得易于維護和可重用,通過把 (從ByteBuf中抽取信息的) 代碼分離出來。在 TIME 協(xié)議的例子里,我們僅僅讀取一個32位的整數(shù),直接使用ByteBuf并不會有太大問題。然而,當(dāng)實現(xiàn)一個真實的網(wǎng)絡(luò)協(xié)議時,你會發(fā)現(xiàn)做代碼分離很有必要。
首先,讓我們定義一個新的類型UnixTime:
package io.netty.example.time; import java.util.Date; public class UnixTime { private final long value; public UnixTime() { this(System.currentTimeMillis() / 1000L + 2208988800L); } public UnixTime(long value) { this.value = value; } public long value() { return value; } @Override public String toString() { return new Date((value() - 2208988800L) * 1000L).toString(); } }
現(xiàn)在修改TimeDecoder,讓他向out列表添加一個UnixTime而不是ByteBuf
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List
既然修改了TimeDecoder,TimeClientHandler也不能再使用ByteBuf了:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { UnixTime m = (UnixTime) msg; System.out.println(m); ctx.close(); }
是不是更加簡單、優(yōu)雅?相同的竅門同樣能使用在服務(wù)端。這次讓我們先修改TimeServerHandler:
@Override public void channelActive(ChannelHandlerContext ctx) { ChannelFuture f = ctx.writeAndFlush(new UnixTime()); f.addListener(ChannelFutureListener.CLOSE); }
現(xiàn)在只剩下encoder(編碼器),他需要實現(xiàn)ChannelOutboundHandler,把UnixTime翻譯回ByteBuf。這里比書寫decoder更加簡單,因為我們不再需要處理數(shù)據(jù)包碎片并把他們組裝起來了。
package io.netty.example.time; public class TimeEncoder extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { UnixTime m = (UnixTime) msg; ByteBuf encoded = ctx.alloc().buffer(4); encoded.writeInt((int)m.value()); ctx.write(encoded, promise); // (1) } }
這一行代碼有幾個重要的點:
首先,我們把參數(shù)中ChannelPromise傳遞到write(),以便 netty 把他標(biāo)記為成功或失敗 (當(dāng)數(shù)據(jù)真正寫到網(wǎng)絡(luò)時)。
其次,我們沒有調(diào)用ctx.flush(),因為ChannelOutboundHandlerAdapter中有一個多帶帶的方法void flush(ChannelHandlerContext ctx)專門用來處理flush操作。
你可以使用MessageToByteEncoder更加地簡化代碼:
public class TimeEncoder extends MessageToByteEncoder{ @Override protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) { out.writeInt((int)msg.value()); } }
最后一步就是把TimeEncoder添加到服務(wù)端ChannelPipeline,留作練習(xí)。
關(guān)閉你的應(yīng)用關(guān)閉netty應(yīng)用很簡單——通過shutdownGracefully()去關(guān)閉所有創(chuàng)建的EventLoopGroup。他返回一個Future去通知你什么時候EventLoopGroup和他從屬的 Channel 已經(jīng)完全關(guān)閉了。
總結(jié)本章,我們快速瀏覽了 netty,使用他書寫了一個可用的網(wǎng)絡(luò)應(yīng)用。接下來的章節(jié)中會介紹更多關(guān)于 netty 的詳細(xì)資料,我們也希望你去重溫io.netty.example package包中的例子。netty 社區(qū)的大門會向你敞開,你可以向社區(qū)提出問題和意見,您的的反饋會幫助netty項目變得更加完善。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77054.html
摘要:的選擇器允許單個線程監(jiān)視多個輸入通道。一旦執(zhí)行的線程已經(jīng)超過讀取代碼中的某個數(shù)據(jù)片段,該線程就不會在數(shù)據(jù)中向后移動通常不會。 1、引言 很多初涉網(wǎng)絡(luò)編程的程序員,在研究Java NIO(即異步IO)和經(jīng)典IO(也就是常說的阻塞式IO)的API時,很快就會發(fā)現(xiàn)一個問題:我什么時候應(yīng)該使用經(jīng)典IO,什么時候應(yīng)該使用NIO? 在本文中,將嘗試用簡明扼要的文字,闡明Java NIO和經(jīng)典IO之...
摘要:響應(yīng)式編程是基于異步和事件驅(qū)動的非阻塞程序,只是垂直通過在內(nèi)啟動少量線程擴展,而不是水平通過集群擴展。三特性常用的生產(chǎn)的特性如下響應(yīng)式編程模型適用性內(nèi)嵌容器組件還有對日志消息測試及擴展等支持。 摘要: 原創(chuàng)出處 https://www.bysocket.com 「公眾號:泥瓦匠BYSocket 」歡迎關(guān)注和轉(zhuǎn)載,保留摘要,謝謝! 02:WebFlux 快速入門實踐 文章工程: JDK...
摘要:個高級多線程面試題及回答后端掘金在任何面試當(dāng)中多線程和并發(fā)方面的問題都是必不可少的一部分。目前在生產(chǎn)環(huán)基于的技術(shù)問答網(wǎng)站系統(tǒng)實現(xiàn)后端掘金這一篇博客將詳細(xì)介紹一個基于的問答網(wǎng)站的實現(xiàn),有詳細(xì)的代碼。 15 個高級 Java 多線程面試題及回答 - 后端 - 掘金在任何Java面試當(dāng)中多線程和并發(fā)方面的問題都是必不可少的一部分。如果你想獲得任何股票投資銀行的前臺資訊職位,那么你應(yīng)該準(zhǔn)備很多...
摘要:個高級多線程面試題及回答后端掘金在任何面試當(dāng)中多線程和并發(fā)方面的問題都是必不可少的一部分。目前在生產(chǎn)環(huán)基于的技術(shù)問答網(wǎng)站系統(tǒng)實現(xiàn)后端掘金這一篇博客將詳細(xì)介紹一個基于的問答網(wǎng)站的實現(xiàn),有詳細(xì)的代碼。 15 個高級 Java 多線程面試題及回答 - 后端 - 掘金在任何Java面試當(dāng)中多線程和并發(fā)方面的問題都是必不可少的一部分。如果你想獲得任何股票投資銀行的前臺資訊職位,那么你應(yīng)該準(zhǔn)備很多...
摘要:完成客戶端服務(wù)器通信,需要基于協(xié)議之上,自定義一套簡單的通信協(xié)議,其中數(shù)據(jù)交換方式需要使用自定義幀。輸入數(shù)據(jù)處理器以下為輸入數(shù)據(jù)的第一個處理器,可以保證無論幀經(jīng)歷怎樣的粘包拆包,均可以準(zhǔn)確提取每一個自定義幀的數(shù)據(jù)部分。 「博客搬家」 原地址: 簡書 原發(fā)表時間: 2017-03-26 本文采用 Netty 這一最流行的 Java NIO 框架,作為 Java 服務(wù)器通信部分的基礎(chǔ)...
閱讀 3695·2021-11-19 09:56
閱讀 1476·2021-09-22 15:11
閱讀 1136·2019-08-30 15:55
閱讀 3382·2019-08-29 14:02
閱讀 2922·2019-08-29 11:07
閱讀 442·2019-08-28 17:52
閱讀 3180·2019-08-26 13:59
閱讀 445·2019-08-26 13:53