關於RocketMQ(Apache接手後)新版本下,producer啟動報錯RocketMQ No route info of this topic問題
坑爹啊,原本以為一會就搞定,結果從上午9點整整搞到下午2點半
producer
package com.zgd.rocketmq.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* 傳送訊息的生產者
* @author zgd
* @time 2018年8月7日09:50:34
*/
public class ExampleProducer {
public static void main(String[] args) throws Exception {
//生產者,可以指定producer叢集
DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
//設定name server的地址
producer. setNamesrvAddr("127.0.0.1:9876");
producer.start();
System.out.println(producer.getNamesrvAddr());
System.out.println(producer.getClientIP());
System.out.println("啟動了生產者producer");
//message必須指定topic,和訊息體body
// 可以選擇指定tag,key來進行細分message
Message msgA = new Message("topicA", "這是topicA的訊息,沒有指定tag和key".getBytes(RemotingHelper.DEFAULT_CHARSET));
Message msgB = new Message("topicB", "這是topicB的訊息,沒有指定tag和key".getBytes(RemotingHelper.DEFAULT_CHARSET));
Message msgC = new Message("topicC","tag-a","這是topicC的訊息,指定了tag-a".getBytes(RemotingHelper.DEFAULT_CHARSET));
Message msgD = new Message("topicC","tag-b","這是topicC的訊息,指定了tag-b".getBytes(RemotingHelper.DEFAULT_CHARSET));
Message msgE = new Message("topicC","tag-a","key1","這是topicC的訊息,指定了tag-a和key1".getBytes(RemotingHelper.DEFAULT_CHARSET));
Message[] messages =new Message[]{msgA,msgB,msgC,msgD,msgE};
//傳送訊息
for (Message message : messages) {
SendResult result = producer.send(message);
System.out.println("訊息傳送成功:id:" + result.getMsgId() +
" result:" + result.getSendStatus());
}
}
}
consumer
package com.zgd.rocketmq.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.List;
/**
* 接收訊息的消費者
* @author zgd
* @time 2018年8月7日09:50:34
*/
public class ExampleConsumer {
public static void main(String[] args) throws Exception {
//定義消費者,可以指定消費叢集
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_name");
//同樣的,指定name server 的地址
consumer.setNamesrvAddr("127.0.0.1:9876");
/*
//訂閱topicA下的所有訊息
consumer.subscribe("topicA","*");
//一個consumer可以訂閱多個topic
consumer.subscribe("topicB","*");
*/
consumer.subscribe("topicC","tag-a");
//程式第一次啟動從訊息佇列頭取資料
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//註冊訂閱訊息
consumer.registerMessageListener(
new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> list,
ConsumeConcurrentlyContext Context) {
MessageExt msg = list.get(0);
try {
System.out.println( new Date().toLocaleString()
+"-收到訊息:id-"+msg.getMsgId()
+","+ new String(msg.getBody(), "UTF-8")
+","+"keys: "+msg.getKeys()
);
System.out.println("msg全部資訊:"+ msg.toString());
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
consumer.start();
System.out.println("consumer消費者啟動");
while (true){
}
}
}
windows部署RocketMQ, ok
啟動namesrv, ok
啟動broker,ok
windows直接點選藍色框框的兩個檔案, linux開啟紅色框框檔案;
sh bin/mqnamesrv &
sh bin/mqbroker -n localhost:9876 &
然後啟動consumer,沒問題
然後啟動producer發訊息,出錯了!
org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, topicA
不得不說這個其實很坑.不管大大小小七七八八的問題都是報這個異常, 包括沒開broker和namesrv直接啟動也是報這個異常,所以網上很多解決辦法也千奇百怪
不過主要的就是producer沒有和broker建立長連線導致的
目前網上常見的解決方法:
1. broker沒啟動成功,啟動命令有問題
linux:
nohup sh mqbroker -n 192.168.180.133:9876 autoCreateTopicEnable=true
windows
mqbroker.exe -n localhost:9876autoCreateTopicEnable=true
這個也是有問題,比較新版本的已經沒有mqbroker.exe這個了,如圖:
直接cmd下./mqbroker -n 127.0.0.1:9876
啟動就好
2. broker沒有自動建立topic
其實這個問題也不存在,新版本中都是預設autoCreateTopicEnable=true
啟動./mqbroker -n 127.0.0.1:9876 -p
-p是檢視配置資訊
所以新版本都是開啟自動建立topic的
3. 不知道broker是否啟動成功
檢視broker.log
,windows的預設在C:\Users\Admin\logs\rocketmqlogs
檢視logback_broker.xml可以看出
linux的在~/logs/rocketmqlogs/
看到register broker to name server localhost:9876 OK
說明已經成功註冊到name server上了
也可以啟動rocketmq-console
看到這個就是成功了.
4. fastjson, slf4j的jar包缺失
其實這個新版本應該沒啥影響,不過以防萬一的話還是都加上, 看上面我貼出來的pom.xml
5. 版本不對
其實以上的問題基本在高版本都解決了
就是這個問題困擾了我一天,硬是把所有的原因都排除了一遍,甚至去排除broker的配置檔案,brokerIP,重啟name server,都不行.後面嘗試著去將pom的版本換成下載rocketmq的版本,結果居然就好了!!!
我的專案pom檔案:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zgd.rocketmq</groupId>
<artifactId>rocketmq-study-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>rocketmq-study-demo</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<rocketmq.version>4.3.0</rocketmq.version>
<slf4j.version>1.7.25</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.28.Final</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-remoting</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.0.0</version>
</plugin>
<!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.20.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
我用的rocketmq版本: 4.3.0
分享一下成功後的截圖
最後說一下windows關閉namesrv和broker
.\mqshutdown namesrv
.\mqshutdown broker
linux
sh mqshutdown namesrv
sh mqshutdown broker