摘要:但是它不是自己創建線程,而是從調用構造方法時指定的線程池中獲取線程。這就意味著,即使發送兩個獨立的消息,操作系統會把他們視為一個字節串。釋放過程很簡單,調用它的方法,所有相關的和線程池將會自動關閉。
簡單找了下發現網上沒有關于Netty3比較完整的源碼解析的文章,于是我就去讀官方文檔,為了加強記憶,翻譯成了中文,有適當的簡化。
原文檔地址:Netty3文檔
Chapter 1 開始 1、開始之前運行demo的前提有兩個:最新版本的Netty3和JDK1.5以上
2、寫一個Discard Server最簡單的協議就是Discard協議——忽略所有接收到的數據并且不作任何響應。我們從Netty處理I/O事件的handler實現開始:
public class DiscardServerHandler extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); Channel ch = e.getChannel(); ch.close(); } }
DiscardServerHandler 繼承SimpleChannelHandler——ChannelHandler的一個實現;
messageReceived方法接收MessageEvent類型的參數,它包含接收的客戶端數據;
exceptionCaught方法在出現I/O錯誤或者處理事件時拋出錯誤時被調用,通常包含記錄錯誤信息和關閉通道的動作;
接下來寫一個main方法來開啟使用DiscardServerHandler的服務:
public class DiscardServer { public static void main(String[] args) throws Exception { ChannelFactory factory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ServerBootstrap bootstrap = new ServerBootstrap(factory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { return Channels.pipeline(new DiscardServerHandler()); } }); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); bootstrap.bind(new InetSocketAddress(8080)); } }
ChannelFactory是創建和管理Channel及其關聯資源的工廠,它負責處理所有I/O請求并且執行I/O生成ChannelEvent。但是它不是自己創建I/O線程,而是從調用構造方法時指定的線程池中獲取線程。服務端應用使用NioServerSocketChannelFactory;
ServerBootstrap是一個設置服務端的幫助類;
當服務端接收到一個新的連接,指定的ChannelPipelineFactory就會創建一個新的ChannelPipeline,這個新的Pipeline包含一個DiscardServerHandler對象;
你可以給Channel實現設置具體的參數,選項帶"child."前綴代表應用在接收到的Channel上而不是服務端本身的ServerSocketChannel;
剩下的就是綁定端口啟動服務,可以綁定多個不同的端口。
3、處理接收到的數據我們可以通過"telnet localhost 8080"命令去測試服務,但因為是Discard服務,我們都不知道服務是否正常工作。所以我們修改下服務,讓它打印出接收到的數據。
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); while(buf.readable()) { System.out.println((char) buf.readByte()); System.out.flush(); } }
ChannelBuffer是Netty基本的存儲字節的數據結構,跟NIO的ByteBuffer類似,但是更容易使用更靈活。比如Netty允許你在盡量少的內存復制次數的情況下把多個ChannelBuffer組合成一個。
4、寫一個Echo服務一個服務通常對請求是有響應的。接下來我們嘗試寫一個實現Echo協議——將接收的數據原路返回給客戶端的服務:
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { Channel ch = e.getChannel(); ch.write(e.getMessage()); }
MessageEvent繼承了ChannnelEvent,一個ChannnelEvent持有它相關的Channel的引用。我們可以獲取這個Channel然后調用寫方法寫入數據返回給客戶端。
5、寫一個時間服務這次我們實現一個時間協議——在不需要任何請求數據的情況下返回一個32位整型數字并且在發送之后關閉連接。因為我們忽略請求數據,只需要在連接建立的發送消息,所以這次不能使用messageReceived方法而是重寫channelConnected方法:
@Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { Channel ch = e.getChannel(); ChannelBuffer time = ChannelBuffers.buffer(4); time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); ChannelFuture f = ch.write(time); f.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { Channel ch = future.getChannel(); ch.close(); } }); }
channelConnected方法在連接建立的時間被調用,然后我們寫入一個32位整型數字代表以秒為單位的當前時間;
我們使用ChannelBuffers工具類分配了一個容量為4字節的ChannelBuffer來存放這個32位整型數字;
然后我們把ChannelBuffer寫入Channel...等一下,flip方法哪里去了?在NIO中我們不是要在寫入通道前調用ByteBuffer的flip方法的嗎?ChannelBuffer沒有這個方法,因為它有兩個指針,一個用于讀操作一個用于寫操作。當數據寫入ChannelBuffer時寫索引增加而讀索引不變。讀索引和寫索引相互獨立。對比之下,Netty的ChannelBuffer比NIO的buffer更容易使用。
另外需要注意的一點是ChannelBuffer的write方法返回的是一個ChannelFuture對象。它表示一個還未發生的I/O操作,因為Netty中所有操作都是異步的。所以我們必須在ChannelFuture收到操作完成的通知之后才能關閉Channel。哦,對了,close方法也是返回ChannelFuture...
那么問題來了,我們如何得到操作完成的通知呢?只需要簡單得向返回的ChannelFuture對象中添加一個ChannelFutureListener,這里我們創建了一個ChannelFutureListener的匿名內部類,它在操作完成的時候會關閉Channel。
6、寫一個時間客戶端我們還需要一個遵守時間協議,即能把整型數字翻譯成日期的客戶端。Netty服務端和客戶端唯一的區別就是要求不同的Bootstrap和ChannelFactory:
public static void main(String[] args) throws Exception { String host = args[0]; int port = Integer.parseInt(args[1]); ChannelFactory factory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ClientBootstrap bootstrap = new ClientBootstrap(factory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { return Channels.pipeline(new TimeClientHandler()); } }); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("keepAlive", true); bootstrap.connect(new InetSocketAddress(host, port)); }
NioClientSocketChannelFactory,用來創建一個客戶端Channel;
ClientBootstrap是ServerBootStrap在客戶端的對應部分;
需要注意的是設置參數時不需要"child."前綴,客戶端SocketChannel沒有父Channel;
對應服務端的bind方法,這里我們需要調用connect方法。
另外我們需要一個ChannelHandler實現,負責把接收到服務端返回的32位整型數字翻譯成日期并打印出來,然后斷開連接:
public class TimeClientHandler extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); long currentTimeMillis = buf.readInt() * 1000L; System.out.println(new Date(currentTimeMillis)); e.getChannel().close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); e.getChannel().close(); } }
看上去很簡單是吧?但是實際運行過程中這個handler有時會拋出一個IndexOutOfBoundsException。下一節我們會討論為什么會這樣。
7、處理基于流的傳輸 7.1、一個關于Socket Buffer的小警告在像TCP/IP那樣基于流的傳輸中,接收數據保存在一個socket接收緩存中。但是這個緩存不是一個以包為單位的隊列,而是一個以字節為單位的隊列。這就意味著,即使發送兩個獨立的消息,操作系統會把他們視為一個字節串。因此,不能保證你讀到的和另一端寫入的一樣。所以,不管是客戶端還是服務端,對于接收到的數據都需要整理成符合應用程序邏輯的結構。
7.2、第一種解決方式回到前面的時間客戶端的問題,32位整型數字很小,但是它也是可以拆分的,特別是當流量上升的時候,被拆分的可能性也隨之上升。
一個簡單的處理方式就是內部創建一個累計的緩存,直到接收滿4個字節才進行處理。
private final ChannelBuffer buf = dynamicBuffer(); @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer m = (ChannelBuffer) e.getMessage(); buf.writeBytes(m); if (buf.readableBytes() >= 4) { long currentTimeMillis = buf.readInt() * 1000L; System.out.println(new Date(currentTimeMillis)); e.getChannel().close(); } }
ChannelBuffers.dynamicBuffer()返回一個自動擴容的ChannelBuffer;
所有接收的數據都累積到這個動態緩存中;
handler需要檢查緩存是否滿4個字節,是的話才能繼續業務邏輯;否則,Netty會在數據繼續到達之后持續調用messageReceive。
7.3、第二種解決方案第一種方案有很多問題,比如一個復雜的協議,由多個可變長度的域組成,這種情況下第一種方案的handler就無法支持了。
你會發現你可以添加多個ChannelHandler到ChannelPipeline中,利用這個特性,你可以把一個臃腫的ChannelHandler拆分到多個模塊化的ChannelHandler中,這樣可以降低應用程序的復雜度。比如,你可以把TimeClientHandler拆分成兩個handler:
TimeDecoder,負責分段問題;
最初那個簡版的TimeClientHandler.
Netty提供了可擴展的類幫助你實現TimeDecoder:
public class TimeDecoder extends FrameDecoder { @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) { if (buffer.readableBytes() < 4) { return null; } return buffer.readBytes(4); } }
FrameDecoder是ChannelHandler的一種實現,專門用來處理分段問題;
FrameDecoder在每次接收到新的數據時調用decode方法,攜帶一個內部維持的累積緩存;
如果返回null,說明目前數據接收的還不夠,當數據量足夠時FrameDecoder會再次調用方法;
如果返回非null對象,代表解碼成功,FrameDecoder會丟棄累積緩存中剩余的數據。你無需提供批量解碼,FrameDecoder會繼續調用decode方法直到返回null。
拆分之后,我們需要修改TimeClient的ChannelPipelineFactory實現:
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { return Channels.pipeline( new TimeDecoder(), new TimeClientHandler()); } });
Netty還提供了進一步簡化解碼的ReplayingDecoder:
public class TimeDecoder extends ReplayingDecoder{ @Override protected Object decode( ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, VoidEnum state) { return buffer.readBytes(4); } }
此外,Netty提供了一批開箱即用的解碼器,讓你可以簡單得實現大多數協議:
org.jboss.netty.example.factorial 用于二進制協議;
org.jboss.netty.example.telnet 用于基于行的文本協議.
8、用POJO替代ChannelBuffer上面的demo我們都是用ChannelBuffer作為協議化消息的基本數據結構,這一節我們用POJO替代ChannelBuffer。將從ChannelBuffer提取信息的代碼跟handler分離開,會使handler變得更加可維護的和可重用的。從上面的demo里不容易看出這個優勢,但是實際應用中分離很有必要。
首先,我們定義一個類型UnixTime:
public class UnixTime { private final int value; public UnixTime(int value) { this.value = value; } public int getValue() { return value; } @Override public String toString() { return new Date(value * 1000L).toString(); } }
現在我們可以修改TimeDecoder讓它返回一個UnixTime而不是ChannelBuffer:
@Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) { if (buffer.readableBytes() < 4) { return null; } return new UnixTime(buffer.readInt()); }
編碼器改了,那么相應的TimeClientHandler就不會繼續使用ChannelBuffer:
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { UnixTime m = (UnixTime) e.getMessage(); System.out.println(m); e.getChannel().close(); }
同樣的技術也可以應用到服務端的TimeServerHandler上:
@Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { UnixTime time = new UnixTime((int)(System.currentTimeMillis() / 1000)); ChannelFuture f = e.getChannel().write(time); f.addListener(ChannelFutureListener.CLOSE); }
能這樣運用的前提是有一個編碼器,可以把UnixTime對象翻譯成ChannelBuffer:
public class TimeEncoder extends SimpleChannelHandler { public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) { UnixTime time = (UnixTime) e.getMessage(); ChannelBuffer buf = buffer(4); buf.writeInt(time.getValue()); Channels.write(ctx, e.getFuture(), buf); } }
一個編碼器重寫writeRequested方法攔截一個寫請求。這里需要注意的一點是,盡管這里的writeRequested方法參數里也有一個MessageEvent對象,客戶端TimeClientHandler的messageReceived的參數里也有一個,但是它們的解讀是完全不同的。一個ChannelEvent可以是upstream也可以是downstream事件,這取決于事件的流向。messageReceived方法里的MessageEvent是一個upstream事件,而writeRequested方法里的是downstream事件。
當把POJO類轉化為ChannelBuffer后,你需要把ChannelBuffer轉發到之前在ChannelPipeline內的ChannelDownstreamHandler,也就是TimeServerHandler。Channels提供了多個幫助方法創建和發送ChanenlEvent。
同樣,TimeEncoder也需要加入到服務端的ChannelPipeline中:
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { return Channels.pipeline( new TimeServerHandler(), new TimeEncoder()); } });9、關閉你的應用程序
為了關閉I/O線程讓應用程序優雅得退出,我們需要釋放ChannelFactory分配的資源。
一個典型網絡應用程序的關閉過程分為三步:
關閉所有服務端socket連接;
關閉所有非服務端socket連接(包括客戶端socket和服務端接收到的socket);
釋放ChannelFactory使用的所有資源。
應用到TimeClient上:
ChannelFuture future = bootstrap.connect(...); future.awaitUninterruptibly(); if (!future.isSuccess()) { future.getCause().printStackTrace(); } future.getChannel().getCloseFuture().awaitUninterruptibly(); factory.releaseExternalResources();
CilentBootStrap的connect方法返回一個ChannelFuture,當連接嘗試成功或者失敗時會通知到ChannelFuture。它還持有連接嘗試關聯的Channel的引用;
ChannelFuture.awaitUninterruptibly()等待ChannelFuture確定連接是否嘗試成功;
如果連接失敗,我們打印出失敗的原因。ChannelFuture.getCause()會在連接即沒有成功也沒有取消的情況下返回失敗的原因;
連接嘗試的情況處理之后,我們還需要等待連接關閉。每個Channel有它自己的closeFuture,用來通知你連接關閉然后你可以針對關閉做一些動作。即使連接嘗試失敗了,closeFuture仍然會被通知,因為Channel會在連接失敗后自動關閉;
所有連接關閉之后,剩下的就是釋放ChannelFactory使用的資源了。釋放過程很簡單,調用它的releaseExternalResources方法,所有相關的NIO Selector和線程池將會自動關閉。
關閉一個客戶端很簡單,那服務端呢?你需要從端口解綁并關閉所有接收到的連接。前提是你需要一個保持跟蹤活躍連接的數據結構,Netty提供了ChannelGroup。
ChannelGroup是Java集合API一個特殊的的擴展,它代表一組打開的Channel。如果一個Channel被添加到ChannelGroup,然后這個Channel被關閉了,它會從ChannelGroup中自動移除。你可以對同一ChannelGroup中的Channel做批量操作,比如在關閉服務的時候關閉所有Channel。
要跟蹤打開的socket,你需要修改TimeServerHandler,把新打開的Channel添加到全局的ChannelGroup變量中。ChannelGroup是線程安全的。
@Override public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) { TimeServer.allChannels.add(e.getChannel()); }
現在我們自動維持了一個包含所有活躍Channel的列表,關閉服務端就像關閉客戶端一樣容易了。
public class TimeServer { static final ChannelGroup allChannels = new DefaultChannelGroup("time-server"); public static void main(String[] args) throws Exception { ... ChannelFactory factory = ...; ... ServerBootstrap bootstrap = ...; ... Channel channel = bootstrap.bind(new InetSocketAddress(8080)); allChannels.add(channel); waitForShutdownCommand(); ChannelGroupFuture future = allChannels.close(); future.awaitUninterruptibly(); factory.releaseExternalResources(); } }
DefaultChannelGroup構造方法接收組名為參數,組名是它的唯一標識;
ServerBootstrap的bind方法返回一個服務端的綁定指定本地地址的Channel,調用Channel的close方法將會使它與本地地址解綁;
所有Channel類型都可以被添加到ChannelGroup中,不管是客戶端、服務端或是服務端接收的。因為你可以在服務器關閉時同時關閉綁定的Channel和接收到的Channel;
waitForShutdownCommand()是一個等待關閉信號的虛構方法。
我們可以對ChannelGroup中的Channel進行統一操作,這里我們調用close方法,相當于解綁服務端Channel并且異步關閉所有接收到的Channel。close方法返回一個功能和ChannelFuture相近的ChannelGroupFuture,在所有連接都成功關閉通知我們。
10、總結這一節我們快速瀏覽了Netty,示范了如何用Netty寫一個能正常工作的網絡應用。
下一節將介紹Netty的更多細節。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/74215.html
摘要:豐富的緩存數據結構使用它自己的緩存來表示字節序列而不是的。針對有一個定義良好的事件模型。有一些協議是多層的建立在其他低級協議基礎上。此外,甚至不是完全線程安全的。協議由標準化為。協議緩存整合是一個高效二進制協議的快速實現。 Chapter 2、結構概覽 這一節我們將確認Netty提供的核心功能是什么,以及它們怎么構成一個完整的網絡應用開發堆棧。 1、豐富的緩存數據結構 Netty使用它...
摘要:的選擇器允許單個線程監視多個輸入通道。一旦執行的線程已經超過讀取代碼中的某個數據片段,該線程就不會在數據中向后移動通常不會。 1、引言 很多初涉網絡編程的程序員,在研究Java NIO(即異步IO)和經典IO(也就是常說的阻塞式IO)的API時,很快就會發現一個問題:我什么時候應該使用經典IO,什么時候應該使用NIO? 在本文中,將嘗試用簡明扼要的文字,闡明Java NIO和經典IO之...
摘要:英文全名為,也叫遠程過程調用,其實就是一個計算機通信協議,它是一種通過網絡從遠程計算機程序上請求服務而不需要了解底層網絡技術的協議。 Hello,Dubbo 你好,dubbo,初次見面,我想和你交個朋友。 Dubbo你到底是什么? 先給出一套官方的說法:Apache Dubbo是一款高性能、輕量級基于Java的RPC開源框架。 那么什么是RPC? 文檔地址:http://dubbo.a...
摘要:算法序和年的論文提出了一種定時輪的方式來管理和維護大量的調度算法內核中的定時器采用的就是這個方案。使用實例每一次的時間間隔每一次就會到達下一個槽位輪中的數源碼解讀之時間輪算法實現定時輪算法細說延時任務的處理定時器的實現 HashedWheelTimer算法 序 George Varghese 和 Tony Lauck 1996 年的論文:Hashed and Hierarchical ...
閱讀 2702·2021-11-08 13:16
閱讀 2380·2021-10-18 13:30
閱讀 2252·2021-09-27 13:35
閱讀 2005·2019-08-30 15:55
閱讀 2455·2019-08-30 13:22
閱讀 596·2019-08-30 11:24
閱讀 2089·2019-08-29 12:33
閱讀 1823·2019-08-26 12:10