閱讀這篇文章之前,建議先閱讀和這篇文章關(guān)聯(lián)的內(nèi)容。

1. 詳細剖析分布式微服務(wù)架構(gòu)下網(wǎng)絡(luò)通信的底層實現(xiàn)原理(圖解)

2. (年薪60W的技巧)工作了5年,你真的理解Netty以及為什么要用嗎?(深度干貨)

3. 深度解析Netty中的核心組件(圖解+實例)

4. BAT面試必問細節(jié):關(guān)于Netty中的ByteBuf詳解

5. 通過大量實戰(zhàn)案例分解Netty中是如何解決拆包黏包問題的?

6. 基于Netty實現(xiàn)自定義消息通信協(xié)議(協(xié)議設(shè)計及解析應(yīng)用實戰(zhàn))

7. 全網(wǎng)最詳細最齊全的序列化技術(shù)及深度解析與應(yīng)用實戰(zhàn)

8. 手把手教你基于Netty實現(xiàn)一個基礎(chǔ)的RPC框架(通俗易懂)

9. (年薪60W分水嶺)基于Netty手寫實現(xiàn)RPC框架進階篇(帶注冊中心和注解)

提前準備好如下代碼, 從服務(wù)端構(gòu)建著手,深入分析Netty服務(wù)端的啟動過程。

public class NettyBasicServerExample {    public void bind(int port){        //netty的服務(wù)端編程要從EventLoopGroup開始,        // 我們要創(chuàng)建兩個EventLoopGroup,        // 一個是boss專門用來接收連接,可以理解為處理accept事件,        // 另一個是worker,可以關(guān)注除了accept之外的其它事件,處理子任務(wù)。        //上面注意,boss線程一般設(shè)置一個線程,設(shè)置多個也只會用到一個,而且多個目前沒有應(yīng)用場景,        // worker線程通常要根據(jù)服務(wù)器調(diào)優(yōu),如果不寫默認就是cpu的兩倍。        EventLoopGroup bossGroup=new NioEventLoopGroup();        EventLoopGroup workerGroup=new NioEventLoopGroup();        try {            //服務(wù)端要啟動,需要創(chuàng)建ServerBootStrap,            // 在這里面netty把nio的模板式的代碼都給封裝好了            ServerBootstrap bootstrap = new ServerBootstrap();            bootstrap.group(bossGroup, workerGroup)                //配置Server的通道,相當于NIO中的ServerSocketChannel                .channel(NioServerSocketChannel.class)                .handler(new LoggingHandler(LogLevel.INFO)) //設(shè)置ServerSocketChannel對應(yīng)的Handler                //childHandler表示給worker那些線程配置了一個處理器,                // 這個就是上面NIO中說的,把處理業(yè)務(wù)的具體邏輯抽象出來,放到Handler里面                .childHandler(new ChannelInitializer() {                    @Override                    protected void initChannel(SocketChannel socketChannel) throws Exception {                        socketChannel.pipeline()                            .addLast(new NormalInBoundHandler("NormalInBoundA",false))                            .addLast(new NormalInBoundHandler("NormalInBoundB",false))                            .addLast(new NormalInBoundHandler("NormalInBoundC",true));                        socketChannel.pipeline()                            .addLast(new NormalOutBoundHandler("NormalOutBoundA"))                            .addLast(new NormalOutBoundHandler("NormalOutBoundB"))                            .addLast(new NormalOutBoundHandler("NormalOutBoundC"))                            .addLast(new ExceptionHandler());                    }                });            //綁定端口并同步等待客戶端連接            ChannelFuture channelFuture=bootstrap.bind(port).sync();            System.out.println("Netty Server Started,Listening on :"+port);            //等待服務(wù)端監(jiān)聽端口關(guān)閉            channelFuture.channel().closeFuture().sync();        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            //釋放線程資源            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }    public static void main(String[] args) {        new NettyBasicServerExample().bind(8080);    }}
public class NormalInBoundHandler extends ChannelInboundHandlerAdapter {    private final String name;    private final boolean flush;    public NormalInBoundHandler(String name, boolean flush) {        this.name = name;        this.flush = flush;    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("InboundHandler:"+name);        if(flush){            ctx.channel().writeAndFlush(msg);        }else {            throw new RuntimeException("InBoundHandler:"+name);        }    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        System.out.println("InboundHandlerException:"+name);        super.exceptionCaught(ctx, cause);    }}
public class NormalOutBoundHandler extends ChannelOutboundHandlerAdapter {    private final String name;    public NormalOutBoundHandler(String name) {        this.name = name;    }    @Override    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {        System.out.println("OutBoundHandler:"+name);        super.write(ctx, msg, promise);    }}

在服務(wù)端啟動之前,需要配置ServerBootstrap的相關(guān)參數(shù),這一步大概分為以下幾個步驟

  • 配置EventLoopGroup線程組
  • 配置Channel類型
  • 設(shè)置ServerSocketChannel對應(yīng)的Handler
  • 設(shè)置網(wǎng)絡(luò)監(jiān)聽的端口
  • 設(shè)置SocketChannel對應(yīng)的Handler
  • 配置Channel參數(shù)

Netty會把我們配置的這些信息組裝,發(fā)布服務(wù)監(jiān)聽。

ServerBootstrap參數(shù)配置過程

下面這段代碼是我們配置ServerBootStrap相關(guān)參數(shù),這個過程比較簡單,就是把配置的參數(shù)值保存到ServerBootstrap定義的成員變量中就可以了。

bootstrap.group(bossGroup, workerGroup)    //配置Server的通道,相當于NIO中的ServerSocketChannel    .channel(NioServerSocketChannel.class)    .handler(new LoggingHandler(LogLevel.INFO)) //設(shè)置ServerSocketChannel對應(yīng)的Handler    //childHandler表示給worker那些線程配置了一個處理器,    // 這個就是上面NIO中說的,把處理業(yè)務(wù)的具體邏輯抽象出來,放到Handler里面    .childHandler(new ChannelInitializer() {    });

我們來看一下ServerBootstrap的類關(guān)系圖以及屬性定義

ServerBootstrap類關(guān)系圖

如圖8-1所示,表示ServerBootstrap的類關(guān)系圖。

  • AbstractBootstrap,定義了一個抽象類,作為抽象類,一定是抽離了Bootstrap相關(guān)的抽象邏輯,所以很顯然可以推斷出Bootstrap應(yīng)該也繼承了AbstractBootstrap
  • ServerBootstrap,服務(wù)端的啟動類,
  • ServerBootstrapAcceptor,繼承了ChannelInboundHandlerAdapter,所以本身就是一個Handler,當服務(wù)端啟動后,客戶端連接上來時,會先進入到ServerBootstrapAccepter。

圖8-1 ServerBootstrap類關(guān)系圖

AbstractBootstrap屬性定義

public abstract class AbstractBootstrap, C extends Channel> implements Cloneable {    @SuppressWarnings("unchecked")    private static final Map.Entry, Object>[] EMPTY_OPTION_ARRAY = new Map.Entry[0];    @SuppressWarnings("unchecked")    private static final Map.Entry, Object>[] EMPTY_ATTRIBUTE_ARRAY = new Map.Entry[0];    /**     * 這里的EventLoopGroup 作為服務(wù)端 Acceptor 線程,負責處理客戶端的請求接入     * 作為客戶端 Connector 線程,負責注冊監(jiān)聽連接操作位,用于判斷異步連接結(jié)果。     */    volatile EventLoopGroup group; //    @SuppressWarnings("deprecation")    private volatile ChannelFactory channelFactory;  //channel工廠,很明顯應(yīng)該是用來制造對應(yīng)Channel的    private volatile SocketAddress localAddress;  //SocketAddress用來綁定一個服務(wù)端地址    // The order in which ChannelOptions are applied is important they may depend on each other for validation    // purposes.    /**     * ChannelOption 可以添加Channer 添加一些配置信息     */    private final Map, Object> options = new LinkedHashMap, Object>();    private final Map, Object> attrs = new ConcurrentHashMap, Object>();    /**     *  ChannelHandler 是具體怎么處理Channer 的IO事件。     */    private volatile ChannelHandler handler;}

對于上述屬性定義,整體總結(jié)如下:

  1. 提供了一個ChannelFactory對象用來創(chuàng)建Channel,一個Channel會對應(yīng)一個EventLoop用于IO的事件處理,在一個Channel的整個生命周期中 只會綁定一個EventLoop,這里可理解給Channel分配一個線程進行IO事件處理,結(jié)束后回收該線程。

  2. AbstractBootstrap沒有提供EventLoop而是提供了一個EventLoopGroup,其實我認為這里只用一個EventLoop就行了。

  3. 不管是服務(wù)器還是客戶端的Channel都需要綁定一個本地端口這就有了SocketAddress類的對象localAddress。

  4. Channel有很多選項所有有了options對象LinkedHashMap, Object>

  5. 怎么處理Channel的IO事件呢,我們添加一個事件處理器ChannelHandler對象。

ServerBootstrap屬性定義

ServerBootstrap可以理解為服務(wù)器啟動的工廠類,我們可以通過它來完成服務(wù)器端的 Netty 初始化。主要職責:|

  • EventLoop初始化
  • channel的注冊
  • pipeline的初始化
  • handler的添加過程
  • 服務(wù)端連接處理。
public class ServerBootstrap extends AbstractBootstrap {    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);    // The order in which child ChannelOptions are applied is important they may depend on each other for validation    // purposes.    //SocketChannel相關(guān)的屬性配置    private final Map, Object> childOptions = new LinkedHashMap, Object>();    private final Map, Object> childAttrs = new ConcurrentHashMap, Object>();    private final ServerBootstrapConfig config = new ServerBootstrapConfig(this); //配置類    private volatile EventLoopGroup childGroup;  //工作線程組    private volatile ChannelHandler childHandler; //負責SocketChannel的IO處理相關(guān)的Handler    public ServerBootstrap() { }}

服務(wù)端啟動過程分析

了解了ServerBootstrap相關(guān)屬性的配置之后,我們繼續(xù)來看服務(wù)的啟動過程,在開始往下分析的時候,先不妨來思考以下這些問題

  • Netty自己實現(xiàn)的Channel與底層JDK提供的Channel是如何聯(lián)系并且構(gòu)建實現(xiàn)的
  • ChannelInitializer這個特殊的Handler處理器的作用以及實現(xiàn)原理
  • Pipeline是如何初始化以的

ServerBootstrap.bind

先來看ServerBootstrap.bind()方法的定義,這里主要用來綁定一個端口并且發(fā)布服務(wù)端監(jiān)聽。

根據(jù)我們使用NIO相關(guān)API的理解,無非就是使用JDK底層的API來打開一個服務(wù)端監(jiān)聽并綁定一個端口。

 ChannelFuture channelFuture=bootstrap.bind(port).sync();
public ChannelFuture bind(SocketAddress localAddress) {    validate();    return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));}
  • validate(), 驗證ServerBootstrap核心成員屬性的配置是否正確,比如group、channelFactory、childHandler、childGroup等,這些屬性如果沒配置,那么服務(wù)端啟動會報錯

  • localAddress,綁定一個本地端口地址

doBind

doBind方法比較長,從大的代碼結(jié)構(gòu),可以分為三個部分

  • initAndRegister 初始化并注冊Channel,并返回一個ChannelFuture,說明初始化注冊Channel是異步實現(xiàn)
  • regFuture.cause() 用來判斷initAndRegister()是否發(fā)生異常,如果發(fā)生異常,則直接返回
  • regFuture.isDone(), 判斷initAndRegister()方法是否執(zhí)行完成。
    • 如果執(zhí)行完成,則調(diào)用doBind0()方法。
    • 如果未執(zhí)行完成,regFuture添加一個監(jiān)聽回調(diào),在監(jiān)聽回調(diào)中再次判斷執(zhí)行結(jié)果進行相關(guān)處理。
    • PendingRegistrationPromise 用來保存異步執(zhí)行結(jié)果的狀態(tài)

從整體代碼邏輯來看,邏輯結(jié)構(gòu)還是非常清晰的, initAndRegister()方法負責Channel的初始化和注冊、doBind0()方法用來綁定端口。這個無非就是我們使用NIO相關(guān)API發(fā)布服務(wù)所做的事情。

private ChannelFuture doBind(final SocketAddress localAddress) {    final ChannelFuture regFuture = initAndRegister();    final Channel channel = regFuture.channel();    if (regFuture.cause() != null) {        return regFuture;    }    if (regFuture.isDone()) {        // At this point we know that the registration was complete and successful.        ChannelPromise promise = channel.newPromise();        doBind0(regFuture, channel, localAddress, promise);        return promise;    } else {        // Registration future is almost always fulfilled already, but just in case it"s not.        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);        regFuture.addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                Throwable cause = future.cause();                if (cause != null) {                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an                    // IllegalStateException once we try to access the EventLoop of the Channel.                    promise.setFailure(cause);                } else {                    // Registration was successful, so set the correct executor to use.                    // See https://github.com/netty/netty/issues/2586                    promise.registered();                    doBind0(regFuture, channel, localAddress, promise);                }            }        });        return promise;    }}

initAndRegister

這個方法顧名思義,就是初始化和注冊,基于我們整個流程的分析可以猜測到

  • 初始化,應(yīng)該就是構(gòu)建服務(wù)端的Handler處理鏈
  • register,應(yīng)該就是把當前服務(wù)端的連接注冊到selector上

下面我們通過源碼印證我們的猜想。

final ChannelFuture initAndRegister() {    Channel channel = null;    try {        //通過ChannelFactory創(chuàng)建一個具體的Channel實現(xiàn)        channel = channelFactory.newChannel();        init(channel); //初始化    } catch (Throwable t) {        //省略....    }    //這個代碼應(yīng)該和我們猜想是一致的,就是將當前初始化的channel注冊到selector上,這個過程同樣也是異步的    ChannelFuture regFuture = config().group().register(channel);    if (regFuture.cause() != null) { //獲取regFuture的執(zhí)行結(jié)果        if (channel.isRegistered()) {             channel.close();        } else {            channel.unsafe().closeForcibly();        }    }    return regFuture;}

channelFactory.newChannel()

這個方法在分析之前,我們可以繼續(xù)推測它的邏輯。

在最開始構(gòu)建服務(wù)端的代碼中,我們通過channel設(shè)置了一個NioServerSocketChannel.class類對象,這個對象表示當前channel的構(gòu)建使用哪種具體的API

bootstrap.group(bossGroup, workerGroup)    //配置Server的通道,相當于NIO中的ServerSocketChannel    .channel(NioServerSocketChannel.class)

而在initAndRegister方法中,又用到了channelFactory.newChannel()來生成一個具體的Channel實例,因此不難想到,這兩者必然有一定的聯(lián)系,我們也可以武斷的認為,這個工廠會根據(jù)我們配置的channel來動態(tài)構(gòu)建一個指定的channel實例。

channelFactory有多個實現(xiàn)類,所以我們可以從配置方法中找到channelFactory的具體定義,代碼如下。

public B channel(Class channelClass) {    return channelFactory(new ReflectiveChannelFactory(        ObjectUtil.checkNotNull(channelClass, "channelClass")    ));}

channelFactory對應(yīng)的具體實現(xiàn)是:ReflectiveChannelFactory,因此我們定位到newChannel()方法的實現(xiàn)。

ReflectiveChannelFactory.newChannel

在該方法中,使用constructor構(gòu)建了一個實例。

@Overridepublic T newChannel() {    try {        return constructor.newInstance();    } catch (Throwable t) {        throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);    }}

construtor的初始化代碼如下, 用到了傳遞進來的clazz類,獲得該類的構(gòu)造器,該構(gòu)造器后續(xù)可以通過newInstance創(chuàng)建一個實例對象

而此時的clazz其實就是:NioServerSocketChannel

public class ReflectiveChannelFactory implements ChannelFactory {    private final Constructor constructor;    public ReflectiveChannelFactory(Class clazz) {        ObjectUtil.checkNotNull(clazz, "clazz");        try {            this.constructor = clazz.getConstructor();        } catch (NoSuchMethodException e) {            throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +                    " does not have a public non-arg constructor", e);        }    }}

NioServerSocketChannel

NioServerSocketChannel的構(gòu)造方法定義如下。

public class NioServerSocketChannel extends AbstractNioMessageChannel                             implements io.netty.channel.socket.ServerSocketChannel {    private static ServerSocketChannel newSocket(SelectorProvider provider) {        try {            return provider.openServerSocketChannel();        } catch (IOException e) {            throw new ChannelException(                    "Failed to open a server socket.", e);        }    }    public NioServerSocketChannel() {        this(newSocket(DEFAULT_SELECTOR_PROVIDER));    }}

當NioServerSocketChannel實例化后,調(diào)用newSocket方法創(chuàng)建了一個服務(wù)端實例。

newSocket方法中調(diào)用了provider.openServerSocketChannel(),來完成ServerSocketChannel的創(chuàng)建,ServerSocketChannel就是Java中NIO中的服務(wù)端API。

public ServerSocketChannel openServerSocketChannel() throws IOException {    return new ServerSocketChannelImpl(this);}

通過層層推演,最終看到了Netty是如何一步步封裝,完成ServerSocketChannel的創(chuàng)建。

設(shè)置非阻塞

在NioServerSocketChannel中的構(gòu)造方法中,先通過super調(diào)用父類做一些配置操作

public NioServerSocketChannel(ServerSocketChannel channel) {    super(null, channel, SelectionKey.OP_ACCEPT);    config = new NioServerSocketChannelConfig(this, javaChannel().socket());}

最終,super會調(diào)用AbstractNioChannel中的構(gòu)造方法,

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {    super(parent);    this.ch = ch;    this.readInterestOp = readInterestOp; //設(shè)置關(guān)心事件,此時是一個連接事件,所以是OP_ACCEPT    try {        ch.configureBlocking(false); //設(shè)置非阻塞    } catch (IOException e) {        try {            ch.close();        } catch (IOException e2) {            logger.warn(                "Failed to close a partially initialized socket.", e2);        }        throw new ChannelException("Failed to enter non-blocking mode.", e);    }}

繼續(xù)分析initAndRegister

分析完成channel的初始化后,接下來就是要將當前channel注冊到Selector上,所以繼續(xù)回到initAndRegister方法。

final ChannelFuture initAndRegister() {//省略....    //這個代碼應(yīng)該和我們猜想是一致的,就是將當前初始化的channel注冊到selector上,這個過程同樣也是異步的    ChannelFuture regFuture = config().group().register(channel);    if (regFuture.cause() != null) { //獲取regFuture的執(zhí)行結(jié)果        if (channel.isRegistered()) {             channel.close();        } else {            channel.unsafe().closeForcibly();        }    }    return regFuture;}

注冊到某個Selector上,其實就是注冊到某個EventLoopGroup中,如果大家能有這個猜想,說明前面的內(nèi)容是聽懂了的。

config().group().register(channel)這段代碼,其實就是獲取在ServerBootstrap中配置的bossEventLoopGroup,然后把當前的服務(wù)端channel注冊到該group中。

此時,我們通過快捷鍵想去看一下register的實現(xiàn)時,發(fā)現(xiàn)EventLoopGroup又有多個實現(xiàn),我們來看一下類關(guān)系圖如圖8-2所示。

圖8-3 EventLoopGroup類關(guān)系圖

而我們在前面配置的EventLoopGroup的實現(xiàn)類是NioEventLoopGroup,而NioEventLoopGroup繼承自MultithreadEventLoopGroup,所以在register()方法中,我們直接找到父類的實現(xiàn)方法即可。

MultithreadEventLoopGroup.register

這段代碼大家都熟了,從NioEventLoopGroup中選擇一個NioEventLoop,將當前channel注冊上去

@Overridepublic ChannelFuture register(Channel channel) {    return next().register(channel);}

next()方法返回的是NioEventLoop,而NioEventLoop又有多個實現(xiàn)類,我們來看圖8-4所示的類關(guān)系圖。

圖8-4 NioEventLoop類關(guān)系圖

從類關(guān)系圖中發(fā)現(xiàn),發(fā)現(xiàn)NioEventLoop派生自SingleThreadEventLoop,所以next().register(channel);方法,執(zhí)行的是SingleThreadEventLoop中的register

SingleThreadEventLoop.register

@Overridepublic ChannelFuture register(Channel channel) {    return register(new DefaultChannelPromise(channel, this));}
@Overridepublic ChannelFuture register(final ChannelPromise promise) {    ObjectUtil.checkNotNull(promise, "promise");    promise.channel().unsafe().register(this, promise);    return promise;}

ChannelPromise, 派生自Future,用來實現(xiàn)異步任務(wù)處理回調(diào)功能。簡單來說就是把注冊的動作異步化,當異步執(zhí)行結(jié)束后會把執(zhí)行結(jié)果回填到ChannelPromise中

AbstractChannel.register

抽象類一般就是公共邏輯的處理,而這里的處理主要就是針對一些參數(shù)的判斷,判斷完了之后再調(diào)用register0()方法。

@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {    ObjectUtil.checkNotNull(eventLoop, "eventLoop");    if (isRegistered()) { //判斷是否已經(jīng)注冊過        promise.setFailure(new IllegalStateException("registered to an event loop already"));        return;    }    if (!isCompatible(eventLoop)) { //判斷eventLoop類型是否是EventLoop對象類型,如果不是則拋出異常        promise.setFailure(            new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));        return;    }    AbstractChannel.this.eventLoop = eventLoop;    //Reactor內(nèi)部線程調(diào)用,也就是說當前register方法是EventLoop線程觸發(fā)的,則執(zhí)行下面流程    if (eventLoop.inEventLoop()) {        register0(promise);    } else { //如果是外部線程        try {            eventLoop.execute(new Runnable() {                @Override                public void run() {                    register0(promise);                }            });        } catch (Throwable t) {            logger.warn(                "Force-closing a channel whose registration task was not accepted by an event loop: {}",                AbstractChannel.this, t);            closeForcibly();            closeFuture.setClosed();            safeSetFailure(promise, t);        }    }}

AbstractChannel.register0

Netty從EventLoopGroup線程組中選擇一個EventLoop和當前的Channel綁定,之后該Channel生命周期中的所有I/O事件都由這個EventLoop負責。

register0方法主要做四件事:

  • 調(diào)用JDK層面的API對當前Channel進行注冊
  • 觸發(fā)HandlerAdded事件
  • 觸發(fā)channelRegistered事件
  • Channel狀態(tài)為活躍時,觸發(fā)channelActive事件

在當前的ServerSocketChannel連接注冊的邏輯中,我們只需要關(guān)注下面的doRegister方法即可。

private void register0(ChannelPromise promise) {    try {        // check if the channel is still open as it could be closed in the mean time when the register        // call was outside of the eventLoop        if (!promise.setUncancellable() || !ensureOpen(promise)) {            return;        }        boolean firstRegistration = neverRegistered;        doRegister();  //調(diào)用JDK層面的register()方法進行注冊        neverRegistered = false;        registered = true;        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the        // user may already fire events through the pipeline in the ChannelFutureListener.        pipeline.invokeHandlerAddedIfNeeded(); //觸發(fā)Handler,如果有必要的情況下        safeSetSuccess(promise);        pipeline.fireChannelRegistered();        // Only fire a channelActive if the channel has never been registered. This prevents firing        // multiple channel actives if the channel is deregistered and re-registered.        if (isActive()) { //此時是ServerSocketChannel的注冊,所以連接還處于非活躍狀態(tài)            if (firstRegistration) {                pipeline.fireChannelActive();             } else if (config().isAutoRead()) {                // This channel was registered before and autoRead() is set. This means we need to begin read                // again so that we process inbound data.                //                // See https://github.com/netty/netty/issues/4805                beginRead();            }        }    } catch (Throwable t) {        // Close the channel directly to avoid FD leak.        closeForcibly();        closeFuture.setClosed();        safeSetFailure(promise, t);    }}

AbstractNioChannel.doRegister

進入到AbstractNioChannel.doRegister方法。

javaChannel().register()負責調(diào)用JDK層面的方法,把channel注冊到eventLoop().unwrappedSelector()上,其中第三個參數(shù)傳入的是Netty自己實現(xiàn)的Channel對象,也就是把該對象綁定到attachment中。

這樣做的目的是,后續(xù)每次調(diào)Selector對象進行事件輪詢時,當觸發(fā)事件時,Netty都可以獲取自己的Channe對象。

@Overrideprotected void doRegister() throws Exception {    boolean selected = false;    for (;;) {        try {            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);            return;        } catch (CancelledKeyException e) {            if (!selected) {                // Force the Selector to select now as the "canceled" SelectionKey may still be                // cached and not removed because no Select.select(..) operation was called yet.                eventLoop().selectNow();                selected = true;            } else {                // We forced a select operation on the selector before but the SelectionKey is still cached                // for whatever reason. JDK bug ?                throw e;            }        }    }}

服務(wù)注冊總結(jié)

上述代碼比較繞,但是整體總結(jié)下來并不難理解

  • 初始化指定的Channel實例
  • 把該Channel分配給某一個EventLoop
  • 然后把Channel注冊到該EventLoop的Selector中

AbstractBootstrap.doBind0

分析完了注冊的邏輯后,再回到AbstractBootstrap類中的doBind0方法,這個方法不用看也能知道,ServerSocketChannel初始化了之后,接下來要做的就是綁定一個ip和端口地址。

private static void doBind0(    final ChannelFuture regFuture, final Channel channel,    final SocketAddress localAddress, final ChannelPromise promise) {    //獲取當前channel中的eventLoop實例,執(zhí)行一個異步任務(wù)。    //需要注意,以前我們在課程中講過,eventLoop在輪詢中一方面要執(zhí)行select遍歷,另一方面要執(zhí)行阻塞隊列中的任務(wù),而這里就是把任務(wù)添加到隊列中異步執(zhí)行。    channel.eventLoop().execute(new Runnable() {        @Override        public void run() {            //如果ServerSocketChannel注冊成功,則調(diào)用該channel的bind方法            if (regFuture.isSuccess()) {                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);            } else {                promise.setFailure(regFuture.cause());            }        }    });}

channel.bind方法,會根據(jù)ServerSocketChannel中的handler鏈配置,逐個進行調(diào)用,由于在本次案例中,我們給ServerSocketChannel配置了一個 LoggingHandler的處理器,所以bind方法會先調(diào)用LoggingHandler,然后再調(diào)用DefaultChannelPipeline中的bind方法,調(diào)用鏈路

-> DefaultChannelPipeline.ind

? -> AbstractChannel.bind

? -> NioServerSocketChannel.doBind

最終就是調(diào)用前面初始化好的ServerSocketChannel中的bind方法綁定本地地址和端口。

protected void doBind(SocketAddress localAddress) throws Exception {    if (PlatformDependent.javaVersion() >= 7) {        javaChannel().bind(localAddress, config.getBacklog());    } else {        javaChannel().socket().bind(localAddress, config.getBacklog());    }}

構(gòu)建SocketChannel的Pipeline

在ServerBootstrap的配置中,我們針對SocketChannel,配置了入站和出站的Handler,也就是當某個SocketChannel的IO事件就緒時,就會按照我們配置的處理器鏈表進行逐一處理,那么這個鏈表是什么時候構(gòu)建的,又是什么樣的結(jié)構(gòu)呢?下面我們來分析這部分的內(nèi)容

.childHandler(new ChannelInitializer() {    @Override    protected void initChannel(SocketChannel socketChannel) throws Exception {        socketChannel.pipeline()            .addLast(new NormalInBoundHandler("NormalInBoundA",false))            .addLast(new NormalInBoundHandler("NormalInBoundB",false))            .addLast(new NormalInBoundHandler("NormalInBoundC",true));        socketChannel.pipeline()            .addLast(new NormalOutBoundHandler("NormalOutBoundA"))            .addLast(new NormalOutBoundHandler("NormalOutBoundB"))            .addLast(new NormalOutBoundHandler("NormalOutBoundC"))            .addLast(new ExceptionHandler());    }});

childHandler的構(gòu)建

childHandler的構(gòu)建過程,在AbstractChannel.register0方法中實現(xiàn)

final ChannelFuture initAndRegister() {        Channel channel = null;        try {            channel = channelFactory.newChannel(); //這是是創(chuàng)建channel            init(channel); //這里是初始化        } catch (Throwable t) {            //省略....        }        ChannelFuture regFuture = config().group().register(channel); //這是是注冊        if (regFuture.cause() != null) {            if (channel.isRegistered()) {                channel.close();            } else {                channel.unsafe().closeForcibly();            }        }        return regFuture;    }

ServerBootstrap.init

init方法,調(diào)用的是ServerBootstrap中的init(),代碼如下。

@Overridevoid init(Channel channel) {    setChannelOptions(channel, newOptionsArray(), logger);    setAttributes(channel, newAttributesArray());    ChannelPipeline p = channel.pipeline();    final EventLoopGroup currentChildGroup = childGroup;    final ChannelHandler currentChildHandler = childHandler;  //childHandler就是在服務(wù)端配置時添加的ChannelInitializer    final Entry, Object>[] currentChildOptions = newOptionsArray(childOptions);    final Entry, Object>[] currentChildAttrs = newAttributesArray(childAttrs);    // 此時的Channel是NioServerSocketChannel,這里是為NioServerSocketChannel添加處理器鏈。    p.addLast(new ChannelInitializer() {        @Override        public void initChannel(final Channel ch) {            final ChannelPipeline pipeline = ch.pipeline();            ChannelHandler handler = config.handler(); //如果在ServerBootstrap構(gòu)建時,通過.handler添加了處理器,則會把相關(guān)處理器添加到NioServerSocketChannel中的pipeline中。            if (handler != null) {                pipeline.addLast(handler);            }            ch.eventLoop().execute(new Runnable() { //異步天劍一個ServerBootstrapAcceptor處理器,從名字來看,                @Override                public void run() {                    pipeline.addLast(new ServerBootstrapAcceptor(                        //currentChildHandler,表示SocketChannel的pipeline,當收到客戶端連接時,就會把該handler添加到當前SocketChannel的pipeline中                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));                }            });        }    });}

其中,對于上述代碼的核心部分說明如下

  • ChannelPipeline 是在AbstractChannel中的構(gòu)造方法中初始化的一個DefaultChannelPipeline

    protected AbstractChannel(Channel parent) {  this.parent = parent;  id = newId();  unsafe = newUnsafe();  pipeline = newChannelPipeline();}
  • p.addLast是為NioServerSocketChannel添加handler處理器鏈,這里添加了一個ChannelInitializer回調(diào)函數(shù),該回調(diào)是異步觸發(fā)的,在回調(diào)方法中做了兩件事

    • 如果ServerBootstrap.handler添加了處理器,則會把相關(guān)處理器添加到該pipeline中,在本次演示的案例中,我們添加了LoggerHandler
    • 異步執(zhí)行添加了ServerBootstrapAcceptor,從名字來看,它是專門用來接收新的連接處理的。

我們在這里思考一個問題,為什么NioServerSocketChannel需要通過ChannelInitializer回調(diào)處理器呢? ServerBootstrapAcceptor為什么通過異步任務(wù)添加到pipeline中呢?

原因是,NioServerSocketChannel在初始化的時候,還沒有開始將該Channel注冊到Selector對象上,也就是沒辦法把ACCEPT事件注冊到Selector上,所以事先添加了ChannelInitializer處理器,等待Channel注冊完成后,再向Pipeline中添加ServerBootstrapAcceptor。

ServerBootstrapAcceptor

按照下面的方法演示一下SocketChannel中的Pipeline的構(gòu)建過程

  1. 啟動服務(wù)端監(jiān)聽
  2. 在ServerBootstrapAcceptor的channelRead方法中打上斷點
  3. 通過telnet 連接,此時會觸發(fā)debug。
public void channelRead(ChannelHandlerContext ctx, Object msg) {    final Channel child = (Channel) msg;    child.pipeline().addLast(childHandler);  //在這里,將handler添加到SocketChannel的pipeline中    setChannelOptions(child, childOptions, logger);    setAttributes(child, childAttrs);    try {        //把當前客戶端的鏈接SocketChannel注冊到某個EventLoop中。        childGroup.register(child).addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                if (!future.isSuccess()) {                    forceClose(child, future.cause());                }            }        });    } catch (Throwable t) {        forceClose(child, t);    }}

ServerBootstrapAcceptor是服務(wù)端NioServerSocketChannel中的一個特殊處理器,該處理器的channelRead事件只會在新連接產(chǎn)生時觸發(fā),所以這里通過final Channel child = (Channel) msg;可以直接拿到客戶端的鏈接SocketChannel。

ServerBootstrapAcceptor接著通過childGroup.register()方法,把當前NioSocketChannel注冊到工作線程中。

事件觸發(fā)機制的流程

在ServerBootstrapAcceptor中,收到客戶端連接時,會調(diào)用childGroup.register(child)把當前客戶端連接注冊到指定NioEventLoop的Selector中。

這個注冊流程和前面講解的NioServerSocketChannel注冊流程完全一樣,最終都會進入到AbstractChannel.register0方法。

AbstractChannel.register0

private void register0(ChannelPromise promise) {    try {        // check if the channel is still open as it could be closed in the mean time when the register        // call was outside of the eventLoop        if (!promise.setUncancellable() || !ensureOpen(promise)) {            return;        }        boolean firstRegistration = neverRegistered;        doRegister();        neverRegistered = false;        registered = true;        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the        // user may already fire events through the pipeline in the ChannelFutureListener.        pipeline.invokeHandlerAddedIfNeeded();        safeSetSuccess(promise);        pipeline.fireChannelRegistered(); //執(zhí)行pipeline中的ChannelRegistered()事件。        // Only fire a channelActive if the channel has never been registered. This prevents firing        // multiple channel actives if the channel is deregistered and re-registered.        if (isActive()) {            if (firstRegistration) {                pipeline.fireChannelActive();            } else if (config().isAutoRead()) {                // This channel was registered before and autoRead() is set. This means we need to begin read                // again so that we process inbound data.                //                // See https://github.com/netty/netty/issues/4805                beginRead();            }        }    } catch (Throwable t) {        // Close the channel directly to avoid FD leak.        closeForcibly();        closeFuture.setClosed();        safeSetFailure(promise, t);    }}

pipeline.fireChannelRegistered()

@Overridepublic final ChannelPipeline fireChannelRegistered() {    AbstractChannelHandlerContext.invokeChannelRegistered(head);    return this;}

下面的事件觸發(fā),分為兩個邏輯

  • 如果當前的任務(wù)是在eventLoop中觸發(fā)的,則直接調(diào)用invokeChannelRegistered
  • 否則,異步執(zhí)行invokeChannelRegistered。
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {    EventExecutor executor = next.executor();    if (executor.inEventLoop()) {        next.invokeChannelRegistered();    } else {        executor.execute(new Runnable() {            @Override            public void run() {                next.invokeChannelRegistered();            }        });    }}

invokeChannelRegistered

觸發(fā)下一個handler的channelRegistered方法。

private void invokeChannelRegistered() {    if (invokeHandler()) {        try {            ((ChannelInboundHandler) handler()).channelRegistered(this);        } catch (Throwable t) {            invokeExceptionCaught(t);        }    } else {        fireChannelRegistered();    }}

Netty服務(wù)端啟動總結(jié)

到此為止,整個服務(wù)端啟動的過程,我們就已經(jīng)分析完成了,主要的邏輯如下

  • 創(chuàng)建服務(wù)端Channel,本質(zhì)上是根據(jù)用戶配置的實現(xiàn),調(diào)用JDK原生的Channel
  • 初始化Channel的核心屬性,unsafe、pipeline
  • 初始化Channel的Pipeline,主要是添加兩個特殊的處理器,ChannelInitializer和ServerBootstrapAcceptor
  • 注冊服務(wù)端的Channel,添加OP_ACCEPT事件,這里底層調(diào)用的是JDK層面的實現(xiàn),講Channel注冊到BossEventLoop中的Selector上
  • 綁定端口,調(diào)用JDK層面的API,綁定端口。