1. 程式人生 > 實用技巧 >Netty(一):server啟動流程解析

Netty(一):server啟動流程解析

  netty作為一個被廣泛應用的通訊框架,有必要我們多瞭解一點。

  實際上netty的幾個重要的技術亮點:

    1. reactor的執行緒模型;
    2. 安全有效的nio非阻塞io模型應用;
    3. pipeline流水線式的靈活處理過程;
    4. channelHandler的靈活實現;
    5. 提供許多開箱即用的處理器和編解碼器;

  我們可以從這些點去深入理解其過人之處。

1. 一個NettyServer的demo

  要想深入理解某個框架,一般還是要以demo作為一個抓手點的。以下,我們可以看到一個簡單的nettyServer的建立過程,即netty的quick start樣例吧。

@Slf4j
public class NettyServerHelloApplication {

    /**
     * 一個server的樣例
     */
    public static void main(String[] args) throws Exception {
        // 1. 建立對應的EventLoop執行緒池備用, 分bossGroup和workerGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup 
= new NioEventLoopGroup(4); try { // 2. 建立netty對應的入口核心類 ServerBootstrap ServerBootstrap b = new ServerBootstrap(); // 3. 設定server的各項引數,以及應用處理器 b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,
100) // 設定tcp協議的請求等待佇列 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // 3.2. 最重要的,將各channelHandler繫結到netty的上下文中(暫且這麼說吧) ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast("encoder", new MessageEncoder()); p.addLast("decoder", new MessageDecoder()); p.addLast(new EchoServerHandler()); } }); // 4. 繫結tcp埠開啟服務端監聽, sync() 保證執行完成所有任務 ChannelFuture f = b.bind(ServerConstant.PORT).sync(); // 5. 等待關閉訊號,讓業務執行緒去服務業務了 f.channel().closeFuture().sync(); } finally { // 6. 收到關閉訊號後,優雅關閉server的執行緒池,保護應用 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

  以上,就是一個簡版的nettyServer的整個框架了,這也基本上整個nettyServer的程式設計正規化了。主要即分為這麼幾步:

    1. 建立對應的EventLoop執行緒池備用, 分bossGroup和workerGroup;
    2. 建立netty對應的入口核心類 ServerBootstrap;
    3. 設定server的各項引數,以及應用處理器(必備的channelHandler業務接入過程);
    4. 繫結tcp埠開啟服務端監聽;
    5. 等待關閉訊號,讓業務執行緒去服務業務了;
    6. 收到關閉訊號後,優雅關閉server的執行緒池,保護應用;

  事實上,如果我們直接基於jdk提供的ServerSocketChannel是否也差不了多少呢?是的,至少表面看起來是的,但我們要處理許多的異常情況,且可能面對變化繁多的業務型別。又該如何呢?

  畢竟一個框架的成功,絕非偶然。下面我們就這幾個過程來看看netty都是如何處理的吧!

2. EventLoop 的建立

  EventLoop 直譯為事件迴圈,但在這裡我們也可以理解為一個執行緒池,因為所有的事件都是提交給其處理的。那麼,它倒底是個什麼樣的迴圈呢?

  首先來看下其類繼承情況:

  從類圖可以看出,EventLoop也是一個executor或者說執行緒池的實現,它們也許有相通之處。

    // 呼叫方式如下
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup(4);
    // io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.ThreadFactory)
    /**
     * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the
     * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
     */
    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
        this(nThreads, threadFactory, SelectorProvider.provider());
    }    
    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
    
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
    // io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        // 預設執行緒是 cpu * 2
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    // io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, java.lang.Object...)
    /**
     * Create a new instance.
     *
     * @param nThreads          the number of threads that will be used by this instance.
     * @param executor          the Executor to use, or {@code null} if the default should be used.
     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
     */
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }

    // io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
    /**
     * Create a new instance.
     *
     * @param nThreads          the number of threads that will be used by this instance.
     * @param executor          the Executor to use, or {@code null} if the default should be used.
     * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
     */
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        // 建立一個執行器,該執行器每提交一個任務,就建立一個執行緒來執行,即並沒有佇列的概念
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        // 使用一個數組來儲存整個可用的執行緒池
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // 為每個child建立一個執行緒執行, 該方法由子類實現
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    // 如果建立失敗,則把已經建立好的執行緒池關閉掉
                    // 不過值得注意的是,當某個執行緒池建立失敗後,並沒有立即停止後續建立工作,即無 return 操作,這是為啥?
                    // 實際上,發生異常時,Exeception 已經被丟擲,此處無需關注
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
        // 建立選擇器,猜測是做負載均衡時使用
        // 此處的chooser預設是 DefaultEventExecutorChooserFactory
        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

    // io.netty.channel.nio.NioEventLoopGroup#newChild
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        // 注意此處的引數型別是由外部進行保證的,在此直接做強轉操作
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
    
    // io.netty.channel.nio.NioEventLoop#NioEventLoop
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        // 此構造器會做很多事,比如建立佇列,開啟nio selector...
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }


    // io.netty.util.concurrent.DefaultEventExecutorChooserFactory#newChooser
    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        // 如: 1,2,4,8... 都會建立 PowerOfTwoEventExecutorChooser
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    // io.netty.util.concurrent.DefaultPromise#addListener
    @Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        checkNotNull(listener, "listener");

        synchronized (this) {
            addListener0(listener);
        }

        if (isDone()) {
            notifyListeners();
        }

        return this;
    }

  以上,就是 NioEventLoopGroup 的建立過程了. 本質上其就是一個個的單獨的執行緒組成的陣列列表, 等待被呼叫.

3. ServerBootstrap 的建立

  ServerBootstrap是Netty的一個服務端核心入口類, 它可以很快速的建立一個穩定的netty服務.

  ServerBootstrap 的類圖如下:

  還是非常純粹的啊!其中有意思是的, ServerBootstrap繼承自 AbstractBootstrap, 而這個 AbstractBootstrap 是一個自依賴的抽象類: AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> , 這樣,即父類可以直接返回子類的資訊了。

  其預設構造方法為空,所以所以引數都使用預設值, 因為還有後續的引數設定過程,接下來,我們看看其一些關鍵引數的設定:

    // 1. channel的設定
    // io.netty.bootstrap.AbstractBootstrap#channel
    /**
     * The {@link Class} which is used to create {@link Channel} instances from.
     * You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
     * {@link Channel} implementation has no no-args constructor.
     */
    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        // 預設使用構造器反射的方式建立 channel
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }
    // io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.channel.ChannelFactory<? extends C>)
    /**
     * {@link io.netty.channel.ChannelFactory} which is used to create {@link Channel} instances from
     * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
     * is not working for you because of some more complex needs. If your {@link Channel} implementation
     * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for
     * simplify your code.
     */
    @SuppressWarnings({ "unchecked", "deprecation" })
    public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
        return channelFactory((ChannelFactory<C>) channelFactory);
    }
    // io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.bootstrap.ChannelFactory<? extends C>)
    /**
     * @deprecated Use {@link #channelFactory(io.netty.channel.ChannelFactory)} instead.
     */
    @Deprecated
    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory");
        }
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }

        this.channelFactory = channelFactory;
        return self();
    }
    @SuppressWarnings("unchecked")
    private B self() {
        return (B) this;
    }

    // 2. option 引數選項設定, 它會承包各種特殊配置的設定, 是一個通用配置項設定的入口 
    /**
     * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got
     * created. Use a value of {@code null} to remove a previous set {@link ChannelOption}.
     */
    public <T> B option(ChannelOption<T> option, T value) {
        if (option == null) {
            throw new NullPointerException("option");
        }
        // options 是一個 new LinkedHashMap<ChannelOption<?>, Object>(), 即非執行緒安全的容器, 所以設定值時要求使用 synchronized 保證執行緒安全
        // value 為null時代表要將該選項設定刪除, 如果key相同,後面的配置將會覆蓋前面的配置
        if (value == null) {
            synchronized (options) {
                options.remove(option);
            }
        } else {
            synchronized (options) {
                options.put(option, value);
            }
        }
        return self();
    }
    
    // 3. childHandler 新增channelHandler, 這是一個最重要的一個方法, 它會影響到後面的業務處理統籌
    // 呼叫該方法僅將 channelHandler的上下文加入進來, 實際還未進行真正的新增操作 .childHandler(new ChannelInitializer<SocketChannel>() {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 100) // 設定tcp協議的請求等待佇列
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new LoggingHandler(LogLevel.INFO));
                    p.addLast("encoder", new MessageEncoder());
                    p.addLast("decoder", new MessageDecoder());
                    p.addLast(new EchoServerHandler());
                }
            });
    /**
     * Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.
     */
    public ServerBootstrap childHandler(ChannelHandler childHandler) {
        if (childHandler == null) {
            throw new NullPointerException("childHandler");
        }
        // 僅將 channelHandler 繫結到netty的上下文中
        this.childHandler = childHandler;
        return this;
    }
    
    // 4. bossGroup, workGroup 如何被分配 ?
    /**
     * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
     * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
     * {@link Channel}'s.
     */
    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        // parentGroup 是給acceptor使用的, 主要用於對socket連線的接入,所以一般一個執行緒也夠了
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        // childGroup 主要用於接入後的socket的事件的處理,一般要求數量較多,視業務屬性決定
        this.childGroup = childGroup;
        return this;
    }

  bind 繫結tcp埠,這個是真正觸發server初始化的一步,工作量比較大,我們另開一段講解。

4. nettyServer 的初始化

  前面所有工作都是在準備, 都並未體現在外部, 而 bind 則會是開啟一個對外服務, 對外可見, 真正啟動server.

    // io.netty.bootstrap.AbstractBootstrap#bind(int)
    /**
     * Create a new {@link Channel} and bind it.
     */
    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }
    // io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
    /**
     * Create a new {@link Channel} and bind it.
     */
    public ChannelFuture bind(SocketAddress localAddress) {
        // 先驗證各種引數是否設定完整, 如執行緒池是否設定, channelHandler 是否設定...
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        // 繫結tcp埠
        return doBind(localAddress);
    }
    private ChannelFuture doBind(final SocketAddress localAddress) {
        // 1. 建立一些channel使用, 與eventloop繫結, 統一管理嘛
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            // 2. 註冊成功之後, 開始實際的 bind() 操作, 實際就是呼叫 channel.bind()
            // doBind0() 是一個非同步的操作,所以使用的一個 promise 作為結果驅動
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

  所以,從整體來說,bind()過程分兩大步走:1. 初始化channel,與nio關聯; 2. 落實channel和本地埠的繫結工作; 我們來細看下:

4.1 初始化channel

  初始化channel, 並註冊到 selector上, 這個操作實際上非常重要。

    // 以下我們先看下執行框架
    // io.netty.bootstrap.AbstractBootstrap#initAndRegister
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 即根據前面設定的channel 使用反射建立一個例項出來
            // 即此處將會例項化出一個 ServerSocketChannel 出來
            // 所以如果你想用jdk的nio實現,則設定channel時使用 NioServerSocketChannel.class即可, 而你想使用其他更優化的實現時比如EpollServerSocketChannel時,改變一下即可
            // 而此處的 channelFactory 就是一個反射的實現 ReflectiveChannelFactory, 它會呼叫如上channel的無參構造方法例項化
            // 重點工作就需要在這個無參構造器中完成,我們接下來看看
            channel = channelFactory.newChannel();
            // 初始化channel的一些公共引數, 相當於做一些屬性的繼承, 因為後續它將不再依賴 ServerBootstrap, 它需要有獨立自主能力
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // 註冊建立好的 channel 到eventLoop中
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }
    
    // 1. 先看看 NioServerSocketChannel 的構造過程
    // io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel()
    /**
     * Create a new instance
     */
    public NioServerSocketChannel() {
        // newSocket 簡單說就是建立一個本地socket, api呼叫: SelectorProvider.provider().openServerSocketChannel()
        // 但此時本 socket 並未和任何埠繫結
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    /**
     * Create a new instance using the given {@link ServerSocketChannel}.
     */
    public NioServerSocketChannel(ServerSocketChannel channel) {
        // 註冊 OP_ACCEPT 事件
        super(null, channel, SelectionKey.OP_ACCEPT);
        // 此處的 javaChannel() 實際就是 channel, 這樣呼叫只是為統一吧
        // 建立一個新的 socket 傳入 NioServerSocketChannelConfig 中
        // 主要用於一些 RecvByteBufAllocator 的設定,及channel的儲存
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    // io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel
    /**
     * Create a new instance
     *
     * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
     * @param ch                the underlying {@link SelectableChannel} on which it operates
     * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}
     */
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        // 先讓父類初始化必要的上下文
        super(parent);
        // 保留 channel 資訊,並設定非阻塞標識
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }
    // io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
    /**
     * Creates a new instance.
     *
     * @param parent
     *        the parent of this channel. {@code null} if there's no parent.
     */
    protected AbstractChannel(Channel parent) {
        // 初始化上下文
        this.parent = parent;
        // DefaultChannelId
        id = newId();
        // NioMessageUnsafe
        unsafe = newUnsafe();
        // new DefaultChannelPipeline(this); 
        // 比較重要,將會初始化 head, tail 節點
        pipeline = newChannelPipeline();
    }
    // io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
        // 初始化 head, tail
        tail = new TailContext(this);
        head = new HeadContext(this);
        // 構成雙向連結串列
        head.next = tail;
        tail.prev = head;
    }



    // 2. 初始化channel, 有個最重要的動作是將 Acceptor 接入到 pipeline 中
    // io.netty.bootstrap.ServerBootstrap#init
    @Override
    void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        // 根據前面的設定, 將各種屬性copy過來, 放到 config 欄位中
        // 同樣, 因為 options 和 attrs 都不是執行緒安全的, 所以都要上鎖操作
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
        // 此處的pipeline, 就是在 NioServerSocketChannel 中初始化好head,tail的pipeline
        ChannelPipeline p = channel.pipeline();
        // childGroup 實際就是外部的 workGroup
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }
        // 這個就比較重要了, 關聯 ServerBootstrapAcceptor
        // 主動新增一個 initializer, 它將作為第一個被呼叫的 channelInitializer 存在 
        // 而 channelInitializer 只會被呼叫一次
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        // 新增 Acceptor 到 pipeline 中, 形成一個 head -> ServerBootstrapAcceptor -> tail 的pipeline
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
        // 此操作過後,當前pipeline中,就只有此一handler
    }

  。。。

4.2 handler的新增過程

  addLast() 看起來只是一個新增元素的過程, 總體來說就是一個雙向連結串列的新增, 但也蠻有意思的, 有興趣可以戳開詳情看看.

    // io.netty.channel.ChannelHandler
    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }
    // io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, io.netty.channel.ChannelHandler...)
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }
        // 支援同時新增多個 handler
        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }
    // io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, java.lang.String, io.netty.channel.ChannelHandler)
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 重複性檢查 @Shareable 引數使用
            checkMultiplicity(handler);
            // 生成一個新的上下文, filterName()將會生成一個唯一的名稱, 如 ServerBootstrap$1#0
            newCtx = newContext(group, filterName(name, handler), handler);
            // 將當前ctx新增到連結串列中
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                // 未註冊情況下, 不會進行下一步了
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
            // 而已註冊情況下, 則會使用 executor 提交callHandlerAdded0, 即呼叫 pipeline 的頭節點
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        // 一個雙向連結串列儲存上下文
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
    // 新增ctx到佇列尾部
    private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert !registered;

        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        if (pending == null) {
            pendingHandlerCallbackHead = task;
        } else {
            // Find the tail of the linked-list.
            while (pending.next != null) {
                pending = pending.next;
            }
            pending.next = task;
        }
    }
    // 對每一次新增 handler, 則都會產生一個事件, 通知現有的handler, handlerAdded()
    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
            // any pipeline events ctx.handler() will miss them because the state will not allow it.
            ctx.setAddComplete();
            ctx.handler().handlerAdded(ctx);
        } catch (Throwable t) {
            boolean removed = false;
            try {
                remove0(ctx);
                try {
                    ctx.handler().handlerRemoved(ctx);
                } finally {
                    ctx.setRemoved();
                }
                removed = true;
            } catch (Throwable t2) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to remove a handler: " + ctx.name(), t2);
                }
            }

            if (removed) {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; removed.", t));
            } else {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; also failed to remove.", t));
            }
        }
    }
檢視 handler 的新增過程

4.3 註冊channel,繫結eventloop執行緒

  經過前面兩步, channel已經建立好和初始化好了, 但還沒有看到 eventLoop 的影子. 實際上eventloop和channel間就差一個註冊了.

  也就是前面看到的 ChannelFuture regFuture = config().group().register(channel); 此處的group 即是 bossGroup.

    // io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
    @Override
    public ChannelFuture register(Channel channel) {
        // next() 相當於是一個負載均衡器, 會選擇出一個合適的 eventloop 出來, 預設是round-robin
        return next().register(channel);
    }
    // io.netty.channel.MultithreadEventLoopGroup#next
    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }
    // io.netty.util.concurrent.MultithreadEventExecutorGroup#next
    @Override
    public EventExecutor next() {
        // 使用前面建立的 PowerOfTwoEventExecutorChooser 進行呼叫 
        // 預設實現為輪詢
        return chooser.next();
    }
        // io.netty.util.concurrent.DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser#next
        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
        
    // io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)    
    @Override
    public ChannelFuture register(Channel channel) {
        // 使用 DefaultChannelPromise 封裝channel, 再註冊到 eventloop 中
        return register(new DefaultChannelPromise(channel, this));
    }
    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        // NioMessageUnsafe
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

        // io.netty.channel.AbstractChannel.AbstractUnsafe#register
        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;
            // inEventLoop() 判斷當前執行緒是否在 eventLoop 中
            // 判斷方式為直接比較 eventloop 執行緒也當前執行緒是否是同一個即可 Thread.currentThread() == this.thread;
            // 核心註冊方法 register0()
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                // 不在 eventLoop 中, 則非同步提交任務給 eventloop 處理
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

        // register0() 做真正的註冊
        // io.netty.channel.AbstractChannel.AbstractUnsafe#register0
        private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                // 具體的註冊邏輯由子類實現, NioServerSocketChannel
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                // 幾個擴充套件點: fireHandlerAdded() -> fireChannelRegistered() -> fireChannelActive()
                // part1: fireChannelAdded(), 它將會回撥上面的 ServerBootstrapAcceptor 的新增 channelInitializer
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                // part2: fireChannelRegistered()
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    // io.netty.channel.nio.AbstractNioChannel#doRegister
    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        // 進行註冊即是 JDK 的 ServerSocketChannel.register() 過程
        // 即 netty 與 socket 建立了關係連線, ops=0, 代表監聽所有讀事件
        for (;;) {
            try {
                // 一直註冊直到成功
                // 此處 ops=0, 即不關注任何事件哦, 那麼前面的 OP_ACCEPT 和這裡又是什麼關係呢?
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

  。。。

4.4 ServerBootstrapAcceptor 速覽

  前面我們看到, 在做 register() 完了之後, netty 會觸發一個invokeHandlerAddedIfNeeded, 從而呼叫fireHandlerAdded. 此時將會觸發 handlerAdded() 從而首次呼叫 ChannelInitializer.initChannel(), 從而將 ServerBootstrapAcceptor 新增到pipeline進來. ServerBootstrapAcceptor 獨立做的事情不多,更多是交給父類處理。

        ServerBootstrapAcceptor(
                final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
                Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
            this.childGroup = childGroup;
            this.childHandler = childHandler;
            this.childOptions = childOptions;
            this.childAttrs = childAttrs;

            // Task which is scheduled to re-enable auto-read.
            // It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
            // not be able to load the class because of the file limit it already reached.
            //
            // See https://github.com/netty/netty/issues/1328
            // 
            enableAutoReadTask = new Runnable() {
                @Override
                public void run() {
                    channel.config().setAutoRead(true);
                }
            };
        }
        
        // ServerBootstrapAcceptor 大部分情況下都是普通的 InboundHandler, 除了 channelRead() 時
        // io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
                // 它會向 childGroup 中提交channel過去, 從而使用 childGroup 產生作用
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

  。。。

4.5 埠的繫結 doBind0

  經過前面的channel的建立,初始化, Acceptor 的新增到handlerAdded(), 整個pipeline已經work起來了. 然後netty會回撥之前新增好的 listeners, 其中一個便是 doBind0();

    // 回顧下:
        ...
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
        ...
    // io.netty.bootstrap.AbstractBootstrap#doBind0
    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        // 這還是一個非同步過程
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                // channel.bind(), channel 與 埠繫結
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
    // io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
    @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        // bind() 被當作一個普通的出站事件, 在pipeline中被傳遞
        return pipeline.bind(localAddress, promise);
    }
    
    // io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
    @Override
    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        // 從tail開始傳遞
        return tail.bind(localAddress, promise);
    }
    // io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
    @Override
    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        if (isNotValidPromise(promise, false)) {
            // cancelled
            return promise;
        }
        // 同樣是一個pipeline式呼叫, bind() 是一個出站事件, 所以查詢 outbound
        // 最終會調到 DefaultChannelPipeline 中
        // netty的pipeline機制就體現在這裡, 它會一直查詢可用的handler, 然後執行它, 直到結束
        final AbstractChannelHandlerContext next = findContextOutbound();
        // 獲取其繫結的 executor
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }
    // -------------------------------------------------------------------------
    // 出入站handler的查詢實現, 非常簡單, 卻很有效 (該方法在 AbstractChannelHandlerContext 中實現,被所有handler通用)
    // io.netty.channel.AbstractChannelHandlerContext#findContextInbound
    private AbstractChannelHandlerContext findContextInbound() {
        // 以當前節點作為起點開始查詢, 取第一個入站handler返回, 沒有則說明 pipeline 已結束 
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }
    // io.netty.channel.AbstractChannelHandlerContext#findContextOutbound
    private AbstractChannelHandlerContext findContextOutbound() {
        // 以當前節點作為起點開始查詢, 取第一個出站handler返回, 沒有則說明 pipeline 已結束 
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }
    // -------------------------------------------------------------------------
    
    // io.netty.channel.AbstractChannelHandlerContext#invokeBind
    private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            bind(localAddress, promise);
        }
    }
        // 最終傳遞到 HeadContext 中進行處理
        // io.netty.channel.DefaultChannelPipeline.HeadContext#bind
        @Override
        public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                throws Exception {
            // unsafe 處理bind() 操作
            unsafe.bind(localAddress, promise);
        }
        // io.netty.channel.AbstractChannel.AbstractUnsafe#bind
        @Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            assertEventLoop();

            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            // See: https://github.com/netty/netty/issues/576
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                localAddress instanceof InetSocketAddress &&
                !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
                // Warn a user about the fact that a non-root user can't receive a
                // broadcast packet on *nix if the socket is bound on non-wildcard address.
                logger.warn(
                        "A non-root user can't receive a broadcast packet if the socket " +
                        "is not bound to a wildcard address; binding to a non-wildcard " +
                        "address (" + localAddress + ") anyway as requested.");
            }

            boolean wasActive = isActive();
            try {
                // 這裡會呼叫 jdk 的ServerSocketChannel介面, 實現真正的埠繫結
                // 至此, 服務對外可見
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }
            // 判斷是否是首次建立 channel, 如果是, 則呼叫 fireChannelActive() 傳播channelActive事件
            if (!wasActive && isActive()) {
                // 這將會被稍後執行
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }
            // 觸發一些通知什麼的, 結束了
            safeSetSuccess(promise);
        }
    // 最終的bind(), 是通過 jdk 底層的 serverSocketChannel 開啟socket監聽
    // io.netty.channel.socket.nio.NioServerSocketChannel#doBind
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            // 呼叫 serverSocketChannel bind() 方法,開啟socket監聽
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

  至此, bind 工作總算是完成了.我們來總結下它的主要工作:

    1. 初始化一個channel, 根據設定裡來, 我們使用 NioServerSocketChannel;
    2. 過繼現有的配置項給到channel;
    3. 將channel與eventloop繫結做註冊, 新增 ServerBootstrapAcceptor 到 pipeline 中;
    4. 繫結完成後, 通知現有的handler, 觸發系列事件: fireHandlerAdded() -> fireChannelRegistered() -> fireChannelActive();
    5. 而bind()則作為一個出站事件, 被處理, 最終呼叫 jdk的ServerSocketChannel.register() 完成埠的開啟;

  不過有一點需要注意, 在這個過程中, 只有 bossGroup 起作用, 所有的 workGroup 都還在待命中. 我們目前看到的 pipeline 是這樣的: head -> Acceptor -> tail;

  講了這麼多, 有一種繞了一大圈的感覺有木有, 如果你自己直接使用nio寫, 估計10行程式碼都不要就搞定了. 尷尬!

5. netty eventloop 主迴圈

  evenloop是netty的重要概念, 但在前面我們並未細講這玩意如何起作用(僅看過其建立過程而已), 不過這並不意味著它還沒起作用, 而我們暫時忽略了它. 每次要執行任務時, 總是會呼叫 eventloop().execute(...), 實際上這就是 eventloop的入口:

    // io.netty.util.concurrent.SingleThreadEventExecutor#execute
    @Override
    public void execute(Runnable task) {
        // execute 線上程池中, 是一個非同步任務的提交方法, eventloop中同樣也一樣
        // 但是大部分情況下只是新增佇列, 因為 eventloop 是單執行緒的
        if (task == null) {
            throw new NullPointerException("task");
        }
        // 向eventLoop佇列中新增task                                                                          
        boolean inEventLoop = inEventLoop();
        addTask(task);
        // 如果自身就是執行在 eventloop 環境中, 新增完task後則不再做更多的事
        if (!inEventLoop) {
            // 如果不是在eventLoop執行緒中,則都會嘗試建立新執行緒執行, 但實際會重新檢測執行緒是否建立
            startThread();
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
    // io.netty.util.concurrent.SingleThreadEventExecutor#addTask
    /**
     * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
     * before.
     */
    protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        // taskQueue = MpscUnsafeUnboundedArrayQueue, 基於Unsafe 和 cas 實現的執行緒安全的佇列
        if (!offerTask(task)) {
            // 新增失敗,則走拒絕策略
            reject(task);
        }
    }
    // startThread, 看起來是開啟執行緒的意思, 卻又不太一樣
    private void startThread() {
        // 所以實際上只會建立一次執行緒
        if (state == ST_NOT_STARTED) {
            // 搶到鎖的執行緒才能呼叫start()方法
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                try {
                    doStartThread();
                } catch (Throwable cause) {
                    STATE_UPDATER.set(this, ST_NOT_STARTED);
                    PlatformDependent.throwException(cause);
                }
            }
        }
    }
    // 開啟eventLoop的執行緒
    // io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
    private void doStartThread() {
        assert thread == null;
        // 它並不是簡單的thread.start()
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    // 核心方法,由 SingleThreadEventExecutor.run() 實現 
                    // 當然是由具體的executor具體實現了, 此文為 NioEventLoop.run()
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    // 執行緒池關閉,優雅停機
                    ...
                }
            }
        });
    }

  核心: 事件迴圈主框架, 既然是事件迴圈,則其必然是不會退出的。

    // io.netty.channel.nio.NioEventLoop#run
    @Override
    protected void run() {
        // 一個死迴圈檢測任務, 這就 eventloop 的大殺器哦
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    // 有任務時執行任務, 否則阻塞等待網路事件, 或被喚醒
                    case SelectStrategy.SELECT:
                        // select.select(), 帶超時限制
                        select(wakenUp.getAndSet(false));

                        // 'wakenUp.compareAndSet(false, true)' is always evaluated
                        // before calling 'selector.wakeup()' to reduce the wake-up
                        // overhead. (Selector.wakeup() is an expensive operation.)
                        //
                        // However, there is a race condition in this approach.
                        // The race condition is triggered when 'wakenUp' is set to
                        // true too early.
                        //
                        // 'wakenUp' is set to true too early if:
                        // 1) Selector is waken up between 'wakenUp.set(false)' and
                        //    'selector.select(...)'. (BAD)
                        // 2) Selector is waken up between 'selector.select(...)' and
                        //    'if (wakenUp.get()) { ... }'. (OK)
                        //
                        // In the first case, 'wakenUp' is set to true and the
                        // following 'selector.select(...)' will wake up immediately.
                        // Until 'wakenUp' is set to false again in the next round,
                        // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                        // any attempt to wake up the Selector will fail, too, causing
                        // the following 'selector.select(...)' call to block
                        // unnecessarily.
                        //
                        // To fix this problem, we wake up the selector again if wakenUp
                        // is true immediately after selector.select(...).
                        // It is inefficient in that it wakes up the selector for both
                        // the first case (BAD - wake-up required) and the second case
                        // (OK - no wake-up required).

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                // ioRatio 為io操作的佔比, 和執行任務相比, 預設為 50:50
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        // step1. 執行io操作
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        // step2. 執行task任務
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        // 執行任務的最長時間
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
    // select, 事件迴圈的依據
    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            // 帶超時限制, 預設最大超時1s, 但當有延時任務處理時, 以它為標準
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    // 超時則立即返回
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                // Selector#wakeup. So we need to check task queue again before executing select operation.
                // If we don't, the task might be pended until select operation was timed out.
                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }

                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                            selectCnt, selector);

                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
            // Harmless exception - log anyway
        }
    }

  反正整體就是這樣了, 迴圈檢測select, 執行io事件及execute task.

  有了這個 eventloop, 整體server就可以run起來了, 不管是有外部請求進來, 還是有內部任務提交, 都將被eventloop執行.

  不過還有一點未澄清的: 前面在做channel.register()時傳遞了一個 ops=0, 那它是如何監聽新連線事件的呢?

  實際上它是在註冊啟用完成之後, 再進行了一個read()的操作, 重新將 OP_ACCEPT 新增到 selectionKey 中了.(沒錯,底層永遠沒那麼多花招)

        // io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();
            // 會觸發 read() 流程, 修改 selectionKey 的 ops 標誌位
            readIfIsAutoRead();
        }
        ...
        // io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead
        @Override
        public final void beginRead() {
            assertEventLoop();

            if (!isActive()) {
                return;
            }

            try {
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }
    // io.netty.channel.nio.AbstractNioMessageChannel#doBeginRead
    @Override
    protected void doBeginRead() throws Exception {
        if (inputShutdown) {
            return;
        }
        super.doBeginRead();
    }
    // io.netty.channel.nio.AbstractNioChannel#doBeginRead
    @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            // readInterestOp, 即是前面設定的 OP_ACCEPT
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
View Code

  本文有點長了, 留點東西下篇繼續: io事件如何處理? 任務如何執行?