1. 程式人生 > 實用技巧 >RocketMQ原始碼分析 producer啟動以及訊息傳送流程,producer與broker網路互動過程,傳送和接收方式總結

RocketMQ原始碼分析 producer啟動以及訊息傳送流程,producer與broker網路互動過程,傳送和接收方式總結

1.proucer傳送訊息本質就是把訊息通過網路傳送給伺服器(broker),broker接收到訊息儲存應答producer成功。

要傳送的訊息在producer包裝為Message,到了broker端變為MessageExtBrokerInner,producer客戶端的啟動和傳送比較簡單,貼個大圖

上圖就是producer的啟動以及訊息的傳送。

生產中producer傳送訊息通常採用同步傳送,如果訊息大則採用非同步傳送(訊息大的情況比較少) 。

本質producer就是個netty client,它的主要功能就是連線上namesvr:9876,獲取到broker、topic、queue等資訊快取到producer本地,實際快取到DefaultMQProducerImpl.topicPublishInfoTable,然後傳送訊息的時候選擇一個broker和訊息佇列進行傳送。

這裡主要說下rmq訊息傳送的設計。

rmq對於client傳送訊息,都會最終包裝為一個遠端命令RemotingCommand,這是個命令模式。

RemotingCommand.code存放的是傳送的命令,broker收到後知道客戶端要做什麼

RemotingCommand.body 存放的是真實要傳送的訊息。

RemotingCommand.customHeader,是自定義的命令頭,是CommandCustomHeader

CommandCustomHeader有許多具體子類,比如producer傳送訊息是SendMessageRequestHeader,消費端拉取訊息是PullMessageRequestHeader,CommandCustomHeader存放自定義的訊息資訊,比如SendMessageRequestHeader存放的是topic、queueid。

code、body、customHeader組成了RemotingCommand是傳送資料。在broker收到訊息後,解析RemotingCommand,根據命令進入不同的處理器處理,broker響應訊息也是放在RemotingCommand返回。

producer傳送訊息到broker之間的互動:

producer啟動的大圖中已經寫了client server端的inboud outbound處理器,它們的執行順序參考上圖,除了對RemotingCommand編解碼的NettyEncoder NettyDecoder,重要的就是圖中標註綠色的方法。在server端決定由哪個執行緒池處理processor,在client端決定處理響應結果,對於同步呼叫喚醒等待發送執行緒,對於非同步呼叫,執行回撥結果。

重點說下org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(ChannelHandlerContext, RemotingCommand)方法,呼叫堆疊如圖,正好分別是客戶端和服務端的inbound事件中呼叫。

對於服務端,接收到的是請求,執行NettyRemotingAbstract.processRequestCommand(ChannelHandlerContext, RemotingCommand),根據RemotingCommand.code即請求命令從快取NettyRemotingAbstract.processorTable獲取對應的執行處理器和執行緒池,那麼processorTable是怎麼有值的呢?對於服務端是在broker啟動過程中新增的命令和處理器、執行緒池,檢視堆疊如下

對於client端,是在客戶端啟動時候新增處理器

服務端處理請求NettyRemotingAbstract.processRequestCommand(ChannelHandlerContext, RemotingCommand)

把pair.getObject1()即處理器(比如SendMessageProcessor)儲存到新鍵的task中,然後把task提交到pair.getObject2()即執行緒池中,這樣就可以併發處理客戶端請求。task被提交執行緒池後就會執行,繼而執行對應的處理器(比如SendMessageProcessor)而且對於所有的處理器都是使用同一個這樣模式,這樣設計的很巧妙。

客戶端處理響應NettyRemotingAbstract.processResponseCommand(ChannelHandlerContext, RemotingCommand)

客戶端同步傳送

傳送入口是org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeSyncImpl(Channel, RemotingCommand, long)方法,建立一個ResponseFuture儲存到NettyRemotingAbstract.responseTable併發集合,然後傳送資料到broker,接著ResponseFuture同步等待broker響應結果。看如下程式碼

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
    final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    final int opaque = request.getOpaque();//每次請求都生成個唯一鍵,用於匹配原ResponseFuture
 
    try {
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);//傳送前建立ResponseFuture,功能就類似jdk的future
        this.responseTable.put(opaque, responseFuture);//把ResponseFuture儲存到responseTable集合
        final SocketAddress addr = channel.remoteAddress();//伺服器地址
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {//writeAndFlush就是傳送,傳送成功後執行ChannelFutureListener監聽器,注意傳送成功不代表就接收到了響應,只是表示資料已經發送出去了
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
            	//監聽器實際是由netty的執行緒執行,而非當前傳送執行緒,因此發生成功直接return,並不會進入到finally。傳送失敗也不會進入finally。如果不懂得netty原始碼,這裡程式碼容易讓人混亂的
                if (f.isSuccess()) {//傳送成功回撥
                    responseFuture.setSendRequestOK(true);//成功則返回,responseTable存在該responseFuture,線上程[ServerHouseKeepingService]執行NettyRemotingAbstract.scanResponseTable()進行處理
                    return;
                } else {
                    responseFuture.setSendRequestOK(false);//傳送失敗回撥
                }
				//傳送失敗,就不需要處理響應了,因此把ResponseFuture從responseTable集合移除
                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                responseFuture.putResponse(null);
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });
 
        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);//當前執行緒等待timeoutMillis時間被喚醒
        if (null == responseCommand) {
            if (responseFuture.isSendRequestOK()) {
                throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                    responseFuture.getCause());//接收broker響應超時
            } else {
                throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());//傳送失敗
            }
        }
 
        return responseCommand;
    } finally {
        this.responseTable.remove(opaque);//如果超時or傳送失敗,則把ResponseFuture從responseTable集合移除
    }
}

接收入口就是NettyRemotingAbstract.processResponseCommand(ChannelHandlerContext, RemotingCommand),從responseTable併發集合根據opaque(每次請求都會生成一個唯一值,用來查詢匹配原請求)取出傳送之前儲存的ResponseFuture,把broker響應結果儲存到ResponseFuture並喚醒傳送執行緒。

/*
     * 處理響應結果,通過opaque匹配到原ResponseFuture,對於同步則喚醒等待執行緒,對於非同步則執行回撥。
     */
    public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();//opaque是每次請求都生成一個,該值是遞增的,根據該唯一值匹配到原ResponseFuture
        final ResponseFuture responseFuture = responseTable.get(opaque);//傳送之前會把ResponseFuture儲存到responseTable集合,這樣在處理響應的時候就可以獲取到
        if (responseFuture != null) {//說明ResponseFuture還沒被scanResponseTable()操作從集合中移除
            responseFuture.setResponseCommand(cmd);//把響應結果儲存到responseFuture
 
            responseTable.remove(opaque);//移除,如果不移除就記憶體洩漏了
 
            if (responseFuture.getInvokeCallback() != null) {//非同步呼叫才有回撥
                executeInvokeCallback(responseFuture);//非同步傳送執行這裡
            } else {
                responseFuture.putResponse(cmd);//喚醒同步傳送的等待執行緒
                responseFuture.release();//同步傳送,無功能,因為ResponseFuture.once==null
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }

這裡就有個問題,如果客戶端傳送很快,但是服務端響應很慢或者不響應,同步呼叫大量超時,這樣就導致NettyRemotingAbstract.responseTable集合不斷增加(因為每次呼叫都向該集合新增一個ResponseFuture),最終會導致oom,這裡就有可能記憶體洩漏了,怎麼解決呢?在本篇最開始貼的大圖中,有個計劃執行緒每1s執行一次NettyRemotingAbstract.scanResponseTable(),在該方法內,會把超時的ResponseFuture從NettyRemotingAbstract.responseTable集合移除,這樣就不會導致記憶體洩漏了。

客戶端非同步呼叫

rmq4.4版本是沒有非同步傳送,入口org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(Message, SendCallback, long)見圖

執行進入到org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(Message, CommunicationMode, SendCallback, long)的時候發生模式是非同步直接return null。

非同步傳送暫時無法分析了。不過也能猜到大概,傳送後直接返回,服務端什麼時候處理完畢了,吧響應返回,這樣客戶端在接收響應(在netty inbound事件)得到了ResponseFuture,然後呼叫回撥即可。具體非同步傳送是有什麼bug呢才被去除呢?暫時不清楚

客戶端oneway呼叫

oneway是隻發不收,因此就沒有ResponseFuture。主要用於非業務型請求,比如REGISTER_BROKER、UPDATE_CONSUMER_OFFSET,具體入口是org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeOnewayImpl(Channel, RemotingCommand, long),onway方式雖然只發送,但是服務端照樣會把響應給返回,只是客戶端不處理響應(查了下REGISTER_BROKER、UPDATE_CONSUMER_OFFSET命令在服務端對onway傳送方式的處理確實是會返回資料)。這樣是否有個問題?資料會快取在網路層,這樣是否會導致最終tcp接收緩衝區滿了?是不是服務端在writeAndFlush響應資料前應該判斷下isOnewayRPC()呢?答案是服務端對於oneway請求是不會的返回資料的,服務端處理請求的入口是org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(ChannelHandlerContext, RemotingCommand),該方法執行步驟是先根據命令找到處理器和執行緒池,吧task提交到執行緒池處理,task執行的時候先執行處理器,處理器返回處理結果,接著判斷是否是oneway,oneway方式不返回資料,從下圖可以看出

總結說明:rokcetmq採用netty通訊,netty是個非同步非阻塞,producer作為一個netty client,傳送本質就是個非同步,但是做成同步情況就是非同步轉同步的情況,那麼必須要採用接收執行緒(netty io 執行緒)必須匹配到原請求(producer傳送執行緒),那麼通常的匹配方式就是根據唯一key從ConcurrentHashMap集合,可以找到原請求,如果找不到,說明則超時了,已經被掃描執行緒給移除了。我的工作中同步轉非同步也是這樣做的,不同之處是用的wait和notify等待和喚醒,rmq採用的countdownlatch。

rmq的netty使用是非常好的例子,可以直接參考整合到自己專案。