【mq】從零開始實現 mq-07-負載均衡 load balance
前景回顧
【mq】從零開始實現 mq-02-如何實現生產者呼叫消費者?
【mq】從零開始實現 mq-03-引入 broker 中間人
【mq】從零開始實現 mq-06-消費者心跳檢測 heartbeat
【mq】從零開始實現 mq-07-負載均衡 load balance
為什麼需要負載均衡
大家好,我是老馬。
這一節讓我們看一下如何實現 MQ 的負載均衡。
為什麼需要負載均衡呢?
作用
負載均衡最核心的作用:
(1)可以避免單點故障
(2)可以讓請求均分的分散到每一個節點
實現思路
負載均衡實現的方式比較多,最簡單的就是隨機選擇一個。
拓展閱讀:
從零手寫實現負載均衡 http://houbb.github.io/2020/06/19/load-balance-03-hand-write
MQ 中用到負載均衡的地方
生產者傳送
生產者傳送訊息時,可以傳送給任一 broker。
broker 推送給消費者
broker 接收到訊息以後,在推送給消費者時,也可以任一選擇一個。
消費者的消費 ACK
消費者消費完,狀態回執給 broker,可以選擇任一一個。
訊息黏連
有些訊息比較特殊,比如需要保證消費的有序性,可以通過 shardingKey 的方式,在負載的時候固定到指定的片區。
程式碼實現
生產者傳送
統一調整獲取 channel 的方法。
@Override public Channel getChannel(String key) { // 等待啟動完成 while (!statusManager.status()) { log.debug("等待初始化完成..."); DateUtil.sleep(100); } RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(this.loadBalance, channelFutureList, key); return rpcChannelFuture.getChannelFuture().channel(); }
工具類實現為核心實現:
/**
* 負載均衡
*
* @param list 列表
* @param key 分片鍵
* @return 結果
* @since 0.0.7
*/
public static <T extends IServer> T loadBalance(final ILoadBalance<T> loadBalance,
final List<T> list, String key) {
if(CollectionUtil.isEmpty(list)) {
return null;
}
if(StringUtil.isEmpty(key)) {
LoadBalanceContext<T> loadBalanceContext = LoadBalanceContext.<T>newInstance()
.servers(list);
return loadBalance.select(loadBalanceContext);
}
// 獲取 code
int hashCode = Objects.hash(key);
int index = hashCode % list.size();
return list.get(index);
}
如果指定了 shardingKey,那麼根據 shadringKey 進行 hash 判斷。
如果沒有,則進行預設的負載均衡策略。
Broker 訊息推送給消費者
消費者訂閱列表的獲取:
@Override
public List<Channel> getSubscribeList(MqMessage mqMessage) {
final String topicName = mqMessage.getTopic();
Set<ConsumerSubscribeBo> set = subscribeMap.get(topicName);
if(CollectionUtil.isEmpty(set)) {
return Collections.emptyList();
}
//2. 獲取匹配的 tag 列表
final List<String> tagNameList = mqMessage.getTags();
Map<String, List<ConsumerSubscribeBo>> groupMap = new HashMap<>();
for(ConsumerSubscribeBo bo : set) {
String tagRegex = bo.getTagRegex();
if(hasMatch(tagNameList, tagRegex)) {
//TODO: 這種設定模式,統一新增處理 haven
String groupName = bo.getGroupName();
List<ConsumerSubscribeBo> list = groupMap.get(groupName);
if(list == null) {
list = new ArrayList<>();
}
list.add(bo);
groupMap.put(groupName, list);
}
}
//3. 按照 groupName 分組之後,每一組只隨機返回一個。最好應該調整為以 shardingkey 選擇
final String shardingKey = mqMessage.getShardingKey();
List<Channel> channelList = new ArrayList<>();
for(Map.Entry<String, List<ConsumerSubscribeBo>> entry : groupMap.entrySet()) {
List<ConsumerSubscribeBo> list = entry.getValue();
ConsumerSubscribeBo bo = RandomUtils.loadBalance(loadBalance, list, shardingKey);
final String channelId = bo.getChannelId();
BrokerServiceEntryChannel entryChannel = registerMap.get(channelId);
if(entryChannel == null) {
log.warn("channelId: {} 對應的通道資訊為空", channelId);
continue;
}
channelList.add(entryChannel.getChannel());
}
return channelList;
}
核心邏輯:RandomUtils.loadBalance(loadBalance, list, shardingKey);
獲取,其他的保持不變。
消費者 ACK
消費者也是類似的,獲取 channel 的方式調整如下:
public Channel getChannel(String key) {
// 等待啟動完成
while (!statusManager.status()) {
log.debug("等待初始化完成...");
DateUtil.sleep(100);
}
RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(loadBalance,
channelFutureList, key);
return rpcChannelFuture.getChannelFuture().channel();
}
小結
負載均衡在分散式服務中,是必備的特性之一。實現的原理並不算複雜。
希望本文對你有所幫助,如果喜歡,歡迎點贊收藏轉發一波。
我是老馬,期待與你的下次重逢。
開源地址
The message queue in java.(java 簡易版本 mq 實現) https://github.com/houbb/mq