1. 程式人生 > 其它 >【mq】從零開始實現 mq-06-消費者心跳檢測 heartbeat

【mq】從零開始實現 mq-06-消費者心跳檢測 heartbeat

前景回顧

【mq】從零開始實現 mq-01-生產者、消費者啟動

【mq】從零開始實現 mq-02-如何實現生產者呼叫消費者?

【mq】從零開始實現 mq-03-引入 broker 中間人

【mq】從零開始實現 mq-04-啟動檢測與實現優化

【mq】從零開始實現 mq-05-實現優雅停機

【mq】從零開始實現 mq-06-消費者心跳檢測 heartbeat

為什麼需要心跳?

心跳(heartbeat ),顧名思義就是心臟的跳動。

醫學上一般通過心跳是否跳動,來判斷一個人是否活著。

那麼,分散式服務中如何判斷一個服務是否還活著呢?

實現思路

比如 mq 中,broker 需要把訊息實時推送給線上的消費者。

那麼如何判斷一個消費者是否活著呢?

我們可以讓消費者定時,比如每 5 秒鐘給 broker 傳送一個心跳包,考慮到網路延遲等,如果連續 1min 都沒有收到心跳,我們則移除這個消費者,認為服務已經掛了。

消費者實現

上程式碼!

心跳實現

心跳可以是一個很簡單的訊息體。

@Override
public void heartbeat() {
    final MqHeartBeatReq req = new MqHeartBeatReq();
    final String traceId = IdHelper.uuid32();
    req.setTraceId(traceId);
    req.setMethodType(MethodType.C_HEARTBEAT);
    req.setAddress(NetUtil.getLocalHost());
    req.setPort(0);
    req.setTime(System.currentTimeMillis());

    log.debug("[HEARTBEAT] 往服務端傳送心跳包 {}", JSON.toJSON(req));

    // 通知全部
    for(RpcChannelFuture channelFuture : channelFutureList) {
        try {
            Channel channel = channelFuture.getChannelFuture().channel();
            callServer(channel, req, null);
        } catch (Exception exception) {
            log.error("[HEARTBEAT] 往服務端處理異常", exception);
        }
    }
}

消費者把心跳通知所有的 broker.

心跳的定時執行

我們啟動一個定時任務,5S 鍾執行一次。

/**
 * 初始化心跳
 * @since 0.0.6
 */
private void initHeartbeat() {
    //5S 發一次心跳
    scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            heartbeat();
        }
    }, 5, 5, TimeUnit.SECONDS);
}

心跳是在連線到 broker 之後就開始啟動:

@Override
public void initChannelFutureList(ConsumerBrokerConfig config) {
    //1. 配置初始化
    //...

    //2. 初始化
    this.channelFutureList = ChannelFutureUtils.initChannelFutureList(brokerAddress,
            initChannelHandler(), check);

    //3. 初始化心跳
    this.initHeartbeat();
}

Broker 實現

消費者定時傳送訊息,生產者肯定是需要接受的。

接收心跳

為了簡單,我們讓心跳是 ONE-WAY 的。

// 消費者心跳
if(MethodType.C_HEARTBEAT.equals(methodType)) {
    MqHeartBeatReq req = JSON.parseObject(json, MqHeartBeatReq.class);
    registerConsumerService.heartbeat(req, channel);
    return null;
}

hearbeat 處理

每次收到訊息,我們把請求的 channelId 記錄下來,並儲存最新的訪問時間

@Override
public void heartbeat(MqHeartBeatReq mqHeartBeatReq, Channel channel) {
    final String channelId = ChannelUtil.getChannelId(channel);
    log.info("[HEARTBEAT] 接收消費者心跳 {}, channelId: {}",
            JSON.toJSON(mqHeartBeatReq), channelId);

    ServiceEntry serviceEntry = new ServiceEntry();
    serviceEntry.setAddress(mqHeartBeatReq.getAddress());
    serviceEntry.setPort(mqHeartBeatReq.getPort());

    BrokerServiceEntryChannel entryChannel = InnerChannelUtils.buildEntryChannel(serviceEntry, channel);
    entryChannel.setLastAccessTime(mqHeartBeatReq.getTime());
    heartbeatMap.put(channelId, entryChannel);
}

移除消費者

如果一些消費者長時間沒有心跳,我們就認為服務已經掛了。

LocalBrokerConsumerService 服務啟動的時候,同時啟用一個定時清理任務。

public LocalBrokerConsumerService() {
    //120S 掃描一次
    final long limitMills = 2 * 60 * 1000;

    scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            for(Map.Entry<String, BrokerServiceEntryChannel> entry : heartbeatMap.entrySet()) {
                String key  = entry.getKey();
                long lastAccessTime = entry.getValue().getLastAccessTime();
                long currentTime = System.currentTimeMillis();
                if(currentTime - lastAccessTime > limitMills) {
                    removeByChannelId(key);
                }
            }
        }
    }, 2 * 60, 2 * 60, TimeUnit.SECONDS);
}

這個任務 2min 執行一次,如果 2min 都沒有心跳,這移除對應的消費者。

小結

心跳,是網路傳輸中驗證服務可用性非常簡單,但是有效的方式。

希望本文對你有所幫助,如果喜歡,歡迎點贊收藏轉發一波。

我是老馬,期待與你的下次重逢。

開源地址

The message queue in java.(java 簡易版本 mq 實現) https://github.com/houbb/mq

拓展閱讀

rpc-從零開始實現 rpc https://github.com/houbb/rpc