1. 程式人生 > 程式設計 >RocketMQ深度解析(三):Producer

RocketMQ深度解析(三):Producer

訊息傳送

RocketMQ支援3種訊息傳送模式:

  • 同步(sync):傳送者向MQ執行傳送訊息API時,同步等待,直到訊息伺服器返回傳送結果。
  • 非同步(async):傳送者向MQ執行傳送訊息API時,指定訊息傳送成功後的回撥函式,然後呼叫訊息傳送API後,立即返回,訊息傳送者執行緒不阻塞,直到執行結束,訊息傳送成功或失敗的回撥任務在一個新的執行緒中執行。
  • 單向(oneway):訊息傳送者向MQ執行傳送訊息API時,直接返回,不等待訊息伺服器的結果,也不註冊回撥函式。

RocketMQ訊息

Message屬性

Message全屬性建構函式

    public Message(String topic,String tags,String keys,int flag,byte[] body,boolean wait
StoreMsgOK) { this.topic = topic; this.flag = flag; this.body = body; if (tags != null && tags.length() > 0) this.setTags(tags); if (keys != null && keys.length() > 0) this.setKeys(keys); this.setWaitStoreMsgOK(wait
StoreMsgOK); } 複製程式碼

延遲級別屬性

    public int getDelayTimeLevel() {
        String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        if (t != null) {
            return Integer.parseInt(t);
        }

        return 0;
    }

    public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL,String.valueOf(level));
    }
複製程式碼

Message擴充套件屬性

  • tag:訊息TAG,用於訊息過略。
  • keys:Message索引鍵,多用空格隔開。
  • waitStoreMsgOK:訊息傳送時是否等訊息儲存完成後才返回。
  • delayTimeLevel:訊息延遲級別,用於定時訊息或訊息重試。

這些擴充套件屬性儲存在Message的properties中。

Producer啟動流程

啟動一個Producer我們使用的是DefaultMQProducer這個類。

org.apache.rocketmq.client.producer.DefaultMQProducer#start
    @Override
    public void start() throws MQClientException {
        this.setProducerGroup(withNamespace(this.producerGroup));
        this.defaultMQProducerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(),this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ",e);
            }
        }
    }
複製程式碼

可以看到,該start方法中只做了兩件事,一件事是為ProduerGroup加上名稱空間,這裡實際上就是加上了NameServer的地址相關資訊,有興趣的同學可以看一下withNamespace的原始碼。另外一件事就是呼叫了defaultMQProducerImpl.start()。

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)
    this.checkConfig();

    if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
        this.defaultMQProducer.changeInstanceNameToPID();
    }
複製程式碼

這裡就是檢查一下配置是否正確,以及將生產者的instanceName設定為程式Id。

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer,rpcHook);
複製程式碼

我們先看一下MQClientManager這個類。

    private static MQClientManager instance = new MQClientManager();
    public static MQClientManager getInstance() {
        return instance;
    }
複製程式碼

這是一個明顯的餓漢的單例模式,也就是一個JVM中只會有一個MQClientManager instance。

org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig,org.apache.rocketmq.remoting.RPCHook)
    public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig,RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(),clientId,rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId,instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]",clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]",clientId);
            }
        }

        return instance;
    }
複製程式碼

建立MQClientInstance例項。用ClientId(客戶端Ip+程式id組成+unitname(可選))作為Key,也就是一個JVM中同一個ClientId只會存在一個MQClientInstance例項。

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(),this);
複製程式碼

將當前Producer註冊進MQClientInstance的例項中。這裡同一個mQClientFactory中只允許一個相同的producerGroupName的DefaultMQProducer,否則就會註冊失敗。

this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(),new TopicPublishInfo());
複製程式碼

用於當Topic不存在時,send自動建立Topic,僅當autoCreateTopicEnable設為true才可用。生產環境中不建議使用。

   if (startFactory) {
       mQClientFactory.start();
   }
複製程式碼

啟動MQClientInstance。

org.apache.rocketmq.client.impl.factory.MQClientInstance#start
    public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    // 關於netty網路互動 
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    // 啟動一些定時任務
                    this.startScheduledTask();
                    // Start pull service
                    // 消費者相關
                    this.pullMessageService.start();
                    // Start rebalance service
                    // 消費者相關
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK",this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before,and failed.",null);
                default:
                    break;
            }
        }
    }
複製程式碼

startScheduledTask中大部分定時任務都和消費者相關。所以不在本章中詳解。 其中包括120s更新一次本地快取中的NameServer地址。30s更新一次本地快取中的topic路由相關資訊(Consumer)相關,30s向所有Broker傳送自己的一些資訊(Consumer相關),50s持久化一次本地consumer offset(Consumer相關),以及60s調整一次消費執行緒池(Consumer相關)。

訊息傳送

訊息傳送主要包含三個步驟:驗證訊息、查詢路由、訊息傳送(包括異常處理機制)。 我們以同步訊息為例,看一下訊息傳送的流程。

驗證訊息

org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)
    public SendResult send(
        Message msg) throws MQClientException,RemotingException,MQBrokerException,InterruptedException {
        Validators.checkMessage(msg,this);
        msg.setTopic(withNamespace(msg.getTopic()));
        return this.defaultMQProducerImpl.send(msg);
    }
複製程式碼

訊息長度驗證,訊息體長度不能為0以及不能超過4MB。

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message)
    public SendResult send(
        Message msg) throws MQClientException,InterruptedException {
        return send(msg,this.defaultMQProducer.getSendMsgTimeout());
    }
複製程式碼
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message,long)
    public SendResult send(Message msg,long timeout) throws MQClientException,InterruptedException {
        return this.sendDefaultImpl(msg,CommunicationMode.SYNC,null,timeout);
    }
複製程式碼

查詢路由

TopicPublishInfo

關於Topic路由資訊的本地快取。

這裡說兩個屬性,

  • MessageQueueList:該主題的訊息佇列
  • sendWhichQueue:用ThreadLocal維護了一個index,用於負載均衡。

在sendDefaultImpl方法中,首先會查詢主題相關的路由資訊。

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo
    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic,new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            // 本次主要是因為topic為建立,看broker是否允許自動建立Topic。
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic,true,this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }
複製程式碼

如果快取中包含了該Topic的路由資訊且路由資訊內的MessageQueue不為空。則直接返回路由資訊。如果沒有,則向NameServer查詢該topic的路由資訊,如果有更新本地快取。

沒有的話嘗試用預設主題createTopicKey去查詢,只有BrokerConfig#autoCreateTopicEnable為true時,NameServer才會返回路由資訊。

org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String,boolean,org.apache.rocketmq.client.producer.DefaultMQProducer)
    TopicRouteData topicRouteData;
    if (isDefault && defaultMQProducer != null) {
        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);
        if (topicRouteData != null) {
            for (QueueData data : topicRouteData.getQueueDatas()) {
                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(),data.getReadQueueNums());
                data.setReadQueueNums(queueNums);
                data.setWriteQueueNums(queueNums);
            }   
        }
    } else {
        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic,1000 * 3);
    }
複製程式碼

如果isDefault為true,則使用預設主題去查詢,如果查詢到路由資訊,則替換路由資訊中讀寫佇列個數為訊息生產者預設的佇列個數(DefaultTopicQueueNums);如果isDefault為false,則使用引數topic去查詢;如果為查詢到路由資訊,則返回false,表示路由資訊未變化。

    TopicRouteData old = this.topicRouteTable.get(topic);
    boolean changed = topicRouteDataIsChange(old,topicRouteData);
    if (!changed) {
        changed = this.isNeedUpdateTopicRouteInfo(topic);
    } else {
        log.info("the topic[{}] route info changed,old[{}],new[{}]",topic,old,topicRouteData);
    }
複製程式碼

如果查詢到路由資訊,和本地快取中的路由資訊比較,將changed置為true。

TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic,topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String,MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) { 
    Entry<String,MQProducerInner> entry = it.next();
    MQProducerInner impl = entry.getValue();
    if (impl != null) {
    impl.updateTopicPublishInfo(topic,publishInfo);
    }
}
複製程式碼

更新該MQClientInstance所管轄的所有producer的本地快取。

org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteData2TopicPublishInfo
List<QueueData> qds = route.getQueueDatas();
Collections.sort(qds);
for (QueueData qd : qds) {
    // 如果是寫佇列
    if (PermName.isWriteable(qd.getPerm())) {
        BrokerData brokerData = null;
        // 找到對應的broker
        for (BrokerData bd : route.getBrokerDatas()) {
            if (bd.getBrokerName().equals(qd.getBrokerName())) {
                brokerData = bd;
                break;
            }
        }
        // 如果broker為null 結束本次迴圈
        if (null == brokerData) {
            continue;
        }
        // 如果broker中不包含master 結束本次迴圈
        if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
            continue;
        }
        // 將本QueueDate中的讀佇列放到TopicPublishInfo
        for (int i = 0; i < qd.getWriteQueueNums(); i++) {
            MessageQueue mq = new MessageQueue(topic,qd.getBrokerName(),i);
                info.getMessageQueueList().add(mq);
        }
    }
}
複製程式碼

迴圈遍歷路由資訊中QueueData資訊,如果佇列有寫許可權,就建立對應的MessageQueue,填充 topicPublishInfo的List<QueueMessage>

路由查詢就到此完畢了,不太明白的同學可以跟著原始碼一起讀。

選擇訊息佇列

在上一步,我們已經得到了topic的路由資訊。

比如說某topic分佈在broker-a和broker-b各四個佇列。

那麼得到的MessageQueue則是:

  • {"topic":"topic","brokerName":"broker-a","queueId":0}
  • {"topic":"topic","queueId":1}
  • {"topic":"topic","queueId":2}
  • {"topic":"topic","queueId":3}
  • {"topic":"topic","brokerName":"broker-b","queueId":3}

那麼RocketMQ如果選擇這些訊息佇列呢。 首先訊息佇列傳送端採用重試機制,由retryTimesWhenSendFailed指定同步方式重試次數,非同步重試機制在收到訊息傳送結構後執行回撥之前進行重試。 選擇訊息佇列有兩種方式:

  • sendLatencyFaultEnable = false,預設不啟用Broker故障延遲機制。
  • sendLatencyFaultEnable = true,啟用Broker故障延遲機制。

這兩種機制有什麼區別呢,我們下面來看一看。

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo,final String lastBrokerName) {
        // 開啟了故障延遲機制
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue",e);
            }

            return tpInfo.selectOneMessageQueue();
        }
        //預設不開啟故障延遲
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
複製程式碼

預設機制

org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String)
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
複製程式碼
org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue()
    public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
複製程式碼

如果上一次執行傳送訊息失敗的BrokerName(lastBrokerName)為空,則直接用sendWhichQueue自增然後對訊息佇列個數進行取模。

如果上一次傳送訊息失敗的lastBrokerName不為空,則規避上次失敗了的BrokerName,比如說,上次傳送到{"topic":"topic","queueId":0}這個佇列中失敗,則本次選擇佇列的時候就會規避所有brokerName為broker-a的佇列。

我們先看一下sendWhichQueue資料結構。

org.apache.rocketmq.client.common.ThreadLocalIndex
    private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
    private final Random random = new Random();

    public int getAndIncrement() {
        Integer index = this.threadLocalIndex.get();
        if (null == index) {
            index = Math.abs(random.nextInt());
            if (index < 0)
                index = 0;
            this.threadLocalIndex.set(index);
        }

        index = Math.abs(index + 1);
        if (index < 0)
            index = 0;

        this.threadLocalIndex.set(index);
        return index;
    }
複製程式碼

其實就是本地執行緒維護了一個執行緒安全的index,第一次生成則生成一個隨機數,每次負載則自增一。也就是說,傳送訊息在同一執行緒內才會負載均衡,如果是多個使用者每個使用者只傳送一次訊息的話,走的是隨機的訊息佇列,吐槽一下這神一般的負載均衡策略?。

該演演算法只能在一次傳送中規避上一次傳送失敗的broker。也就是說只在一次send中有效,如果是當前執行緒第二次呼叫send,則無法規避上一次send中傳送失敗的broker。我們可以仔細看看上面selectOneMessageQueue中lastBrokerName不等於null的情況,先從sendWhichQueue裡取出一個index,然後通過臨時的index變數來規避上一次傳送失敗的lastBrokerName。那麼當第二次傳送,從sendWhichQueue取出的index,大概率就是上一次傳送失敗的Broke中,那麼又會引發重試,造成不必要的效能損耗,那麼有什麼辦法可以暫時將該Broker排除在佇列選擇的範圍外呢?

Broker故障延遲機制

在sendDefaultImpl方法中,傳送成功或者丟擲異常,都會呼叫org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#updateFaultItem方法。

我們先看一下這個方法。

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#updateFaultItem
    public void updateFaultItem(final String brokerName,final long currentLatency,boolean isolation) {
        this.mqFaultStrategy.updateFaultItem(brokerName,currentLatency,isolation);
    }
複製程式碼
org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem
    public void updateFaultItem(final String brokerName,boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName,duration);
        }
    }
複製程式碼

首先會計算系統是否不可用持續時長。如果isolation為true,則將30000傳給computeNotAvailableDuration,否則傳currentLatency(當前延遲)。

org.apache.rocketmq.client.latency.MQFaultStrategy#computeNotAvailableDuration
    private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }
        return 0;
    }
複製程式碼

看一下latencyMax和notAvailableDuration。

private long[] latencyMax = {50L,100L,550L,1000L,2000L,3000L,15000L};
private long[] notAvailableDuration = {0L,0L,30000L,60000L,120000L,180000L,600000L};
複製程式碼

如果延遲時間達到某個量,則認為不可用時間是某個量,這是一個經驗值。

我們繼續往下看,updateFaultItem方法。

org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#updateFaultItem
    public void updateFaultItem(final String name,final long notAvailableDuration) {
        FaultItem old = this.faultItemTable.get(name);
        if (null == old) {
            final FaultItem faultItem = new FaultItem(name);
            faultItem.setCurrentLatency(currentLatency);
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

            old = this.faultItemTable.putIfAbsent(name,faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    }
複製程式碼

根據broker從快取中查詢出FaultItem,如果找到則更新FaultItem,沒有則新建。 主要是當前CurrentLatency和StartTimestamp兩個欄位。

  • CurrentLatency以及StartTimestamp被volatile修飾,多執行緒下是立即可見的。
  • StartTimestamp為當前系統時間加上需要規避的時長。是判斷Broker是否可用最直接的依據。

我們看一下isAvailable方法。

org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#isAvailable
    public boolean isAvailable(final String name) {
        final FaultItem faultItem = this.faultItemTable.get(name);
        if (faultItem != null) {
            return faultItem.isAvailable();
        }
        return true;
    }
複製程式碼
org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl.FaultItem#isAvailable
    public boolean isAvailable() {
        return (System.currentTimeMillis() - startTimestamp) >= 0;
    }
複製程式碼

如果當前時間大於或者等於StartTimestamp則認為Broker是可用的。

我們回過頭來看一下選擇訊息佇列方法中的開啟了故障延遲的處理辦法。

try {
    int index = tpInfo.getSendWhichQueue().getAndIncrement();
    for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
        int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
        if (pos < 0)
            pos = 0;
        // 首先根據index輪詢出一個mq
        MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
        // 判斷是否可用
        if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
            // 這裡不是很懂,在github上很多人提issue說這是一個bug但是沒有官方回覆
            if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                return mq;
        }
    }
    
    // 如果沒有找到可用的mq,則嘗試從規避的Broker中選擇一個Broker
    final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
    int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
    if (writeQueueNums > 0) {
        final MessageQueue mq = tpInfo.selectOneMessageQueue();
        if (notBestBroker != null) {
            mq.setBrokerName(notBestBroker);
            mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
        }
        return mq;
    } else {
        latencyFaultTolerance.remove(notBestBroker);
    }
}
複製程式碼

訊息傳送

訊息傳送API核心入口:

DefaultMQProducerImpl#sendKernelImpl。
    private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout)
複製程式碼
  • msg:待傳送的訊息
  • mq:訊息將傳送到該訊息佇列上
  • communicationMode:訊息傳送模式,SYNC、ASYNC、ONEWAY
  • sendCallback:非同步訊息回撥函式。
  • topicPublishInfo:主題路由資訊
  • timeout:訊息傳送超時時間
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }
複製程式碼

嘗試從快取中獲取Broker的網路地址,如果未快取該資訊則主動從NameServer更新一下快取,然後再獲取一次。

    if (!(msg instanceof MessageBatch)) {
        MessageClientIDSetter.setUniqID(msg);
    }

    boolean topicWithNamespace = false;
    if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
        msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
        topicWithNamespace = true;
    }

    int sysFlag = 0;
    boolean msgBodyCompressed = false;
    if (this.tryToCompressMessage(msg)) {
        sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
        msgBodyCompressed = true;
    }

    final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
        sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
    }
複製程式碼

為訊息分配全域性唯一ID,如果訊息體預設超過超過4K,會對訊息體採用ZIP壓縮,並設定訊息的系統標記為MessageSysFlag.COMPRESSED_FLAG,如果訊息是PREPARED事務訊息,則設定訊息系統標記MessageSysFlag.TRANSACTION_PREPARED_TYPE。

if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    context.setProducer(this);
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    context.setCommunicationMode(communicationMode);
    context.setBornHost(this.defaultMQProducer.getClientIP());
    context.setBrokerAddr(brokerAddr);
    context.setMessage(msg);
    context.setMq(mq);
    context.setNamespace(this.defaultMQProducer.getNamespace());
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (isTrans != null && isTrans.equals("true")) {
        context.setMsgType(MessageType.Trans_Msg_Half);
    }

    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
        context.setMsgType(MessageType.Delay_Msg);
    }
    this.executeSendMessageHookBefore(context);
    }
複製程式碼

如果註冊了訊息傳送鉤子函式,則執行訊息傳送之前增強邏輯。

org.apache.rocketmq.client.hook.SendMessageHook
public interface SendMessageHook {
    String hookName();

    void sendMessageBefore(final SendMessageContext context);

    void sendMessageAfter(final SendMessageContext context);
}
複製程式碼

鉤子介面。

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
// 設定傳送組
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
// 設定topic
requestHeader.setTopic(msg.getTopic());
// 預設建立主題Key
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
// 單個Broker預設訊息佇列數
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
// 佇列Id
requestHeader.setQueueId(mq.getQueueId());
// 訊息系統標記
requestHeader.setSysFlag(sysFlag);
// 訊息傳送時間
requestHeader.setBornTimestamp(System.currentTimeMillis());
// 訊息標記
requestHeader.setFlag(msg.getFlag());
// 訊息擴充套件屬性
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
// 訊息重試次數
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
// 是否批次訊息
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
    if (reconsumeTimes != null) {
    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
    MessageAccessor.clearProperty(msg,MessageConst.PROPERTY_RECONSUME_TIME);
    }
    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
    if (maxReconsumeTimes != null) {
        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
        MessageAccessor.clearProperty(msg,MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
    }
}
複製程式碼

構建訊息傳送請求包。

org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage
public SendResult sendMessage(
        final String addr,final String brokerName,final Message msg,final SendMessageRequestHeader requestHeader,final long timeoutMillis,final MQClientInstance instance,final int retryTimesWhenSendFailed,final SendMessageContext context,final DefaultMQProducerImpl producer
    ) throws RemotingException,InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        RemotingCommand request = null;
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2,requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE,requestHeader);
        }

        request.setBody(msg.getBody());

        switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr,request,timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                this.sendMessageAsync(addr,brokerName,msg,timeoutMillis - costTimeAsync,sendCallback,topicPublishInfo,instance,retryTimesWhenSendFailed,times,context,producer);
                return null;
            case SYNC:
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                return this.sendMessageSync(addr,timeoutMillis - costTimeSync,request);
            default:
                assert false;
                break;
        }

        return null;
    }
複製程式碼

根據訊息傳送方式,同步、非同步、單向方式進行網路傳輸。

    if (this.hasSendMessageHook()) {
        context.setSendResult(sendResult);
        this.executeSendMessageHookAfter(context);
    }
複製程式碼

如果註冊了鉤子函式,執行after邏輯。

總結

  • 客戶端呼叫producer傳送訊息時,會先從NameServer獲取該topic的路由資訊。訊息頭code為GET_ROUTEINFO_BY_TOPIC
  • 從NameServer返回的路由資訊,包括topic包含的佇列列表和broker列表
  • Producer端根據查詢策略,選出其中一個佇列,用於後續儲存訊息
  • 每條訊息會生成一個唯一id,新增到訊息的屬性中。屬性的key為UNIQ_KEY
  • 對訊息做一些特殊處理,比如:超過4M會對訊息進行壓縮
  • producer向Broker傳送rpc請求,將訊息儲存到broker端。訊息頭的code為SEND_MESSAGE或SEND_MESSAGE_V2(配置檔案設定了特殊標誌)

#參考文獻