1. 程式人生 > 程式設計 >RocketMQ深度解析(四):Consumer

RocketMQ深度解析(四):Consumer

訊息消費

首先我們看看RocketMQ中訊息消費需要關注哪些問題。

  • 訊息佇列負載與重新分佈
  • 訊息消費模式
  • 訊息拉取方式
  • 訊息進度反饋
  • 訊息過濾
  • 順序訊息

概述

訊息消費以組的模式展開,一個消費組內可以包含多個消費者(同一個JVM例項內只允許不允許存在消費組相同的消費者),消費組之間要保持統一的訂閱關係,這一點很重要

消費組之間有兩種消費模式:

  • 廣播模式:主題下的同一條訊息將被叢集內的所有消費者消費一次。
  • 叢集模式:主題下的同一條訊息只允許唄其中一個消費者消費。

訊息伺服器與消費者之間的訊息傳送也有兩種方式:

  • 拉模式:消費端主動發起拉請求
  • 推模式:訊息達到伺服器後,推送給訊息消費者(實際上推模式也是基於拉模式實現的,在拉模式上封裝了一層)。

原始碼解析

消費者啟動流程

先來看看DefaultMQPushConsumerImpl#start方法。

  // 檢查配置
  this.checkConfig();
  // 構建主題訂閱訊息SubscriptionData
  this.copySubscription();
複製程式碼

檢查配置沒什麼可說的,我們來看看copySubscription方法。

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#copySubscription
private void copySubscription() throws MQClientException {
    try {
        // 取出訂閱關係表
        Map<String,String> sub = this.defaultMQPushConsumer.getSubscription();
        if
(sub != null) { for (final Map.Entry<String,String> entry : sub.entrySet()) { final String topic = entry.getKey(); final String subString = entry.getValue(); // 構造訂閱關係subscriptionData SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic,subString); // 加入到rebalanceImpl的subscriptionInner中 // subscriptionInner是一個ConcurrentMapHashMap // 就是訂閱關係表,以topic為key this.rebalanceImpl.getSubscriptionInner().put(topic,subscriptionData); } } // 註冊監聽器 if
(null == this.messageListenerInner) { this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener(); } switch (this.defaultMQPushConsumer.getMessageModel()) { // 如果是廣播模式,不需要重試 case BROADCASTING: break; // 如果是叢集模式,則將重試topic也加入到subscriptionInner中, // 訊息重試是以消費組為單位,topic為%RETRY%+消費組名 case CLUSTERING: final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),retryTopic,SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic,subscriptionData); break; default: break; } } catch (Exception e) { throw new MQClientException("subscription exception",e); } } 複製程式碼

可以看到,subscriptionInner不光儲存了使用者相關的訂閱關係,還儲存了以%RETRY%+消費組名為topic的訂閱關係,自動訂閱了重試topic。

我們繼續看消費者啟動流程。

// 如果是叢集模式 則將instanceName更改為程式id
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
    this.defaultMQPushConsumer.changeInstanceNameToPID();
}

// 初始化ClientInstance,在上篇文章中已經說過了,MQClientManager是單例的。
// 然後向MQClientManager的factoryTable新增本Instance的例項,key為ip+pid,
// 也就是說單個程式內只有一個MQClientInstance,除非自己設定過InstanceName。
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,this.rpcHook);

// 初始化訊息重新負載實現類
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
複製程式碼

上面一步主要是生成MQClientInstance並註冊到MQClientManager的factoryTable中,該例項主要是用於和外部通訊。

然後初始化了訊息負載均衡的實現類。

    // 長連線,主要用於從broker拉取訊息並交由使用者的Listener處理
    this.pullAPIWrapper = new PullAPIWrapper(
    mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(),isUnitMode());
    this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
複製程式碼

初始化了pullAPIWrapper。

if (this.defaultMQPushConsumer.getOffsetStore() != null) {
    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
    switch (this.defaultMQPushConsumer.getMessageModel()) {
        // 如果是廣播消費模式,則offset在本地儲存
        case BROADCASTING:
            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup());
            break;
        // 如果是叢集模式,則將offset儲存在broker端。
        case CLUSTERING:
            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup());
            break;
        default:
            break;
    }
    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 載入offset,叢集模式下是個空方法
this.offsetStore.load();
複製程式碼

這裡初始化了offsetStore,叢集模式下offset儲存在broker端,廣播模式下儲存在本地。這裡也很好理解,叢集模式一條訊息只允許被一個consumer消費,廣播模式一條訊息則需要被所有consumer消費。

// 如果是順序訊息
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService = 
        new ConsumeMessageOrderlyService(this,(MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    // 併發訊息
    this.consumeOrderly = false;
    this.consumeMessageService =
    new ConsumeMessageConcurrentlyService(this,(MessageListenerConcurrently) this.getMessageListenerInner());
}
// 啟動consumeMessageService消費執行緒服務
this.consumeMessageService.start();
複製程式碼

初始化並啟動consumeMessageService消費執行緒服務,主要負責訊息消費,分為有序訊息和無序訊息。 我們先來看看併發消費執行緒服務的實現,順序消費邏輯下面再講。

先看看ConsumeMessageConcurrentlyService的構造方法。

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#ConsumeMessageConcurrentlyService
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,MessageListenerConcurrently messageListener) {
        // 設定defaultMQPushConsumerImpl
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        // 註冊使用者的messageListener具體處理類
        this.messageListener = messageListener;
        // 設定defaultMQPushConsumer
        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
        // 設定消費組
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        // 消費請求佇列
        this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

        // 消費執行緒池
        this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),this.defaultMQPushConsumer.getConsumeThreadMax(),1000 * 60,TimeUnit.MILLISECONDS,this.consumeRequestQueue,new ThreadFactoryImpl("ConsumeMessageThread_"));
        // 定時
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
        // 定時清理執行緒池
        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
    }
複製程式碼

主要是初始化了各種執行緒池。

    // 開啟了一個定時清理過期訊息的執行緒。
    public void start() {
        this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                cleanExpireMsg();
            }

        },this.defaultMQPushConsumer.getConsumeTimeout(),TimeUnit.MINUTES);
    }
複製程式碼

詳細的後面會講,這裡主要就是三個執行緒池,其實這也是RocketMQ的魅力所在,各種非同步相互協調。

回到消費者啟動流程。

// 向MQClientInstance註冊defaultMQPushConsumerImpl,以消費組的為key,也就是說一個程式內只允許有一個同名的消費組例項
boolean registerOK mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(),this);
// 如果註冊失敗,則丟擲異常
if (!registerOK) {
`   this.serviceState = ServiceState.CREATE_JUST;
    this.consumeMessageService.shutdown();
    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
    + "] has been created before,specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);
}
// 啟動MQClientInstance
mQClientFactory.start();
複製程式碼

主要是向MQClientInstance註冊defaultMQPushConsumerImpl,以及啟動MQClientInstance。看看MQClientInstance啟動幹了什麼。

org.apache.rocketmq.client.impl.factory.MQClientInstance#start
    public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK",this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before,and failed.",null);
                default:
                    break;
            }
        }
    }
複製程式碼

讓我們看看下面定時任務做了什麼。

每隔2分鐘嘗試獲取一次NameServer地址
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
            } catch (Exception e) {
                log.error("ScheduledTask fetchNameServerAddr exception",e);
            }
        }
    },1000 * 10,1000 * 60 * 2,TimeUnit.MILLISECONDS);
複製程式碼
每隔30S嘗試更新主題路由資訊
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception",10,this.clientConfig.getPollNameServerInterval(),TimeUnit.MILLISECONDS);
複製程式碼
每隔30S進行Broker心跳檢測
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.cleanOfflineBroker();
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {
                log.error("ScheduledTask sendHeartbeatToAllBroker exception",1000,this.clientConfig.getHeartbeatBrokerInterval(),TimeUnit.MILLISECONDS);
複製程式碼
預設每隔5秒持久化ConsumeOffset
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception",this.clientConfig.getPersistConsumerOffsetInterval(),TimeUnit.MILLISECONDS);
複製程式碼
預設每隔1S檢查執行緒池適配
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.adjustThreadPool();
            } catch (Exception e) {
                log.error("ScheduledTask adjustThreadPool exception",1,TimeUnit.MINUTES);
複製程式碼

消費者啟動流程總結

  • 檢查配置
  • 將訂閱關係新增到負載實現類中
  • 改變例項id為程式id
  • 初始化MQClientInstance並將MQClientInstance加入到MQClientManager中
  • 初始化負載均衡實現類
  • 初始化長連線
  • 初始化offsetStore
  • 初始化並啟動consumeMessageService消費執行緒服務
  • 向MQClientInstance註冊自己
  • 啟動MQClientInstance

訊息拉取流程

PullMessageService

從上文可以看到,一個程式內只存在一個MQClientInstance(自己設定InstanceName除外),從MQClientInstance的啟動流程可以看出,MQClientInstance使用一個單獨的執行緒PullMessageService來負責訊息的拉取。

PullMessageService繼承了ServiceThread,ServiceThread實現了Runnable,實際上ServiceThread只是重寫了start等方法,將該執行緒設定為了守護執行緒。我們先來看看PullMessageService的run方法。

org.apache.rocketmq.client.impl.consumer.PullMessageService#run
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception",e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }
複製程式碼

他的主要任務就是迴圈不斷阻塞的從pullRequestQueue中取出pullRequest。

然後呼叫pullMessage方法進行處理。

我們先來看一下pullRequest是什麼時候被放到pullRequestQueue佇列裡去的。 PullMessageService

    public void executePullRequestLater(final PullRequest pullRequest,final long timeDelay) {
        if (!isStopped()) {
            this.scheduledExecutorService.schedule(new Runnable() {
                @Override
                public void run() { 
                    PullMessageService.this.executePullRequestImmediately(pullRequest);
                }
            },timeDelay,TimeUnit.MILLISECONDS);
        } else {
            log.warn("PullMessageServiceScheduledThread has shutdown");
        }
    }

    public void executePullRequestImmediately(final PullRequest pullRequest) {
        try {
            this.pullRequestQueue.put(pullRequest);
        } catch (InterruptedException e) {
            log.error("executePullRequestImmediately pullRequestQueue.put",e);
        }
    }
複製程式碼

分別會在訊息拉取任務結束後,又重新將PullRequest物件放入到pullRequestQueue。

和RebalanceImpl中建立。

從上面得知,PullMessageServuce只有在拿到PullRequst物件時才會執行拉取任務,我們先看看PullRequest到底是什麼。

public class PullRequest {
    /**
     * 消費者組
     */
    private String consumerGroup;
    /**
     * 待拉取消費佇列
     */
    private MessageQueue messageQueue;
    /**
     * 訊息處理佇列
     */
    private ProcessQueue processQueue;
    /**
     * 偏移量
     */
    private long nextOffset;
    /**
     * 是否被鎖定
     */
    private boolean lockedFirst = false;
}
複製程式碼

繼續回到PullMessageService

    private void pullMessage(final PullRequest pullRequest) {
        // 根據消費組名從MQclientInstance中獲取對應的實現類
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            // 呼叫DefaultMQPushConsumerImpl的pullMessage
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {},drop it",pullRequest);
        }
    }
複製程式碼

在講解訊息拉取過程之前。我們先講一下ProcessQueue。

ProcessQueue是MessageQueue在訊息端的快照。PullMessageService從訊息伺服器預設每次拉取32跳訊息,按訊息的佇列偏移量順序存放在ProcessQueue中,PullMessageService然後將訊息提交到消費者消費執行緒池,訊息成功消費後從ProcessQueue中移除。

訊息拉取流程

訊息拉取分為3個主要步驟。

  • Client訊息拉取請求封裝
  • Broker查詢並返回訊息
  • Client處理返回的訊息

訊息拉取

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
        // 拿到佇列快照
        final ProcessQueue processQueue = pullRequest.getProcessQueue();
        // 如果處理佇列當前狀態被丟棄
        if (processQueue.isDropped()) {
            log.info("the pull request[{}] is dropped.",pullRequest.toString());
            return;
        }
        // 更新processQueue的LastPullTimestamp為當前時間戳
        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

        // 判斷當前Client是否RUNNING狀態
        try {
            this.makeSureStateOK();
        } catch (MQClientException e) {
            log.warn("pullMessage exception,consumer state not ok",e);
            this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            return;
        }

        // 如果當前消費者被掛起,則將拉取任務延遲一秒再次放入PullMessageService的拉取任務佇列中,結束本次拉取
        if (this.isPause()) {
            log.warn("consumer was paused,execute pull request later. instanceName={},group={}",this.defaultMQPushConsumer.getInstanceName(),this.defaultMQPushConsumer.getConsumerGroup());
            this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
            return;
        }
複製程式碼

一些訊息拉取的前置校驗。

        // 如果訊息總數大於1000
        if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            // 延遲放入則將拉取任務延遲一秒再次放入PullMessageService的拉取任務佇列中
            this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            // 流控次數達到1000次則打警告日誌
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message count exceeds the threshold {},so do flow control,minOffset={},maxOffset={},count={},size={} MiB,pullRequest={},flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(),processQueue.getMsgTreeMap().firstKey(),processQueue.getMsgTreeMap().lastKey(),cachedMessageCount,cachedMessageSizeInMiB,pullRequest,queueFlowControlTimes);
            }
            return;
        }
        
        // 如果訊息大小大於100MB
        if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
            // 延遲放入則將拉取任務延遲一秒再次放入PullMessageService的拉取任務佇列中
            this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            // 流控次數達到1000次則打警告日誌
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message size exceeds the threshold {} MiB,this.defaultMQPushConsumer.getPullThresholdSizeForQueue(),queueFlowControlTimes);
            }
            return;
        }
複製程式碼

從兩個維度來進行流控:

  • 訊息總數不得大於1000
  • 訊息大小不得大於100MB
        // 如果不是順序訊息
        if (!this.consumeOrderly) {
            // 如果快照內的訊息最大偏移量間隔大於2000
            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                    log.warn(
                        "the queue's messages,span too long,maxSpan={},processQueue.getMaxSpan(),queueMaxSpanFlowControlTimes);
                }
                return;
            }
        }
複製程式碼

如果是併發訊息消費的話,還要判斷快照內訊息最大偏移量間隔是否大於2000,如果是則流控。

順序訊息的邏輯我們下面再講。

        // 拿到該主題訂閱資訊
        final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        // 如果為空,則延遲3s拉取
        if (null == subscriptionData) {
            this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            log.warn("find the consumer's subscription failed,{}",pullRequest);
            return;
        }
複製程式碼

從rebalanceImpl中拿到該主題的訂閱資訊。

        boolean commitOffsetEnable = false;
        long commitOffsetValue = 0L;
        // 如果是叢集消費模式
        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
            // 從本地快取中讀取offset消費進度
            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(),ReadOffsetType.READ_FROM_MEMORY);
            // 如果消費進度大於0
            if (commitOffsetValue > 0) {
                // 則告訴Broker需要儲存消費進度
                commitOffsetEnable = true;
            }
        }

        //這段不是很明白
        String subExpression = null;
        boolean classFilter = false;
        SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (sd != null) {
            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                subExpression = sd.getSubString();
            }

            classFilter = sd.isClassFilterMode();
        }
    
        // 構建訊息拉取系統標記
        int sysFlag = PullSysFlag.buildSysFlag(
            commitOffsetEnable,// commitOffset
            true,// suspend
            subExpression != null,// subscription
            classFilter // class filter
        );
複製程式碼

系統標記用四個二進位制位分別表示四個狀態位。

org.apache.rocketmq.common.sysflag.PullSysFlag
    /**
     * 表示從記憶體中讀取的消費進度大於0,則設定該標記位
     */
    private final static int FLAG_COMMIT_OFFSET = 0x1;
    /**
     * 表示訊息拉取時支援掛起
     */
    private final static int FLAG_SUSPEND = 0x1 << 1;
    /**
     * 訊息過濾機制位表示式機制
     */
    private final static int FLAG_SUBSCRIPTION = 0x1 << 2;
    /**
     * 訊息過濾機制為類過濾模式
     */
    private final static int FLAG_CLASS_FILTER = 0x1 << 3;
複製程式碼

繼續回到拉取訊息。

        try {
            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),// 具體哪個訊息佇列
                subExpression,// 訊息過濾表示式
                subscriptionData.getExpressionType(),// 訊息過濾表示式型別
                subscriptionData.getSubVersion(),// 訊息過濾表示式版本號
                pullRequest.getNextOffset(),//訊息拉取偏移量
                this.defaultMQPushConsumer.getPullBatchSize(),//本次拉取最大訊息條數
                sysFlag,// 拉取訊息系統標記
                commitOffsetValue,//當前MessageQueue的消費進度(記憶體內)
                BROKER_SUSPEND_MAX_TIME_MILLIS,//拉取過程中允許broker掛起時間
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,//拉取訊息超時時間
                CommunicationMode.ASYNC,//非同步拉取
                pullCallback    // 從broker拉取到訊息的回撥方法
            );
        } catch (Exception e) {
            log.error("pullKernelImpl exception",PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        }
複製程式碼

最終拉取訊息。

我們看一下org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl方法。

        // 根據brokername以及brokerId從記憶體中查詢broker地址相關資訊
        FindBrokerResult findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq),false);
        // 如果記憶體為空,則先從NamerServer更新一下記憶體的Broker地址
        // 再從記憶體中拿
        if (null == findBrokerResult) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),false);
        }
複製程式碼

首先拿到broker相關資訊,經典的快取策略。

        if (findBrokerResult != null) {
            {
                // 如果過濾表示式不是TAG型別 && broker版本小於V4_1_0_SNAPSHOT
                // 也就是說V4_1_0版本之前是隻支援tag型別的
                if (!ExpressionType.isTagType(expressionType)
                    && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                    throw new MQClientException("The broker[" + mq.getBrokerName() + ","
                        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType,null);
                }
            }
            int sysFlagInner = sysFlag;

            //如果是從節點 則清除FLAG_COMMIT_OFFSET標誌位
            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
            }
            // 拼接請求
            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            requestHeader.setSysFlag(sysFlagInner);
            requestHeader.setCommitOffset(commitOffset);
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);

            String brokerAddr = findBrokerResult.getBrokerAddr();
            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(),brokerAddr);
            }
            // 傳送請求
            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);

            return pullResult;
        }
複製程式碼

最後通過pullMessageAsync非同步傳送請求。

訊息拉取客戶端處理訊息

org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync
        this.remotingClient.invokeAsync(addr,request,new InvokeCallback() {
            @Override
            public void operationComplete(ResponseFuture responseFuture) {
                RemotingCommand response = responseFuture.getResponseCommand();
                if (response != null) {
                    try {
                        PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
                        assert pullResult != null;
                        pullCallback.onSuccess(pullResult);
                    } catch (Exception e) {
                        pullCallback.onException(e);
                    }
                } else {
                    if (!responseFuture.isSendRequestOK()) {
                        pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request,responseFuture.getCause()));
                    } else if (responseFuture.isTimeout()) {
                        pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,responseFuture.getCause()));
                    } else {
                        pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ",timeoutMillis: " + timeoutMillis + ". Request: " + request,responseFuture.getCause()));
                    }
                }
            }
        });
複製程式碼

解析請求成功則會回撥onSuccess方法。

org.apache.rocketmq.client.consumer.PullCallback#onSuccess
    // 拿到下一次拉取的offset
    long prevRequestOffset = pullRequest.getNextOffset();
    // 設定下一次拉取offset
    pullRequest.setNextOffset(pullResult.getNextBeginOffset());
複製程式碼

首先設定下次拉取的offset。

if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
複製程式碼

如果沒有訊息返回,則將pullRequest放入pullMessageService的pullRequestQueue中。

boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);
複製程式碼

把拉取到的訊息儲存processQueue。 然後將拉取到的訊息交給consumeMessageService的執行緒池去處理。

    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
    } else {
        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    }
複製程式碼

之後再把pullRequest放到pullMessageService中,拉取任務就已經結束了。

還有異常邏輯,有興趣的同學可以去看一下。這裡就不進行深入講解了。

todo 流程圖

todo 負載均衡

··· 未完待續