1. 程式人生 > >原始碼分析RocketMQ訊息拉取拉模式PULL

原始碼分析RocketMQ訊息拉取拉模式PULL

消費者 與 訊息儲存方Broker一般有兩種通訊機制:推(PUSH)、拉(PULL)
推模式:訊息傳送者將訊息傳送到Broker,然後Broker主動推送給訂閱了該訊息的消費者。
拉模式:訊息傳送者將訊息傳送到Broker上,然後由訊息消費者自發的向Broker拉取訊息。
RocketMQ推拉機制實現:
嚴格意義上來講,RocketMQ並沒有實現PUSH模式,而是對拉模式進行一層包裝,在消費端開啟一個執行緒PullMessageService迴圈向Broker拉取訊息,一次拉取任務結束後馬上又發起另一次拉取操作,實現準實時自動拉取,,PUSH模式的實現請參考如下博文:
推模式訊息拉取機制:

http://blog.csdn.net/prestigeding/article/details/78885420
推模式訊息佇列負載機制:http://blog.csdn.net/prestigeding/article/details/78927447
本文重點在討論RocketMQ拉模式DefaultMQPullConsumer實現。
RocketMQ拉模式,RocketMQ消費者不自動向訊息伺服器拉取訊息,而是將控制權移交給應用程式,RocketMQ消費者只是提供拉取訊息API。
為了對RocketMQ 拉模式有一個直觀的瞭解,我們先大概瀏覽一下MQPullConsumer介面:
這裡寫圖片描述
從上面我們可以看到除了啟動、關閉,註冊訊息監聽器,其他的就是針對MessageQueue拉取訊息,特別值得留意的是每一個拉取pull方法,都是直接針對訊息消費佇列。PUSH模式可以說基於訂閱與釋出模式,而PULL模式可以說是基於訊息佇列模式。
特別說明:PULL模式根據主題註冊訊息監聽器,這裡的訊息監聽器,不是用來訊息消費的,而是在該主題的佇列負載發生變化時,做一下通知。
下文,我們應該帶著我們對PUSH模式的相關知識來認識一下PULL模式,對比學習:
PUSH模式主要知識點:
1)訊息拉取機制:PullMessageServer執行緒 根據PullRequest拉取任務迴圈拉取。
2)訊息佇列負載機制,按照消費組,對主題下的訊息佇列,結合當前消費組內消費者數量動態負載。
按照上面API的描述,PULL模式應該無需考慮上面兩個情形,我們帶著上述疑問,開始我們今天的學習。

1、DefaultMQPullConsumer 核心屬性

/**
     * Do the same thing for the same Group, the application must be set,and
     * guarantee Globally unique
     */
    private String consumerGroup;
    /**
     * Long polling mode, the Consumer connection max suspend time, it is not
     * recommended to modify
     */
private long brokerSuspendMaxTimeMillis = 1000 * 20; /** * Long polling mode, the Consumer connection timeout(must greater than * brokerSuspendMaxTimeMillis), it is not recommended to modify */ private long consumerTimeoutMillisWhenSuspend = 1000 * 30; /** * The socket timeout in milliseconds */ private long consumerPullTimeoutMillis = 1000 * 10; /** * Consumption pattern,default is clustering */ private MessageModel messageModel = MessageModel.CLUSTERING; /** * Message queue listener */ private MessageQueueListener messageQueueListener; /** * Offset Storage */ private OffsetStore offsetStore; /** * Topic set you want to register */ private Set<String> registerTopics = new HashSet<String>(); /** * Queue allocation algorithm */ private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely(); /** * Whether the unit of subscription group */ private boolean unitMode = false; private int maxReconsumeTimes = 16;

consumerGroup : 消費組名稱
brokerSuspendMaxTimeMillis :長輪詢模式下掛起的最大超時時間,在Broker端根據偏移量從儲存
檔案中查詢訊息時如果返回PULL_NOT_FOUND時,不理解返回給拉取客戶端,而是交給
PullRequestHoldService執行緒,每隔5秒再去拉取一次訊息,如果找到則返回給訊息拉取客
戶端,否則超時。
consumerTimeoutMillisWhenSuspend : 整個訊息拉取過程中,拉取客戶端等待伺服器響應結果的超時時間,預設30S
consumerPullTimeoutMillis :預設10s,拉訊息時建立網路連線的超時時間
messageModel :消費模式,廣播、叢集
messageQueueListener : 業務訊息監聽器
OffsetStore :訊息消費進度管理器
registerTopics :註冊主題數
allocateMessageQueueStrategy :佇列分配器
maxReconsumeTimes :最大訊息重試次數,預設16次

2、訊息消費者啟動流程分析,DefaultMQPullConsumerImpl#start

 public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;

                this.checkConfig();  

                this.copySubscription(); // @1

                if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) { 
                    this.defaultMQPullConsumer.changeInstanceNameToPID();
                }

                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);   // @2

                this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);   // @3

                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);    // @:4

                if (this.defaultMQPullConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPullConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);   // @5
                }

                this.offsetStore.load();  

                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);   // @6
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;

                    throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                mQClientFactory.start();  // @7
                log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

    }

程式碼@1:根據註冊的主題,構建訂閱資訊,放入到RebalanceImpl的訂閱表中。PS:DefaultMQPullConsumerImpl 可以註冊多個主題,但多個主題使用同一個訊息處理監聽器。
這裡寫圖片描述
程式碼@2:建立MQClientInstance,每一個clientConfig一個MqClientInstance物件。
程式碼@3:填充rebalanceImpl 物件的消費組、訊息佇列分配器、消費模式。這裡的作用是什麼?既然無需負載訊息佇列,為什麼需要這一步???
程式碼@4:構建PullAPIWrapper物件,該物件封裝了具體拉取訊息的邏輯,PULL,PUSH模式最終都會呼叫PullAPIWrapper類的方法從Broker拉取訊息。
程式碼@5:根據叢集消費模式(廣播、叢集)初始化訊息進度管理器offsetStore。
程式碼@6:將該消費者加入到MQClientInstance消費者列表中。
程式碼@7:啟動MQClientInstance。該方法我們在講解DefaultMQPushConsumer時相信講解過,我們再簡單瀏覽一下該方法:
這裡寫圖片描述
既然Pull模式無需自動拉取訊息,但PullMessageService執行緒(訊息拉取)+ RebalanceService執行緒(訊息佇列負載)這個兩個執行緒就沒必要啟動,這裡啟動了,會不會帶來問題?
答案是不會,因為雖然PullMessageService執行緒啟動,但是一開始會在獲取拉取任務(PullRequest)
這裡寫圖片描述
PullRequest是有RebalanceService產生,它根據主題訊息佇列個數和當前消費組內消費者個數進行負載,然後產生對應的PullRequest物件,再將這些物件放入到PullMessageService的pullRequestQueue佇列。具體放入邏輯呼叫:RebalanceImpl#dispatchPullRequest(final List pullRequestList);
我們來看一下RebalanceImpl的子類RebalancePullImpl的dispatchPullRequest方法:
這裡寫圖片描述
再對比一下RebalancePushImpl的dispatchPullRequest,
這裡寫圖片描述
再結合PullMessageService被喚醒後,執行的pullMessage方法:
這裡寫圖片描述
我們可以得出結論,PullMessageService 只為PUSH模式服務,ReblanceService進行路由重新分佈時,如果是RebalancePullImpl,並不會產PullRequest,從而喚醒PullMessageService,PullMessageService被 喚醒後,也是執行DefaultMQPushConsumerImpl的pullMessage方法。
ReblanceService執行緒預設每20S進行一次訊息佇列重新負載,判斷訊息佇列是否需要進行重新分佈(如果消費者個數和主題的佇列數沒有發生改變),則繼續保持原樣。對於PULL模型,如果消費者需要監聽某些主題佇列發生事件,註冊訊息佇列變更事件方法,則RebalanceService會將訊息佇列負載變化事件通知消費者。
至於PULL模式那些根據訊息佇列拉取訊息的方法,與PUSH模式走的邏輯是一樣的,唯一的區別是PULL模式是需要應用程式收到觸發訊息拉取動作。

通過上述分析,我們總結一下RocketMQ,PUSH,PULL模式區別:
PUSH: 消費者訂閱主題,然後自動進行叢集內訊息佇列的動態負載,自動拉取訊息。準實時。
PULL:消費者無需訂閱主題,由業務方(應用程式)直接根據MessageQueue拉取訊息。
專案中一般採用PUSH模式。