訊息中介軟體(rabbitmq,kafka,rocketmq)
訊息中介軟體
為什麼要使用MQ?
- 實現非同步通訊
- 實現系統解耦
- 實現流量削峰
- 實現廣播通訊
MQ帶來的問題
- 運維成本的增加
- 系統的可用性降低
- 系統複雜性提高
AMQP協議是應用層的協議
RabbitMQ
因為是Erlang編寫的,而且Erlang是為電話交換機編寫的語言,天生適合分散式和高併發。
預設埠是5672,RabbitMq伺服器我們叫做Broker。消費者可生產者都要跟broker建立連線,這個連線屬於TCP的長連線。
channel
為了節省效能損耗,節省時間。在AMQP裡面引入了Channel的概念,他是一個虛擬的連線。也就是通道或者訊息通道。不同的Channel是相互隔離的,每個Channel都有自己的編號。每個客戶端執行緒都有自己的Channel。
Queue
在Broker上有一個物件用來儲存訊息,在RabbitMQ裡面這個物件叫做Queue。
Consumer
消費者消費訊息有兩種模式。
一種是pull模式,對應的方法是basicGet。訊息存放在服務端,只有消費者主動獲取才能拿到訊息。
另一種是push模式,對應的方法是basicConsume,只要生產者發訊息到伺服器,就馬上推送給消費者,訊息儲存在客戶端,實時性很高,如果消費者不過來會造成訊息積壓。
Exchange
是幫助路由訊息的元件。不管有多少個佇列需要接收訊息,只需要傳送到Exchange就可以,由它來分發。Exchange不會儲存訊息,它只做一件事,根據規則分發訊息。
Exchange和這些需要接收訊息的佇列必須建立一個繫結關係,並且為每個佇列指定一個特殊的標識。Exchange和佇列是多對多的繫結關係,一個交換機的訊息一個路由給多個佇列,一個佇列也可以接收來自多個交換機的訊息。
Vhost
不同的的Vhost可以實現資源的隔離和許可權的控制。不同的Vhost中可以由同名的Exchange和Queue。預設的Vhost名字是”/“。
路由方式
一共有四種類型的交換機,Direct、Topic、Fanout、Headers。Headers不常用。
Direct直連
一個佇列與直連型別的交換機繫結,需指定一個明確的繫結鍵(binding key)。生產者傳送訊息時會攜帶一個路由鍵(routing key)。
當訊息的路由鍵與某個佇列的繫結鍵完全匹配時,這條訊息擦灰從交換機路由到這個佇列上。多個佇列也可以使用相同的繫結鍵。
public class MyConsumer {
private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE";
private final static String QUEUE_NAME = "SIMPLE_QUEUE";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 連線IP
factory.setHost("192.168.242.110");
// 預設監聽埠
factory.setPort(5672);
// 虛擬機器
factory.setVirtualHost("/");
// 設定訪問的使用者
factory.setUsername("admin");
factory.setPassword("admin");
// 建立連線
Connection conn = factory.newConnection();
// 建立訊息通道
Channel channel = conn.createChannel();
// 宣告交換機
// String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
channel.exchangeDeclare(EXCHANGE_NAME,"direct",false, false, null);
// 宣告佇列
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" Waiting for message....");
// 繫結佇列和交換機
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"gupao.best");
// 建立消費者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("Received message : '" + msg + "'");
System.out.println("consumerTag : " + consumerTag );
System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
}
};
// 開始獲取訊息
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
public class MyProducer {
private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 連線IP
factory.setHost("192.168.242.110");
// 連線埠
factory.setPort(5672);
// 虛擬機器
factory.setVirtualHost("/");
// 使用者
factory.setUsername("admin");
factory.setPassword("admin");
// 建立連線
Connection conn = factory.newConnection();
// 建立訊息通道
Channel channel = conn.createChannel();
// 傳送訊息
String msg = "Hello world, Rabbit MQ";
// String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish(EXCHANGE_NAME, "gupao.best", null, msg.getBytes());
channel.close();
conn.close();
}
}
Topic主題
一個佇列與主題型別的交換機繫結時,可以在繫結鍵中使用萬用字元。支援兩個萬用字元:
-
代表0個或多個單詞
-
*代表不多不少一個單詞
單詞用代表用英文的.分開的字元。例如a.bc.def是三個單詞。
Fanout廣播
廣播型別的交換機與佇列繫結時,不需要指定繫結鍵。訊息到達交換機時,所有與之綁定了的佇列,都會收到相同的訊息副本。
springmvc用法
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
<!--配置connection-factory,指定連線rabbit server引數 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/" username="admin" password="admin" host="192.168.242.110" port="5672" />
<!--通過指定下面的admin資訊,當前producer中的exchange和queue會在rabbitmq伺服器上自動生成 -->
<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
<!--######分隔線######-->
<!--定義queue -->
<rabbit:queue name="MY_FIRST_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />
<!--定義direct exchange,繫結MY_FIRST_QUEUE -->
<rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="MY_FIRST_QUEUE" key="FirstKey">
</rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定義rabbit template用於資料的接收和傳送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="MY_DIRECT_EXCHANGE" />
<!--訊息接收者 -->
<bean id="messageReceiver" class="com.gupaoedu.consumer.FirstConsumer"></bean>
<!--queue listener 觀察 監聽模式 當有訊息到達時會通知監聽在對應的佇列上的監聽物件 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="MY_FIRST_QUEUE" ref="messageReceiver" />
</rabbit:listener-container>
<!--定義queue -->
<rabbit:queue name="MY_SECOND_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />
<!-- 將已經定義的Exchange繫結到MY_SECOND_QUEUE,注意關鍵詞是key -->
<rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="MY_SECOND_QUEUE" key="SecondKey"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 訊息接收者 -->
<bean id="receiverSecond" class="com.gupaoedu.consumer.SecondConsumer"></bean>
<!-- queue litener 觀察 監聽模式 當有訊息到達時會通知監聽在對應的佇列上的監聽物件 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="MY_SECOND_QUEUE" ref="receiverSecond" />
</rabbit:listener-container>
<!--######分隔線######-->
<!--定義queue -->
<rabbit:queue name="MY_THIRD_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />
<!-- 定義topic exchange,繫結MY_THIRD_QUEUE,注意關鍵詞是pattern -->
<rabbit:topic-exchange name="MY_TOPIC_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="MY_THIRD_QUEUE" pattern="#.Third.#"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--定義rabbit template用於資料的接收和傳送 -->
<rabbit:template id="amqpTemplate2" connection-factory="connectionFactory" exchange="MY_TOPIC_EXCHANGE" />
<!-- 訊息接收者 -->
<bean id="receiverThird" class="com.gupaoedu.consumer.ThirdConsumer"></bean>
<!-- queue litener 觀察 監聽模式 當有訊息到達時會通知監聽在對應的佇列上的監聽物件 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="MY_THIRD_QUEUE" ref="receiverThird" />
</rabbit:listener-container>
<!--######分隔線######-->
<!--定義queue -->
<rabbit:queue name="MY_FOURTH_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />
<!-- 定義fanout exchange,繫結MY_FIRST_QUEUE 和 MY_FOURTH_QUEUE -->
<rabbit:fanout-exchange name="MY_FANOUT_EXCHANGE" auto-delete="false" durable="true" declared-by="connectAdmin" >
<rabbit:bindings>
<rabbit:binding queue="MY_FIRST_QUEUE"></rabbit:binding>
<rabbit:binding queue="MY_FOURTH_QUEUE"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- 訊息接收者 -->
<bean id="receiverFourth" class="com.gupaoedu.consumer.FourthConsumer"></bean>
<!-- queue litener 觀察 監聽模式 當有訊息到達時會通知監聽在對應的佇列上的監聽物件 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="MY_FOURTH_QUEUE" ref="receiverFourth" />
</rabbit:listener-container>
</beans>
@Service
public class MessageProducer {
private Logger logger = LoggerFactory.getLogger(MessageProducer.class);
@Autowired
@Qualifier("amqpTemplate")
private AmqpTemplate amqpTemplate;
@Autowired
@Qualifier("amqpTemplate2")
private AmqpTemplate amqpTemplate2;
/**
* 演示三種交換機的使用
*
* @param message
*/
public void sendMessage(Object message) {
// amqpTemplate 預設交換機 MY_DIRECT_EXCHANGE
// amqpTemplate2 預設交換機 MY_TOPIC_EXCHANGE
// Exchange 為 direct 模式,直接指定routingKey
amqpTemplate.convertAndSend("FirstKey", "[Direct,FirstKey] "+message);
amqpTemplate.convertAndSend("SecondKey", "[Direct,SecondKey] "+message);
// Exchange模式為topic,通過topic匹配關心該主題的佇列
amqpTemplate2.convertAndSend("msg.Third.send","[Topic,msg.Third.send] "+message);
// 廣播訊息,與Exchange繫結的所有佇列都會收到訊息,routingKey為空
amqpTemplate2.convertAndSend("MY_FANOUT_EXCHANGE",null,"[Fanout] "+message);
}
}
public class FirstConsumer implements MessageListener {
private Logger logger = LoggerFactory.getLogger(FirstConsumer.class);
public void onMessage(Message message) {
logger.info("The first consumer received message : " + message.getBody());
}
}
public class SecondConsumer implements MessageListener {
private Logger logger = LoggerFactory.getLogger(SecondConsumer.class);
public void onMessage(Message message) {
logger.info("The second consumer received message : " + message);
}
}
....
springboot用法
消費者
@Configuration
@PropertySource("classpath:gupaomq.properties")
public class RabbitConfig {
@Value("${com.gupaoedu.firstqueue}")
private String firstQueue;
@Value("${com.gupaoedu.secondqueue}")
private String secondQueue;
@Value("${com.gupaoedu.thirdqueue}")
private String thirdQueue;
@Value("${com.gupaoedu.fourthqueue}")
private String fourthQueue;
@Value("${com.gupaoedu.directexchange}")
private String directExchange;
@Value("${com.gupaoedu.topicexchange}")
private String topicExchange;
@Value("${com.gupaoedu.fanoutexchange}")
private String fanoutExchange;
// 建立四個佇列
@Bean("vipFirstQueue")
public Queue getFirstQueue(){
return new Queue(firstQueue);
}
@Bean("vipSecondQueue")
public Queue getSecondQueue(){
return new Queue(secondQueue);
}
@Bean("vipThirdQueue")
public Queue getThirdQueue(){
return new Queue(thirdQueue);
}
@Bean("vipFourthQueue")
public Queue getFourthQueue(){
return new Queue(fourthQueue);
}
// 建立三個交換機
@Bean("vipDirectExchange")
public DirectExchange getDirectExchange(){
return new DirectExchange(directExchange);
}
@Bean("vipTopicExchange")
public TopicExchange getTopicExchange(){
return new TopicExchange(topicExchange);
}
@Bean("vipFanoutExchange")
public FanoutExchange getFanoutExchange(){
return new FanoutExchange(fanoutExchange);
}
// 定義四個繫結關係
@Bean
public Binding bindFirst(@Qualifier("vipFirstQueue") Queue queue, @Qualifier("vipDirectExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("gupao.best");
}
@Bean
public Binding bindSecond(@Qualifier("vipSecondQueue") Queue queue, @Qualifier("vipTopicExchange") TopicExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("*.gupao.*");
}
@Bean
public Binding bindThird(@Qualifier("vipThirdQueue") Queue queue, @Qualifier("vipFanoutExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public Binding bindFourth(@Qualifier("vipFourthQueue") Queue queue, @Qualifier("vipFanoutExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
/**
* 在消費端轉換JSON訊息
* 監聽類都要加上containerFactory屬性
* @param connectionFactory
* @return
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setAutoStartup(true);
return factory;
}
}
@Component
@PropertySource("classpath:gupaomq.properties")
@RabbitListener(queues = "${com.gupaoedu.firstqueue}", containerFactory="rabbitListenerContainerFactory")
public class FirstConsumer {
@RabbitHandler
public void process(@Payload Merchant merchant){
System.out.println("First Queue received msg : " + merchant.getName());
}
}
生產者
@Configuration
public class RabbitConfig {
/**
* 所有的訊息傳送都會轉換成JSON格式發到交換機
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate gupaoTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
}
@Component
@PropertySource("classpath:gupaomq.properties")
public class RabbitSender {
@Value("${com.gupaoedu.directexchange}")
private String directExchange;
@Value("${com.gupaoedu.topicexchange}")
private String topicExchange;
@Value("${com.gupaoedu.fanoutexchange}")
private String fanoutExchange;
@Value("${com.gupaoedu.directroutingkey}")
private String directRoutingKey;
@Value("${com.gupaoedu.topicroutingkey1}")
private String topicRoutingKey1;
@Value("${com.gupaoedu.topicroutingkey2}")
private String topicRoutingKey2;
@Autowired
private MerchantService merchantService;
// 自定義的模板,所有的訊息都會轉換成JSON傳送
@Autowired
AmqpTemplate gupaoTemplate;
public void send() throws JsonProcessingException {
Merchant merchant = new Merchant(1,"a direct msg : 中原鏢局","漢中省解放路266號");
gupaoTemplate.convertAndSend(directExchange,directRoutingKey, merchant);
gupaoTemplate.convertAndSend(topicExchange,topicRoutingKey1, "a topic msg : shanghai.gupao.teacher");
gupaoTemplate.convertAndSend(topicExchange,topicRoutingKey2, "a topic msg : changsha.gupao.student");
// 傳送JSON字串
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(merchant);
System.out.println(json);
merchantService.update(merchant);
gupaoTemplate.convertAndSend(fanoutExchange,"", json);
}
}
service程式碼
@Override
public int add(Merchant merchant) {
int k = merchantMapper.add(merchant);
System.out.println("aaa : "+merchant.getId());
JSONObject title = new JSONObject();
String jsonBody = JSONObject.toJSONString(merchant);
title.put("type","add");
title.put("desc","新增商戶");
title.put("content",jsonBody);
gupaoTemplate.convertAndSend(topicExchange,topicRoutingKey, title.toJSONString());
return k;
}
消費端限流
//非自動確認訊息的前提下,如果一定數目的訊息(通過基於consume或者channel設定Qos的值)未被確認前,不進行消費新的訊息。
channel.basicQos(2);
channel.basicConsume(QUEUE_NAME, false, consumer);
可靠性投遞
-
①代表訊息從生產者傳送到Broker
RabbitMQ提供了兩種服務端確認機制
-
Transaction(事務)模式
try { channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes()); channel.txCommit(); System.out.println("訊息傳送成功"); } catch (Exception e) { channel.txRollback(); System.out.println("訊息已經回滾"); }
事務模式是阻塞的,一條訊息沒有傳送完畢,不能傳送下一條,太消耗伺服器效能,不建議生產環境使用。
SpringBoot中設定
rabbitTemplate.setChannelTransacted(true);
-
Confirm(確認)模式
/*單條確認*/ // 開啟發送方確認模式 channel.confirmSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); // 普通Confirm,傳送一條,確認一條 if (channel.waitForConfirms()) { System.out.println("訊息傳送成功" ); } /*批量確認*/ channel.confirmSelect(); for (int i = 0; i < 5; i++) { // 傳送訊息 // String exchange, String routingKey, BasicProperties props, byte[] body channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes()); } // 批量確認結果,ACK如果是Multiple=True,代表ACK裡面的Delivery-Tag之前的訊息都被確認了 // 比如5條訊息可能只收到1個ACK,也可能收到2個(抓包才看得到) // 直到所有資訊都發布,只要有一個未被Broker確認就會IOException channel.waitForConfirmsOrDie(); System.out.println("訊息傳送完畢,批量確認成功");
由於批量確認不能確定多少條確認一次,所以就有非同步確認模式,一邊傳送一邊確認。
// 用來維護未確認訊息的deliveryTag final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); // 這裡不會列印所有響應的ACK;ACK可能有多個,有可能一次確認多條,也有可能一次確認一條 // 非同步監聽確認和未確認的訊息 // 如果要重複執行,先停掉之前的生產者,清空佇列 channel.addConfirmListener(new ConfirmListener() { public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("Broker未確認訊息,標識:" + deliveryTag); if (multiple) { // headSet表示後面引數之前的所有元素,全部刪除 confirmSet.headSet(deliveryTag + 1L).clear(); } else { confirmSet.remove(deliveryTag); } // 這裡新增重發的方法 } public void handleAck(long deliveryTag, boolean multiple) throws IOException { // 如果true表示批量執行了deliveryTag這個值以前(小於deliveryTag的)的所有訊息,如果為false的話表示單條確認 System.out.println(String.format("Broker已確認訊息,標識:%d,多個訊息:%b", deliveryTag, multiple)); if (multiple) { // headSet表示後面引數之前的所有元素,全部刪除 confirmSet.headSet(deliveryTag + 1L).clear(); } else { // 只移除一個元素 confirmSet.remove(deliveryTag); } System.out.println("未確認的訊息:"+confirmSet); } }); // 開啟發送方確認模式 channel.confirmSelect(); for (int i = 0; i < 10; i++) { long nextSeqNo = channel.getNextPublishSeqNo(); // 傳送訊息 // String exchange, String routingKey, BasicProperties props, byte[] body channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes()); confirmSet.add(nextSeqNo); } System.out.println("所有訊息:"+confirmSet);
springboot中confirm模式實在Channel上開啟的,RabbitTemplate對Channel進行了封裝
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){ public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey){ System.out.println("回發的訊息:"); System.out.println("replyCode: "+replyCode); System.out.println("replyText: "+replyText); System.out.println("exchange: "+exchange); System.out.println("routingKey: "+routingKey); } }); rabbitTemplate.setChannelTransacted(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { System.out.println("傳送訊息失敗:" + cause); throw new RuntimeException("傳送異常:" + cause); } } }); return rabbitTemplate; }
-
-
②代表訊息從Exchange路由到Queue
這個環節出錯有兩種可能,一是routingkey錯誤,二是佇列不存在。(生產環境基本不會出現這兩種情況)
channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("=========監聽器收到了無法路由,被返回的訊息============"); System.out.println("replyText:"+replyText); System.out.println("exchange:"+exchange); System.out.println("routingKey:"+routingKey); System.out.println("message:"+new String(body)); } });
Springboot訊息回發的方式
// 訊息是否必須路由到一個佇列中,配合Return使用 rabbitTemplate.setMandatory(true); // 為RabbitTemplate設定ReturnCallback rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { try { System.out.println("--------收到無法路由回發的訊息--------"); System.out.println("replyCode:" + replyCode); System.out.println("replyText:" + replyText); System.out.println("exchange:" + exchange); System.out.println("routingKey:" + routingKey); System.out.println("properties:" + message.getMessageProperties()); System.out.println("body:" + new String(message.getBody(), "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } });
建立交換機的時候,從屬性中指定備份交換機
// 在宣告交換機的時候指定備份交換機 Map<String,Object> arguments = new HashMap<String,Object>(); arguments.put("alternate-exchange","ALTERNATE_EXCHANGE"); channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments); // 傳送到了預設的交換機上,由於沒有任何佇列使用這個關鍵字跟交換機繫結,所以會被退回 // 第三個引數是設定的mandatory,如果mandatory是false,訊息也會被直接丟棄 channel.basicPublish("TEST_EXCHANGE","qingshan2673",true, properties,"只為更好的你".getBytes());
-
③代表訊息在Queue中儲存
如果沒有消費者,佇列一直存在在資料庫中。如果伺服器或硬體發生故障,比如宕機、重啟、關閉。所以要把訊息本身和元資料都儲存到磁碟。
-
佇列持久化
// 宣告佇列 // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments //durable:沒有持久化的佇列,儲存在記憶體中,服務重啟後佇列和訊息都會丟失。 //exclusive:排他性佇列的特點:只對首次宣告他的連線可見。會在連線斷開的時候自動刪除。 //autoDelete:沒有消費者連線的時候自動刪除。 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
交換機持久化
@Bean("gpexchange") public DirectExchange exchange() { return new DirectExchange("GP_RELIABLE_RECEIVE_EXCHANGE", true, false, new HashMap<>()); }
-
訊息持久化
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) // 2代表持久化 .contentEncoding("UTF-8") // 編碼 .expiration("10000") // TTL,過期時間 .headers(headers) // 自定義屬性 .priority(5) // 優先順序,預設為5,配合佇列的 x-max-priority 屬性使用 .messageId(String.valueOf(UUID.randomUUID())) .build();
如果訊息沒有持久化,儲存在記憶體中,佇列還在,但是訊息在重啟後會消失。
如果只有一個RabbitMQ節點,即使交換機、佇列、訊息做了持久化,如果服務崩潰或者硬體發生故障,RabbitMQ的服務一樣是不可用的。所以為了提高MQ服務的可用性,保障訊息的傳輸,我們需要多個RabbitMQ的節點,也就是叢集。
-
-
④代表消費者訂閱Queue並消費訊息
RabbitMQ提供了訊息的確認機制,消費者可以自動或手動傳送ACK給服務端。
沒有收到ACK的訊息,消費者斷開連線後,RabbitMQ會把這條訊息傳送給其他消費者。如果沒有其他消費者,消費者重啟後會重新消費這條訊息,重複執行業務邏輯。
消費者有兩種方式給broker應答。一種是自動ACK,一種是手動ACK。
自動ACK,也是預設的情況。也就是沒有在消費者處編寫ACK的程式碼,消費者會在收到訊息的時候就自動傳送ACK,並不關心有沒有正常消費。
如果想要等訊息執行完成後才傳送ACK,需要先把自動ACK設定成手動ACK。把autoAck設定成false。
// String queue, boolean autoAck, Consumer callback channel.basicConsume(QUEUE_NAME, false, consumer);
Springboot中設定
spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual
程式碼中設定
// 設定訊息確認模式為手動模式 messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
None:自動ACK
MANUAL:手動ACK
AUTO:如果方法未丟擲異常,則傳送ack。如果方法丟擲異常,並且不是AmqpRejectAndDontRequeueException則傳送nack,並且重新進入佇列。如果丟擲異常,並且是AmqpRejectAndDontRequeueException異常則傳送nack不會重新進入佇列。
消費者呼叫Ack:
@RabbitHandler public void process(String msgContent,Channel channel, Message message) throws IOException { System.out.println("Second Queue received msg : " + msgContent ); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
如果消費出了問題,也有拒絕訊息的指令,而且還可以讓訊息重新入隊給其他消費者消費。
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("Received message : '" + msg + "'"); if (msg.contains("拒收")){ // 拒絕訊息 // requeue:是否重新入佇列,true:是;false:直接丟棄,相當於告訴佇列可以直接刪除掉 // TODO 如果只有這一個消費者,requeue 為true 的時候會造成訊息重複消費 channel.basicReject(envelope.getDeliveryTag(), false); } else if (msg.contains("異常")){ // 批量拒絕 // requeue:是否重新入佇列 // TODO 如果只有這一個消費者,requeue 為true 的時候會造成訊息重複消費 channel.basicNack(envelope.getDeliveryTag(), true, false); } else { // 手工應答 // 如果不應答,佇列中的訊息會一直存在,重新連線的時候會重複消費 channel.basicAck(envelope.getDeliveryTag(), true); } } };
生產者確定消費者是否消費成功的兩種方式:
- 消費者收到訊息,處理完畢後,呼叫生產者的API(是否破化解耦)
- 消費者收到訊息,處理完畢後,傳送一條響應訊息給生產者。
補償機制
如果生產者的API沒有別呼叫,也沒有收到消費者的響應訊息,生產者和消費者應該約定一個超時時間,對於超過這個時間沒有得到響應的訊息,才確定為消費失敗。
消費失敗後,需要重新發送訊息。
-
誰來重發?
可以建立一個數據庫表記錄消費失敗的訊息,由定時任務定時重新發送。(會消耗效能,消耗資料庫儲存空間)
-
隔多久傳送一次?由定時任務的時間決定。
-
一共重發多少次?可以設定為幾次,在訊息表裡記錄次數實現重新發送。
-
重發什麼內容?傳送一模一樣的訊息
訊息的冪等性
如果消費者狀態時正常的,每一條訊息都可以正常處理,只是在響應或者呼叫API的時候出現問題。為了避免相同訊息的重複處理,必須採取一定的措施。RabbitMQ服務端是沒有這種控制的,只能在消費端處理。
重複訊息的兩個原因
- 生產者的問題,環節①重複傳送訊息,比如在開啟了Confirm模式但未收到確認,消費者重複投遞。
- 環節④出了問題,由於消費者未傳送ACK或者其他原因,訊息重複消費。
對於重複傳送的訊息,可以對每一條訊息生成一個唯一的業務ID,通過日誌或者訊息落庫來做重複控制。
最終一致
約定一個標準,獲取核心系統的日誌檔案,解析,登記成資料,然後跟自己記錄的流水比較,跟核心系統保持一致。
訊息的順序性
訊息的順序性是指消費者消費的順序跟生產者生產的訊息順序是一致的。一個佇列有多個消費者時無法保證順序。
只有一個佇列有一個消費者時才能保證順序消費。
叢集和高可用
叢集目的:實現高可用和負載均衡。
如何支援叢集?
由於是Erlang開發,所以天然支援叢集。
通過.erlang.cookie(預設路徑:/var/lib/rabbitmq)來驗證身份,需要在所有節點上保持一致。
RabbitMQ節點型別
叢集有兩種節點型別:一種是磁碟節點(disc node),一種是記憶體節點(ram node)
磁碟節點:將元資料(佇列名字,屬性,交換機的名字、屬性,繫結,vhost)放在磁碟中。預設磁碟節點。
記憶體節點:將元資料放在記憶體中。
叢集中至少需要一個磁碟節點來持久化元資料,否則全部記憶體節點崩潰時,就無從同步元資料。
叢集的配置步驟:
- 配置hosts以便互相通訊
- 同步erlang.cookie
- 加入叢集(join cluster 命令)
rabbitmq有兩種叢集模式:普通叢集模式和映象佇列叢集模式。
普通叢集
不同的節點之間只會相互同步元資料(交換機、佇列、繫結關係,vhost的定義),而不會同步訊息。如果要保證佇列的高可用,不能用這種叢集。
映象叢集
訊息內容會在映象節點同步,可用性更高。但是會降低系統性能。
叢集模式可以通過UI或者CLI或者HTTP操作。
高可用
通過VRRP協議,這個元件就是Keepalived,它具有Load Balane和high Availability的功能。
生產環境運維監控可以用zabbix+grafana實現,主要關注:磁碟,記憶體,連線數。
傳送訊息:可以用json陣列格式傳送,建議單條訊息不要超過4M(4096kb)
kafka
kafka的服務叫做broker,預設是9092埠,生產者和消費者都是跟一個broker建立連線才能實現訊息的收發。
生產者對應的封裝類是ProducerRecord,消費者對應的封裝類是ConsumerRecord,訊息在傳輸過程中需要序列號。
生產者
生產者不是逐條傳送訊息給broker,而是批量傳送的。多少條傳送一次由一個引數決定。
proc.put("batch.size",16384);
消費者
kafka的消費是pull模式。而且可以控制一次到底取多少條訊息,預設是500,可以在poll方法裡面指定。
max.poll.records
因為在push模式下,如果訊息的生產速度遠遠大於消費者消費的速度,那消費者就會不堪重負,直接掛掉。
Topic
在kakfa中,佇列叫topic,是一個邏輯的概念,可以理解為訊息的組合。生產者和topic以及消費者和topic都是多對多的關係。
生產者傳送訊息時,如果topic不存在,會自動建立,由一個引數控制。
預設為true。
auto.create.topics.enable
Partition和Cluster
如果一個topic中的訊息過多的話,會帶來兩個問題:
第一個不方便橫向擴充套件
第二個是併發或者負載的問題。所有的客戶端操作都在一個topic上,在高併發的場景下效能會大大降低。
為了解決上面的問題,引入了分割槽,就是把topic拆分。
分割槽在建立topic的時候指定,每個topic至少有一個分割槽
./kafka-topics.sh --create --zookper localhost:2181 --replication-factor 1 --partitions 1 --topic gptest
如果沒有指定分割槽,預設的分割槽是一個,這個引數可以修改。
num.partitions=1
partitions是分割槽數,replication-factor是分割槽副本數。
partition分割槽裡面的訊息讀取後不會被刪除。
Partition副本Replica機制
如果partition的資料只儲存一份,在發生網路或者硬體故障的時候,該分割槽的資料就無法訪問或者無法恢復了。
kafka在0.8版本之後增加了副本機制。
每個partition可以有若干個副本(replica),副本必須在不同的broker上面。
服務端有一個引數控制預設的副本數
offsets.topic.replication.factor
Segment
如果一個partition只有一個log檔案,訊息不斷地追加,這個log我呢見也會變得越來越大,這個時候要檢索資料庫效率就會很低。所以乾脆把partition再做一個切分,切分出來的單位就叫段。實際上kafka的儲存檔案是劃分成段來儲存的。
預設儲存路徑:/tmp/kafka-logs/
每個segment都至少有1個數據檔案和2個索引檔案,這三個檔案是成套出現的。
一個segment預設大小是1073741824bytes(1G),由下面引數控制
log.segment.bytes
Consumer Group
如果生產者訊息的速率過快,會造成訊息在broker的堆積,影響broker的效能。通過增加消費者的數量提升消費速率,消費者過多怎麼確定消費的在同一個topic?
所以引入了一個Consumer Group消費組的概念。在程式碼中通過group id來配置。消費同一個topic的消費者不一定是同一個組,只有group id相同的消費者才是一個消費者組。
Consumer Offset
partition裡面的訊息是順序寫入的,被讀取之後為了不被刪除。所以對訊息進行編號,用來標識一條唯一的訊息。這個編號叫做offset,偏移量。offset記錄著下一條將要傳送給consumer的訊息序號。偏移量儲存在服務端。
java開發
public class SimpleConsumer {
public static void main(String[] args) {
Properties props= new Properties();
//props.put("bootstrap.servers","192.168.44.161:9093,192.168.44.161:9094,192.168.44.161:9095");
props.put("bootstrap.servers","192.168.242.110:9092");
props.put("group.id","gp-test-group");
// 是否自動提交偏移量,只有commit之後才更新消費組的 offset
props.put("enable.auto.commit","true");
// 消費者自動提交的間隔
props.put("auto.commit.interval.ms","1000");
// 從最早的資料開始消費 earliest | latest | none
props.put("auto.offset.reset","earliest");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);
// 訂閱topic
consumer.subscribe(Arrays.asList("mytopic"));
try {
while (true){
ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String,String> record:records){
System.out.printf("offset = %d ,key =%s, value= %s, partition= %s%n" ,record.offset(),record.key(),record.value(),record.partition());
}
}
}finally {
consumer.close();
}
}
}
public class SimpleProducer {
public static void main(String[] args) {
Properties pros=new Properties();
//pros.put("bootstrap.servers","192.168.44.161:9093,192.168.44.161:9094,192.168.44.161:9095");
pros.put("bootstrap.servers","192.168.242.110:9092");
pros.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
pros.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
// 0 發出去就確認 | 1 leader 落盤就確認| all(-1) 所有Follower同步完才確認
pros.put("acks","1");
// 異常自動重試次數
pros.put("retries",3);
// 多少條資料傳送一次,預設16K
pros.put("batch.size",16384);
// 批量傳送的等待時間
pros.put("linger.ms",5);
// 客戶端緩衝區大小,預設32M,滿了也會觸發訊息傳送
pros.put("buffer.memory",33554432);
// 獲取元資料時生產者的阻塞時間,超時後丟擲異常
pros.put("max.block.ms",3000);
// 建立Sender執行緒
Producer<String,String> producer = new KafkaProducer<String,String>(pros);
for (int i =0 ;i<10;i++) {
producer.send(new ProducerRecord<String,String>("mytopic",Integer.toString(i),Integer.toString(i)));
// System.out.println("傳送:"+i);
}
//producer.send(new ProducerRecord<String,String>("mytopic","1","1"));
//producer.send(new ProducerRecord<String,String>("mytopic","2","2"));
producer.close();
}
}
kafka與springboot實戰
server.port=7271
#spring.kafka.bootstrap-servers=192.168.44.161:9093,192.168.44.161:9094,192.168.44.161:9095
spring.kafka.bootstrap-servers=192.168.44.160:9092
# producer
spring.kafka.producer.retries=1
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.acks=all
spring.kafka.producer.properties.linger.ms=5
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# consumer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
消費者使用@KafkaListener監聽topic
@Component
public class ConsumerListener {
@KafkaListener(topics = "springboottopic",groupId = "springboottopic-group")
public void onMessage(String msg){
System.out.println("----收到訊息:"+msg+"----");
}
}
生產者
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
public String send(@RequestParam String msg){
kafkaTemplate.send("springboottopic", msg);
return "ok";
}
}
測試先啟動Application,在執行單元測試
@SpringBootTest
class KafkaTests {
@Autowired
KafkaProducer producer;
// 消費者:先啟動 kafkaApp
@Test
void testSendMsg() {
long time = System.currentTimeMillis();
System.out.println("----"+time +",已經發出----");
producer.send("qingshan penyuyan, " +time);
}
}
kafka繼承canal實現資料同步
canal利用mysql的binlog日誌功能,把自己偽裝成一個slave節點,不斷請求最新的binlog。canal會解析binlog的內容,把內容發給關注資料庫變動的接收者,完成後續邏輯的處理。
Canal是一個純java開發的資料同步工具,可以支援binlog增量訂閱的功能。binlog設定成row模式以後,不僅能獲取到執行的每一個增刪改的指令碼,同時能獲取到修改前和修改後的資料。
工作流程:資料變動-產生binlog資訊-canal服務獲取binlog資訊-傳送MQ訊息-消費者消費MQ訊息,完成後續邏輯處理。
進階功能
訊息冪等性
如果消費失敗了,訊息需要重發。但是不清楚消費者是不是真的消費失敗的情況下,有可能出現訊息重複的情況。
訊息重複需要在消費端解決,也就是在消費者實現冪等性。
考慮到所有的消費者都要做一樣的實現,kafka乾脆在broker實現了訊息的重複性判斷,大大的解放了消費者的雙手。
去重肯定要依賴於生產者的訊息唯一的標識,不然是沒有辦法知道是不是同一條訊息的。通過配置:
props.put("enable.idempotence",true);
enable.idempotence設定成true之後,producer自動升級成冪等性的producer,kafka會自動去重。有兩個實現機制:
-
PID(Producer ID),冪等性的生產者每個客戶端都有一個唯一的編號。
-
sequence number,冪等性的生產者傳送的每條訊息都帶有相應的sequence number,server端就是根據這個值來判斷資料是否重複。如果說sequence number比服務端已經記錄的值要小,那肯定是出現訊息重複了。
- 由於sequence number並不是全域性有序的,不能保證所有時間上的冪等性。所以他的作用範圍是有限的
- 只能保證單分割槽上的冪等性,即一個冪等性的producer能夠保證某個主題的一個分割槽上不出現重複訊息。
- 只能實現單會話的冪等性,這裡的會話指的是producer程序的一次執行。當重啟了producer程序之後,冪等性不保證。
如果要實現多個分割槽的訊息的原子性,就要用到kafka的事務機制了。
生產者事務
生產者事務是kakfa2017引入的新特性,通過事務,kafka可以保證生產者會話的訊息冪等傳送。
public class TransactionProducer {
public static void main(String[] args) {
Properties props=new Properties();
//props.put("bootstrap.servers","192.168.44.161:9093,192.168.44.161:9094,192.168.44.161:9095");
props.put("bootstrap.servers","192.168.44.160:9092");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
// 0 發出去就確認 | 1 leader 落盤就確認| all或-1 所有Follower同步完才確認
props.put("acks","all");
// 異常自動重試次數
props.put("retries",3);
// 多少條資料傳送一次,預設16K
props.put("batch.size",16384);
// 批量傳送的等待時間
props.put("linger.ms",5);
// 客戶端緩衝區大小,預設32M,滿了也會觸發訊息傳送
props.put("buffer.memory",33554432);
// 獲取元資料時生產者的阻塞時間,超時後丟擲異常
props.put("max.block.ms",3000);
props.put("enable.idempotence",true);
// 事務ID,唯一
props.put("transactional.id", UUID.randomUUID().toString());
Producer<String,String> producer = new KafkaProducer<String,String>(props);
// 初始化事務
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<String,String>("transaction-test","1","1"));
producer.send(new ProducerRecord<String,String>("transaction-test","2","2"));
// Integer i = 1/0;
producer.send(new ProducerRecord<String,String>("transaction-test","3","3"));
// 提交事務
producer.commitTransaction();
} catch (KafkaException e) {
// 中止事務
producer.abortTransaction();
}
producer.close();
}
}
特性
- 高吞吐、低延遲:最大特點就是收發訊息快,每秒可以處理幾十萬條訊息,他的延遲只有幾毫秒
- 高伸縮性:通過增加分割槽partition來實現擴容。不同的分割槽可以在不同的broker中。通過zk來管理broker實現擴充套件,zk管理consumer可以實現負載。
- 永續性、可靠性:kafka能夠允許資料的持久化儲存,訊息被持久化到磁碟,支援資料備份防止資料丟失。
- 容錯性:允許叢集中的節點失敗,某個節點宕機,kafka叢集能正常工作。
- 高併發:支援數千個客戶端同時讀寫。
產品側重 | 效能 | 訊息順序 | 訊息路由和分發 | 延遲訊息。死信佇列 | 訊息的留存 | |
---|---|---|---|---|---|---|
RabbitMq | 訊息代理 | 主要是push | 更加靈活 | 支援 | 不留存 | |
Kafka | 流式訊息處理、訊息引擎 | 只有pull,吞吐量更高 | 分割槽訊息有序,能保證訊息的順序 | 不支援 | 消費完會清除 | |
RocketMq | 只有pull |
RocketMQ
RocketMQ常用管理命令
https://blog.csdn.net/gwd1154978352/article/details/80829534
RocketMQ預設配置
https://www.cnblogs.com/jice/p/11981107.html
broker
RocketMQ的服務叫做broker,broker的作用是儲存和轉發訊息。單機大約能承受10萬QPS。
為了提高可靠性,每個Broker可以有自己的副本。預設情況下,讀寫都發生在master上。在slaveReadEnable=true的情況下,slave也可以參與讀負載。但是預設只有BrokerId=1的slave才會參與讀負載,而且是在master消費慢的情況下,由whichBrokerWhenConsumeSlowly這個引數決定。
topic
topic用於將訊息按主題做劃分。跟kafka不同的是,在RocketMQ中,Topic是一個邏輯概念,訊息不是按Topic劃分儲存的。Producer將訊息發往指定的topic,consumer訂閱這個topic就可以收到相應的訊息。跟kafka一樣,如果topic不存在,會自動建立,BrokerConfig:
private boolean autoCreateTopicEnable = true;
topic跟生產者和消費者是多對多的關係。
NameServer
可以把NameServer理解為是RocketMQ的路由中心,每一個NameServer節點都儲存著全量的路由資訊,為了保證高可用,NameServer也可以做叢集的部署。Broker會在NameServer上註冊自己,Producer和Consumer用NameServer來發現Broker。
NameServer作為路由中心怎麼工作?
每個Broker節點在啟動時,都會根據配置遍歷NameServer列表。Broker與每個NameServer建立TCP長連線,註冊自己的資訊,之後每隔30s傳送心跳資訊。除了主從註冊,還有定時探活。每個NameServer每隔10s檢查一下各個Broker的最近一次心跳時間,如果發現某個Broker超過120s都沒傳送心跳,就認為這個Broker已經掛掉 了,會將其從路有訊息裡移除。
一致性問題
-
服務註冊
因為沒有master,Broker每隔30s會向所有的NameServer傳送心跳資訊。
-
服務剔除
- 如果broker正常關閉,連線就斷開了,netty的通道關閉監聽器會監聽到連線斷開事件,然後將這個broker剔除掉
- 如果broker異常關閉,NamServer的定時任務每10s掃描Broker列表,如果某個broker的心跳包的最新時間戳超過當前時間120s,就會被剔除。
-
路由發現
生產者:傳送訊息的時候,根據topic從NameServer獲取路由資訊。
消費者:一般是訂閱固定的topic。在MQClientInstance類中由定時任務定期更新NamServer資訊,預設30s獲取一次。
producer
生產者,會定時從NameServer拉取路由資訊,然後根據路由資訊與指定的Broker建立TCP長連線,從而將訊息傳送到Broker中。傳送邏輯一致的Producer可以組成一個Group。Producer寫資料只能操作master節點。
consumer
消費者有兩種消費方式:一種是叢集消費,一種是廣播消費。
Message Queue
作用跟kafka裡面的partition分片類似。
在建立topic的時候會指定佇列的數量,一個叫writeQueueNums寫佇列的數量,一個readQueueNums讀佇列的數量。寫佇列的數量決定了有幾個Message Queue,讀佇列的數量決定了有幾個執行緒來消費這些Message Queue。
java開發
生產者:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("my_test_producer_group");
producer.setNamesrvAddr("192.168.44.163:9876;192.168.44.164:9876");
producer.start();
for (int i = 0; i < 6; i++){
try {
// tags 用於過濾訊息 keys 索引鍵,多個用空格隔開,RocketMQ可以根據這些key快速檢索到訊息
Message msg = new Message("q-2-1",
"TagA",
"2673",
("RocketMQ "+String.format("%05d", i)).getBytes());
SendResult sendResult = producer.send(msg);
System.out.println(String.format("%05d", i)+" : "+sendResult);
}
catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
消費者:
public class SimpleConsumer {
public static void main(String[] args) throws MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_test_consumer_group");
// Specify name server addresses.
consumer.setNamesrvAddr("192.168.44.163:9876;192.168.44.164:9876");
consumer.setMessageModel(MessageModel.BROADCASTING);
// Subscribe one more more topics to consume.
consumer.subscribe("q-2-1", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
for(MessageExt msg : msgs){
String topic = msg.getTopic();
String messageBody="";
try {
messageBody = new String(msg.getBody(),"utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 重新消費
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
String tags = msg.getTags();
System.out.println("topic:"+topic+",tags:"+tags+",msg:"+messageBody);
}
// 消費成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
springboot整合
消費者
@Component
@RocketMQMessageListener(topic = "springboot-topic",consumerGroup = "qs-consumer-group",
//selectorExpression = "tag1",selectorType = SelectorType.TAG,
messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
public class MessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
try {
System.out.println("----------接收到rocketmq訊息:" + message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
生成者
@Component
public class MessageSender {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void syncSend(){
/**
* 傳送可靠同步訊息 ,可以拿到SendResult 返回資料
* 同步傳送是指訊息傳送出去後,會在收到mq發出響應之後才會傳送下一個資料包的通訊方式。
* 這種方式應用場景非常廣泛,例如重要的右鍵通知、報名簡訊通知、營銷簡訊等。
*
* 引數1: topic:tag
* 引數2: 訊息體 可以為一個物件
* 引數3: 超時時間 毫秒
*/
SendResult result= rocketMQTemplate.syncSend("springboot-topic:tag","這是一條同步訊息",10000);
System.out.println(result);
}
/**
* 傳送 可靠非同步訊息
* 傳送訊息後,不等mq響應,接著傳送下一個資料包。傳送方通過設定回撥介面接收伺服器的響應,並可對響應結果進行處理。
* 非同步傳送一般用於鏈路耗時較長,對於RT響應較為敏感的業務場景,例如使用者上傳視訊後通過啟動轉碼服務,轉碼完成後通推送轉碼結果。
*
* 引數1: topic:tag
* 引數2: 訊息體 可以為一個物件
* 引數3: 回撥物件
*/
public void asyncSend() throws Exception{
rocketMQTemplate.asyncSend("springboot-topic:tag1", "這是一條非同步訊息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("回撥sendResult:"+sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println(e.getMessage());
}
});
TimeUnit.SECONDS.sleep(100000);
}
/**
* 傳送單向訊息
* 引數1: topic:tag
* 引數2: 訊息體 可以為一個物件
*/
public void sendOneWay(){
rocketMQTemplate.sendOneWay("springboot-topic:tag1", "這是一條單向訊息");
}
/**
* 傳送單向的順序訊息
*/
public void sendOneWayOrderly(){
for(int i=0;i<10;i++){
rocketMQTemplate.sendOneWayOrderly("springboot-topic:tag1", "這是一條順序訊息"+i,"2673");
rocketMQTemplate.sendOneWayOrderly("springboot-topic:tag1", "這是一條順序訊息"+i,"2673");
}
}