RabbitMQ學習——常見概念詳解
阿新 • • 發佈:2018-12-08
文章目錄
Exchange
描述
用於接收訊息,並根據路由鍵轉發訊息到所繫結的佇列
結構圖
藍色框表示傳送訊息,然後訊息通過路由關係路由到Queue1或Queue2
綠色框表示接收訊息,消費者與佇列建立監聽,去消費訊息
黃色框表示路由鍵繫結關係
交換機屬性
- Name:交換機名稱
- Type:交換機型別direct、topic、fanout、headers
- Durablity:是否需要持久化 true|false
- Auto Delete:當最後一個繫結到Exchange上的佇列刪除後,自動刪除該Exchange
- Internal:當前Exchange是否用於RabbitMQ內部使用,預設為false
- Arguments:擴充套件引數,用於擴充套件AMQP協議自制定使用
Direct Exchange
- 所有傳送到Direct Exchange的訊息被轉發到RoutingKey中指定的Queue
如圖所示,路由到佇列名為Key的佇列中去了
生產者程式碼:
public class Producer4DirectExchange {
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.getConn();
Channel channel = connection.createChannel ();
//宣告
String exchangeName = "test_direct_exchange";
String routingKey = "test.direct";
//傳送
String msg = "Hello, I am Producer4DirectExchange";
channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());
}
}
消費者程式碼:
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = ConnectionUtil.getConn();
Channel channel = connection.createChannel();
//宣告
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test.direct";
//宣告Exchange
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
//宣告一個佇列
channel.queueDeclare(queueName,false,false,false,null);
//建立一個繫結關係
channel.queueBind(queueName,exchangeName,routingKey);
//durable 是否持久化訊息
QueueingConsumer consumer = new QueueingConsumer(channel);
//引數:佇列名稱,是否自動ACK,Consumer
channel.basicConsume(queueName,true,consumer);
while (true) {
//阻塞獲取訊息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到訊息:"+msg);
}
}
公共類:
public class ConnectionUtil {
public static final String MQ_HOST = "192.168.222.101";
public static final String MQ_VHOST = "/";
public static final int MQ_PORT = 5672;
public static Connection getConn() {
//1. 建立一個ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(MQ_HOST);//配置host
connectionFactory.setPort(MQ_PORT);//配置port
connectionFactory.setVirtualHost(MQ_VHOST);//配置vHost
//2. 通過連線工廠建立連線
try {
return connectionFactory.newConnection();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
登入控制檯,可以看到名為test_direct_exchange
的交換機通過路由鍵test.direct
繫結到test_direct_queue
Topic Exchange
- 所有傳送到Topic Exchange的訊息都被轉發到所有關心RoutingKey中指定Topic的佇列上
- Exchange將RoutingKey和某Topic進行模糊匹配,此時佇列需要繫結一個Topic
可以使用萬用字元進行模糊匹配:
#
匹配一個或多個詞(單詞的意思,不是一個字元)
*
匹配一個詞
log.#
能匹配到"log.info.a"
log.*
只匹配"log.error"
如上圖,比如,user.news和user.weather能路由到第一個佇列
生成者程式碼:
public class Producer4TopicExchange {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConn();
Channel channel = connection.createChannel();
//宣告
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.find.abc";
//傳送
String msg = "Hello, I am Producer4TopicExchange";
channel.basicPublish(exchangeName,routingKey1,null,msg.getBytes());
channel.basicPublish(exchangeName,routingKey2,null,msg.getBytes());
channel.basicPublish(exchangeName,routingKey3,null,msg.getBytes());
CloseTool.closeElegantly(channel,connection);
}
}
消費者程式碼:
public class Consumer4TopicExchange {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = ConnectionUtil.getConn();
Channel channel = connection.createChannel();
//宣告
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
String routingKey = "user.#";
//宣告Exchange
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
//宣告一個佇列
channel.queueDeclare(queueName,false,false,false,null);
//建立一個繫結關係
channel.queueBind(queueName,exchangeName,routingKey);
//durable 是否持久化訊息
QueueingConsumer consumer = new QueueingConsumer(channel);
//引數:佇列名稱,是否自動ACK,Consumer
channel.basicConsume(queueName,true,consumer);
while (true) {
//阻塞獲取訊息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到訊息:"+msg);
}
}
}
先啟動消費者,然後發現下面兩個繫結關係:
佇列
再啟動生產者,從消費者端可以看到如下輸出:
收到訊息:Hello, I am Producer4TopicExchange
收到訊息:Hello, I am Producer4TopicExchange
收到訊息:Hello, I am Producer4TopicExchange
說明3條訊息都收到了,接下來,我們改一下消費者的路由鍵,改為:String routingKey = "user.*";
再次啟動
收到訊息:Hello, I am Producer4TopicExchange
收到訊息:Hello, I am Producer4TopicExchange
收到訊息:Hello, I am Producer4TopicExchange
奇怪,怎麼還是收到3條,帶著疑問我們去看控制檯
可以看到,之前的路由規則繫結還在。因此可以解釋為啥能收到3條。
點選unbind解綁"user.#"
然後繼續操作一把,檢視輸出
收到訊息:Hello, I am Producer4TopicExchange
收到訊息:Hello, I am Producer4TopicExchange
此時,只收到兩條了。
Fanout Exchange
- 不處理路由鍵,只需要簡單的將佇列繫結到交換機上
- 傳送到交換機的訊息都會被轉發到與該互動機繫結的所有佇列上
- 轉發訊息是最快的
訊息不走路由鍵,只要佇列與交換機有繫結關係就能收到。
生產者:
public class Producer4FanoutExchange {
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.getConn();
Channel channel = connection.createChannel();
//宣告
String exchangeName = "test_fanout_exchange";
String routingKey = "nothing";
//傳送
String msg = "Hello, I am Producer4FanoutExchange";
channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());
CloseTool.closeElegantly(channel,connection);
}
}
消費者:
public class Consumer4FanoutExchange {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = ConnectionUtil.getConn();
Channel channel = connection.createChannel();
//宣告
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
String routingKey = "";//不設定路由鍵
//宣告Exchange
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
//宣告一個佇列
channel.queueDeclare(queueName,false,false,false,null);
//建立一個繫結關係
channel.queueBind(queueName,exchangeName,routingKey);
//durable 是否持久化訊息
QueueingConsumer consumer = new QueueingConsumer(channel);
//引數:佇列名稱,是否自動ACK,Consumer
channel.basicConsume(queueName,true,consumer);
while (true) {
//阻塞獲取訊息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到訊息:"+msg);
}
}
}
和
Binding
- Exchange和Exchange、Queue之間的連線關係
- Binding中可以包含RoutingKey或引數
Queue
- 訊息佇列,實際儲存訊息
屬性
- Durability:是否持久化
- Auto delete: 若為yes,代表當最後一個監聽被移除後,該Queue會自動被刪除
Message
- 伺服器和應用程式之間傳遞的資料
- 本質就是一段資料,由Properties和Payload(Body)組成
屬性
- delivery mode
- headers(自定義屬性放到這裡面)
- content_type
- content_encoding
- priority
- correlation_id
- reply_to:指令訊息失敗返回的佇列
- expiration:過期時間
- message_id:訊息ID
…
Producer:
public class Producer {
public static final String MQ_HOST = "192.168.222.101";
public static final String MQ_VHOST = "/";
public static final int MQ_PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立一個ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(MQ_HOST);//配置host
connectionFactory.setPort(MQ_PORT);//配置port
connectionFactory.setVirtualHost(MQ_VHOST);//配置vHost
//2. 通過連線工廠建立連線
Connection connection = connectionFactory.newConnection();
//3. 通過connection建立一個Channel
Channel channel = connection.createChannel();
Map<String,Object> headers = new HashMap<>();
headers.put("var1","abc");
headers.put("var2","sdd");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) //2:持久化投遞;1:非持久化(未消費的訊息重啟後就沒了)
.contentEncoding("UTF-8")
.expiration("5000")//5s
.headers(headers)
.build();
//4. 通過Channel傳送資料
for (int i = 0; i < 10; i++) {
String message = "Hello" + i;
//exchange為"",則通過routingKey取尋找佇列
channel.basicPublish("","testQueue",properties,message.getBytes());
}
//5. 關閉連線
channel.close();
connection.close();
}
}
consumer:
public class Consumer {
public static final String QUEUE_NAME = "testQueue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConn();
//3. 通過connection建立一個Channel
Channel channel = connection.createChannel();
//4. 宣告(建立)一個佇列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//5. 建立消費者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6. 設定Channel
channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
int num = 0;
//7. 獲取訊息
while (true) {
//nextDelivery 會阻塞直到有訊息過來
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("收到:" + message);
Map<String,Object > headers = delivery.getProperties().getHeaders();
System.out.println("headers get va1 :" + headers.get("var1"));
}
}
}
Virtual host - 虛擬主機
- 虛擬地址,用於進行邏輯隔離,最上層的訊息路由
- 一個Virtuall Host裡面可以有若干個Exchange和Queue
- 同一個Virtual Host裡面不能有相同名稱的Exchange或Queue