【mq】從零開始實現 mq-09-消費者拉取訊息 pull message
前景回顧
【mq】從零開始實現 mq-02-如何實現生產者呼叫消費者?
【mq】從零開始實現 mq-03-引入 broker 中間人
【mq】從零開始實現 mq-06-消費者心跳檢測 heartbeat
【mq】從零開始實現 mq-07-負載均衡 load balance
【mq】從零開始實現 mq-09-消費者拉取訊息 pull message
訊息的推與拉
大家好,我是老馬。
這一節我們來一起看一下 MQ 訊息中的推和拉兩種模式。
推
訊息由 broker 直接推送給消費者,實時性比較好。
缺點是如果消費者處理不過來,就會造成大量問題。
拉
訊息由消費者定時從 broker 拉取,優點是實現簡單,可以根據消費者自己的處理能力來消費。
缺點是實時性相對較差。
實際業務中,需要結合具體的場景,選擇合適的策略。
拉取策略實現
push 策略
我們首先看一下 push 策略的簡化核心實現:
package com.github.houbb.mq.consumer.core; /** * 推送消費策略 * * @author binbin.hou * @since 1.0.0 */ public class MqConsumerPush extends Thread implements IMqConsumer { @Override public void run() { // 啟動服務端 log.info("MQ 消費者開始啟動服務端 groupName: {}, brokerAddress: {}", groupName, brokerAddress); //1. 引數校驗 this.paramCheck(); try { //0. 配置資訊 //1. 初始化 //2. 連線到服務端 //3. 標識為可用 //4. 新增鉤子函式 //5. 啟動完成以後的事件 this.afterInit(); log.info("MQ 消費者啟動完成"); } catch (Exception e) { log.error("MQ 消費者啟動異常", e); throw new MqException(ConsumerRespCode.RPC_INIT_FAILED); } } /** * 初始化完成以後 */ protected void afterInit() { } // 其他方法 /** * 獲取消費策略型別 * @return 型別 * @since 0.0.9 */ protected String getConsumerType() { return ConsumerTypeConst.PUSH; } }
我們在 push 中預留了一個 afterInit
方法,便於子類過載。
pull 策略
消費者實現
package com.github.houbb.mq.consumer.core; /** * 拉取消費策略 * * @author binbin.hou * @since 0.0.9 */ public class MqConsumerPull extends MqConsumerPush { private static final Log log = LogFactory.getLog(MqConsumerPull.class); /** * 拉取定時任務 * * @since 0.0.9 */ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); /** * 單次拉取大小 * @since 0.0.9 */ private int size = 10; /** * 初始化延遲毫秒數 * @since 0.0.9 */ private int pullInitDelaySeconds = 5; /** * 拉取週期 * @since 0.0.9 */ private int pullPeriodSeconds = 5; /** * 訂閱列表 * @since 0.0.9 */ private final List<MqTopicTagDto> subscribeList = new ArrayList<>(); // 設定 @Override protected String getConsumerType() { return ConsumerTypeConst.PULL; } @Override public synchronized void subscribe(String topicName, String tagRegex) { MqTopicTagDto tagDto = buildMqTopicTagDto(topicName, tagRegex); if(!subscribeList.contains(tagDto)) { subscribeList.add(tagDto); } } @Override public void unSubscribe(String topicName, String tagRegex) { MqTopicTagDto tagDto = buildMqTopicTagDto(topicName, tagRegex); subscribeList.remove(tagDto); } private MqTopicTagDto buildMqTopicTagDto(String topicName, String tagRegex) { MqTopicTagDto dto = new MqTopicTagDto(); dto.setTagRegex(tagRegex); dto.setTopicName(topicName); return dto; } }
訂閱相關
pull 策略可以把訂閱/取消訂閱放在本地,避免與服務端的互動。
定時拉取
我們過載了 push 策略的 afterInit
方法。
/**
* 初始化拉取訊息
* @since 0.0.6
*/
@Override
public void afterInit() {
//5S 發一次心跳
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if(CollectionUtil.isEmpty(subscribeList)) {
log.warn("訂閱列表為空,忽略處理。");
return;
}
for(MqTopicTagDto tagDto : subscribeList) {
final String topicName = tagDto.getTopicName();
final String tagRegex = tagDto.getTagRegex();
MqConsumerPullResp resp = consumerBrokerService.pull(topicName, tagRegex, size);
if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
List<MqMessage> mqMessageList = resp.getList();
if(CollectionUtil.isNotEmpty(mqMessageList)) {
for(MqMessage mqMessage : mqMessageList) {
IMqConsumerListenerContext context = new MqConsumerListenerContext();
mqListenerService.consumer(mqMessage, context);
}
}
} else {
log.error("拉取訊息失敗: {}", JSON.toJSON(resp));
}
}
}
}, pullInitDelaySeconds, pullPeriodSeconds, TimeUnit.SECONDS);
}
應用啟動時,指定時間定時拉取訊息並進行消費處理。
其中 consumerBrokerService.pull(topicName, tagRegex, size);
拉取實現如下:
public MqConsumerPullResp pull(String topicName, String tagRegex, int fetchSize) {
MqConsumerPullReq req = new MqConsumerPullReq();
req.setSize(fetchSize);
req.setGroupName(groupName);
req.setTagRegex(tagRegex);
req.setTopicName(topicName);
final String traceId = IdHelper.uuid32();
req.setTraceId(traceId);
req.setMethodType(MethodType.C_MESSAGE_PULL);
Channel channel = getChannel(null);
return this.callServer(channel, req, MqConsumerPullResp.class);
}
Borker 相關
訊息分發
// 消費者主動 pull
if(MethodType.C_MESSAGE_PULL.equals(methodType)) {
MqConsumerPullReq req = JSON.parseObject(json, MqConsumerPullReq.class);
return mqBrokerPersist.pull(req, channel);
}
實現
mqBrokerPersist 是一個介面,此處演示基於本地實現的,後續會實現基於資料庫的持久化。
原理是類似的,此處僅作為演示。
@Override
public MqConsumerPullResp pull(MqConsumerPullReq pullReq, Channel channel) {
//1. 拉取匹配的資訊
//2. 狀態更新為代理中
//3. 如何更新對應的消費狀態呢?
// 獲取狀態為 W 的訂單
final int fetchSize = pullReq.getSize();
final String topic = pullReq.getTopicName();
final String tagRegex = pullReq.getTagRegex();
List<MqMessage> resultList = new ArrayList<>(fetchSize);
List<MqMessagePersistPut> putList = map.get(topic);
// 效能比較差
if(CollectionUtil.isNotEmpty(putList)) {
for(MqMessagePersistPut put : putList) {
final String status = put.getMessageStatus();
if(!MessageStatusConst.WAIT_CONSUMER.equals(status)) {
continue;
}
final MqMessage mqMessage = put.getMqMessage();
List<String> tagList = mqMessage.getTags();
if(InnerRegexUtils.hasMatch(tagList, tagRegex)) {
// 設定為處理中
// TODO: 訊息的最終狀態什麼時候更新呢?
// 可以給 broker 一個 ACK
put.setMessageStatus(MessageStatusConst.PROCESS_CONSUMER);
resultList.add(mqMessage);
}
if(resultList.size() >= fetchSize) {
break;
}
}
}
MqConsumerPullResp resp = new MqConsumerPullResp();
resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
resp.setList(resultList);
return resp;
}
我們遍歷找到匹配的訊息,將其狀態更新為中間狀態。
不過這裡還是缺少了一個關鍵的步驟,那就是訊息的 ACK。
我們將在下一小節進行實現。
小結
訊息的推送和拉取各有自己的優缺點,需要我們結合自己的業務,進行選擇。
一般而言,IM 更加適合訊息的推送;一般的業務,為了削峰填谷,更加適合拉取的模式。
希望本文對你有所幫助,如果喜歡,歡迎點贊收藏轉發一波。
我是老馬,期待與你的下次重逢。
開源地址
The message queue in java.(java 簡易版本 mq 實現) https://github.com/houbb/mq