RocketMQ深度解析(四):Consumer
訊息消費
首先我們看看RocketMQ中訊息消費需要關注哪些問題。
- 訊息佇列負載與重新分佈
- 訊息消費模式
- 訊息拉取方式
- 訊息進度反饋
- 訊息過濾
- 順序訊息
概述
訊息消費以組的模式展開,一個消費組內可以包含多個消費者(同一個JVM例項內只允許不允許存在消費組相同的消費者),消費組之間要保持統一的訂閱關係,這一點很重要。
消費組之間有兩種消費模式:
- 廣播模式:主題下的同一條訊息將被叢集內的所有消費者消費一次。
- 叢集模式:主題下的同一條訊息只允許唄其中一個消費者消費。
訊息伺服器與消費者之間的訊息傳送也有兩種方式:
- 拉模式:消費端主動發起拉請求
- 推模式:訊息達到伺服器後,推送給訊息消費者(實際上推模式也是基於拉模式實現的,在拉模式上封裝了一層)。
原始碼解析
消費者啟動流程
先來看看DefaultMQPushConsumerImpl#start方法。
// 檢查配置
this.checkConfig();
// 構建主題訂閱訊息SubscriptionData
this.copySubscription();
複製程式碼
檢查配置沒什麼可說的,我們來看看copySubscription方法。
private void copySubscription() throws MQClientException {
try {
// 取出訂閱關係表
Map<String,String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String,String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
// 構造訂閱關係subscriptionData
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic,subString);
// 加入到rebalanceImpl的subscriptionInner中
// subscriptionInner是一個ConcurrentMapHashMap
// 就是訂閱關係表,以topic為key
this.rebalanceImpl.getSubscriptionInner().put(topic,subscriptionData);
}
}
// 註冊監聽器
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 如果是廣播模式,不需要重試
case BROADCASTING:
break;
// 如果是叢集模式,則將重試topic也加入到subscriptionInner中,
// 訊息重試是以消費組為單位,topic為%RETRY%+消費組名
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),retryTopic,SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic,subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception",e);
}
}
複製程式碼
可以看到,subscriptionInner不光儲存了使用者相關的訂閱關係,還儲存了以%RETRY%+消費組名為topic的訂閱關係,自動訂閱了重試topic。
我們繼續看消費者啟動流程。
// 如果是叢集模式 則將instanceName更改為程式id
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 初始化ClientInstance,在上篇文章中已經說過了,MQClientManager是單例的。
// 然後向MQClientManager的factoryTable新增本Instance的例項,key為ip+pid,
// 也就是說單個程式內只有一個MQClientInstance,除非自己設定過InstanceName。
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,this.rpcHook);
// 初始化訊息重新負載實現類
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
複製程式碼
上面一步主要是生成MQClientInstance並註冊到MQClientManager的factoryTable中,該例項主要是用於和外部通訊。
然後初始化了訊息負載均衡的實現類。
// 長連線,主要用於從broker拉取訊息並交由使用者的Listener處理
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(),isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
複製程式碼
初始化了pullAPIWrapper。
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 如果是廣播消費模式,則offset在本地儲存
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup());
break;
// 如果是叢集模式,則將offset儲存在broker端。
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 載入offset,叢集模式下是個空方法
this.offsetStore.load();
複製程式碼
這裡初始化了offsetStore,叢集模式下offset儲存在broker端,廣播模式下儲存在本地。這裡也很好理解,叢集模式一條訊息只允許被一個consumer消費,廣播模式一條訊息則需要被所有consumer消費。
// 如果是順序訊息
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this,(MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
// 併發訊息
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this,(MessageListenerConcurrently) this.getMessageListenerInner());
}
// 啟動consumeMessageService消費執行緒服務
this.consumeMessageService.start();
複製程式碼
初始化並啟動consumeMessageService消費執行緒服務,主要負責訊息消費,分為有序訊息和無序訊息。 我們先來看看併發消費執行緒服務的實現,順序消費邏輯下面再講。
先看看ConsumeMessageConcurrentlyService的構造方法。
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,MessageListenerConcurrently messageListener) {
// 設定defaultMQPushConsumerImpl
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
// 註冊使用者的messageListener具體處理類
this.messageListener = messageListener;
// 設定defaultMQPushConsumer
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
// 設定消費組
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
// 消費請求佇列
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
// 消費執行緒池
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),this.defaultMQPushConsumer.getConsumeThreadMax(),1000 * 60,TimeUnit.MILLISECONDS,this.consumeRequestQueue,new ThreadFactoryImpl("ConsumeMessageThread_"));
// 定時
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
// 定時清理執行緒池
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
}
複製程式碼
主要是初始化了各種執行緒池。
// 開啟了一個定時清理過期訊息的執行緒。
public void start() {
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
cleanExpireMsg();
}
},this.defaultMQPushConsumer.getConsumeTimeout(),TimeUnit.MINUTES);
}
複製程式碼
詳細的後面會講,這裡主要就是三個執行緒池,其實這也是RocketMQ的魅力所在,各種非同步相互協調。
回到消費者啟動流程。
// 向MQClientInstance註冊defaultMQPushConsumerImpl,以消費組的為key,也就是說一個程式內只允許有一個同名的消費組例項
boolean registerOK mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(),this);
// 如果註冊失敗,則丟擲異常
if (!registerOK) {
` this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before,specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);
}
// 啟動MQClientInstance
mQClientFactory.start();
複製程式碼
主要是向MQClientInstance註冊defaultMQPushConsumerImpl,以及啟動MQClientInstance。看看MQClientInstance啟動幹了什麼。
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK",this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before,and failed.",null);
default:
break;
}
}
}
複製程式碼
讓我們看看下面定時任務做了什麼。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception",e);
}
}
},1000 * 10,1000 * 60 * 2,TimeUnit.MILLISECONDS);
複製程式碼
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception",10,this.clientConfig.getPollNameServerInterval(),TimeUnit.MILLISECONDS);
複製程式碼
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception",1000,this.clientConfig.getHeartbeatBrokerInterval(),TimeUnit.MILLISECONDS);
複製程式碼
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception",this.clientConfig.getPersistConsumerOffsetInterval(),TimeUnit.MILLISECONDS);
複製程式碼
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception",1,TimeUnit.MINUTES);
複製程式碼
消費者啟動流程總結
- 檢查配置
- 將訂閱關係新增到負載實現類中
- 改變例項id為程式id
- 初始化MQClientInstance並將MQClientInstance加入到MQClientManager中
- 初始化負載均衡實現類
- 初始化長連線
- 初始化offsetStore
- 初始化並啟動consumeMessageService消費執行緒服務
- 向MQClientInstance註冊自己
- 啟動MQClientInstance
訊息拉取流程
PullMessageService
從上文可以看到,一個程式內只存在一個MQClientInstance(自己設定InstanceName除外),從MQClientInstance的啟動流程可以看出,MQClientInstance使用一個單獨的執行緒PullMessageService來負責訊息的拉取。
PullMessageService繼承了ServiceThread,ServiceThread實現了Runnable,實際上ServiceThread只是重寫了start等方法,將該執行緒設定為了守護執行緒。我們先來看看PullMessageService的run方法。
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception",e);
}
}
log.info(this.getServiceName() + " service end");
}
複製程式碼
他的主要任務就是迴圈不斷阻塞的從pullRequestQueue中取出pullRequest。
然後呼叫pullMessage方法進行處理。
我們先來看一下pullRequest是什麼時候被放到pullRequestQueue佇列裡去的。 PullMessageService
public void executePullRequestLater(final PullRequest pullRequest,final long timeDelay) {
if (!isStopped()) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
PullMessageService.this.executePullRequestImmediately(pullRequest);
}
},timeDelay,TimeUnit.MILLISECONDS);
} else {
log.warn("PullMessageServiceScheduledThread has shutdown");
}
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put",e);
}
}
複製程式碼
分別會在訊息拉取任務結束後,又重新將PullRequest物件放入到pullRequestQueue。
和RebalanceImpl中建立。
從上面得知,PullMessageServuce只有在拿到PullRequst物件時才會執行拉取任務,我們先看看PullRequest到底是什麼。
public class PullRequest {
/**
* 消費者組
*/
private String consumerGroup;
/**
* 待拉取消費佇列
*/
private MessageQueue messageQueue;
/**
* 訊息處理佇列
*/
private ProcessQueue processQueue;
/**
* 偏移量
*/
private long nextOffset;
/**
* 是否被鎖定
*/
private boolean lockedFirst = false;
}
複製程式碼
繼續回到PullMessageService
private void pullMessage(final PullRequest pullRequest) {
// 根據消費組名從MQclientInstance中獲取對應的實現類
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
// 呼叫DefaultMQPushConsumerImpl的pullMessage
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {},drop it",pullRequest);
}
}
複製程式碼
在講解訊息拉取過程之前。我們先講一下ProcessQueue。
ProcessQueue是MessageQueue在訊息端的快照。PullMessageService從訊息伺服器預設每次拉取32跳訊息,按訊息的佇列偏移量順序存放在ProcessQueue中,PullMessageService然後將訊息提交到消費者消費執行緒池,訊息成功消費後從ProcessQueue中移除。
訊息拉取流程
訊息拉取分為3個主要步驟。
- Client訊息拉取請求封裝
- Broker查詢並返回訊息
- Client處理返回的訊息
訊息拉取
// 拿到佇列快照
final ProcessQueue processQueue = pullRequest.getProcessQueue();
// 如果處理佇列當前狀態被丟棄
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.",pullRequest.toString());
return;
}
// 更新processQueue的LastPullTimestamp為當前時間戳
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
// 判斷當前Client是否RUNNING狀態
try {
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception,consumer state not ok",e);
this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
return;
}
// 如果當前消費者被掛起,則將拉取任務延遲一秒再次放入PullMessageService的拉取任務佇列中,結束本次拉取
if (this.isPause()) {
log.warn("consumer was paused,execute pull request later. instanceName={},group={}",this.defaultMQPushConsumer.getInstanceName(),this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
複製程式碼
一些訊息拉取的前置校驗。
// 如果訊息總數大於1000
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
// 延遲放入則將拉取任務延遲一秒再次放入PullMessageService的拉取任務佇列中
this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
// 流控次數達到1000次則打警告日誌
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {},so do flow control,minOffset={},maxOffset={},count={},size={} MiB,pullRequest={},flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(),processQueue.getMsgTreeMap().firstKey(),processQueue.getMsgTreeMap().lastKey(),cachedMessageCount,cachedMessageSizeInMiB,pullRequest,queueFlowControlTimes);
}
return;
}
// 如果訊息大小大於100MB
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
// 延遲放入則將拉取任務延遲一秒再次放入PullMessageService的拉取任務佇列中
this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
// 流控次數達到1000次則打警告日誌
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB,this.defaultMQPushConsumer.getPullThresholdSizeForQueue(),queueFlowControlTimes);
}
return;
}
複製程式碼
從兩個維度來進行流控:
- 訊息總數不得大於1000
- 訊息大小不得大於100MB
// 如果不是順序訊息
if (!this.consumeOrderly) {
// 如果快照內的訊息最大偏移量間隔大於2000
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages,span too long,maxSpan={},processQueue.getMaxSpan(),queueMaxSpanFlowControlTimes);
}
return;
}
}
複製程式碼
如果是併發訊息消費的話,還要判斷快照內訊息最大偏移量間隔是否大於2000,如果是則流控。
順序訊息的邏輯我們下面再講。
// 拿到該主題訂閱資訊
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
// 如果為空,則延遲3s拉取
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumer's subscription failed,{}",pullRequest);
return;
}
複製程式碼
從rebalanceImpl中拿到該主題的訂閱資訊。
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
// 如果是叢集消費模式
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
// 從本地快取中讀取offset消費進度
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(),ReadOffsetType.READ_FROM_MEMORY);
// 如果消費進度大於0
if (commitOffsetValue > 0) {
// 則告訴Broker需要儲存消費進度
commitOffsetEnable = true;
}
}
//這段不是很明白
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
// 構建訊息拉取系統標記
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable,// commitOffset
true,// suspend
subExpression != null,// subscription
classFilter // class filter
);
複製程式碼
系統標記用四個二進位制位分別表示四個狀態位。
/**
* 表示從記憶體中讀取的消費進度大於0,則設定該標記位
*/
private final static int FLAG_COMMIT_OFFSET = 0x1;
/**
* 表示訊息拉取時支援掛起
*/
private final static int FLAG_SUSPEND = 0x1 << 1;
/**
* 訊息過濾機制位表示式機制
*/
private final static int FLAG_SUBSCRIPTION = 0x1 << 2;
/**
* 訊息過濾機制為類過濾模式
*/
private final static int FLAG_CLASS_FILTER = 0x1 << 3;
複製程式碼
繼續回到拉取訊息。
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),// 具體哪個訊息佇列
subExpression,// 訊息過濾表示式
subscriptionData.getExpressionType(),// 訊息過濾表示式型別
subscriptionData.getSubVersion(),// 訊息過濾表示式版本號
pullRequest.getNextOffset(),//訊息拉取偏移量
this.defaultMQPushConsumer.getPullBatchSize(),//本次拉取最大訊息條數
sysFlag,// 拉取訊息系統標記
commitOffsetValue,//當前MessageQueue的消費進度(記憶體內)
BROKER_SUSPEND_MAX_TIME_MILLIS,//拉取過程中允許broker掛起時間
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,//拉取訊息超時時間
CommunicationMode.ASYNC,//非同步拉取
pullCallback // 從broker拉取到訊息的回撥方法
);
} catch (Exception e) {
log.error("pullKernelImpl exception",PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
複製程式碼
最終拉取訊息。
我們看一下org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl方法。
// 根據brokername以及brokerId從記憶體中查詢broker地址相關資訊
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq),false);
// 如果記憶體為空,則先從NamerServer更新一下記憶體的Broker地址
// 再從記憶體中拿
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),false);
}
複製程式碼
首先拿到broker相關資訊,經典的快取策略。
if (findBrokerResult != null) {
{
// 如果過濾表示式不是TAG型別 && broker版本小於V4_1_0_SNAPSHOT
// 也就是說V4_1_0版本之前是隻支援tag型別的
if (!ExpressionType.isTagType(expressionType)
&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
throw new MQClientException("The broker[" + mq.getBrokerName() + ","
+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType,null);
}
}
int sysFlagInner = sysFlag;
//如果是從節點 則清除FLAG_COMMIT_OFFSET標誌位
if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
// 拼接請求
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computPullFromWhichFilterServer(mq.getTopic(),brokerAddr);
}
// 傳送請求
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);
return pullResult;
}
複製程式碼
最後通過pullMessageAsync非同步傳送請求。
訊息拉取客戶端處理訊息
this.remotingClient.invokeAsync(addr,request,new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
pullCallback.onException(e);
}
} else {
if (!responseFuture.isSendRequestOK()) {
pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request,responseFuture.getCause()));
} else if (responseFuture.isTimeout()) {
pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,responseFuture.getCause()));
} else {
pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ",timeoutMillis: " + timeoutMillis + ". Request: " + request,responseFuture.getCause()));
}
}
}
});
複製程式碼
解析請求成功則會回撥onSuccess方法。
// 拿到下一次拉取的offset
long prevRequestOffset = pullRequest.getNextOffset();
// 設定下一次拉取offset
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
複製程式碼
首先設定下次拉取的offset。
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
複製程式碼
如果沒有訊息返回,則將pullRequest放入pullMessageService的pullRequestQueue中。
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);
複製程式碼
把拉取到的訊息儲存processQueue。 然後將拉取到的訊息交給consumeMessageService的執行緒池去處理。
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
複製程式碼
之後再把pullRequest放到pullMessageService中,拉取任務就已經結束了。
還有異常邏輯,有興趣的同學可以去看一下。這裡就不進行深入講解了。
todo 流程圖
todo 負載均衡
··· 未完待續