1. 程式人生 > 其它 >【mq】從零開始實現 mq-07-負載均衡 load balance

【mq】從零開始實現 mq-07-負載均衡 load balance

前景回顧

【mq】從零開始實現 mq-01-生產者、消費者啟動

【mq】從零開始實現 mq-02-如何實現生產者呼叫消費者?

【mq】從零開始實現 mq-03-引入 broker 中間人

【mq】從零開始實現 mq-04-啟動檢測與實現優化

【mq】從零開始實現 mq-05-實現優雅停機

【mq】從零開始實現 mq-06-消費者心跳檢測 heartbeat

【mq】從零開始實現 mq-07-負載均衡 load balance

為什麼需要負載均衡

大家好,我是老馬。

這一節讓我們看一下如何實現 MQ 的負載均衡。

為什麼需要負載均衡呢?

作用

負載均衡最核心的作用:

(1)可以避免單點故障

(2)可以讓請求均分的分散到每一個節點

實現思路

負載均衡實現的方式比較多,最簡單的就是隨機選擇一個。

拓展閱讀:

從零手寫實現負載均衡 http://houbb.github.io/2020/06/19/load-balance-03-hand-write

MQ 中用到負載均衡的地方

生產者傳送

生產者傳送訊息時,可以傳送給任一 broker。

broker 推送給消費者

broker 接收到訊息以後,在推送給消費者時,也可以任一選擇一個。

消費者的消費 ACK

消費者消費完,狀態回執給 broker,可以選擇任一一個。

訊息黏連

有些訊息比較特殊,比如需要保證消費的有序性,可以通過 shardingKey 的方式,在負載的時候固定到指定的片區。

程式碼實現

生產者傳送

統一調整獲取 channel 的方法。

@Override
public Channel getChannel(String key) {
    // 等待啟動完成
    while (!statusManager.status()) {
        log.debug("等待初始化完成...");
        DateUtil.sleep(100);
    }
    RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(this.loadBalance,
            channelFutureList, key);
    return rpcChannelFuture.getChannelFuture().channel();
}

工具類實現為核心實現:

/**
 * 負載均衡
 *
 * @param list 列表
 * @param key 分片鍵
 * @return 結果
 * @since 0.0.7
 */
public static <T extends IServer> T loadBalance(final ILoadBalance<T> loadBalance,
                                                final List<T> list, String key) {
    if(CollectionUtil.isEmpty(list)) {
        return null;
    }

    if(StringUtil.isEmpty(key)) {
        LoadBalanceContext<T> loadBalanceContext = LoadBalanceContext.<T>newInstance()
                .servers(list);
        return loadBalance.select(loadBalanceContext);
    }

    // 獲取 code
    int hashCode = Objects.hash(key);
    int index = hashCode % list.size();
    return list.get(index);
}

如果指定了 shardingKey,那麼根據 shadringKey 進行 hash 判斷。

如果沒有,則進行預設的負載均衡策略。

Broker 訊息推送給消費者

消費者訂閱列表的獲取:

@Override
public List<Channel> getSubscribeList(MqMessage mqMessage) {
    final String topicName = mqMessage.getTopic();
    Set<ConsumerSubscribeBo> set = subscribeMap.get(topicName);
    if(CollectionUtil.isEmpty(set)) {
        return Collections.emptyList();
    }

    //2. 獲取匹配的 tag 列表
    final List<String> tagNameList = mqMessage.getTags();
    Map<String, List<ConsumerSubscribeBo>> groupMap = new HashMap<>();
    for(ConsumerSubscribeBo bo : set) {
        String tagRegex = bo.getTagRegex();
        if(hasMatch(tagNameList, tagRegex)) {
            //TODO: 這種設定模式,統一新增處理 haven
            String groupName = bo.getGroupName();
            List<ConsumerSubscribeBo> list = groupMap.get(groupName);
            if(list == null) {
                list = new ArrayList<>();
            }
            list.add(bo);
            groupMap.put(groupName, list);
        }
    }

    //3. 按照 groupName 分組之後,每一組只隨機返回一個。最好應該調整為以 shardingkey 選擇
    final String shardingKey = mqMessage.getShardingKey();
    List<Channel> channelList = new ArrayList<>();
    for(Map.Entry<String, List<ConsumerSubscribeBo>> entry : groupMap.entrySet()) {
        List<ConsumerSubscribeBo> list = entry.getValue();
        ConsumerSubscribeBo bo = RandomUtils.loadBalance(loadBalance, list, shardingKey);
        final String channelId = bo.getChannelId();
        BrokerServiceEntryChannel entryChannel = registerMap.get(channelId);
        if(entryChannel == null) {
            log.warn("channelId: {} 對應的通道資訊為空", channelId);
            continue;
        }
        channelList.add(entryChannel.getChannel());
    }
    return channelList;
}

核心邏輯:RandomUtils.loadBalance(loadBalance, list, shardingKey); 獲取,其他的保持不變。

消費者 ACK

消費者也是類似的,獲取 channel 的方式調整如下:

public Channel getChannel(String key) {
    // 等待啟動完成
    while (!statusManager.status()) {
        log.debug("等待初始化完成...");
        DateUtil.sleep(100);
    }

    RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(loadBalance,
            channelFutureList, key);
    return rpcChannelFuture.getChannelFuture().channel();
}

小結

負載均衡在分散式服務中,是必備的特性之一。實現的原理並不算複雜。

希望本文對你有所幫助,如果喜歡,歡迎點贊收藏轉發一波。

我是老馬,期待與你的下次重逢。

開源地址

The message queue in java.(java 簡易版本 mq 實現) https://github.com/houbb/mq

拓展閱讀

rpc-從零開始實現 rpc https://github.com/houbb/rpc