1. 程式人生 > 實用技巧 >netty核心元件之EventLoopGroup和EventLoop

netty核心元件之EventLoopGroup和EventLoop

這節我們著重介紹netty最為核心的元件EventLoopGroup和EventLoop

EventLoopGroup:顧名思義就是EventLoop的組,下面來看它們的繼承結構

在netty中我們可以把EventLoop看做一個執行緒,當然執行緒不單是jdk中的的執行緒,它們都從Executor一路繼承過來,NioEventLoop繼承SinfleThreadEventLoop,從名字可以看出它是一個單執行緒的EventLoop,

我們先來看NioEventLoop是如何構造的

public NioEventLoopGroup() {
    this(0);
}
public NioEventLoopGroup(int
nThreads) { this(nThreads, (Executor) null); } ~~~~~~~~~~~~~ public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler) { super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler); }

NioEventLoop建構函式非常多,每個引數都可以定製,我就不全貼出來了,最後回到這個引數最全的建構函式,下面我們挨個解釋每個引數的作用

  • nThreads: 執行緒數,對應EventLoop的數量,為0時 預設數量為CPU核心數*2
  • executor: 這個我們再熟悉不過,最終用來執行EventLoop的執行緒
  • chooserFactor: 當我們提交一個任務到執行緒池,chooserFactor會根據策略選擇一個執行緒來執行
  • selectorProvider:用來例項化jdk中的selector,沒一個EventLoop都有一個selector
  • selectStrategyFactory:用來生成後續執行緒執行時對應的選擇策略工廠
  • rejectedExecutionHandler:跟jdk中執行緒池中的作用一樣,用於處理執行緒池沒有多餘執行緒的情況,預設直接丟擲異常

接著我們進入父類MultithreadEventLoopGroup的建構函式

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    // 建立nThreads大小的EventLoop陣列
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 建立具體的EventLoop,會呼叫子類NioEventLoopGruop中的方法
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                // 如果其中有一個建立失敗,把之前建立好的都關閉掉
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
    // 把剛才建立好的EventLoop提供給EventExecutorChooser,用於後續選擇
    chooser = chooserFactory.newChooser(children);

    // 新增一個EventLoop監聽器,用來監聽EventLoop終止狀態
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        // 迴圈加入
        e.terminationFuture().addListener(terminationListener);
    }
    // 將EventLoop陣列轉成一個只讀的set
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

我們繼續跟到父類NioEventLoopGroup中的newChild

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

可以看出先用來建立EventLoopGroup的引數其實都是用來建立EventLoop的,我們繼續跟NioEventLoop的構造

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    // 建立selector,由此可見每一個EventLoop都會有一個selector
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    this.executor = ObjectUtil.checkNotNull(executor, "executor");
    // 用來新增task的阻塞佇列 連結串列結構
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

這裡我們只是建立EventLoop,同時設定了執行的執行緒池、selector、taskQueue,並新增一個監聽器用來監聽它們的關閉狀態,當所有執行緒都全部處於關閉狀態terminationFuture會被置為true,到此還是沒有實際建立可執行的thread。

後續我們在介紹channel的時候就知道EventLoop和channel如何建立關係,什麼時候執行執行緒,我們姑且把EventLoop當做一個可執行runnable的netty執行緒