1. 程式人生 > >Netty 心跳相關(1)

Netty 心跳相關(1)

無論是B/S還是C/S架構,如果你用的是長連線,那麼心跳是必不可少的。Netty提供了對心跳機制的天然支援,今天結合例子特地學習了一下。

首先,我們來設想一下何時需要傳送心跳。假設你做的是一款棋牌類小遊戲,那麼當玩家登陸游戲後肯定是先進入大廳,再選擇一張合適的桌子正式開始遊戲。此時玩家的客戶端與伺服器建立的這一次session(會話)應該是長久保持著,如果伺服器端儲存著大量的session,那麼整個伺服器就會越來越卡,最終整個服務都會掛掉。

為了預防這種情況,我們需要清理掉一些已經不用的或者理論上不會再用的session,比如:在手機上,如果我們在遊戲中,突然接到一個電話或者退回桌面,這個時候我們的遊戲客戶端理論上就不會再主動向我們傳送任何訊息。這時候,心跳就派上用場了。

**心跳,是為了證明自己還活著。**因此,這裡的心跳,說白了就是客戶端向伺服器端傳送一次請求,伺服器端相應,這樣客戶端就知道了伺服器端是alive(活著的);伺服器端向客戶端傳送一次心跳,客戶端相應,這樣伺服器端就知道了客戶端是alive。

知道了心跳的大致概念,那現在我們就需要知道Netty中是如何實現心跳,這就引出了兩個類:IdleStateHandlerChannelInboundHandlerAdapter

IdleStateHandler

大致作用

當連線的空閒時間(無論是讀或者是寫)太長時,都會觸發IdleStateEvent事件。你可以寫一個類繼承ChannelInboundHandlerAdapter,重寫userEventTriggered方法,來處理這類空閒事件。

知道了其大致作用,那麼接下來就看看我們到底該如何使用了。

IdleStateHandler有3個構造方法,主要針對這4個屬性,分別是:

private final boolean observeOutput;// 是否考慮出站時較慢的情況。預設值是false(一般不考慮)。
private final long readerIdleTimeNanos; // 讀事件空閒時間,0 代表禁用事件
private final long writerIdleTimeNanos;// 寫事件空閒時間,0 代表禁用事件
private final long allIdleTimeNanos; //讀或寫空閒時間,0 代表禁用事件

上面的三個時間,預設是秒,你也可以在構造的時候指定。

當你在pipeline中加入了該handler之後:

pipeline.addLast(new IdleStateHandler(30, 90, 0)); // 這個代表只考慮讀空閒30秒或寫空閒90秒的情況

則會先呼叫handlerAdded方法:

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
            // channelActive() event has been fired already, which means this.channelActive() will
            // not be invoked. We have to initialize here instead.
            initialize(ctx);
        } else {
            // channelActive() event has not been fired yet.  this.channelActive() will be invoked
            // and initialization will occur there.
        }
    }

如果channel正常,則呼叫initialize方法:

    private byte state; // 0 - none, 1 - initialized, 2 - destroyed
    private void initialize(ChannelHandlerContext ctx) {
        // Avoid the case where destroy() is called before scheduling timeouts.
        // See: https://github.com/netty/netty/issues/143
        switch (state) {
        case 1: // 避免重複新增
        case 2: // 如果處於destoryed狀態,則不需要新增
            return;
        }

        state = 1;
        initOutputChanged(ctx);

        lastReadTime = lastWriteTime = ticksInNanos();  // 當前時間
        // 新增相應的定時排程任務
        if (readerIdleTimeNanos > 0) {
            // readerIdleTimeNanos時間後,執行ReaderIdleTimeoutTask裡面的方法
            readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                    readerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (writerIdleTimeNanos > 0) {
            writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                    writerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (allIdleTimeNanos > 0) {
            allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                    allIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
    }

ReaderIdleTimeoutTask、WriterIdleTimeoutTask、AllIdleTimeoutTask均繼承自類AbstractIdleTask

    private abstract static class AbstractIdleTask implements Runnable {

        private final ChannelHandlerContext ctx;

        AbstractIdleTask(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        @Override
        public void run() {
            if (!ctx.channel().isOpen()) {
                return;
            }

            run(ctx);
        }
        // 子類需要實現的方法
        protected abstract void run(ChannelHandlerContext ctx);
    }

ReaderIdleTimeoutTask為例:

    private final class ReaderIdleTimeoutTask extends AbstractIdleTask {

        ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        protected void run(ChannelHandlerContext ctx) {
            long nextDelay = readerIdleTimeNanos;
            if (!reading) { // 如果不在讀(channelRead時會被置為true,cahnnelReadComplete時會被置為false)
                nextDelay -= ticksInNanos() - lastReadTime;
            }

            if (nextDelay <= 0) { // 說明讀空閒時間達到或超過預設時間
                // Reader is idle - set a new timeout and notify the callback.
                readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
                // firstReaderIdleEvent,是否是第一次讀空閒事件(該標誌位會在下一次channelRead觸發時改成true,所以應該理解在一次讀取完成後,這個讀空閒事件是不是第一次)
                boolean first = firstReaderIdleEvent;
                firstReaderIdleEvent = false;

                try {
                    // 生成一個IdleStateEvent物件
                    IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                    // 找到下一個ChannelInboundHandler類(或其子類)的handler,觸發其userEventTrigger(可以參考AbstractChannelHandlerContext的fireUserEventTriggered方法)
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else { // 要麼正在讀,要麼讀空閒時間小於預設時間
                // Read occurred before the timeout - set a new timeout with shorter delay.
                readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }

schedule方法可以理解為將定時排程事件放進一個隊列當中(我是在AbstractScheduledEventExecutor裡找到的scheduledTaskQueue().add(task);,但這裡面的程式碼我還沒看明白,有興趣的你可以自己研究,研究完後如果有空可在下方評論)。
channelIdle(ctx, event)方法時找到下一個ChannelInboundHandler類(或其子類)的handler,因此你寫的繼承自ChannelInboundHandler的handler,一定要新增在IdleStateHandler的後面,比如:

    pipeline.addLast(new IdleStateHandler(30, 90, 0));
    pipeline.addLast(heartbeatHandler);

ChannelInboundHandler

它就很簡單了,因為上面說了,channelIdle會呼叫ChannelInboundHandler的userEventTrigger,所以你只要自己寫一個類繼承ChannelInboundHandler,並重寫它的userEventTrigger方法。比如:

// 用Sharable是因為我的每一個pipeline中用的都是同樣的handler
@Sharable
public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter {

  private final IHeartbeatFactory factory;

  public NettyHeartbeatHandler(IHeartbeatFactory factory) {
    Preconditions.checkNotNull(factory);
    this.factory = factory;
  }

  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (!(evt instanceof IdleStateEvent)) {
      super.userEventTriggered(ctx, evt);
      return;
    }

    IdleStateEvent event = (IdleStateEvent) evt;

    if (event.state() == IdleState.READER_IDLE) { // 如果是讀空閒,則關閉當前會話
      ctx.close(); // 此時會觸發下一個ChannelOutboundHandler的close方法,你可以在自己寫的handler中進行斷線操作
    } else if (event.state() == IdleState.WRITER_IDLE) { 
      // 如果是寫空閒,則向客戶端傳送心跳請求包,如果客戶端不返回心跳相應包,則說明客戶端斷線,下一次就將觸發讀空閒事件。這也是為了向客戶端證明伺服器端alive
      ctx.writeAndFlush(
          new BinaryWebSocketFrame(
              Unpooled.copiedBuffer(factory.getHeartbeatRequest().toByteArray())
          )
      );
    }
  }
}

因此,以上就是關於用Netty實現心跳的簡單介紹。其中帶大家重點看了伺服器端應該在什麼情況下發起一次心跳請求,應該是長久沒有收到訊息時(可能是有業務含義的訊息或者是一個心跳包)。如果大家有什麼想法可以在下方評論。

如果有興趣的話,可以看看我的私人部落格,說不定會有意外的感覺。