1. 程式人生 > 其它 >Netty學習之Reactor執行緒模型

Netty學習之Reactor執行緒模型

一、什麼是Reactor模型

  Reactor設計模式是event-driven architecture(事件驅動)的一種實現方式。Reactor會解耦併發請求的服務並分發給對應的事件處理器來處理。

  目前,許多流行的開源框架都用到了Reactor模型。如:netty、node.js等,包括java的nio。

二、基於IO事件驅動的分發處理模型

  1)分而治之

  一個連線裡完整的網路處理過程一般分為accept、read、decode、process、encode、send這幾步。

  Reactor模式將每個步驟對映為一個Task,服務端執行緒執行的最小邏輯單元不再是一次完整的網路請求,而是Task,且採用非阻塞方式執行。

  2)事件驅動

  每個Task對應特定網路事件。當Task準備就緒時,Reactor收到對應的網路事件通知,並將Task分發給綁定了對應網路事件的Handler執行。

  3)幾個角色

  reactor:負責繫結管理事件和處理介面;

  selector:負責監聽響應事件,將事件分發給綁定了該事件的Handler處理;

  Handler:事件處理器,綁定了某類事件,負責執行對應事件的Task對事件進行處理;

  Acceptor:Handler的一種,綁定了connect事件。當客戶端發起connect請求時,Reactor會將accept事件分發給Acceptor處理。

三、Reactor三種執行緒模型

  Netty是典型的Reactor模型結構,常見的Reactor執行緒模型有三種,分別是:Reactor單執行緒模型;Reactor多執行緒模型;主從Reactor多執行緒模型。

  1、單執行緒模型

  Reactor單執行緒模型,指的是所有的I/O操作都在同一個NIO執行緒上面完成,NIO執行緒的職責如下:

  • 作為NIO服務端,接收客戶端的TCP連線;
  • 作為NIO客戶端,向服務端發起TCP連線;
  • 讀取通訊對端的請求或者應答訊息;
  • 向通訊對端傳送訊息請求或者應答訊息;

  Reactor執行緒是個多面手,負責多路分離套接字,Accept新連線,並分派請求到處理器鏈中。該模型 適用於處理器鏈中業務處理元件能快速完成的場景。不過,這種單執行緒模型不能充分利用多核資源,所以實際使用的不多。如圖所示:所有的處理操作Reactor、Acceptor、Handler都是一個執行緒實現。

                

  服務端執行緒啟動程式碼如下:

public class ReactorServer {
    public static void main(String[] args) throws Exception{
        new Thread(new Reactor(8080),"reactor-001").start();
    }
}

  Reactor執行緒:

public class Reactor implements Runnable {

    private final Selector selector;
    private final ServerSocketChannel serverSocketChannel;

    public Reactor(int port) throws IOException { //Reactor初始化
        selector = Selector.open(); //開啟一個Selector
        serverSocketChannel = ServerSocketChannel.open(); //建立一個Server端通道
        serverSocketChannel.socket().bind(new InetSocketAddress(port)); //繫結服務埠
        serverSocketChannel.configureBlocking(false); //selector模式下,所有通道必須是非阻塞的
        //Reactor是入口,最初給一個channel註冊上去的事件都是accept
        SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        //attach callback object, Acceptor
        sk.attach(new Acceptor(serverSocketChannel, selector));//繫結接收事件處理器
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select(); //就緒事件到達之前,阻塞
                Set selected = selector.selectedKeys(); //拿到本次select獲取的就緒事件
                Iterator it = selected.iterator();
                while (it.hasNext()) {
                    //這裡進行任務分發
                    dispatch((SelectionKey) (it.next()));
                }
                selected.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if (selector != null) {
                try {
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    void dispatch(SelectionKey k) {

        Runnable r = (Runnable) (k.attachment()); //這裡很關鍵,拿到每次selectKey裡面附帶的處理物件,然後呼叫其run,這個物件在具體的Handler裡會進行建立,初始化的附帶物件為Acceptor(看上面構造器)
        //呼叫之前註冊的callback物件
        if (r != null) {
            r.run();//只是拿到控制代碼執行run方法,並沒有新起執行緒
        }
    }
}

  服務端Acceptor連線建立:

public class Acceptor implements Runnable {

    private final Selector selector;

    private final ServerSocketChannel serverSocketChannel;

    Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
        this.serverSocketChannel = serverSocketChannel;
        this.selector = selector;
    }

    @Override
    public void run() {
        SocketChannel socketChannel;
        try {
            socketChannel = serverSocketChannel.accept();   //三次握手
            if (socketChannel != null) {
                System.out.println(String.format("收到來自 %s 的連線",
                        socketChannel.getRemoteAddress()));

                new Handler(socketChannel, selector); //這裡把客戶端通道傳給Handler,
                // Handler負責接下來的事件處理(除了連線事件以外的事件均可)
          // new AsyncHandler(socketChannel, selector);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

  服務端Handler程式碼:

public class Handler implements Runnable {

    private final SelectionKey selectionKey;
    private final SocketChannel socketChannel;

    private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    private ByteBuffer sendBuffer = ByteBuffer.allocate(2048);

    private final static int READ = 0;
    private final static int SEND = 1;

    private int status = READ;

    public Handler(SocketChannel socketChannel, Selector selector) throws IOException {
        this.socketChannel = socketChannel; //接收客戶端連線
        this.socketChannel.configureBlocking(false); //置為非阻塞模式(selector僅允非阻塞模式)

        selectionKey = socketChannel.register(selector, 0); //將該客戶端註冊到selector,得到一個SelectionKey,以後的select到的就緒動作全都是由該物件進行封裝
        selectionKey.attach(this); //附加處理物件,當前是Handler物件,run是物件處理業務的方法
        selectionKey.interestOps(SelectionKey.OP_READ); //走到這裡,說明之前Acceptor裡的建連已完成,那麼接下來就是讀取動作,因此這裡首先將讀事件標記為“感興趣”事件
        selector.wakeup(); //讓阻塞的selector立即返回  ----> selector.select()
    }

    @Override
    public void run() {
        try {
            switch (status) {
                case READ:
                    read();
                    break;
                case SEND:
                    send();
                    break;
                default:
            }
        } catch (IOException e) { //這裡的異常處理是做了彙總,常出的異常就是server端還有未讀/寫完的客戶端訊息,客戶端就主動斷開連線,這種情況下是不會觸發返回-1的,這樣下面read和write方法裡的cancel和close就都無法觸發,這樣會導致死迴圈異常(read/write處理失敗,事件又未被cancel,因此會不斷的被select到,不斷的報異常)
            System.err.println("read或send時發生異常!異常資訊:" + e.getMessage());
            selectionKey.cancel();
            try {
                socketChannel.close();
            } catch (IOException e2) {
                System.err.println("關閉通道時發生異常!異常資訊:" + e2.getMessage());
                e2.printStackTrace();
            }
        }
    }

    private void read() throws IOException {
        if (selectionKey.isValid()) {
            readBuffer.clear();
            int count = socketChannel.read(readBuffer); //read方法結束,意味著本次"讀就緒"變為"讀完畢",標記著一次就緒事件的結束
            if (count > 0) {
                System.out.println(String.format("收到來自 %s 的訊息: %s",
                        socketChannel.getRemoteAddress(),new String(readBuffer.array())));
                status = SEND;
                selectionKey.interestOps(SelectionKey.OP_WRITE); //註冊寫方法
            } else {
                //讀模式下拿到的值是-1,說明客戶端已經斷開連線,那麼將對應的selectKey從selector裡清除,否則下次還會select到,因為斷開連線意味著讀就緒不會變成讀完畢,也不cancel,下次select會不停收到該事件
                //所以在這種場景下,(伺服器程式)你需要關閉socketChannel並且取消key,最好是退出當前函式。注意,這個時候服務端要是繼續使用該socketChannel進行讀操作的話,就會丟擲“遠端主機強迫關閉一個現有的連線”的IO異常。
                selectionKey.cancel();
                socketChannel.close();
                System.out.println("read時-------連線關閉");
            }
        }
    }

    void send() throws IOException {
        if (selectionKey.isValid()) {
            sendBuffer.clear();
            sendBuffer.put(String.format("我收到來自%s的資訊辣:%s,  200ok;",
                    socketChannel.getRemoteAddress(),
                    new String(readBuffer.array())).getBytes());
            sendBuffer.flip();
            int count = socketChannel.write(sendBuffer); //write方法結束,
            // 意味著本次寫就緒變為寫完畢,標記著一次事件的結束

            if (count < 0) {
                //同上,write場景下,取到-1,也意味著客戶端斷開連線
                selectionKey.cancel();
                socketChannel.close();
                System.out.println("send時-------連線關閉");
            }

            //沒斷開連線,則再次切換到讀
            status = READ;
            selectionKey.interestOps(SelectionKey.OP_READ);
        }
    }
}

  對於一些小容量應用場景,可以使用單執行緒模型,但是對於高負載、大併發的應用卻不合適,主要原因如下:

  • 一個NIO執行緒同時處理成百上千的鏈路,效能上無法支撐。即便NIO執行緒的CPU負荷達到100%,也無法滿足海量訊息的編碼、解碼、讀取和傳送;
  • 當NIO執行緒負載過重之後,處理速度將變慢,這會導致大量客戶端連線超時,超時之後往往進行重發,這更加重了NIO執行緒的負載,最終導致大量訊息積壓和處理超時,NIO執行緒會成為系統的效能瓶頸;
  • 可靠性問題。一旦NIO執行緒意外跑飛,或者進入死迴圈,會導致整個系統通訊模組不可用,不能接收和處理外部資訊,造成節點故障。

  為了解決這些問題,演進出了Reactor多執行緒模型,下面我們一起學習下Reactor多執行緒模型。

  2、多執行緒模型

  Reactor多執行緒模型與單執行緒模型最大區別就是有一組NIO執行緒處理I/O操作,它的特點如下:

  • 有一個專門的NIO執行緒--acceptor新城用於監聽服務端,接收客戶端的TCP連線請求;
  • 網路I/O操作--讀、寫等由一個NIO執行緒池負責,執行緒池可以採用標準的JDK執行緒池實現,它包含一個任務佇列和N個可用的執行緒,由這些NIO執行緒負責訊息的讀取、解碼、編碼和傳送;
  • 1個NIO執行緒可以同時處理N條鏈路,但是1個鏈路只對應1個NIO執行緒,防止發生併發操作問題。

  如圖所示:Reactor、Acceptor的處理操作是一個執行緒實現。Handler是另一個執行緒實現。

                

  因此,如程式碼所示,其餘程式碼一致,重寫Handler為:

public class AsyncHandler implements Runnable {

    private final Selector selector;

    private final SelectionKey selectionKey;
    private final SocketChannel socketChannel;

    private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    private ByteBuffer sendBuffer = ByteBuffer.allocate(2048);

    private final static int READ = 0; //讀取就緒
    private final static int SEND = 1; //響應就緒
    private final static int PROCESSING = 2; //處理中

    private int status = READ; //所有連線完成後都是從一個讀取動作開始的

    //開啟執行緒數為5的非同步處理執行緒池
    private static final ExecutorService workers = Executors.newFixedThreadPool(5);

    public AsyncHandler(SocketChannel socketChannel, Selector selector) throws IOException {
        this.socketChannel = socketChannel;
        this.socketChannel.configureBlocking(false);
        selectionKey = socketChannel.register(selector, 0);
        selectionKey.attach(this);
        selectionKey.interestOps(SelectionKey.OP_READ);
        this.selector = selector;
        this.selector.wakeup();
    }

    @Override
    public void run() { //如果一個任務正在非同步處理,那麼這個run是直接不觸發任何處理的,read和send只負責簡單的資料讀取和響應,業務處理完全不阻塞這裡的處理
        switch (status) {
            case READ:
                read();
                break;
            case SEND:
                send();
                break;
            default:
        }
    }

    private void read() {
        if (selectionKey.isValid()) {
            try {
                readBuffer.clear();
                int count = socketChannel.read(readBuffer);
                if (count > 0) {
                    status = PROCESSING; //置為處理中,處理完成後該狀態為響應,表示讀入處理完成,接下來可以響應客戶端了
                    workers.execute(this::readWorker); //非同步處理
                } else {
                    selectionKey.cancel();
                    socketChannel.close();
                    System.out.println("read時-------連線關閉");
                }
            } catch (IOException e) {
                System.err.println("處理read業務時發生異常!異常資訊:" + e.getMessage());
                selectionKey.cancel();
                try {
                    socketChannel.close();
                } catch (IOException e1) {
                    System.err.println("處理read業務關閉通道時發生異常!異常資訊:" + e.getMessage());
                }
            }
        }
    }

    void send() {
        if (selectionKey.isValid()) {
            status = PROCESSING; //置為執行中
            workers.execute(this::sendWorker); //非同步處理
            selectionKey.interestOps(SelectionKey.OP_READ); //重新設定為讀
        }
    }

    //讀入資訊後的業務處理
    private void readWorker() {
//        try {
//            Thread.sleep(5000L);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        System.out.println(String.format("收到來自客戶端的訊息: %s",
                new String(readBuffer.array())));

        status = SEND;

        selectionKey.interestOps(SelectionKey.OP_WRITE); //把當前事件改為寫事件
        this.selector.wakeup(); //喚醒阻塞在select的執行緒,
        // 因為該interestOps寫事件是放到子執行緒的,
        // select在該channel還是對read事件感興趣時又被呼叫
        // ,因此如果不主動喚醒,
        // select可能並不會立刻select該讀就緒事件(在該例中,可能永遠不會被select到)
    }

    private void sendWorker() {
        try {
            sendBuffer.clear();
            sendBuffer.put(String.format("我收到來自%s的資訊辣:%s,  200ok;",
                    socketChannel.getRemoteAddress(),
                    new String(readBuffer.array())).getBytes());
            sendBuffer.flip();

            int count = socketChannel.write(sendBuffer);

            if (count < 0) {
                selectionKey.cancel();
                socketChannel.close();
                System.out.println("send時-------連線關閉");
            } else {
                //再次切換到讀
                status = READ;
            }
        } catch (IOException e) {
            System.err.println("非同步處理send業務時發生異常!異常資訊:" + e.getMessage());
            selectionKey.cancel();
            try {
                socketChannel.close();
            } catch (IOException e1) {
                System.err.println("非同步處理send業務關閉通道時發生異常!異常資訊:" + e.getMessage());
            }
        }
    }
}

  在絕大多數場景下,Reactor多執行緒模型都可以滿足效能需求;但是,在極特殊應用場景中,一個NIO執行緒負責監聽和處理所有的客戶端連線可能會存在效能問題。例如百萬客戶端併發連線,或者服務端需要對客戶端的握手資訊進行安全認證,認證本身非常損耗效能。這類場景下,單獨一個Acceptor執行緒可能會存在效能不足問題,為了解決效能問題,產生了第三種Reactor執行緒模型--主從Reactor多執行緒模型。

  3、主從多執行緒模型

  特點是:服務端用於接收客戶端連線的不再是1個單獨的NIO執行緒,而是一個獨立的NIO執行緒池。Acceptor接收到客戶端TCP連線請求處理完成後(可能包含接入認證等),將新建立的SocketChannel註冊到I/O執行緒池(sub reactor執行緒池)的某個I/O執行緒上,由它負責SocketChannel的讀寫和編解碼工作。

  Acceptor執行緒池只用於客戶端的登入、握手和安全認證,一旦鏈路建立成功,就將鏈路註冊到後端subReactor執行緒池的I/O執行緒上,有I/O執行緒負責後續的I/O操作。

  第三種模型比起第二種模型,是將Reactor分成兩部分,mainReactor負責監聽server socket,accept新連線,並將建立的socket分派給subReactor。subReactor負責多路分離已連線的socket,讀寫網路資料,對業務處理功能,其扔給worker執行緒池完成。通常,subReactor個數上可與CPU個數等同。

  如下圖所示,

            

  新增SubReactor部分:

public class SubReactor implements Runnable {
    private final Selector selector;
    private boolean register = false; //註冊開關表示,為什麼要加這麼個東西,可以參考Acceptor設定這個值那裡的描述
    private int num; //序號,也就是Acceptor初始化SubReactor時的下標

    SubReactor(Selector selector, int num) {
        this.selector = selector;
        this.num = num;
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            System.out.println(String.format("%d號SubReactor等待註冊中...", num));
            while (!Thread.interrupted() && !register) {
                try {
                    if (selector.select() == 0) {
                        continue;
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator it = selectedKeys.iterator();
                while (it.hasNext()) {
                    dispatch((SelectionKey)it.next());
                    it.remove();
                }
            }
        }
    }

    private void dispatch(SelectionKey key) {
        Runnable r = (Runnable) (key.attachment());
        if (r != null) {
            r.run();
        }
    }

    void registering(boolean register) {
        this.register = register;
    }

}

  重寫Acceptor部分:

public class Acceptor implements Runnable {

    private final ServerSocketChannel serverSocketChannel;

    private final int coreNum = Runtime.getRuntime().availableProcessors(); // 獲取CPU核心數

    private final Selector[] selectors = new Selector[coreNum]; // 建立selector給SubReactor使用,個數為CPU核心數(如果不需要那麼多可以自定義,畢竟這裡會吞掉一個執行緒)

    private int next = 0; // 輪詢使用subReactor的下標索引

    private SubReactor[] subReactors = new SubReactor[coreNum]; // subReactor

    private Thread[] threads = new Thread[coreNum]; // subReactor的處理執行緒

    Acceptor(ServerSocketChannel serverSocketChannel) throws IOException {
        this.serverSocketChannel = serverSocketChannel;
        // 初始化
        for (int i = 0; i < coreNum; i++) {
            selectors[i] = Selector.open();
            subReactors[i] = new SubReactor(selectors[i], i); //初始化sub reactor
            threads[i] = new Thread(subReactors[i]); //初始化執行sub reactor的執行緒
            threads[i].start(); //啟動(啟動後的執行參考SubReactor裡的run方法)
        }
    }

    @Override
    public void run() {
        SocketChannel socketChannel;
        try {
            socketChannel = serverSocketChannel.accept(); // 阻塞獲取連線
            if (socketChannel != null) {
                //輪詢reactors[] 處理接收到的請求
                System.out.println(String.format("收到來自 %s 的連線",socketChannel.getRemoteAddress()));
                socketChannel.configureBlocking(false); //

                subReactors[next].registering(true);
                /*讓線下一次subReactors的while迴圈不去執行
                 selector.select,但是select我們是使用的不超時阻塞的方式,
                 所以下一步需要執行wakeup()
                 * */

                selectors[next].wakeup(); //使一個阻塞住的selector操作立即返回

                SelectionKey selectionKey = socketChannel.register(selectors[next],
                        SelectionKey.OP_READ); // 當前客戶端通道SocketChannel
                // 向selector[next]註冊一個讀事件,返回key

                selectors[next].wakeup();
                /*使一個阻塞住的selector操作立即返回,這樣才能對剛剛註冊的SelectionKey感興趣
                */

                subReactors[next].registering(false); // 本次事件註冊完成後,需要再次觸發select的執行
                // ,因此這裡Restart要在設定回false(具體參考SubReactor裡的run方法)
                selectionKey.attach(new AsyncHandler(socketChannel, selectors[next]));
                // 繫結Handler

                //輪詢負載
                if (++next == selectors.length) {
                    next = 0; //越界後重新分配
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}   

  四、NioEventLoopGroup 與 Reactor 執行緒模型的對應

  Netty的執行緒模型併發固定不變,通過在啟動輔助類中建立不同的EventLoopGroup例項並進行適當的引數配置,就可以支援上述三種Reactor執行緒模型。

/**
     * Netty單執行緒模型服務端程式碼示例
     * @param port
     */
    public void bind(int port) {
        EventLoopGroup reactorGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(reactorGroup, reactorGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast("http-codec", new HttpServerCodec());
                        ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
                        ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                        //後面程式碼省略
                    }
                });
        
            Channel ch = b.bind(port).sync().channel();
            ch.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            reactorGroup.shutdownGracefully();
        }
    }
/**
     * Netty多執行緒模型程式碼
     * @param port
     */
    public void bind2(int port) {
        EventLoopGroup acceptorGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup ioGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(acceptorGroup, ioGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast("http-codec", new HttpServerCodec());
                        ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
                        ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                        //後面程式碼省略
                    }
                });
        
            Channel ch = b.bind(port).sync().channel();
            ch.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            acceptorGroup.shutdownGracefully();
            ioGroup.shutdownGracefully();
        }
    }
/**
     * Netty主從執行緒模型程式碼
     * @param port
     */
    public void bind3(int port) {
        EventLoopGroup acceptorGroup = new NioEventLoopGroup();
        NioEventLoopGroup ioGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(acceptorGroup, ioGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast("http-codec", new HttpServerCodec());
                        ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
                        ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                        //後面程式碼省略
                    }
                });
        
            Channel ch = b.bind(port).sync().channel();
            ch.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            acceptorGroup.shutdownGracefully();
            ioGroup.shutdownGracefully();
        }
    }

  說完Reacotr模型的三種形式,那麼Netty是哪種呢?其實,我還有一種Reactor模型的變種沒說,那就是去掉執行緒池的第三種形式的變種,這也 是Netty NIO的預設模式。在實現上,Netty中的Boss類充當mainReactor,NioWorker類充當subReactor(預設 NioWorker的個數是Runtime.getRuntime().availableProcessors())。在處理新來的請求 時,NioWorker讀完已收到的資料到ChannelBuffer中,之後觸發ChannelPipeline中的ChannelHandler流。

  Netty是事件驅動的,可以通過ChannelHandler鏈來控制執行流向。因為ChannelHandler鏈的執行過程是在 subReactor中同步的,所以如果業務處理handler耗時長,將嚴重影響可支援的併發數。這種模型適合於像Memcache這樣的應用場景,但 對需要操作資料庫或者和其他模組阻塞互動的系統就不是很合適。Netty的可擴充套件性非常好,而像ChannelHandler執行緒池化的需要,可以通過在 ChannelPipeline中新增Netty內建的ChannelHandler實現類–ExecutionHandler實現,對使用者來說只是 新增一行程式碼而已。對於ExecutionHandler需要的執行緒池模型,Netty提供了兩種可 選:1) MemoryAwareThreadPoolExecutor 可控制Executor中待處理任務的上限(超過上限時,後續進來的任務將被阻 塞),並可控制單個Channel待處理任務的上限;2) OrderedMemoryAwareThreadPoolExecutor 是 MemoryAwareThreadPoolExecutor 的子類,它還可以保證同一Channel中處理的事件流的順序性,這主要是控制事件在非同步處 理模式下可能出現的錯誤的事件順序,但它並不保證同一Channel中的事件都在一個執行緒中執行(通常也沒必要)。一般來 說,OrderedMemoryAwareThreadPoolExecutor 是個很不錯的選擇。