1. 程式人生 > 實用技巧 >netty之EventLoop原始碼分析

netty之EventLoop原始碼分析

我們在講解服務端和客戶端時經常會看到提交一個任務到channel對應的EventLoop上,後續的io事件監聽和任務執行都在EventLoop完成,可以說EventLoop是netty最核心的元件,我們接下來一一分析 剝開這層神祕的面紗

提交一個連線任務非同步執行

channel.eventLoop().execute(new Runnable() {
    @Override
    public void run() {
        if (localAddress == null) {
            channel.connect(remoteAddress, connectPromise);
        } 
else { channel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } });
/***************************SingleThreadEventExecutor************************/
public void execute(Runnable task) {
    if (task == null
) { throw new NullPointerException("task"); } // 判斷提交任務的執行緒是否是EventLoop執行緒 boolean inEventLoop = inEventLoop(); // 把任務提交到佇列中 addTask(task); if (!inEventLoop) { // 如果不是 嘗試開啟EventLoop執行緒 startThread(); if (isShutdown() && removeTask(task)) { reject(); } }
if (!addTaskWakesUp && wakesUpForTask(task)) { // 喚醒正在等待task阻塞的執行緒,實現在NioEventLoop中,阻塞物件為selector wakeup(inEventLoop); } } private void startThread() { // 如果執行緒還未開啟,隨即開啟執行緒 if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { try { doStartThread(); } catch (Throwable cause) { STATE_UPDATER.set(this, ST_NOT_STARTED); PlatformDependent.throwException(cause); } } } } private void doStartThread() { assert thread == null; // 提交一個任務用來開啟"EventLoop執行緒",這裡終於用到了executor執行緒池 executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } try { // !!!終於要幹正事了 SingleThreadEventExecutor.this.run(); success = true; } } }); } /***************************NioEventLoop************************/ protected void run() { // 無限迴圈 for (;;) { try { // selectNoSupplier執行的是selector.selectNow(), hasTasks() 判斷任務佇列是否有任務 // 如果不為空 那就執行selector.selectNow()返回結果,如果為空返回SelectStrategy.SELECT // 目的根據是否有任務來決定阻塞 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: // 佇列中沒有任務進行等待io阻塞 select(wakenUp.getAndSet(false)); if (wakenUp.get()) { // 喚醒來自其它執行緒執行selector.select()的阻塞 因為當前已經獲取到最新的io事件了 selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { // 執行io事件 processSelectedKeys(); } finally { // 執行佇列任務 runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; // 通過ioRatio和執行io耗費的時長來算出能執行佇列任務的時長 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } } private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { // 如果有定時任務在定時任務剩餘的時間上加0.5ms進行阻塞 預設阻塞1s long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } // 如果恰好在新增task時,wakenUp被設定了true 也就是需要進行喚醒 那麼這個任務是不需要現在執行的,如果不這樣做,任務可能會被掛起,直到select操作超時。 if (hasTasks() && wakenUp.compareAndSet(false, true)) { // 不需要喚醒 再執行一次非阻塞的selectNow()隨即返回 selector.selectNow(); selectCnt = 1; break; } int selectedKeys = selector.select(timeoutMillis); selectCnt ++; // 如果1s後有返回||select()被喚醒||佇列中有任務||有定時任務 則跳出迴圈 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // 當前迴圈時間小於1s且迴圈次數超過selector重新生成的界限 將當前selector重新生成一個,將舊selector對應的channel也進行重新註冊 rebuildSelector(); selector = this.selector; selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } } }

首先根據佇列中是有任務來決定執行帶阻塞的selector.select()還是不帶阻塞的select.selectNow(),直到

然後執行io事件和佇列任務,netty在這裡做了一個優化策略,通過ioRatio來控制 執行任務佇列的時長,為了防止任務佇列一直執行阻塞著影響後續的io事件處理,ioRatio預設為50也就是執行佇列任務的時長等於io事件執行的時間

processSelectedKeys()和runAllTasks()方法比較簡單,直接追進去一看就明白了了,這裡我就不貼了

經過本節分析後,netty整個服務流程就分析的差不多了,涉及到的原始碼非常多,建議讀者多讀幾遍原始碼