《RabbitMQ系列教程-第四章-03-RabbitMQ工作模式之Pub/Sub釋出訂閱模式》
技術標籤:# 《RabbitMQ系列教程》交換機佇列rabbitmqjavaqueue
RabbitMQ工作模式之Pub/Sub釋出訂閱模式
4.3.1 簡介
在釋出訂閱模式中,Producer
傳送訊息到指定的交換機(Exchange
)中,由Exchange
繫結不同的Queues
,消費者依舊監聽這些佇列進行消費(work
模式使用的是預設的交換機,空字串""
)
tips:交換機(Exchange)只負責將訊息轉發到繫結的佇列(Queues)中,不會儲存訊息,如果Exchange沒有繫結Queues或者Exchange不能將訊息路由到指定的Queues中時,此訊息將會丟失
;
一個Exchange可以有direct
、topic
、headers
、fanout
四種類型,不同型別的Exchange具備不同的訊息路由功能,Pub/Sub
模式則重點強調的是fanout
型別的Exchange
交換模式,Pub/Sub
模式也叫分列模式;
Pub/Sub模式官網介紹:https://www.rabbitmq.com/tutorials/tutorial-four-java.html
4.3.2 生產者
package com.lscl.rabbitmq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq. client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer03_PubSub {
public static void main(String[] args) throws Exception {
// 建立連線工廠,用於獲取頻道channel
ConnectionFactory factory = new ConnectionFactory();
factory. setHost("192.168.40.132");
factory.setPort(5672);
factory.setUsername("lscl");
factory.setPassword("admin");
factory.setVirtualHost("/lscl");
// 2.建立連線
Connection connection = factory.newConnection();
// 3.建立頻道
Channel channel = connection.createChannel();
String exchangeName = "test_fanout";
//5. 建立交換機
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
1. exchange:交換機名稱
2. type:交換機型別
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(廣播),傳送訊息到每一個與之繫結佇列。
TOPIC("topic"),萬用字元的方式
HEADERS("headers");引數匹配
3. durable:是否持久化
4. autoDelete:自動刪除
5. internal:內部使用。 一般false
6. arguments:引數
*/
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//6. 建立佇列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7. 繫結佇列和交換機
/*
queueBind(String queue, String exchange, String routingKey)
引數:
1. queue:佇列名稱
2. exchange:交換機名稱
3. routingKey:路由鍵,繫結規則
如果交換機的型別為fanout ,routingKey設定為""
*/
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
String body = "呼叫各位";
//8. 傳送訊息
channel.basicPublish(exchangeName,"",null,body.getBytes());
//9. 釋放資源
channel.close();
connection.close();
}
}
執行完畢之後,檢視RabbitMQ UI面板,發現多了一個Exchange(test_fanout)和兩個Queues(test_fanout_queue1
、test_fanout_queue2
)
4.3.3 消費者1
package com.lscl.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer03_PubSub {
public static void main(String[] args) throws Exception {
// 建立連線工廠,用於獲取頻道channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.40.132");
factory.setPort(5672);
factory.setUsername("lscl");
factory.setPassword("admin");
factory.setVirtualHost("/lscl");
// 2.建立連線
Connection connection = factory.newConnection();
// 3.建立頻道
Channel channel = connection.createChannel();
String queueName = "test_fanout_queue1";
channel.queueDeclare(queueName, true, false, false, null);
// 接收訊息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumer-1 body:" + new String(body));
}
};
channel.basicConsume(queueName, true, consumer);
// 不釋放資源,讓rabbitmq一直監聽
}
}
4.3.4 消費者2
package com.lscl.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer04_PubSub {
public static void main(String[] args) throws Exception {
// 建立連線工廠,用於獲取頻道channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.40.130");
factory.setPort(5672);
factory.setUsername("lscl");
factory.setPassword("admin");
factory.setVirtualHost("/lscl");
// 2.建立連線
Connection connection = factory.newConnection();
// 3.建立頻道
Channel channel = connection.createChannel();
String queueName = "test_fanout_queue2";
channel.queueDeclare(queueName, true, false, false, null);
// 4.接收訊息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumer-2 body:" + new String(body));
}
};
channel.basicConsume(queueName, true, consumer);
// 不釋放資源,讓rabbitmq一直監聽
}
}
4.3.5 Pub/Sub模式小結
在Pub/Sub
模式下(Exchange
型別為Fanout
,也叫分列模式),訊息首先被髮送到Exchange
,由Exchange
路由到繫結的Queue
中,類似於我們微信群,有什麼事在群裡面傳送一下,群裡面的人都能看得到;這樣就不需要每個人單獨傳送訊息了;
需要注意的兩點:
1、work、simple也會有交換機,他們使用的是預設的交換機
2、Exchange還可以繫結到另一個Exchange上
Pub/Sub
模式重點強調的是Fanout
型別的Exchange
交換模式,很多人也把Pub/Sub
模式稱為分列模式
或者Fanout
模式;
下一篇:《RabbitMQ系列教程-第四章-04-RabbitMQ工作模式之Routing路由模式》