摘要:目錄源碼分析之番外篇的前生今世的前生今世之一簡介的前生今世之二小結(jié)的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環(huán)境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端源碼分析之一揭開神秘的紅蓋頭服務(wù)器
目錄
Netty 源碼分析之 番外篇 Java NIO 的前生今世
Java NIO 的前生今世 之一 簡介
Java NIO 的前生今世 之二 NIO Channel 小結(jié)
Java NIO 的前生今世 之三 NIO Buffer 詳解
Java NIO 的前生今世 之四 NIO Selector 詳解
Netty 源碼分析之 零 磨刀不誤砍柴工 源碼分析環(huán)境搭建
Netty 源碼分析之 一 揭開 Bootstrap 神秘的紅蓋頭
Netty 源碼分析之 一 揭開 Bootstrap 神秘的紅蓋頭 (客戶端)
Netty 源碼分析之 一 揭開 Bootstrap 神秘的紅蓋頭 (服務(wù)器端)
簡述這一章是 Netty 源碼分析系列 的第一章, 我打算在這一章中, 展示一下 Netty 的客戶端和服務(wù)端的初始化和啟動的流程, 給讀者一個對 Netty 源碼有一個大致的框架上的認(rèn)識, 而不會深入每個功能模塊.
本章會從 Bootstrap/ServerBootstrap 類 入手, 分析 Netty 程序的初始化和啟動的流程.
Bootstrap 是 Netty 提供的一個便利的工廠類, 我們可以通過它來完成 Netty 的客戶端或服務(wù)器端的 Netty 初始化.
下面我以 Netty 源碼例子中的 Echo 服務(wù)器作為例子, 從客戶端和服務(wù)器端分別分析一下Netty 的程序是如何啟動的.
首先, 讓我們從客戶端方面的代碼開始
下面是源碼example/src/main/java/io/netty/example/echo/EchoClient.java 的客戶端部分的啟動代碼:
EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new EchoClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(HOST, PORT).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); }
從上面的客戶端代碼雖然簡單, 但是卻展示了 Netty 客戶端初始化時(shí)所需的所有內(nèi)容:
EventLoopGroup: 不論是服務(wù)器端還是客戶端, 都必須指定 EventLoopGroup. 在這個例子中, 指定了 NioEventLoopGroup, 表示一個 NIO 的EventLoopGroup.
ChannelType: 指定 Channel 的類型. 因?yàn)槭强蛻舳? 因此使用了 NioSocketChannel.
Handler: 設(shè)置數(shù)據(jù)的處理器.
下面我們深入代碼, 看一下客戶端通過 Bootstrap 啟動后, 都做了哪些工作.
NioSocketChannel 的初始化過程在 Netty 中, Channel 是一個 Socket 的抽象, 它為用戶提供了關(guān)于 Socket 狀態(tài)(是否是連接還是斷開) 以及對 Socket 的讀寫等操作. 每當(dāng) Netty 建立了一個連接后, 都會有一個對應(yīng)的 Channel 實(shí)例.
NioSocketChannel 的類層次結(jié)構(gòu)如下:
這一小節(jié)我們著重分析一下 Channel 的初始化過程.
ChannelFactory 和 Channel 類型的確定除了 TCP 協(xié)議以外, Netty 還支持很多其他的連接協(xié)議, 并且每種協(xié)議還有 NIO(異步 IO) 和 OIO(Old-IO, 即傳統(tǒng)的阻塞 IO) 版本的區(qū)別. 不同協(xié)議不同的阻塞類型的連接都有不同的 Channel 類型與之對應(yīng)下面是一些常用的 Channel 類型:
NioSocketChannel, 代表異步的客戶端 TCP Socket 連接.
NioServerSocketChannel, 異步的服務(wù)器端 TCP Socket 連接.
NioDatagramChannel, 異步的 UDP 連接
NioSctpChannel, 異步的客戶端 Sctp 連接.
NioSctpServerChannel, 異步的 Sctp 服務(wù)器端連接.
OioSocketChannel, 同步的客戶端 TCP Socket 連接.
OioServerSocketChannel, 同步的服務(wù)器端 TCP Socket 連接.
OioDatagramChannel, 同步的 UDP 連接
OioSctpChannel, 同步的 Sctp 服務(wù)器端連接.
OioSctpServerChannel, 同步的客戶端 TCP Socket 連接.
那么我們是如何設(shè)置所需要的 Channel 的類型的呢? 答案是 channel() 方法的調(diào)用.
回想一下我們在客戶端連接代碼的初始化 Bootstrap 中, 會調(diào)用 channel() 方法, 傳入 NioSocketChannel.class, 這個方法其實(shí)就是初始化了一個 BootstrapChannelFactory:
public B channel(Class extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new BootstrapChannelFactory(channelClass)); }
而 BootstrapChannelFactory 實(shí)現(xiàn)了 ChannelFactory 接口, 它提供了唯一的方法, 即 newChannel. ChannelFactory, 顧名思義, 就是產(chǎn)生 Channel 的工廠類.
進(jìn)入到 BootstrapChannelFactory.newChannel 中, 我們看到其實(shí)現(xiàn)代碼如下:
@Override public T newChannel() { // 刪除 try 塊 return clazz.newInstance(); }
根據(jù)上面代碼的提示, 我們就可以確定:
Bootstrap 中的 ChannelFactory 的實(shí)現(xiàn)是 BootstrapChannelFactory
生成的 Channel 的具體類型是 NioSocketChannel.
Channel 的實(shí)例化過程, 其實(shí)就是調(diào)用的 ChannelFactory#newChannel 方法, 而實(shí)例化的 Channel 的具體的類型又是和在初始化 Bootstrap 時(shí)傳入的 channel() 方法的參數(shù)相關(guān). 因此對于我們這個例子中的客戶端的 Bootstrap 而言, 生成的的 Channel 實(shí)例就是 NioSocketChannel.
前面我們已經(jīng)知道了如何確定一個 Channel 的類型, 并且了解到 Channel 是通過工廠方法 ChannelFactory.newChannel() 來實(shí)例化的, 那么 ChannelFactory.newChannel() 方法在哪里調(diào)用呢?
繼續(xù)跟蹤, 我們發(fā)現(xiàn)其調(diào)用鏈?zhǔn)?
Bootstrap.connect -> Bootstrap.doConnect -> AbstractBootstrap.initAndRegister
在 AbstractBootstrap.initAndRegister 中就調(diào)用了 channelFactory().newChannel() 來獲取一個新的 NioSocketChannel 實(shí)例, 其源碼如下:
final ChannelFuture initAndRegister() { // 去掉非關(guān)鍵代碼 final Channel channel = channelFactory().newChannel(); init(channel); ChannelFuture regFuture = group().register(channel); }
在 newChannel 中, 通過類對象的 newInstance 來獲取一個新 Channel 實(shí)例, 因而會調(diào)用NioSocketChannel 的默認(rèn)構(gòu)造器.
NioSocketChannel 默認(rèn)構(gòu)造器代碼如下:
public NioSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
這里的代碼比較關(guān)鍵, 我們看到, 在這個構(gòu)造器中, 會調(diào)用 newSocket 來打開一個新的 Java NIO SocketChannel:
private static SocketChannel newSocket(SelectorProvider provider) { ... return provider.openSocketChannel(); }
接著會調(diào)用父類, 即 AbstractNioByteChannel 的構(gòu)造器:
AbstractNioByteChannel(Channel parent, SelectableChannel ch)
并傳入?yún)?shù) parent 為 null, ch 為剛才使用 newSocket 創(chuàng)建的 Java NIO SocketChannel, 因此生成的 NioSocketChannel 的 parent channel 是空的.
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ); }
接著會繼續(xù)調(diào)用父類 AbstractNioChannel 的構(gòu)造器, 并傳入了參數(shù) readInterestOp = SelectionKey.OP_READ:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; // 省略 try 塊 // 配置 Java NIO SocketChannel 為非阻塞的. ch.configureBlocking(false); }
然后繼續(xù)調(diào)用父類 AbstractChannel 的構(gòu)造器:
protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); }
到這里, 一個完整的 NioSocketChannel 就初始化完成了, 我們可以稍微總結(jié)一下構(gòu)造一個 NioSocketChannel 所需要做的工作:
調(diào)用 NioSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 打開一個新的 Java NIO SocketChannel
AbstractChannel(Channel parent) 中初始化 AbstractChannel 的屬性:
parent 屬性置為 null
unsafe 通過newUnsafe() 實(shí)例化一個 unsafe 對象, 它的類型是 AbstractNioByteChannel.NioByteUnsafe 內(nèi)部類
pipeline 是 new DefaultChannelPipeline(this) 新創(chuàng)建的實(shí)例. 這里體現(xiàn)了:Each channel has its own pipeline and it is created automatically when a new channel is created.
AbstractNioChannel 中的屬性:
SelectableChannel ch 被設(shè)置為 Java SocketChannel, 即 NioSocketChannel#newSocket 返回的 Java NIO SocketChannel.
readInterestOp 被設(shè)置為 SelectionKey.OP_READ
SelectableChannel ch 被配置為非阻塞的 ch.configureBlocking(false)
NioSocketChannel 中的屬性:
SocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket())
關(guān)于 unsafe 字段的初始化我們簡單地提到了, 在實(shí)例化 NioSocketChannel 的過程中, 會在父類 AbstractChannel 的構(gòu)造器中, 調(diào)用 newUnsafe() 來獲取一個 unsafe 實(shí)例. 那么 unsafe 是怎么初始化的呢? 它的作用是什么?
其實(shí) unsafe 特別關(guān)鍵, 它封裝了對 Java 底層 Socket 的操作, 因此實(shí)際上是溝通 Netty 上層和 Java 底層的重要的橋梁.
那么我們就來看一下 Unsafe 接口所提供的方法吧:
interface Unsafe { SocketAddress localAddress(); SocketAddress remoteAddress(); void register(EventLoop eventLoop, ChannelPromise promise); void bind(SocketAddress localAddress, ChannelPromise promise); void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); void disconnect(ChannelPromise promise); void close(ChannelPromise promise); void closeForcibly(); void deregister(ChannelPromise promise); void beginRead(); void write(Object msg, ChannelPromise promise); void flush(); ChannelPromise voidPromise(); ChannelOutboundBuffer outboundBuffer(); }
一看便知, 這些方法其實(shí)都會對應(yīng)到相關(guān)的 Java 底層的 Socket 的操作.
回到 AbstractChannel 的構(gòu)造方法中, 在這里調(diào)用了 newUnsafe() 獲取一個新的 unsafe 對象, 而 newUnsafe 方法在 NioSocketChannel 中被重寫了:
@Override protected AbstractNioUnsafe newUnsafe() { return new NioSocketChannelUnsafe(); }
NioSocketChannel.newUnsafe 方法會返回一個 NioSocketChannelUnsafe 實(shí)例. 從這里我們就可以確定了, 在實(shí)例化的 NioSocketChannel 中的 unsafe 字段, 其實(shí)是一個 NioSocketChannelUnsafe 的實(shí)例.
關(guān)于 pipeline 的初始化上面我們分析了一個 Channel (在這個例子中是 NioSocketChannel) 的大體初始化過程, 但是我們漏掉了一個關(guān)鍵的部分, 即 ChannelPipeline 的初始化.
根據(jù) Each channel has its own pipeline and it is created automatically when a new channel is created., 我們知道, 在實(shí)例化一個 Channel 時(shí), 必然伴隨著實(shí)例化一個 ChannelPipeline. 而我們確實(shí)在 AbstractChannel 的構(gòu)造器看到了 pipeline 字段被初始化為 DefaultChannelPipeline 的實(shí)例. 那么我們就來看一下, DefaultChannelPipeline 構(gòu)造器做了哪些工作吧:
public DefaultChannelPipeline(AbstractChannel channel) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
我們調(diào)用 DefaultChannelPipeline 的構(gòu)造器, 傳入了一個 channel, 而這個 channel 其實(shí)就是我們實(shí)例化的 NioSocketChannel, DefaultChannelPipeline 會將這個 NioSocketChannel 對象保存在channel 字段中. DefaultChannelPipeline 中, 還有兩個特殊的字段, 即 head 和 tail, 而這兩個字段是一個雙向鏈表的頭和尾. 其實(shí)在 DefaultChannelPipeline 中, 維護(hù)了一個以 AbstractChannelHandlerContext 為節(jié)點(diǎn)的雙向鏈表, 這個鏈表是 Netty 實(shí)現(xiàn) Pipeline 機(jī)制的關(guān)鍵. 關(guān)于 DefaultChannelPipeline 中的雙向鏈表以及它所起的作用, 我在這里暫時(shí)不表, 在 Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline 中會有詳細(xì)的分析.
HeadContext 的繼承層次結(jié)構(gòu)如下所示:
TailContext 的繼承層次結(jié)構(gòu)如下所示:
我們可以看到, 鏈表中 head 是一個 ChannelOutboundHandler, 而 tail 則是一個 ChannelInboundHandler.
接著看一下 HeadContext 的構(gòu)造器:
HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); }
它調(diào)用了父類 AbstractChannelHandlerContext 的構(gòu)造器, 并傳入?yún)?shù) inbound = false, outbound = true.
TailContext 的構(gòu)造器與 HeadContext 的相反, 它調(diào)用了父類 AbstractChannelHandlerContext 的構(gòu)造器, 并傳入?yún)?shù) inbound = true, outbound = false.
即 header 是一個 outboundHandler, 而 tail 是一個inboundHandler, 關(guān)于這一點(diǎn), 大家要特別注意, 因?yàn)樵诜治龅?Netty Pipeline 時(shí), 我們會反復(fù)用到 inbound 和 outbound 這兩個屬性.
回到最開始的 EchoClient.java 代碼中, 我們在一開始就實(shí)例化了一個 NioEventLoopGroup 對象, 因此我們就從它的構(gòu)造器中追蹤一下 EventLoop 的初始化過程.
首先來看一下 NioEventLoopGroup 的類繼承層次:
NioEventLoop 有幾個重載的構(gòu)造器, 不過內(nèi)容都沒有什么大的區(qū)別, 最終都是調(diào)用的父類MultithreadEventLoopGroup構(gòu)造器:
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); }
其中有一點(diǎn)有意思的地方是, 如果我們傳入的線程數(shù) nThreads 是0, 那么 Netty 會為我們設(shè)置默認(rèn)的線程數(shù) DEFAULT_EVENT_LOOP_THREADS, 而這個默認(rèn)的線程數(shù)是怎么確定的呢?
其實(shí)很簡單, 在靜態(tài)代碼塊中, 會首先確定 DEFAULT_EVENT_LOOP_THREADS 的值:
static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2)); }
Netty 會首先從系統(tǒng)屬性中獲取 "io.netty.eventLoopThreads" 的值, 如果我們沒有設(shè)置它的話, 那么就返回默認(rèn)值: 處理器核心數(shù) * 2.
回到MultithreadEventLoopGroup構(gòu)造器中, 這個構(gòu)造器會繼續(xù)調(diào)用父類 MultithreadEventExecutorGroup 的構(gòu)造器:
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { // 去掉了參數(shù)檢查, 異常處理 等代碼. children = new SingleThreadEventExecutor[nThreads]; if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); } for (int i = 0; i < nThreads; i ++) { children[i] = newChild(threadFactory, args); } }
根據(jù)代碼, 我們就很清楚 MultithreadEventExecutorGroup 中的處理邏輯了:
創(chuàng)建一個大小為 nThreads 的 SingleThreadEventExecutor 數(shù)組
根據(jù) nThreads 的大小, 創(chuàng)建不同的 Chooser, 即如果 nThreads 是 2 的冪, 則使用 PowerOfTwoEventExecutorChooser, 反之使用 GenericEventExecutorChooser. 不論使用哪個 Chooser, 它們的功能都是一樣的, 即從 children 數(shù)組中選出一個合適的 EventExecutor 實(shí)例.
調(diào)用 newChhild 方法初始化 children 數(shù)組.
根據(jù)上面的代碼, 我們知道, MultithreadEventExecutorGroup 內(nèi)部維護(hù)了一個 EventExecutor 數(shù)組, Netty 的 EventLoopGroup 的實(shí)現(xiàn)機(jī)制其實(shí)就建立在 MultithreadEventExecutorGroup 之上. 每當(dāng) Netty 需要一個 EventLoop 時(shí), 會調(diào)用 next() 方法獲取一個可用的 EventLoop.
上面代碼的最后一部分是 newChild 方法, 這個是一個抽象方法, 它的任務(wù)是實(shí)例化 EventLoop 對象. 我們跟蹤一下它的代碼, 可以發(fā)現(xiàn), 這個方法在 NioEventLoopGroup 類中實(shí)現(xiàn)了, 其內(nèi)容很簡單:
@Override protected EventExecutor newChild( ThreadFactory threadFactory, Object... args) throws Exception { return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]); }
其實(shí)就是實(shí)例化一個 NioEventLoop 對象, 然后返回它.
最后總結(jié)一下整個 EventLoopGroup 的初始化過程吧:
EventLoopGroup(其實(shí)是MultithreadEventExecutorGroup) 內(nèi)部維護(hù)一個類型為 EventExecutor children 數(shù)組, 其大小是 nThreads, 這樣就構(gòu)成了一個線程池
如果我們在實(shí)例化 NioEventLoopGroup 時(shí), 如果指定線程池大小, 則 nThreads 就是指定的值, 反之是處理器核心數(shù) * 2
MultithreadEventExecutorGroup 中會調(diào)用 newChild 抽象方法來初始化 children 數(shù)組
抽象方法 newChild 是在 NioEventLoopGroup 中實(shí)現(xiàn)的, 它返回一個 NioEventLoop 實(shí)例.
NioEventLoop 屬性:
SelectorProvider provider 屬性: NioEventLoopGroup 構(gòu)造器中通過 SelectorProvider.provider() 獲取一個 SelectorProvider
Selector selector 屬性: NioEventLoop 構(gòu)造器中通過調(diào)用通過 selector = provider.openSelector() 獲取一個 selector 對象.
channel 的注冊過程在前面的分析中, 我們提到, channel 會在 Bootstrap.initAndRegister 中進(jìn)行初始化, 但是這個方法還會將初始化好的 Channel 注冊到 EventGroup 中. 接下來我們就來分析一下 Channel 注冊的過程.
回顧一下 AbstractBootstrap.initAndRegister 方法:
final ChannelFuture initAndRegister() { // 去掉非關(guān)鍵代碼 final Channel channel = channelFactory().newChannel(); init(channel); ChannelFuture regFuture = group().register(channel); }
當(dāng)Channel 初始化后, 會緊接著調(diào)用 group().register() 方法來注冊 Channel, 我們繼續(xù)跟蹤的話, 會發(fā)現(xiàn)其調(diào)用鏈如下:
AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register
通過跟蹤調(diào)用鏈, 最終我們發(fā)現(xiàn)是調(diào)用到了 unsafe 的 register 方法, 那么接下來我們就仔細(xì)看一下 AbstractUnsafe.register 方法中到底做了什么:
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { // 省略條件判斷和錯誤處理 AbstractChannel.this.eventLoop = eventLoop; register0(promise); }
首先, 將 eventLoop 賦值給 Channel 的 eventLoop 屬性, 而我們知道這個 eventLoop 對象其實(shí)是 MultithreadEventLoopGroup.next() 方法獲取的, 根據(jù)我們前面 關(guān)于 EventLoop 初始化 小節(jié)中, 我們可以確定 next() 方法返回的 eventLoop 對象是 NioEventLoop 實(shí)例.
register 方法接著調(diào)用了 register0 方法:
private void register0(ChannelPromise promise) { boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } }
register0 又調(diào)用了 AbstractNioChannel.doRegister:
@Override protected void doRegister() throws Exception { // 省略錯誤處理 selectionKey = javaChannel().register(eventLoop().selector, 0, this); }
javaChannel() 這個方法在前面我們已經(jīng)知道了, 它返回的是一個 Java NIO SocketChannel, 這里我們將這個 SocketChannel 注冊到與 eventLoop 關(guān)聯(lián)的 selector 上了.
我們總結(jié)一下 Channel 的注冊過程:
首先在 AbstractBootstrap.initAndRegister中, 通過 group().register(channel), 調(diào)用 MultithreadEventLoopGroup.register 方法
在MultithreadEventLoopGroup.register 中, 通過 next() 獲取一個可用的 SingleThreadEventLoop, 然后調(diào)用它的 register
在 SingleThreadEventLoop.register 中, 通過 channel.unsafe().register(this, promise) 來獲取 channel 的 unsafe() 底層操作對象, 然后調(diào)用它的 register.
在 AbstractUnsafe.register 方法中, 調(diào)用 register0 方法注冊 Channel
在 AbstractUnsafe.register0 中, 調(diào)用 AbstractNioChannel.doRegister 方法
AbstractNioChannel.doRegister 方法通過 javaChannel().register(eventLoop().selector, 0, this) 將 Channel 對應(yīng)的 Java NIO SockerChannel 注冊到一個 eventLoop 的 Selector 中, 并且將當(dāng)前 Channel 作為 attachment.
總的來說, Channel 注冊過程所做的工作就是將 Channel 與對應(yīng)的 EventLoop 關(guān)聯(lián), 因此這也體現(xiàn)了, 在 Netty 中, 每個 Channel 都會關(guān)聯(lián)一個特定的 EventLoop, 并且這個 Channel 中的所有 IO 操作都是在這個 EventLoop 中執(zhí)行的; 當(dāng)關(guān)聯(lián)好 Channel 和 EventLoop 后, 會繼續(xù)調(diào)用底層的 Java NIO SocketChannel 的 register 方法, 將底層的 Java NIO SocketChannel 注冊到指定的 selector 中. 通過這兩步, 就完成了 Netty Channel 的注冊過程.
handler 的添加過程Netty 的一個強(qiáng)大和靈活之處就是基于 Pipeline 的自定義 handler 機(jī)制. 基于此, 我們可以像添加插件一樣自由組合各種各樣的 handler 來完成業(yè)務(wù)邏輯. 例如我們需要處理 HTTP 數(shù)據(jù), 那么就可以在 pipeline 前添加一個 Http 的編解碼的 Handler, 然后接著添加我們自己的業(yè)務(wù)邏輯的 handler, 這樣網(wǎng)絡(luò)上的數(shù)據(jù)流就向通過一個管道一樣, 從不同的 handler 中流過并進(jìn)行編解碼, 最終在到達(dá)我們自定義的 handler 中.
既然說到這里, 有些讀者朋友肯定會好奇, 既然這個 pipeline 機(jī)制是這么的強(qiáng)大, 那么它是怎么實(shí)現(xiàn)的呢? 不過我這里不打算詳細(xì)展開 Netty 的 ChannelPipeline 的實(shí)現(xiàn)機(jī)制(具體的細(xì)節(jié)會在后續(xù)的章節(jié)中展示), 我在這一小節(jié)中, 從簡單的入手, 展示一下我們自定義的 handler 是如何以及何時(shí)添加到 ChannelPipeline 中的.
首先讓我們看一下如下的代碼片段:
... .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } });
這個代碼片段就是實(shí)現(xiàn)了 handler 的添加功能. 我們看到, Bootstrap.handler 方法接收一個 ChannelHandler, 而我們傳遞的是一個 派生于 ChannelInitializer 的匿名類, 它正好也實(shí)現(xiàn)了 ChannelHandler 接口. 我們來看一下, ChannelInitializer 類內(nèi)到底有什么玄機(jī):
@Sharable public abstract class ChannelInitializerextends ChannelInboundHandlerAdapter { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class); protected abstract void initChannel(C ch) throws Exception; @Override @SuppressWarnings("unchecked") public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { initChannel((C) ctx.channel()); ctx.pipeline().remove(this); ctx.fireChannelRegistered(); } ... }
ChannelInitializer 是一個抽象類, 它有一個抽象的方法 initChannel, 我們正是實(shí)現(xiàn)了這個方法, 并在這個方法中添加的自定義的 handler 的. 那么 initChannel 是哪里被調(diào)用的呢? 答案是 ChannelInitializer.channelRegistered 方法中.
我們來關(guān)注一下 channelRegistered 方法. 從上面的源碼中, 我們可以看到, 在 channelRegistered 方法中, 會調(diào)用 initChannel 方法, 將自定義的 handler 添加到 ChannelPipeline 中, 然后調(diào)用 ctx.pipeline().remove(this) 將自己從 ChannelPipeline 中刪除. 上面的分析過程, 可以用如下圖片展示:
一開始, ChannelPipeline 中只有三個 handler, head, tail 和我們添加的 ChannelInitializer.
接著 initChannel 方法調(diào)用后, 添加了自定義的 handler:
最后將 ChannelInitializer 刪除:
分析到這里, 我們已經(jīng)簡單了解了自定義的 handler 是如何添加到 ChannelPipeline 中的, 不過限于主題與篇幅的原因, 我沒有在這里詳細(xì)展開 ChannelPipeline 的底層機(jī)制, 我打算在下一篇 Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline 中對這個問題進(jìn)行深入的探討.
客戶端連接分析經(jīng)過上面的各種分析后, 我們大致了解了 Netty 初始化時(shí), 所做的工作, 那么接下來我們就直奔主題, 分析一下客戶端是如何發(fā)起 TCP 連接的.
首先, 客戶端通過調(diào)用 Bootstrap 的 connect 方法進(jìn)行連接.
在 connect 中, 會進(jìn)行一些參數(shù)檢查后, 最終調(diào)用的是 doConnect0 方法, 其實(shí)現(xiàn)如下:
private static void doConnect0( final ChannelFuture regFuture, final Channel channel, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { if (localAddress == null) { channel.connect(remoteAddress, promise); } else { channel.connect(remoteAddress, localAddress, promise); } promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
在 doConnect0 中, 會在 event loop 線程中調(diào)用 Channel 的 connect 方法, 而這個 Channel 的具體類型是什么呢? 我們在 Channel 初始化這一小節(jié)中已經(jīng)分析過了, 這里 channel 的類型就是 NioSocketChannel.
進(jìn)行跟蹤到 channel.connect 中, 我們發(fā)現(xiàn)它調(diào)用的是 DefaultChannelPipeline#connect, 而, pipeline 的 connect 代碼如下:
@Override public ChannelFuture connect(SocketAddress remoteAddress) { return tail.connect(remoteAddress); }
而 tail 字段, 我們已經(jīng)分析過了, 是一個 TailContext 的實(shí)例, 而 TailContext 又是 AbstractChannelHandlerContext 的子類, 并且沒有實(shí)現(xiàn) connect 方法, 因此這里調(diào)用的其實(shí)是 AbstractChannelHandlerContext.connect, 我們看一下這個方法的實(shí)現(xiàn):
@Override public ChannelFuture connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { // 刪除的參數(shù)檢查的代碼 final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeConnect(remoteAddress, localAddress, promise); } else { safeExecute(executor, new OneTimeTask() { @Override public void run() { next.invokeConnect(remoteAddress, localAddress, promise); } }, promise, null); } return promise; }
上面的代碼中有一個關(guān)鍵的地方, 即 final AbstractChannelHandlerContext next = findContextOutbound(), 這里調(diào)用 findContextOutbound 方法, 從 DefaultChannelPipeline 內(nèi)的雙向鏈表的 tail 開始, 不斷向前尋找第一個 outbound 為 true 的 AbstractChannelHandlerContext, 然后調(diào)用它的 invokeConnect 方法, 其代碼如下:
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { // 忽略 try 塊 ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); }
還記得我們在 "關(guān)于 pipeline 的初始化" 這一小節(jié)分析的的內(nèi)容嗎? 我們提到, 在 DefaultChannelPipeline 的構(gòu)造器中, 會實(shí)例化兩個對象: head 和 tail, 并形成了雙向鏈表的頭和尾. head 是 HeadContext 的實(shí)例, 它實(shí)現(xiàn)了 ChannelOutboundHandler 接口, 并且它的 outbound 字段為 true. 因此在 findContextOutbound 中, 找到的 AbstractChannelHandlerContext 對象其實(shí)就是 head. 進(jìn)而在 invokeConnect 方法中, 我們向上轉(zhuǎn)換為 ChannelOutboundHandler 就是沒問題的了.
而又因?yàn)?HeadContext 重寫了 connect 方法, 因此實(shí)際上調(diào)用的是 HeadContext.connect. 我們接著跟蹤到 HeadContext.connect, 其代碼如下:
@Override public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); }
這個 connect 方法很簡單, 僅僅調(diào)用了 unsafe 的 connect 方法. 而 unsafe 又是什么呢?
回顧一下 HeadContext 的構(gòu)造器, 我們發(fā)現(xiàn) unsafe 是 pipeline.channel().unsafe() 返回的, 而 Channel 的 unsafe 字段, 在這個例子中, 我們已經(jīng)知道了, 其實(shí)是 AbstractNioByteChannel.NioByteUnsafe 內(nèi)部類. 兜兜轉(zhuǎn)轉(zhuǎn)了一大圈, 我們找到了創(chuàng)建 Socket 連接的關(guān)鍵代碼.
進(jìn)行跟蹤 NioByteUnsafe -> AbstractNioUnsafe.connect:
@Override public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { boolean wasActive = isActive(); if (doConnect(remoteAddress, localAddress)) { fulfillConnectPromise(promise, wasActive); } else { ... } }
AbstractNioUnsafe.connect 的實(shí)現(xiàn)如上代碼所示, 在這個 connect 方法中, 調(diào)用了 doConnect 方法, 注意, 這個方法并不是 AbstractNioUnsafe 的方法, 而是 AbstractNioChannel 的抽象方法. doConnect 方法是在 NioSocketChannel 中實(shí)現(xiàn)的, 因此進(jìn)入到 NioSocketChannel.doConnect 中:
@Override protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { javaChannel().socket().bind(localAddress); } boolean success = false; try { boolean connected = javaChannel().connect(remoteAddress); if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; return connected; } finally { if (!success) { doClose(); } } }
我們終于看到的最關(guān)鍵的部分了, 慶祝一下!
上面的代碼不用多說, 首先是獲取 Java NIO SocketChannel, 即我們已經(jīng)分析過的, 從 NioSocketChannel.newSocket 返回的 SocketChannel 對象; 然后是調(diào)用 SocketChannel.connect 方法完成 Java NIO 層面上的 Socket 的連接.
最后, 上面的代碼流程可以用如下時(shí)序圖直觀地展示:
本文由 yongshun 發(fā)表于個人博客, 采用 署名-相同方式共享 3.0 中國大陸許可協(xié)議.
Email: yongshun1228@gmail.com
本文標(biāo)題為: Netty 源碼分析之 一 揭開 Bootstrap 神秘的紅蓋頭 (客戶端)
本文鏈接為: https://segmentfault.com/a/1190000007282789
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/66229.html
摘要:目錄源碼分析之番外篇的前生今世的前生今世之一簡介的前生今世之二小結(jié)的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環(huán)境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端源碼分析之一揭開神秘的紅蓋頭服務(wù)器 目錄 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NIO 的前生今世 之一 簡介 Java NIO 的前生今世 ...
摘要:背景在工作中雖然我經(jīng)常使用到庫但是很多時(shí)候?qū)Φ囊恍└拍钸€是處于知其然不知其所以然的狀態(tài)因此就萌生了學(xué)習(xí)源碼的想法剛開始看源碼的時(shí)候自然是比較痛苦的主要原因有兩個第一網(wǎng)上沒有找到讓我滿意的詳盡的源碼分析的教程第二我也是第一次系統(tǒng)地學(xué)習(xí)這么大代 背景 在工作中, 雖然我經(jīng)常使用到 Netty 庫, 但是很多時(shí)候?qū)?Netty 的一些概念還是處于知其然, 不知其所以然的狀態(tài), 因此就萌生了學(xué)...
摘要:目錄源碼之下無秘密做最好的源碼分析教程源碼分析之番外篇的前生今世的前生今世之一簡介的前生今世之二小結(jié)的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環(huán)境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端 目錄 源碼之下無秘密 ── 做最好的 Netty 源碼分析教程 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NI...
摘要:目錄源碼之下無秘密做最好的源碼分析教程源碼分析之番外篇的前生今世的前生今世之一簡介的前生今世之二小結(jié)的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環(huán)境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端 目錄 源碼之下無秘密 ── 做最好的 Netty 源碼分析教程 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NI...
摘要:目錄源碼之下無秘密做最好的源碼分析教程源碼分析之番外篇的前生今世的前生今世之一簡介的前生今世之二小結(jié)的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環(huán)境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端 目錄 源碼之下無秘密 ── 做最好的 Netty 源碼分析教程 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NI...
閱讀 2284·2019-08-30 15:56
閱讀 3118·2019-08-30 13:48
閱讀 1130·2019-08-30 10:52
閱讀 1499·2019-08-29 17:30
閱讀 427·2019-08-29 13:44
閱讀 3555·2019-08-29 12:53
閱讀 1122·2019-08-29 11:05
閱讀 2673·2019-08-26 13:24