1. 程式人生 > 程式設計 >Netty Reactor執行緒模型詳解

Netty Reactor執行緒模型詳解

一、簡介

1. Reactor是什麼?

     Reactor pattern(反應器模式)是用於處理通過一個或多個輸入同時傳遞給伺服器的服務請求的事件處理模式。服務處理程式複用傳入的請求,並將它們同步分派給關聯的handler。 關鍵幾點:

(1) 由事件驅動

(2) 處理多個輸入

(3) 採用多路複用將事件分發給相應的Handler處理

2. Reactor主要元件

(1) Reactor

     負責響應事件,將事件分發綁定了該事件的Handler處理。對應netty 的NioEventLoop.run(),processSelectedKeys()。

(2) Handler

     事件處理器,綁定了某類事件,負責執行對應事件的任務對事件進行處理。對應netty的IdleStateHandler等。

(3) Acceptor

     Acceptor屬於handler中的一種,因為更加特殊,獨立出來講,是reactor的事件接收類,負責初始化selector和接收緩衝佇列。對應netty的ServerBootstrapAcceptor。

二、流程

     Reactor執行緒池中的每一Reactor執行緒都會有自己的Selector、執行緒和分發的事件迴圈邏輯。 mainReactor可以只有一個,但subReactor一般會有多個。mainReacto執行緒主要負責接收客戶端的連線請求,然後將接收到的SocketChannel傳遞給subReactor,由subReactor來完成和客戶端的通訊。 原始碼解析

1. 建立mainReactor執行緒池和subReactor執行緒池

	bossGroup = new NioEventLoopGroup();
	workGroup = new NioEventLoopGroup(4);
複製程式碼
protected MultithreadEventExecutorGroup(int nThreads,ThreadFactory threadFactory,Object... args) {
     children = new SingleThreadEventExecutor[nThreads];
     ...
     for (int i = 0; i < nThreads; i ++) {
            ...
            children[i] = newChild(threadFactory,args);
            ...
     }
}
複製程式碼
@Override
protected EventExecutor newChild(
        ThreadFactory threadFactory,Object... args) throws Exception {
    return new NioEventLoop(this,threadFactory,(SelectorProvider) args[0]);
}
複製程式碼

     在這裡建立了mainReactor和subReactor執行緒池,並且建立了eventLoop執行緒

NioEventLoop(NioEventLoopGroup parent,SelectorProvider selectorProvider) {
    super(parent,false);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    provider = selectorProvider;
    selector = openSelector();
}
複製程式碼

     每個eventLoop執行緒都會有自身的selector,這裡的eventLoop執行緒還未啟動,後面啟動之後,會執行run()裡面的selector.select。

2. mainReactor繫結OP_ACCEPT事件的selector,並開啟執行緒迴圈執行selector.select();

ChannelFuture regFuture = group().register(channel); 這裡的group()是bossGroup

@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}
複製程式碼

     next()執行

@Override
public EventLoop next() {
    return (EventLoop) super.next();
}
複製程式碼
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    @Override
    public EventExecutor next() {
        return children[childIndex.getAndIncrement() & children.length - 1];
    }
}
複製程式碼

     從執行緒池中取出第一個eventLoop

@Override
public ChannelFuture register(final Channel channel,final ChannelPromise promise) {
     ...
    channel.unsafe().register(this,promise);
    return promise;
}
複製程式碼
@Override
public final void register(EventLoop eventLoop,final ChannelPromise promise) {
    ...
    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
    try {
        eventLoop.execute(new OneTimeTask() {
            @Override
            public void run() {
                register0(promise);
            }
        });
    } catch (Throwable t) {
}
複製程式碼

     這裡將mainReactor的eventLoop和伺服器的NioServerSocketChannel繫結。 因為剛開始啟動是main執行緒,執行eventLoop.execute,在這裡mainReactor只啟動了一個執行緒。

@Override
public void execute(Runnable task) {
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        ...
    }
        ...
}
複製程式碼

     在execute中執行startThread(),正式啟動mainReactor執行緒迴圈,並且將register0(promise)這個Task加入taskQueue中,讓mainReactor迴圈執行。

private void register0(ChannelPromise promise) {
    doRegister();
    neverRegistered = false;
    registered = true;
    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();
    if (firstRegistration && isActive()) {
        pipeline.fireChannelActive();
    }
}
複製程式碼
@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        ...
        selectionKey = javaChannel().register(eventLoop().selector,this);
	...	
    }
}
複製程式碼

     這裡將mainReactor 中eventLoop的selector註冊一個為0的操作監聽位,並將伺服器的NioServerSocketChannel繫結到mainSubReactor執行緒上 在doBind()-->doBind0()-->channel.bind()-->…-->next.invokeBind()-->HeadContext. Bind()-->unsafe.bind()-->pipeline.fireChannelActive()-->channel.read()-->…-->doBeginRead()中修改為OP_ACCEPT(16)操作監聽位。

@Override
protected void doBeginRead() throws Exception {
…
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}
複製程式碼
  自此,mainReactor的eventLoop從run開始迴圈執行selector.select。
  注:readInterestOp的值來自於建立NioServerSocketChannel的建構函式
複製程式碼
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null,channel,SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this,javaChannel().socket());
}
複製程式碼

3. subReactor 註冊OP_READ事件

     在收到客戶端連線後,將會在ServerBootstrapAcceptor中把客戶端的Channel註冊在subReactor執行緒上,並將這個channel繫結到subReactor執行緒的selector上,監聽客戶端channel的OP_READ事件

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}
複製程式碼

     當監聽到客戶端連線,執行伺服器AbstractNioUnsafe的read();

@Override
public void read() {
    ...
    int localRead = doReadMessages(readBuf);
    ...
    for (int i = 0; i < size; i ++) {
        pipeline.fireChannelRead(readBuf.get(i));
    }
    ...
    pipeline.fireChannelReadComplete();
    ...
}
複製程式碼

(1) doReadMessage

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = javaChannel().accept();
    ...
    buf.add(new NioSocketChannel(this,ch));
    ...
}
複製程式碼
public NioSocketChannel(Channel parent,SocketChannel socket) {
    super(parent,socket);
    config = new NioSocketChannelConfig(this,socket.socket());
}
複製程式碼
protected AbstractNioByteChannel(Channel parent,SelectableChannel ch) {
    super(parent,ch,SelectionKey.OP_READ);
}
複製程式碼

     設定客戶端channel監聽位的值為OP_READ(1)

(2) pipeline.fireChannelRead()

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx,Object msg) {
   final Channel child = (Channel) msg;
   child.pipeline().addLast(childHandler);

   for (Entry<ChannelOption<?>,Object> e: childOptions) {
       try {
           if (!child.config().setOption((ChannelOption<Object>) e.getKey(),e.getValue())) {
               logger.warn("Unknown channel option: " + e);
           }
       } catch (Throwable t) {
           logger.warn("Failed to set a channel option: " + child,t);
       }
   }

   for (Entry<AttributeKey<?>,Object> e: childAttrs) {
       child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
   }

   try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (!future.isSuccess()) {
                forceClose(child,future.cause());
            }
         }
       });
     } catch (Throwable t) {
         forceClose(child,t);
     }
}
}
複製程式碼

     ServerBootstrapAcceptor不僅將subReactor繫結客戶端channel,還為客戶端channel進行一些引數的初始化

@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}
複製程式碼

     和上面的register一樣,只是將mainReactor執行緒池改為了subReactor執行緒池。 在這裡將從subReactor執行緒池中取一個執行緒的selector與客戶端channel繫結,並監聽該客戶端0事件。

(3) pipeline.fireChannelReadComplete()

@Override
public ChannelPipeline fireChannelReadComplete() {
    head.fireChannelReadComplete();
    if (channel.config().isAutoRead()) {
        read();
    }
    return this;
}
複製程式碼

     read()-->tail.read()-->next.invokeRead()-->HeadContext. read()-->…--> doBeginRead()

@Override
protected void doBeginRead() throws Exception {
    ...
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}
複製程式碼

     在這裡將監聽位改為OP_READ(1)

4. subReactor處理read事件

	if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            ...
        }
複製程式碼

     進到NioByteUnsafe的read()方法

@Override
public final void read() {
    ...
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    ...
    byteBuf = allocHandle.allocate(allocator);
    ...
    pipeline.fireChannelRead(byteBuf);
    ...
}
複製程式碼
@Override
public ChannelPipeline fireChannelRead(Object msg) {
    head.fireChannelRead(msg);
    return this;
}
複製程式碼
private void invokeChannelRead(Object msg) {
    try {
        ((ChannelInboundHandler) handler()).channelRead(this,msg);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}
複製程式碼
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
        System.out.println("InBoundHandlerB: " + msg);
        super.channelRead(ctx,msg);
    }
}
複製程式碼

     在這裡將接收到客戶端訊息處理

總結

     Reactor執行緒池有多少個,就會建立多少個selector,mainReactor的eventLoop會與伺服器的channel繫結,並只關注伺服器channel的ACCEPT事件,subReactor的eventLoop會與客戶端的channel繫結,並只關注客戶端channel的READ事件。

     mainReactor和subReactor迴圈各自的selector,mainReactor會迴圈ACCEPT事件的selector,subReactor會迴圈READ事件的selector,mainReactor接受到客戶端連線後,會執行ServerBootstrapAcceptor的channelRead方法,將客戶端連線與subReactor繫結。