1. 程式人生 > >原始碼分析RocketMQ之訊息消費

原始碼分析RocketMQ之訊息消費

1、訊息消費關注點 1)訊息消費方式:拉取、推送 2)消費者組與消費模式 consumerGroup;       MessageModel messageModel;    多個消費者組成一個消費組,兩種模式:叢集(訊息被其中任何一個訊息者消費)、廣播模式(全部消費者消費) 3)ConsumeFromWhere consumeFromWhere     CONSUME_FROM_LAST_OFFSET       上一次消費偏移量     CONSUME_FROM_FIRST_OFFSET      從頭開始     CONSUME_FROM_TIMESTAMP         從某個時間點開始
4)訊息訂閱主題      Map<String /* topic */, String /* sub expression */> subscription 5)消費進度     OffsetStore offsetStore    消費者需要記錄訊息消費的進度    廣播模式:廣播模式由於每個消費者都需要消費訊息,故訊息的進度(最後消費的偏移量可以儲存在本地) 叢集模式:由於叢集中的消費者只要一個消費訊息即可,故訊息的消費進度,需要儲存在集中點,比如分散式快取、                     RocketMQ儲存在Broker所在的伺服器。 6)消費者執行緒(待後續講解)     private int consumeThreadMin = 20;
     private int consumeThreadMax = 64;      private long adjustThreadPoolNumsThreshold = 100000;      private int consumeConcurrentlyMaxSpan = 2000; 2、訊息消費實現

消費端demo:


使用推送模式,設定消費者所屬組,訂閱主題、定義訊息消費回撥介面,推送訊息後消費方具體業務處理,並返回CONSUME_SUCCESS表示消費成功。 訊息消費者具體實現類:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
2.1 DefaultMQPushConsumerImpl 2.1.1 消費端初始化(構造方法)



然後開始重點從start方法深入研究DefaultMQPushConsumerImpl的內部機制
public synchronized void start() throws MQClientException {
        switch (this.serviceState) {  
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;     // @1

                this.checkConfig();                                                 //@2

                this.copySubscription();                                         //@3

                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {    // @4
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }

                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());    // @5
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);   //@6

                if (this.defaultMQPushConsumer.getOffsetStore() != null) {    // @7
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                }
                this.offsetStore.load();         //@8

                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }

                this.consumeMessageService.start();   //@9

                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);   //@10
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
                    + this.serviceState//
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        this.updateTopicSubscribeInfoWhenSubscriptionChanged();    //@11
        this.mQClientFactory.checkClientInBroker();                               //@12
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();   //@13
        this.mQClientFactory.rebalanceImmediately();                          //@14
    }
程式碼@2,checkConfig,檢查配置資訊,主要檢查 消費者組(consumeGroup)、訊息消費方式(messageModel)、訊息消費開始偏移量(consumeFromWhere)、訊息佇列分配演算法(AllocateMessageQueueStrategy)、訂閱訊息主題(Map<topic,sub expression ),訊息回撥監聽器(MessageListener)、順序訊息模式時是否只有一個訊息佇列等等。 程式碼@3,copySubscription加工訂閱資訊,將Map<String /* topic*/, String/* sub extends*/>轉換為Map<String,SubscriptionData>,同時,如果訊息消費模式為叢集模式,還需要為該消費組對應一個重試主題。 程式碼@4,如果訊息消費模式為叢集模式,並且當前的例項名為DEFAULT,替換為當前客戶端程序的PID。 程式碼@5,負載均衡相關實現,後文重點關注 程式碼@6,pullAPIWrapper ,後文重點關注 程式碼@7,消費進度儲存,如果是叢集模式,使用遠端儲存RemoteBrokerOffsetStore,如果是廣播模式,則使用本地儲存LocalFileOffsetStore,後文重點關注 程式碼@8,載入訊息進度。 程式碼@9,訊息消費服務並啟動,後文重點關注 程式碼@10,向遠端Broker伺服器註冊消費者 程式碼@11,更新訂閱新 程式碼@12,檢測broker狀態 程式碼@13,傳送心跳 程式碼@14,重新負載。 上面包含的資訊量巨大,我們一一進行分析 第一步:我們先來確定訊息消費的一些核心類的作用 2.1.1.1 MQClientInstance 概述
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }

        return instance;
    }
public String buildMQClientId() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.getClientIP());
        sb.append("@");
        sb.append(this.getInstanceName());
        if (!UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }
        return sb.toString();
    }
private String clientIP = RemotingUtil.getLocalAddress();
從這段程式碼可以看成,一個客戶端[email protected]只會持有一個MQClientInstance物件,MQClientInstance無論是消費者還是生產者,都在應用程式這一端; 有了這一層認識,我們就重點關注一下該類的屬性: 1) ClientConfig clientConfig;  2) int instanceIndex;       MQClientInstance在同一臺機器上的建立序號 3) String clientId;  客戶端id 4) ConcurrentMap<String/* group */, MQProducerInner> producerTable       生產組--》訊息生產者,也就是在應用程式一端,每個生產者組,在同一臺應用伺服器只需要初始化一個生產者(MQProducerInner) 5)ConcurrentMap<String/* group */, MQConsumerInner> consumerTable       消費組--》消費者,也就是在應用程式一 端  ,每個消費組,在同一臺應用伺服器只需要初始化一個消費者即可。 6)ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable      同理 7)NettyClientConfig nettyClientConfig; 8)NettyClientConfig nettyClientConfig; 9)MQClientAPIImpl mQClientAPIImpl; 10)MQAdminImpl mQAdminImpl;       11)ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable       topic路由資訊 12)Lock lockNamesrv = new ReentrantLock(); 13)Lock lockHeartbeat = new ReentrantLock(); 14)ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable      broker資訊,這些資訊存在於NameServer,但快取在本地客戶端,供生產者、消費者共同使用 15)private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {         @Override         public Thread newThread(Runnable r) {             return new Thread(r, "MQClientFactoryScheduledThread");         }     }); 16)ClientRemotingProcessor clientRemotingProcessor; 17)PullMessageService pullMessageService;        拉取服務 18)RebalanceService rebalanceService;       拉取負載均衡,下文重點關注 19)DefaultMQProducer defaultMQProducer;        預設的訊息生產者 20)ConsumerStatsManager consumerStatsManager;        消費端統計 21)AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);        心跳包傳送次數 22)ServiceState serviceState = ServiceState.CREATE_JUST;        狀態 23)DatagramSocket datagramSocket; 24)Random random = new Random(); 我們在訊息傳送時,也與這個類打過交道,就是從NameServer獲取topic的路由資訊並快取之,現在我們先不重點關注每個方法,方法用到後再細說了,目前我做一個簡單的總結: MQClientInstance mq客戶端例項,每臺應用伺服器將持有一個MQClientInstance物件,供該應用伺服器的消費者,生產者使用。該類是消費者,生產者網路處      理的出口類。 2.1.1.2 RebalanceImpl 同樣,在這裡,我們重點先理解該類的作用  RebalanceImpl是consume的重新負載,什麼意思呢?就是消費者與消費佇列的對應關係,我們來思考一個問題,比如現在有4個訊息佇列(q1,q2,q3,q4),3個消費者(m1,m2,m3),那麼消費者與訊息佇列的對應關係是什麼呢?我們按照一個輪詢演算法來表示,  m1(q1,q4)  m2(q2) m3(q3),如果此時q2訊息佇列失效(所在的broker掛了),那麼訊息佇列的消費就需要重新分配,RebalanceImpl就是幹這事的,該類的呼叫軌跡如下:(MQClientInstance start --> (this.rebalanceService.start()) --->  RebalanceService.run(this.mqClientFactory.doRebalance()) ---> MQConsumerInner.doRebalance(DefaultMQPushConsumerImpl)  --->RebalanceImpl.doRebalance 在這裡著重說明一點:訊息佇列數量與消費者關係:1個消費者可以消費多個佇列,但1個訊息佇列只會被一個消費者消費;如果消費者數量大於訊息佇列數量,則有的消費者會消費不到訊息(叢集模式) 2.1.2 訊息消費過程 訊息的消費過程,就是從伺服器拉取,然後消費者進行消費,再根據業務反饋是否成功消費來推動消費進度,也就是訊息的消費進度並不是儲存在服務端(比如commitlog檔案中),而是儲存在消費端(可以是本地(廣播模式)、broke端(叢集模式))具體的程式碼實現:
public void pullMessage(final PullRequest pullRequest) {
        final ProcessQueue processQueue = pullRequest.getProcessQueue();
        if (processQueue.isDropped()) {
            log.info("the pull request[{}] is dropped.", pullRequest.toString());
            return;
        }

        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

        try {
            this.makeSureStateOK();
        } catch (MQClientException e) {
            log.warn("pullMessage exception, consumer state not ok", e);
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            return;
        }

        if (this.isPause()) {
            log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
            return;
        }

        long size = processQueue.getMsgCount().get();
        if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((flowControlTimes1++ % 1000) == 0) {
                log.warn(
                    "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
                    processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
            }
            return;
        }

        if (!this.consumeOrderly) {
            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((flowControlTimes2++ % 1000) == 0) {
                    log.warn(
                        "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                        pullRequest, flowControlTimes2);
                }
                return;
            }
        } else {
            if (processQueue.isLocked()) {
                if (!pullRequest.isLockedFirst()) {
                    final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                    boolean brokerBusy = offset < pullRequest.getNextOffset();
                    log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                        pullRequest, offset, brokerBusy);
                    if (brokerBusy) {
                        log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                            pullRequest, offset);
                    }

                    pullRequest.setLockedFirst(true);
                    pullRequest.setNextOffset(offset);
                }
            } else {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                log.info("pull message later because not locked in broker, {}", pullRequest);
                return;
            }
        }

        final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (null == subscriptionData) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            log.warn("find the consumer's subscription failed, {}", pullRequest);
            return;
        }

        final long beginTimestamp = System.currentTimeMillis();

        PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);

                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);

                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

                                boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
                                    pullResult.getMsgFoundList(), //
                                    processQueue, //
                                    pullRequest.getMessageQueue(), //
                                    dispathToConsume);

                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }

                            if (pullResult.getNextBeginOffset() < prevRequestOffset//
                                || firstMsgOffset < prevRequestOffset) {
                                log.warn(
                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
                                    pullResult.getNextBeginOffset(), //
                                    firstMsgOffset, //
                                    prevRequestOffset);
                            }

                            break;
                        case NO_NEW_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case NO_MATCHED_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("the pull request offset illegal, {} {}", //
                                pullRequest.toString(), pullResult.toString());
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                            pullRequest.getProcessQueue().setDropped(true);
                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

                                @Override
                                public void run() {
                                    try {
                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                            pullRequest.getNextOffset(), false);

                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

                                        log.warn("fix the pull request offset, {}", pullRequest);
                                    } catch (Throwable e) {
                                        log.error("executeTaskLater Exception", e);
                                    }
                                }
                            }, 10000);
                            break;
                        default:
                            break;
                    }
                }
            }

            @Override
            public void onException(Throwable e) {
                if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("execute the pull request exception", e);
                }

                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            }
        };

        boolean commitOffsetEnable = false;
        long commitOffsetValue = 0L;
        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
            if (commitOffsetValue > 0) {
                commitOffsetEnable = true;
            }
        }

        String subExpression = null;
        boolean classFilter = false;
        SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (sd != null) {
            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                subExpression = sd.getSubString();
            }

            classFilter = sd.isClassFilterMode();
        }

        int sysFlag = PullSysFlag.buildSysFlag(//
            commitOffsetEnable, // commitOffset
            true, // suspend
            subExpression != null, // subscription
            classFilter // class filter
        );
        try {
            this.pullAPIWrapper.pullKernelImpl(//
                pullRequest.getMessageQueue(), // 1
                subExpression, // 2
                subscriptionData.getExpressionType(), // 3
                subscriptionData.getSubVersion(), // 4
                pullRequest.getNextOffset(), // 5
                this.defaultMQPushConsumer.getPullBatchSize(), // 6
                sysFlag, // 7
                commitOffsetValue, // 8
                BROKER_SUSPEND_MAX_TIME_MILLIS, // 9
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 10
                CommunicationMode.ASYNC, // 11
                pullCallback // 12
            );
        } catch (Exception e) {
            log.error("pullKernelImpl exception", e);
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        }
    }
消費過程暫不詳細講解了,後續文章會進一步對訊息消費的各個方面做出詳細的剖析。 本文旨在梳理出訊息的消費過程,訊息佇列與消費者的負載、消費模式(廣播、叢集),訊息進度等等。 後續文章重點關注: 1、消費進度反饋原始碼分析 2、消費者訊息負載實現原理 3、訊息過濾與tag機制 4、常用問題分析 1)一個消費者多個執行緒(執行緒之間如何協調)

相關推薦

原始碼分析RocketMQ訊息消費

1、訊息消費關注點 1)訊息消費方式:拉取、推送 2)消費者組與消費模式 consumerGroup;       MessageModel messageModel;    多個消費者組成一個消費組,兩種模式:叢集(訊息被其中任何一個訊息者消費)、廣播模式(全部消費者消費

原始碼分析RocketMQ訊息ACK機制(消費進度)

首先簡要闡述一下訊息消費進度首先消費者訂閱訊息消費佇列(MessageQueue),當生產者將訊息負載傳送到MessageQueue中時,消費訂閱者開始消費訊息,訊息消費過程中,為了避免重複消費,需要一

rocketmq原始碼分析broker核心MessageStore訊息接受(十八)

這章我們從broker接受到訊息後的處理,從原始碼加註解的角度解析整體處理及技術,整體的處理步驟如下: sendMessage

rocketmq原始碼分析broker核心MessageStore訊息拉取(十九)

根據訊息的拉取程式碼,broker端的大體操作步驟如下,主要進行pullMessage     1,構建net

RocketMQ原始碼分析(一)除錯環境搭建

版本宣告 基於rocketmq-all-4.3.1版本 所有分析是根據自己的理解,因為不是RocketMQ的原始開發者,所以肯定會存在分析不正確的地方,如有發現歡迎指正,謝謝! 規定$ROCKET

原始碼分析RocketMQ訊息拉取拉模式PULL

消費者 與 訊息儲存方Broker一般有兩種通訊機制:推(PUSH)、拉(PULL) 推模式:訊息傳送者將訊息傳送到Broker,然後Broker主動推送給訂閱了該訊息的消費者。 拉模式:訊息傳送者將訊息傳送到Broker上,然後由訊息消費者自發的向Brok

原始碼分析RocketMQ訊息軌跡

目錄 1、傳送訊息軌跡流程 1.1 DefaultMQProducer建構函式 1.2 SendMessageTraceHookImpl鉤子函式 1.3 TraceDispatcher實現原理

原始碼分析 RocketMQ DLedger 多副本 Leader 選主

目錄 1、DLedger關於選主的核心類圖 1.1 DLedgerConfig 1.2 MemberState 1.3 raft協議相關 1.4 DLedgerRpcService

原始碼分析 RocketMQ DLedger(多副本) 日誌複製(傳播)

目錄 1、DLedgerEntryPusher 1.1 核心類圖 1.2 構造方法 1.3 startup 2、EntryDispatcher 詳解 2.1

RocketMQ原始碼分析】深入訊息儲存(1)

![](https://antzyun.oss-cn-beijing.aliyuncs.com/img204d5b68da5e7e26d371c966fbf81d8.jpg) 最近在學習RocketMQ相關的東西,在學習之餘沉澱幾篇筆記。 RocketMQ有很多值得關注的設計點,訊息傳送、訊息消費、路由中

Dubbo 原始碼分析系列三 —— 架構原理

1 核心功能 首先要了解Dubbo提供的三大核心功能: Remoting:遠端通訊 提供對多種NIO框架抽象封裝,包括“同步轉非同步”和“請求-響應”模式的資訊交換方式。 Cluster: 服務框架 提供基於介面方法的透明遠端過程呼叫,包括多協議支援,以及

Java程式設計師從笨鳥到菜鳥(八十一)細談Spring(十)深入原始碼分析SpringHibernateTemplate

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

Tomcat 原始碼分析系列環境搭建

Tomcat 原始碼環境搭建 tomcat 9 和 idea 環境搭建 環境準備 JDK 1.10 git idea tomcat 原始碼 maven ant 國內的maven 倉庫映象 安裝Intellij Idea 新

原始碼分析手寫springvc

1.先建立maven的web專案,結構如下 2.在web.xml新增如下配置 <servlet> <servlet-name>dispatcher</servlet-name> <servlet-class>com.mayik

RxJava2原始碼分析just、fromArray、fromIterable

     Observable.just:接收1個以上,10個以下的引數,然後逐個發射。         Observable.fromArray:接收一個數組,從陣列中一個一個取出來發射。  

jQuery2.0.3原始碼分析系列(29) 視窗尺寸

.width() 基礎回顧 一般的,在獲取瀏覽器視窗的大小和位置時,有以下幾個屬性可以使用: 在不同的瀏覽器中,以下12個屬性所代表的意義也是不一樣的 特別需要注意的是,當使用或者不使用<!DOCTYPE>宣告顯示一個文件的時候,以上12個屬性的意義也會發生變化。 特在IE 9中

原始碼分析String原始碼分析

    前面已經分析過String原始碼為什麼是不可變的,同時通過我們常用的String的相關的類StringBuffer和StringBuilder,我們可以發現String類中欄位名被定義為了final型別,這樣的話將只能被賦值一次。接下來,繼續看String原始碼實現的

Android wpa_supplicant原始碼分析--啟動全域性初始化

1. wpa_supplicant簡介 wpa_supplicant是用來用來支援無線中各種加密方式的,包括WEP、WPA/WPA2和WAPI(中國特有)、EAP(8021x)。wpa_s通過socket與上層(framework)和底層(driver)通訊,向上接收命令和傳

RabbitMQ客戶端原始碼分析(三)Command

RabbitMQ-java-client版本 com.rabbitmq:amqp-client:4.3.0 RabbitMQ版本宣告: 3.6.15 Command Command介面是AMQP方法-引數的容器介面,帶有可選的內容頭(content

RabbitMQ客戶端原始碼分析(五)ConsumerWorkSerivce與WorkPool

RabbitMQ-java-client版本 com.rabbitmq:amqp-client:4.3.0 RabbitMQ版本宣告: 3.6.15 WorkPool WorkPool可以認