上一篇文章,分析了Netty服務(wù)端啟動(dòng)的初始化過(guò)程,今天我們來(lái)分析一下Netty中的Reactor線程模型
在分析源碼之前,我們先分析,哪些地方用到了EventLoop?
- NioServerSocketChannel的連接監(jiān)聽(tīng)注冊(cè)
- NioSocketChannel的IO事件注冊(cè)
NioServerSocketChannel連接監(jiān)聽(tīng)
在AbstractBootstrap類的initAndRegister()方法中,當(dāng)NioServerSocketChannel初始化完成后,會(huì)調(diào)用case
標(biāo)記位置的代碼進(jìn)行注冊(cè)。
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { } //注冊(cè)到boss線程的selector上。 ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture;}
AbstractNioChannel.doRegister
按照代碼的執(zhí)行邏輯,最終會(huì)執(zhí)行到AbstractNioChannel的doRegister()
方法中。
@Overrideprotected void doRegister() throws Exception { boolean selected = false; for (;;) { try { //調(diào)用ServerSocketChannel的register方法,把當(dāng)前服務(wù)端對(duì)象注冊(cè)到boss線程的selector上 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } }}
NioEventLoop的啟動(dòng)過(guò)程
NioEventLoop是一個(gè)線程,它的啟動(dòng)過(guò)程如下。
在AbstractBootstrap的doBind0方法中,獲取了NioServerSocketChannel中的NioEventLoop,然后使用它來(lái)執(zhí)行綁定端口的任務(wù)。
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { //啟動(dòng) channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } });}
SingleThreadEventExecutor.execute
然后一路執(zhí)行到SingleThreadEventExecutor.execute方法中,調(diào)用startThread()
方法啟動(dòng)線程。
private void execute(Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); //啟動(dòng)線程 if (isShutdown()) { boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination. } if (reject) { reject(); } } } if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); }}
startThread
private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { doStartThread(); //執(zhí)行啟動(dòng)過(guò)程 success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } }}
接著調(diào)用doStartThread()方法,通過(guò)executor.execute
執(zhí)行一個(gè)任務(wù),在該任務(wù)中啟動(dòng)了NioEventLoop線程
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { //通過(guò)線程池執(zhí)行一個(gè)任務(wù) @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); //調(diào)用boss的NioEventLoop的run方法,開啟輪詢 } //省略.... } });}
NioEventLoop的輪詢過(guò)程
當(dāng)NioEventLoop線程被啟動(dòng)后,就直接進(jìn)入到NioEventLoop的run方法中。
protected void run() { int selectCnt = 0; for (;;) { try { int strategy; try { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default: } } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Lets rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 rebuildSelector0(); selectCnt = 0; handleLoopException(e); continue; } selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) { try { if (strategy > 0) { processSelectedKeys(); } } finally { // Ensure we always run tasks. ranTasks = runAllTasks(); } } else if (strategy > 0) { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { ranTasks = runAllTasks(0); // This will run the minimum number of tasks } if (ranTasks || strategy > 0) { if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } selectCnt = 0; } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case) selectCnt = 0; } } catch (CancelledKeyException e) { // Harmless exception - log anyway if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } } catch (Error e) { throw (Error) e; } catch (Throwable t) { handleLoopException(t); } finally { // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Error e) { throw (Error) e; } catch (Throwable t) { handleLoopException(t); } } }}
NioEventLoop的執(zhí)行流程
NioEventLoop中的run方法是一個(gè)無(wú)限循環(huán)的線程,在該循環(huán)中主要做三件事情,如圖9-1所示。
- 輪詢處理I/O事件(select),輪詢Selector選擇器中已經(jīng)注冊(cè)的所有Channel的I/O就緒事件
- 處理I/O事件,如果存在已經(jīng)就緒的Channel的I/O事件,則調(diào)用
processSelectedKeys
進(jìn)行處理 - 處理異步任務(wù)(runAllTasks),Reactor線程有一個(gè)非常重要的職責(zé),就是處理任務(wù)隊(duì)列中的非I/O任務(wù),Netty提供了ioRadio參數(shù)用來(lái)調(diào)整I/O時(shí)間和任務(wù)處理的時(shí)間比例。
輪詢I/O就緒事件
我們先來(lái)看I/O時(shí)間相關(guān)的代碼片段:
- 通過(guò)
selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())
獲取當(dāng)前的執(zhí)行策略 - 根據(jù)不同的策略,用來(lái)控制每次輪詢時(shí)的執(zhí)行策略。
protected void run() { int selectCnt = 0; for (;;) { try { int strategy; try { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default: } } //省略.... } }}
selectStrategy處理邏輯
@Overridepublic int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;}
如果hasTasks
為true,表示當(dāng)前NioEventLoop線程存在異步任務(wù)的情況下,則調(diào)用selectSupplier.get()
,否則直接返回SELECT
。
其中selectSupplier.get()
的定義如下:
private final IntSupplier selectNowSupplier = new IntSupplier() { @Override public int get() throws Exception { return selectNow(); }};
該方法中調(diào)用的是selectNow()
方法,這個(gè)方法是Selector選擇器中的提供的非阻塞方法,執(zhí)行后會(huì)立刻返回。
- 如果當(dāng)前已經(jīng)有就緒的Channel,則會(huì)返回對(duì)應(yīng)就緒Channel的數(shù)量
- 否則,返回0.
分支處理
在上面一個(gè)步驟中獲得了strategy之后,會(huì)根據(jù)不同的結(jié)果進(jìn)行分支處理。
- CONTINUE,表示需要重試。
- BUSY_WAIT,由于在NIO中并不支持BUSY_WAIT,所以BUSY_WAIT和SELECT的執(zhí)行邏輯是一樣的
- SELECT,表示需要通過(guò)select方法獲取就緒的Channel列表,當(dāng)NioEventLoop中不存在異步任務(wù)時(shí),也就是任務(wù)隊(duì)列為空,則返回該策略。
switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default:}
SelectStrategy.SELECT
當(dāng)NioEventLoop線程中不存在異步任務(wù)時(shí),則開始執(zhí)行SELECT策略
//下一次定時(shí)任務(wù)觸發(fā)截至?xí)r間,默認(rèn)不是定時(shí)任務(wù),返回 -1Llong curDeadlineNanos = nextScheduledTaskDeadlineNanos();if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar}nextWakeupNanos.set(curDeadlineNanos);try { if (!hasTasks()) { //2. taskQueue中任務(wù)執(zhí)行完,開始執(zhí)行select進(jìn)行阻塞 strategy = select(curDeadlineNanos); }} finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE);}
select方法定義如下,默認(rèn)情況下deadlineNanos=NONE
,所以會(huì)調(diào)用select()
方法阻塞。
private int select(long deadlineNanos) throws IOException { if (deadlineNanos == NONE) { return selector.select(); } //計(jì)算select()方法的阻塞超時(shí)時(shí)間 long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L; return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);}
最終返回就緒的channel個(gè)數(shù),后續(xù)的邏輯中會(huì)根據(jù)返回的就緒channel個(gè)數(shù)來(lái)決定執(zhí)行邏輯。
NioEventLoop.run中的業(yè)務(wù)處理
業(yè)務(wù)處理的邏輯相對(duì)來(lái)說(shuō)比較容易理解
- 如果有就緒的channel,則處理就緒channel的IO事件
- 處理完成后同步執(zhí)行異步隊(duì)列中的任務(wù)。
- 另外,這里為了解決Java NIO中的空轉(zhuǎn)問(wèn)題,通過(guò)selectCnt記錄了空轉(zhuǎn)次數(shù),一次循環(huán)發(fā)生了空轉(zhuǎn)(既沒(méi)有IO需要處理、也沒(méi)有執(zhí)行任何任務(wù)),那么記錄下來(lái)(selectCnt); ,如果連續(xù)發(fā)生空轉(zhuǎn)(selectCnt達(dá)到一定值),netty認(rèn)為觸發(fā)了NIO的BUG(unexpectedSelectorWakeup處理);
Java Nio中有一個(gè)bug,Java nio在Linux系統(tǒng)下的epoll空輪詢問(wèn)題。也就是在
select()
方法中,及時(shí)就緒的channel為0,也會(huì)從本來(lái)應(yīng)該阻塞的操作中被喚醒,從而導(dǎo)致CPU 使用率達(dá)到100%。
@Overrideprotected void run() { int selectCnt = 0; for (;;) { //省略.... selectCnt++;//selectCnt記錄的是無(wú)功而返的select次數(shù),即eventLoop空轉(zhuǎn)的次數(shù),為解決NIO BUG cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) { //ioRadio執(zhí)行時(shí)間占比是100%,默認(rèn)是50% try { if (strategy > 0) { //strategy>0表示存在就緒的SocketChannel processSelectedKeys(); //執(zhí)行就緒SocketChannel的任務(wù) } } finally { //注意,將ioRatio設(shè)置為100,并不代表任務(wù)不執(zhí)行,反而是每次將任務(wù)隊(duì)列執(zhí)行完 ranTasks = runAllTasks(); //確保總是執(zhí)行隊(duì)列中的任務(wù) } } else if (strategy > 0) { //strategy>0表示存在就緒的SocketChannel final long ioStartTime = System.nanoTime(); //io時(shí)間處理開始時(shí)間 try { processSelectedKeys(); //開始處理IO就緒事件 } finally { // io事件執(zhí)行結(jié)束時(shí)間 final long ioTime = System.nanoTime() - ioStartTime; //基于本次循環(huán)處理IO的時(shí)間,ioRatio,計(jì)算出執(zhí)行任務(wù)耗時(shí)的上限,也就是只允許處理多長(zhǎng)時(shí)間異步任務(wù) ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { //這個(gè)分支代表:strategy=0,ioRatio<100,此時(shí)任務(wù)限時(shí)=0,意為:盡量少地執(zhí)行異步任務(wù) //這個(gè)分支和strategy>0實(shí)際是一碼事,代碼簡(jiǎn)化了一下而已 ranTasks = runAllTasks(0); // This will run the minimum number of tasks } if (ranTasks || strategy > 0) { //ranTasks=true,或strategy>0,說(shuō)明eventLoop干活了,沒(méi)有空轉(zhuǎn),清空selectCnt if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } selectCnt = 0; } //unexpectedSelectorWakeup處理NIO BUG else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case) selectCnt = 0; } }}
processSelectedKeys
通過(guò)在select
方法中,我們可以獲得就緒的I/O事件數(shù)量,從而觸發(fā)執(zhí)行processSelectedKeys
方法。
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); }}
處理I/O事件時(shí),有兩個(gè)邏輯分支處理:
- 一種是處理Netty優(yōu)化過(guò)的selectedKeys,
- 另一種是正常的處理邏輯
processSelectedKeys方法中根據(jù)是否設(shè)置了selectedKeys
來(lái)判斷使用哪種策略,默認(rèn)使用的是Netty優(yōu)化過(guò)的selectedKeys,它返回的對(duì)象是SelectedSelectionKeySet
。
processSelectedKeysOptimized
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { //1. 取出IO事件以及對(duì)應(yīng)的channel final SelectionKey k = selectedKeys.keys[i]; selectedKeys.keys[i] = null;//k的引用置null,便于gc回收,也表示該channel的事件處理完成避免重復(fù)處理 final Object a = k.attachment(); //獲取保存在當(dāng)前channel中的attachment,此時(shí)應(yīng)該是NioServerSocketChannel //處理當(dāng)前的channel if (a instanceof AbstractNioChannel) { //對(duì)于boss NioEventLoop,輪詢到的基本是連接事件,后續(xù)的事情就是通過(guò)他的pipeline將連接扔給一個(gè)worker NioEventLoop處理 //對(duì)于worker NioEventLoop來(lái)說(shuō),輪循道的基本商是IO讀寫事件,后續(xù)的事情就是通過(guò)他的pipeline將讀取到的字節(jié)流傳遞給每個(gè)channelHandler來(lái)處理 processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask task = (NioTask) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GCed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } }}
processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { } if (eventLoop == this) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); } return; } try { int readyOps = k.readyOps(); //獲取當(dāng)前key所屬的操作類型 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {//如果是連接類型 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0) { //如果是寫類型 ch.unsafe().forceFlush(); } //如果是讀類型或者ACCEPT類型。則執(zhí)行unsafe.read()方法,unsafe的實(shí)例對(duì)象為 NioMessageUnsafe if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}
NioMessageUnsafe.read()
假設(shè)此時(shí)是一個(gè)讀操作,或者是客戶端建立連接,那么代碼執(zhí)行邏輯如下,
@Overridepublic void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); //如果是第一次建立連接,此時(shí)的pipeline是ServerBootstrapAcceptor final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (continueReading(allocHandle)); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); //調(diào)用pipeline中的channelRead方法 } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); //調(diào)用pipeline中的ExceptionCaught方法 } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } }}
SelectedSelectionKeySet的優(yōu)化
Netty中自己封裝實(shí)現(xiàn)了一個(gè)SelectedSelectionKeySet,用來(lái)優(yōu)化原本SelectorKeys的結(jié)構(gòu),它是怎么進(jìn)行優(yōu)化的呢?先來(lái)看它的代碼定義
final class SelectedSelectionKeySet extends AbstractSet { SelectionKey[] keys; int size; SelectedSelectionKeySet() { keys = new SelectionKey[1024]; } @Override public boolean add(SelectionKey o) { if (o == null) { return false; } keys[size++] = o; if (size == keys.length) { increaseCapacity(); } return true; }}
SelectedSelectionKeySet內(nèi)部使用的是SelectionKey數(shù)組,所有在processSelectedKeysOptimized方法中可以直接通過(guò)遍歷數(shù)組來(lái)取出就緒的I/O事件。
而原來(lái)的Set<SelectionKey>
返回的是HashSet類型,兩者相比,SelectionKey[]不需要考慮哈希沖突的問(wèn)題,所以可以實(shí)現(xiàn)O(1)時(shí)間復(fù)雜度的add操作。
SelectedSelectionKeySet的初始化
netty通過(guò)反射的方式,把Selector對(duì)象內(nèi)部的selectedKeys和publicSelectedKeys替換為SelectedSelectionKeySet。
原本的selectedKeys和publicSelectedKeys這兩個(gè)字段都是HashSet類型,替換之后變成了SelectedSelectionKeySet。當(dāng)有就緒的key時(shí),會(huì)直接填充到SelectedSelectionKeySet的數(shù)組中。后續(xù)只需要遍歷即可。
private SelectorTuple openSelector() { final Class> selectorImplClass = (Class>) maybeSelectorImplClass; final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); //使用反射 Object maybeException = AccessController.doPrivileged(new PrivilegedAction
異步任務(wù)的執(zhí)行流程
分析完上面的流程后,我們繼續(xù)來(lái)看NioEventLoop中的run方法中,針對(duì)異步任務(wù)的處理流程
@Overrideprotected void run() { int selectCnt = 0; for (;;) { ranTasks = runAllTasks(); }}
runAllTask
需要注意,NioEventLoop可以支持定時(shí)任務(wù)的執(zhí)行,通過(guò)nioEventLoop.schedule()
來(lái)完成。
protected boolean runAllTasks() { assert inEventLoop(); boolean fetchedAll; boolean ranAtLeastOne = false; do { fetchedAll = fetchFromScheduledTaskQueue(); //合并定時(shí)任務(wù)到普通任務(wù)隊(duì)列 if (runAllTasksFrom(taskQueue)) { //循環(huán)執(zhí)行taskQueue中的任務(wù) ranAtLeastOne = true; } } while (!fetchedAll); if (ranAtLeastOne) { //如果任務(wù)全部執(zhí)行完成,記錄執(zhí)行完完成時(shí)間 lastExecutionTime = ScheduledFutureTask.nanoTime(); } afterRunningAllTasks();//執(zhí)行收尾任務(wù) return ranAtLeastOne;}
fetchFromScheduledTaskQueue
遍歷scheduledTaskQueue中的任務(wù),添加到taskQueue中。
private boolean fetchFromScheduledTaskQueue() { if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) { return true; } long nanoTime = AbstractScheduledEventExecutor.nanoTime(); for (;;) { Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null) { return true; } if (!taskQueue.offer(scheduledTask)) { // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again. scheduledTaskQueue.add((ScheduledFutureTask>) scheduledTask); return false; } }}
任務(wù)添加方法execute
NioEventLoop內(nèi)部有兩個(gè)非常重要的異步任務(wù)隊(duì)列,分別是普通任務(wù)和定時(shí)任務(wù)隊(duì)列,針對(duì)這兩個(gè)隊(duì)列提供了兩個(gè)方法分別向兩個(gè)隊(duì)列中添加任務(wù)。
- execute()
- schedule()
其中,execute方法的定義如下。
private void execute(Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop(); addTask(task); //把當(dāng)前任務(wù)添加到阻塞隊(duì)列中 if (!inEventLoop) { //如果是非NioEventLoop startThread(); //啟動(dòng)線程 if (isShutdown()) { //如果當(dāng)前NioEventLoop已經(jīng)是停止?fàn)顟B(tài) boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination. } if (reject) { reject(); } } } if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); }}
Nio的空輪轉(zhuǎn)問(wèn)題
所謂的空輪訓(xùn),是指我們?cè)趫?zhí)行selector.select()
方法時(shí),如果沒(méi)有就緒的SocketChannel時(shí),當(dāng)前線程會(huì)被阻塞 。 而空輪詢是指當(dāng)沒(méi)有就緒SocketChannel時(shí),會(huì)被觸發(fā)喚醒。
而這個(gè)喚醒是沒(méi)有任何讀寫請(qǐng)求的,從而導(dǎo)致線程在做無(wú)效的輪詢,使得CPU占用率較高。
導(dǎo)致這個(gè)問(wèn)題的根本原因是:
在部分Linux的2.6的kernel中,poll和epoll對(duì)于突然中斷的連接socket會(huì)對(duì)返回的eventSet事件集合置為POLLHUP,也可能是POLLERR,eventSet事件集合發(fā)生了變化,這就可能導(dǎo)致Selector會(huì)被喚醒。這是與操作系統(tǒng)機(jī)制有關(guān)系的,JDK雖然僅僅是一個(gè)兼容各個(gè)操作系統(tǒng)平臺(tái)的軟件,但很遺憾在JDK5和JDK6最初的版本中(嚴(yán)格意義上來(lái)將,JDK部分版本都是),這個(gè)問(wèn)題并沒(méi)有解決,而將這個(gè)帽子拋給了操作系統(tǒng)方,這也就是這個(gè)bug最終一直到2013年才最終修復(fù)的原因,最終影響力太廣。
Netty是如何解決這個(gè)問(wèn)題的呢?我們回到NioEventLoop的run方法中
@Overrideprotected void run() { int selectCnt = 0; for (;;) { //selectCnt記錄的是無(wú)功而返的select次數(shù),即eventLoop空轉(zhuǎn)的次數(shù),為解決NIO BUG selectCnt++; //ranTasks=true,或strategy>0,說(shuō)明eventLoop干活了,沒(méi)有空轉(zhuǎn),清空selectCnt if (ranTasks || strategy > 0) { //如果選擇操作計(jì)數(shù)器的值,大于最小選擇器重構(gòu)閾值,則輸出log if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } selectCnt = 0; } //unexpectedSelectorWakeup處理NIO BUG else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case) selectCnt = 0; } }}
unexpectedSelectorWakeup
private boolean unexpectedSelectorWakeup(int selectCnt) { if (Thread.interrupted()) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } return true; } //如果選擇重構(gòu)的閾值大于0, 默認(rèn)值是512次、 并且當(dāng)前觸發(fā)的空輪詢次數(shù)大于 512次。,則觸發(fā)重構(gòu) if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); rebuildSelector(); return true; } return false;}
rebuildSelector()
public void rebuildSelector() { if (!inEventLoop()) { //如果不是在eventLoop中執(zhí)行,則使用異步線程執(zhí)行 execute(new Runnable() { @Override public void run() { rebuildSelector0(); } }); return; } rebuildSelector0();}
rebuildSelector0
這個(gè)方法的主要作用: 重新創(chuàng)建一個(gè)選擇器,替代當(dāng)前事件循環(huán)中的選擇器
private void rebuildSelector0() { final Selector oldSelector = selector; //獲取老的selector選擇器 final SelectorTuple newSelectorTuple; //定義新的選擇器 if (oldSelector == null) { //如果老的選擇器為空,直接返回 return; } try { newSelectorTuple = openSelector(); //創(chuàng)建一個(gè)新的選擇器 } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; } // Register all channels to the new Selector. int nChannels = 0; for (SelectionKey key: oldSelector.keys()) {//遍歷注冊(cè)到選擇器的選擇key集合 Object a = key.attachment(); try { //如果選擇key無(wú)效或選擇關(guān)聯(lián)的通道已經(jīng)注冊(cè)到新的選擇器,則跳出當(dāng)前循環(huán) if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) { continue; } //獲取key的選擇關(guān)注事件集 int interestOps = key.interestOps(); key.cancel();//取消選擇key //注冊(cè)選擇key到新的選擇器 SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a); if (a instanceof AbstractNioChannel) {//如果是nio通道,則更新通道的選擇key // Update SelectionKey ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTask task = (NioTask) a; invokeChannelUnregistered(task, key, e); } } } //更新當(dāng)前事件循環(huán)選擇器 selector = newSelectorTuple.selector; unwrappedSelector = newSelectorTuple.unwrappedSelector; try { // time to close the old selector as everything else is registered to the new one oldSelector.close(); //關(guān)閉原始選擇器 } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } } if (logger.isInfoEnabled()) { logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); }}
從上述過(guò)程中我們發(fā)現(xiàn),Netty解決NIO空輪轉(zhuǎn)問(wèn)題的方式,是通過(guò)重建Selector對(duì)象來(lái)完成的,在這個(gè)重建過(guò)程中,核心是把Selector中所有的SelectionKey重新注冊(cè)到新的Selector上,從而巧妙的避免了JDK epoll空輪訓(xùn)問(wèn)題。
連接的建立及處理過(guò)程
在9.2.4.3節(jié)中,提到了當(dāng)客戶端有連接或者讀事件發(fā)送到服務(wù)端時(shí),會(huì)調(diào)用NioMessageUnsafe類的read()方法。
public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //如果有客戶端連接進(jìn)來(lái),則localRead為1,否則返回0 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); //累計(jì)增加read消息數(shù)量 } while (continueReading(allocHandle)); } catch (Throwable t) { exception = t; } int size = readBuf.size(); //遍歷客戶端連接列表 for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); //調(diào)用pipeline中handler的channelRead方法。 } readBuf.clear(); //清空集合 allocHandle.readComplete(); pipeline.fireChannelReadComplete(); //觸發(fā)pipeline中handler的readComplete方法 if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } }}
pipeline.fireChannelRead(readBuf.get(i))
繼續(xù)來(lái)看pipeline的觸發(fā)方法,此時(shí)的pipeline組成,如果當(dāng)前是連接事件,那么pipeline = ServerBootstrap$ServerBootstrapAcceptor。
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); //獲取pipeline中的下一個(gè)節(jié)點(diǎn),調(diào)用該handler的channelRead方法 } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); }}
ServerBootstrapAcceptor
ServerBootstrapAcceptor是NioServerSocketChannel中一個(gè)特殊的Handler,專門用來(lái)處理客戶端連接事件,該方法中核心的目的是把針對(duì)SocketChannel的handler鏈表,添加到當(dāng)前NioSocketChannel中的pipeline中。
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); //把服務(wù)端配置的childHandler,添加到當(dāng)前NioSocketChannel中的pipeline中 setChannelOptions(child, childOptions, logger); //設(shè)置NioSocketChannel的屬性 setAttributes(child, childAttrs); try { //把當(dāng)前的NioSocketChannel注冊(cè)到Selector上,并且監(jiān)聽(tīng)一個(gè)異步事件。 childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); }}
pipeline的構(gòu)建過(guò)程
9.6.2節(jié)中,child其實(shí)就是一個(gè)NioSocketChannel,它是在NioServerSocketChannel中,當(dāng)接收到一個(gè)新的鏈接時(shí),創(chuàng)建對(duì)象。
@Overrideprotected int doReadMessages(List
而NioSocketChannel在構(gòu)造時(shí),調(diào)用了父類AbstractChannel中的構(gòu)造方法,初始化了一個(gè)pipeline.
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline();}
DefaultChannelPipeline
pipeline的默認(rèn)實(shí)例是DefaultChannelPipeline,構(gòu)造方法如下。
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head;}
初始化了一個(gè)頭節(jié)點(diǎn)和尾節(jié)點(diǎn),組成一個(gè)雙向鏈表,如圖9-2所示
NioSocketChannel中handler鏈的構(gòu)成
再回到ServerBootstrapAccepter的channelRead方法中,收到客戶端連接時(shí),觸發(fā)了NioSocketChannel中的pipeline的添加
以下代碼是DefaultChannelPipeline的addLast方法。
@Overridepublic final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { ObjectUtil.checkNotNull(handlers, "handlers"); for (ChannelHandler h: handlers) { //遍歷handlers列表,此時(shí)這里的handler是ChannelInitializer回調(diào)方法 if (h == null) { break; } addLast(executor, null, h); } return this;}
addLast
把服務(wù)端配置的ChannelHandler,添加到pipeline中,注意,此時(shí)的pipeline中保存的是ChannelInitializer回調(diào)方法。
@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); //檢查是否有重復(fù)的handler //創(chuàng)建新的DefaultChannelHandlerContext節(jié)點(diǎn) newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); //添加新的DefaultChannelHandlerContext到ChannelPipeline if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } callHandlerAdded0(newCtx); return this;}
這個(gè)回調(diào)方法什么時(shí)候觸發(fā)調(diào)用呢?其實(shí)就是在ServerBootstrapAcceptor
這個(gè)類的channelRead方法中,注冊(cè)當(dāng)前NioSocketChannel時(shí)
childGroup.register(child).addListener(new ChannelFutureListener() {}
最終按照之前我們上一節(jié)課源碼分析的思路,定位到AbstractChannel中的register0方法中。
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; // pipeline.invokeHandlerAddedIfNeeded(); }}
callHandlerAddedForAllHandlers
pipeline.invokeHandlerAddedIfNeeded()方法,向下執(zhí)行,會(huì)進(jìn)入到DefaultChannelPipeline這個(gè)類中的callHandlerAddedForAllHandlers方法中
private void callHandlerAddedForAllHandlers() { final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this) { assert !registered; // This Channel itself was registered. registered = true; pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; // Null out so it can be GCed. this.pendingHandlerCallbackHead = null; } //從等待被調(diào)用的handler 回調(diào)列表中,取出任務(wù)來(lái)執(zhí)行。 PendingHandlerCallback task = pendingHandlerCallbackHead; while (task != null) { task.execute(); task = task.next; }}
我們發(fā)現(xiàn),pendingHandlerCallbackHead這個(gè)單向鏈表,是在callHandlerCallbackLater方法中被添加的,
而callHandlerCallbackLater又是在addLast方法中添加的,所以構(gòu)成了一個(gè)異步完整的閉環(huán)。
ChannelInitializer.handlerAdded
task.execute()方法執(zhí)行路徑是
callHandlerAdded0 -> ctx.callHandlerAdded ->
? -------> AbstractChannelHandlerContext.callHandlerAffffded()
? ---------------> ChannelInitializer.handlerAdded
調(diào)用initChannel方法來(lái)初始化NioSocketChannel中的Channel.
@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { // This should always be true with our current DefaultChannelPipeline implementation. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. if (initChannel(ctx)) { // We are done with init the Channel, removing the initializer now. removeState(ctx); } }}
接著,調(diào)用initChannel抽象方法,該方法由具體的實(shí)現(xiàn)類來(lái)完成。
private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { pipeline.remove(this); } } return true; } return false;}
ChannelInitializer的實(shí)現(xiàn),是我們自定義Server中的匿名內(nèi)部類,ChannelInitializer。因此通過(guò)這個(gè)回調(diào)來(lái)完成當(dāng)前NioSocketChannel的pipeline的構(gòu)建過(guò)程。
public static void main(String[] args){ EventLoopGroup boss = new NioEventLoopGroup(); //2 用于對(duì)接受客戶端連接讀寫操作的線程工作組 EventLoopGroup work = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(boss, work) //綁定兩個(gè)工作線程組 .channel(NioServerSocketChannel.class) //設(shè)置NIO的模式 // 初始化綁定服務(wù)通道 .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline() .addLast( new LengthFieldBasedFrameDecoder(1024, 9,4,0,0)) .addLast(new MessageRecordEncoder()) .addLast(new MessageRecordDecode()) .addLast(new ServerHandler()); } });}