1. 程式人生 > >RocketMQ詳解(10)——Consumer詳解

RocketMQ詳解(10)——Consumer詳解

RocketMQ詳解(10)——消費模式詳解

一. 不同型別的消費者

根據使用者對讀取操作的控制情況,消費在可以分為兩種型別:

  1. DefaultMQPushConsumer:有系統控制讀取操作,收到訊息後自動呼叫監聽器回撥處理。
  2. DefaultMQPullConsumer:讀取操作中的大部分功能由使用者自主控制。

二. DefaultMQPushConsumer的使用

  1. 使用DefaultMQPushConsumer主要是設定好各種引數和傳入處理訊息的回撥方法。系統收到訊息後會自動呼叫回撥方法來處理訊息,自動儲存Offset,並且加入新的DefaultMQPushConsumer後會自動做負載均衡。

  2. 示例程式碼

    package william.rmq.consumer.quickstart;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus
    ; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol
    .heartbeat.MessageModel; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import javax.annotation.PostConstruct; import java.io.UnsupportedEncodingException; import java.util.List; /** * @Auther: ZhangShenao * @Date: 2018/9/7 11:06 * @Description:RocketMQ訊息消費者 */ @Slf4j @Service public class MessageConsumer implements MessageListenerConcurrently { @Value("${spring.rocketmq.namesrvAddr}") private String namesrvAddr; private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer"); @PostConstruct public void start() { try { consumer.setNamesrvAddr(namesrvAddr); //從訊息佇列頭部開始消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //設定廣播消費模式 consumer.setMessageModel(MessageModel.BROADCASTING); //訂閱主題 consumer.subscribe("DefaultCluster", "*"); //註冊訊息監聽器 consumer.registerMessageListener(this); //啟動消費端 consumer.start(); log.info("Message Consumer Start..."); System.err.println("Message Consumer Start..."); } catch (MQClientException e) { log.error("Message Consumer Start Error!!",e); } } @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { if (CollectionUtils.isEmpty(msgs)){ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } msgs.stream() .forEach(msg -> { try { String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET); log.info("Message Consumer: Handle New Message: messageId:{}, topic:{}, tags:{}, keys:{}, messageBody:{}" , msg.getMsgId(), msg.getTopic(), msg.getTags(), msg.getKeys(), messageBody); System.err.println("Message Consumer: Handle New Message: messageId: " + msg.getMsgId() + ",topic: " + msg.getTopic() + ",tags: " + msg.getTags()); } catch (Exception e) { log.error("Consume Message Error!!", e); } }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }

    DefaultMQPushConsumer需要設定三個引數:

    1. 這個Consumer所屬的ConsumerGroup
    2. NameServer的IP和埠
    3. 訂閱的Topic名稱

    下面對這幾個引數進行詳細介紹:

  3. ConsumerGroup用於把多個Consumer組織到一起,提高併發處理能力,ConsumerGroup需要和訊息模式MessageModel配合使用。

    RocketMQ支援兩種訊息模式:

    1. MessageModel.CLUSTERING——叢集模式:同一個ConsumerGroup裡面的每個Consumer只消費所訂閱的訊息的一部分內容,同一個ConsumerGroup下所有Consumer消費的內容合起來才是所訂閱的Topic內容的整體,從而達到負載均衡的目的。
    2. MessageModel.BROADCASTING——廣播模式:同一個ConsumerGroup下的每個Consumer都能消費到所訂閱Topic的所有訊息,也就是一個訊息會被多次分發,被多個Consumer消費。
  4. NameServer的ip和埠號,可以填寫多個,用分號隔開,達到消除單點故障的目的,如”ip1:port1;ip2:port2”

  5. Topic名稱用來標識訊息型別,需要提前建立。如果不想消費某個Topic下的所有訊息,可以通過指定Tag進行訊息過濾,如Consumer.subscribe(“TopicTest”,”tags1 || tag2 || tag3”),表示這個Consumer要消費TopicTest主體下的帶有tag1或tag2或tag3的訊息(Tag指的是在傳送訊息時設定的標籤)。在設定Tag引數時,用null或者”*”表示要消費這個Topic下的所有訊息。

三. DefaultMQPushConsumer的處理流程

本節結合原始碼分析DefaultMQPushConsumer的處理流程。

DefaultMQPushConsumer主要功能實現在DefaultMQPushConsumerImpl中,訊息處理邏輯是在pullMessage()方法的PullCallback回撥中。在PullCallback回撥中有個switch語句,根據Broker返回的訊息型別做響應的處理,具體邏輯看原始碼:

PullCallback pullCallback = new PullCallback() {
    @Override
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                                                                                         subscriptionData);

            switch (pullResult.getPullStatus()) {
                case FOUND:
                    long prevRequestOffset = pullRequest.getNextOffset();
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                    long pullRT = System.currentTimeMillis() - beginTimestamp;
                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                                                                       pullRequest.getMessageQueue().getTopic(), pullRT);

                    long firstMsgOffset = Long.MAX_VALUE;
                    if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    } else {
                        firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                                                                            pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

                        boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                        DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                            pullResult.getMsgFoundList(),
                            processQueue,
                            pullRequest.getMessageQueue(),
                            dispatchToConsume);

                        if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                            DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                                                                   DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                        } else {
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        }
                    }

                    if (pullResult.getNextBeginOffset() < prevRequestOffset
                        || firstMsgOffset < prevRequestOffset) {
                        log.warn(
                            "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                            pullResult.getNextBeginOffset(),
                            firstMsgOffset,
                            prevRequestOffset);
                    }

                    break;
                case NO_NEW_MSG:
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    break;
                case NO_MATCHED_MSG:
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    break;
                case OFFSET_ILLEGAL:
                    log.warn("the pull request offset illegal, {} {}",
                             pullRequest.toString(), pullResult.toString());
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    pullRequest.getProcessQueue().setDropped(true);
                    DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

                        @Override
                        public void run() {
                            try {
                                DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                                                                        pullRequest.getNextOffset(), false);

                                DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

                                DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

                                log.warn("fix the pull request offset, {}", pullRequest);
                            } catch (Throwable e) {
                                log.error("executeTaskLater Exception", e);
                            }
                        }
                    }, 10000);
                    break;
                default:
                    break;
            }
        }
    }

    @Override
    public void onException(Throwable e) {
        if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            log.warn("execute the pull request exception", e);
        }

        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    }
};

PullCallback是一個訊息拉取回調,Consumer從Broker拉取訊息會,會根據拉取狀態回撥對應的onSuccess或onException方法。在onSuccess()的處理中,會根據不同的PullStatus進行不同的處理,PullStatus的狀態有:

  1. PullStatus.FOUND:成功拉取訊息
  2. PullStatus.NO_NEW_MSG:沒有新的訊息可被拉取
  3. PullStatus.NO_MATCHED_MSG:過濾結果不匹配
  4. PullStatus.OFFSET_ILLEGAL:offset非法

DefaultMQPushConsumer中有很多PullRequest的方法,如executePullRequestImmediately(),之所以在PushConsumer中也使用PullRequest的方式拉取訊息,是因為RocketMQ通過長輪詢的方式來實現Push和Pull兩種模式,長輪詢可以即有Pull的優點,又兼具Push的實時性。

Push方式是Broker端接收到訊息後,主動把訊息推給Consumer端,實時性高。對於一個提供訊息佇列服務的Server來說,用Push方式會有很多弊端:首先是訊息的流量難以控制,當Push的訊息過多時會加大Server的工作量,進而影響Server的效能;其次,Client的處理能力各不相同,且Client的狀態不受Server控制,如果Client不能及時處理Server推送過來的訊息,在造成訊息堆積等各種潛在的問題。

Pull方式是Client端迴圈地從Server端拉取訊息,主動權在Client手裡,自己拉取到一定量的訊息後,妥當處理完畢再繼續拉取。Pull方式的問題是迴圈拉取的時間間隔不好設定,間隔太短就會處於”忙等”的狀態,浪費資源;間隔太長又可能導致Server端有訊息到來時沒有及時被處理。

“長輪詢”方式通過Client端和Server端的配合,達到既擁有Pull的優點,又保證實時性的目的。

“長輪詢”的核心是,Broker端HOLD住客戶端的請求一小段時間,如果在這段時間內有訊息到達,就利用現有的連結立刻返回訊息給Consumer。”長輪詢”的主動權還是掌握在Consumer手上,即使Broker有大量的訊息積壓,也不會主動推送給Consumer。

長輪詢方式的侷限性在於,HOLD住Consumer端請求時,需要佔用資源,它適合用在訊息佇列這種客戶端連線數可控的場景中。

四. DefaultMQPullConsumer的處理流程

  1. 使用DefaultMQPullConsumer和使用DefaultMQPushConsumer一樣需要設定各種引數,寫處理訊息的回撥方法。此外,還需要進行額外的處理。下面給出使用的示例程式碼:

    package william.rmq.consumer.pull;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
    import org.apache.rocketmq.client.consumer.PullResult;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;
    import william.rmq.common.constant.RocketMQConstant;
    import javax.annotation.PostConstruct;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
    * @Auther: ZhangShenao
    * @Date: 2018/9/15 09:25
    * @Description:使用DefaultMQPullConsumer拉取訊息
    */
    @Service
    @Slf4j
    public class PullMessageConsumer {
       /**記錄每個MessageQueue的消費位點offset,可以持久化到DB或快取Redis,這裡作為演示就儲存在程式中*/
       private static final Map<MessageQueue,Long> OFFSET_TABLE = new ConcurrentHashMap<>();
    
       @Value("${spring.rocketmq.namesrvAddr}")
       private String namesrvAddr;
    
       /**使用DefaultMQPullConsumer實現拉取訊息模式*/
       private DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("DefaultPullConsumer");
    
       /**每次拉取訊息的最大數量*/
       private static final int MAX_PULL_SIZE_EACH_TIME = 32;
    
       @PostConstruct
       public void start() {
           try {
               //設定namesrv地址
               consumer.setNamesrvAddr(namesrvAddr);
    
               //啟動消費端
               consumer.start();
               System.err.println("Order Message Consumer Start...");
    
               //從指定的Topic pull訊息
               Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(RocketMQConstant.TEST_TOPIC_NAME);
    
               //遍歷MessageQueue,獲取Message
               for (MessageQueue messageQueue : messageQueues){
                   //獲取該MessageQueue的消費位點
                   long offset = consumer.fetchConsumeOffset(messageQueue, true);
                   System.err.println("Consumer From Queue: " + messageQueue + ",from offset: " + offset);
    
                   while (true){
                       try {
                           //拉取訊息
                           PullResult pullResult = consumer.pullBlockIfNotFound(messageQueue, null, getMessageQueueOffset(messageQueue), MAX_PULL_SIZE_EACH_TIME);
                           System.err.println("Pull Message Result: " + pullResult);
    
                           //記錄offset
                           saveMessageQueueOffset(messageQueue,pullResult.getNextBeginOffset());
                           switch (pullResult.getPullStatus()){
                               //拉取到訊息
                               case FOUND: break;
    
                               //沒有匹配的訊息
                               case NO_MATCHED_MSG: break;
    
                               //暫時沒有新訊息
                               case NO_NEW_MSG: continue;
    
                               //offset非法
                               case OFFSET_ILLEGAL: break;
    
                               default: break;
                           }
                       }
                       catch (Exception e){
                           log.error("Pull Message Error!!",e);
                       }
                   }
               }
    
               //關閉Consumer
               consumer.shutdown();
    
           } catch (Exception e) {
               throw new RuntimeException(e);
           }
    
       }
    
       private long getMessageQueueOffset(MessageQueue messageQueue){
           Long offset = OFFSET_TABLE.get(messageQueue);
           return (offset != null ? offset : 0);
       }
    
       private void saveMessageQueueOffset(MessageQueue messageQueue,long offset){
           OFFSET_TABLE.put(messageQueue,offset);
       }
    }
    

    分別啟動生產端和消費端程式,可看到消費端控制檯列印如下:

    Order Message Consumer Start...
    Consumer From Queue: MessageQueue [topic=DefaultCluster, brokerName=zhangshenao, queueId=0],from offset: 0
    Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=32, minOffset=0, maxOffset=57, msgFoundList=32]
    Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=57, minOffset=0, maxOffset=57, msgFoundList=25]
    Pull Message Result: PullResult [pullStatus=NO_NEW_MSG, nextBeginOffset=57, minOffset=0, maxOffset=57, msgFoundList=0]
    Pull Message Result: PullResult [pullStatus=NO_NEW_MSG, nextBeginOffset=57, minOffset=0, maxOffset=57, msgFoundList=0]
    Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=58, minOffset=0, maxOffset=58, msgFoundList=1]
    Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=59, minOffset=0, maxOffset=59, msgFoundList=1]
    Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=60, minOffset=0, maxOffset=60, msgFoundList=1]
    Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=61, minOffset=0, maxOffset=61, msgFoundList=1]
  2. 示例程式碼的處理邏輯是遍歷指定Topic下的所有MessageQueue,然後從中pull訊息,並記錄消費的offset。主要包括下面三件事:

    1. 獲取MessageQueue並遍歷

      一個Topic包含多個MessageQueue,如果這個Consumer需要獲取Topic下的所有訊息,就要遍歷所有的MessageQueue。如果有特殊情況,也可以選擇某些指定的MessageQueue來消費。

    2. 維護Offset

      從一個MessageQueue中拉取訊息時,要傳入Offset引數,隨著不斷讀取訊息,offset不斷增加,這個時候需要使用者把offset儲存下來,根據具體情況可以儲存在記憶體中、寫到磁碟或資料庫等。

    3. 根據不同的拉取狀態做不同的處理

      拉取訊息的請求發出後,會返回FOUND、NO_NEW_MSG、NO_MATCHED_MSG和OFFSET_ILLEGAL四種狀態。需要根據每個狀態做不同的處理。比較重要的兩個狀態是FOUND和NO_NEW_MSG,分別表示拉取到新的訊息和沒有新的訊息。

    五. Consumer的啟動、關閉流程

    1. Consumer分為Push和Pull兩種模式,對於DefaultMQPullConsumer來說,使用者主動權很高,可以根據實際需要啟動、暫停和停止消費過程。需要特別注意的是offset的儲存,要在程式的異常處理部分考慮把offset寫入磁碟的處理,準確記錄每個MessageQueue消費的offset,才能保證消費的準確性。
    2. DefaultMQPushConsumer的退出,要顯式呼叫shutdown()方法,以便釋放資源、儲存offset等。這個呼叫要加到Consumer所在應用的退出邏輯中。DefaultMQPushConsumer在啟動時,會檢查各種配置,然後連線NameServer獲取Topic資訊。如果啟動時出現異常,如無法連線NameServer,程式仍然可以正常啟動不報錯(日誌會列印Warn資訊)。在單機情況下,可以故意寫錯NameServer的地址模擬這種異常。
    3. 之所以DefaultMQPushConsumer在無法連線NameServer時仍然能正常啟動,是考慮到分散式系統的設計。RocketMQ叢集可以有多個NameServer和Broker,某個節點出現異常後整個叢集仍然可用。所以DefaultMQPushConsumer在出現連線異常時不是立刻退出,而是不斷嘗試重連。