1. 程式人生 > 實用技巧 >RockeMQ通過程式碼監控消費者狀態

RockeMQ通過程式碼監控消費者狀態

==背景==

物聯網場景,在裝置端寫了一個小的API服務程式,這個程式包括:

1、向平臺上報裝置資料

2、建立消費者客戶端,用來監聽平臺的下行命令

==問題==

平臺層需要知道裝置的狀態:線上 or 離線。我能想到的解決辦法

1、裝置上報心跳資料,平臺通過心跳來判斷裝置是否線上。

2、rocketmq應該有可以監控消費者狀態的命令,是否可以通過這個命令實現。

方案1肯定是沒有問題的,不過缺點就是需要在平臺上寫狀態管理的程式碼,麻煩不說,可能還有延遲。

於是想嘗試方法2是否可行。

==踐行過程==

首先,我觀察了rocketmq-console(RocketMQ的Web介面,需要獨立部署),發現可以通過Web介面檢視消費者狀態,結果如圖:

通過瀏覽器的控制檯日誌,可以看到呼叫的是consumerConnection.query介面。

很好,我是否可以借鑑一下這個思路,去監聽消費者狀態呢。

按照這個思路走,去github上找了原始碼:https://github.com/apache/rocketmq-externals

通過檢視他們的原始碼,才知道RocketMQ已經提供了供檢視消費者連結資訊的API。

==API示例==

需要引入新的pom檔案rocketmq-tools、rocketmq-common,增加只有,所有的pom為

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-store</artifactId>
    <version>4.5
.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> <version>4.5
.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-tools</artifactId> <version>4.5.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.5.0</version> </dependency>

Java程式碼示例

package admin;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;

public class AdminExtSample {
    public static void main(String[] args)
        throws MQClientException, InterruptedException, MQBrokerException, RemotingException {
        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
        defaultMQAdminExt.setNamesrvAddr("101.132.242.90:9876;47.116.50.192:9876");
        defaultMQAdminExt.start();

        ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo("device_cg_notice_down");
        System.out.println(cc.toString());

        defaultMQAdminExt.shutdown();

    }
}

這樣就可以獲取上面web頁面中的所有資訊了。

--END--