netty核心元件之EventLoopGroup和EventLoop
阿新 • • 發佈:2020-12-23
這節我們著重介紹netty最為核心的元件EventLoopGroup和EventLoop
EventLoopGroup:顧名思義就是EventLoop的組,下面來看它們的繼承結構
在netty中我們可以把EventLoop看做一個執行緒,當然執行緒不單是jdk中的的執行緒,它們都從Executor一路繼承過來,NioEventLoop繼承SinfleThreadEventLoop,從名字可以看出它是一個單執行緒的EventLoop,
我們先來看NioEventLoop是如何構造的
public NioEventLoopGroup() { this(0); } public NioEventLoopGroup(intnThreads) { this(nThreads, (Executor) null); } ~~~~~~~~~~~~~ public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory,final RejectedExecutionHandler rejectedExecutionHandler) { super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler); }
NioEventLoop建構函式非常多,每個引數都可以定製,我就不全貼出來了,最後回到這個引數最全的建構函式,下面我們挨個解釋每個引數的作用
- nThreads: 執行緒數,對應EventLoop的數量,為0時 預設數量為CPU核心數*2
- executor: 這個我們再熟悉不過,最終用來執行EventLoop的執行緒
- chooserFactor: 當我們提交一個任務到執行緒池,chooserFactor會根據策略選擇一個執行緒來執行
- selectorProvider:用來例項化jdk中的selector,沒一個EventLoop都有一個selector
- selectStrategyFactory:用來生成後續執行緒執行時對應的選擇策略工廠
- rejectedExecutionHandler:跟jdk中執行緒池中的作用一樣,用於處理執行緒池沒有多餘執行緒的情況,預設直接丟擲異常
接著我們進入父類MultithreadEventLoopGroup的建構函式
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } // 建立nThreads大小的EventLoop陣列 children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 建立具體的EventLoop,會呼叫子類NioEventLoopGruop中的方法 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { // 如果其中有一個建立失敗,把之前建立好的都關閉掉 for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // 把剛才建立好的EventLoop提供給EventExecutorChooser,用於後續選擇 chooser = chooserFactory.newChooser(children); // 新增一個EventLoop監聽器,用來監聽EventLoop終止狀態 final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { // 迴圈加入 e.terminationFuture().addListener(terminationListener); } // 將EventLoop陣列轉成一個只讀的set Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
我們繼續跟到父類NioEventLoopGroup中的newChild
protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
可以看出先用來建立EventLoopGroup的引數其實都是用來建立EventLoop的,我們繼續跟NioEventLoop的構造
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; // 建立selector,由此可見每一個EventLoop都會有一個selector final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; } protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); this.executor = ObjectUtil.checkNotNull(executor, "executor"); // 用來新增task的阻塞佇列 連結串列結構 taskQueue = newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }
這裡我們只是建立EventLoop,同時設定了執行的執行緒池、selector、taskQueue,並新增一個監聽器用來監聽它們的關閉狀態,當所有執行緒都全部處於關閉狀態terminationFuture會被置為true,到此還是沒有實際建立可執行的thread。
後續我們在介紹channel的時候就知道EventLoop和channel如何建立關係,什麼時候執行執行緒,我們姑且把EventLoop當做一個可執行runnable的netty執行緒