1. 程式人生 > 其它 >《RabbitMQ系列教程-第四章-03-RabbitMQ工作模式之Pub/Sub釋出訂閱模式》

《RabbitMQ系列教程-第四章-03-RabbitMQ工作模式之Pub/Sub釋出訂閱模式》

技術標籤:# 《RabbitMQ系列教程》交換機佇列rabbitmqjavaqueue

RabbitMQ工作模式之Pub/Sub釋出訂閱模式

4.3.1 簡介

在釋出訂閱模式中,Producer傳送訊息到指定的交換機(Exchange)中,由Exchange繫結不同的Queues,消費者依舊監聽這些佇列進行消費(work模式使用的是預設的交換機,空字串""

在這裡插入圖片描述

tips:交換機(Exchange)只負責將訊息轉發到繫結的佇列(Queues)中,不會儲存訊息,如果Exchange沒有繫結Queues或者Exchange不能將訊息路由到指定的Queues中時,此訊息將會丟失

一個Exchange可以有directtopicheadersfanout四種類型,不同型別的Exchange具備不同的訊息路由功能,Pub/Sub模式則重點強調的是fanout型別的Exchange交換模式,Pub/Sub模式也叫分列模式;

Pub/Sub模式官網介紹:https://www.rabbitmq.com/tutorials/tutorial-four-java.html

4.3.2 生產者

package com.lscl.rabbitmq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.
client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer03_PubSub { public static void main(String[] args) throws Exception { // 建立連線工廠,用於獲取頻道channel ConnectionFactory factory = new ConnectionFactory(); factory.
setHost("192.168.40.132"); factory.setPort(5672); factory.setUsername("lscl"); factory.setPassword("admin"); factory.setVirtualHost("/lscl"); // 2.建立連線 Connection connection = factory.newConnection(); // 3.建立頻道 Channel channel = connection.createChannel(); String exchangeName = "test_fanout"; //5. 建立交換機 /* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) 1. exchange:交換機名稱 2. type:交換機型別 DIRECT("direct"),:定向 FANOUT("fanout"),:扇形(廣播),傳送訊息到每一個與之繫結佇列。 TOPIC("topic"),萬用字元的方式 HEADERS("headers");引數匹配 3. durable:是否持久化 4. autoDelete:自動刪除 5. internal:內部使用。 一般false 6. arguments:引數 */ channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null); //6. 建立佇列 String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); //7. 繫結佇列和交換機 /* queueBind(String queue, String exchange, String routingKey) 引數: 1. queue:佇列名稱 2. exchange:交換機名稱 3. routingKey:路由鍵,繫結規則 如果交換機的型別為fanout ,routingKey設定為"" */ channel.queueBind(queue1Name,exchangeName,""); channel.queueBind(queue2Name,exchangeName,""); String body = "呼叫各位"; //8. 傳送訊息 channel.basicPublish(exchangeName,"",null,body.getBytes()); //9. 釋放資源 channel.close(); connection.close(); } }

執行完畢之後,檢視RabbitMQ UI面板,發現多了一個Exchange(test_fanout)和兩個Queues(test_fanout_queue1test_fanout_queue2

4.3.3 消費者1

package com.lscl.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer03_PubSub {

    public static void main(String[] args) throws Exception {
        // 建立連線工廠,用於獲取頻道channel
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("192.168.40.132");
        factory.setPort(5672);
        factory.setUsername("lscl");
        factory.setPassword("admin");
        factory.setVirtualHost("/lscl");

        // 2.建立連線
        Connection connection = factory.newConnection();

        // 3.建立頻道
        Channel channel = connection.createChannel();

        String queueName = "test_fanout_queue1";
        channel.queueDeclare(queueName, true, false, false, null);

        // 接收訊息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer-1 body:" + new String(body));
            }
        };
        channel.basicConsume(queueName, true, consumer);
        
        // 不釋放資源,讓rabbitmq一直監聽
    }
}

4.3.4 消費者2

package com.lscl.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer04_PubSub {

    public static void main(String[] args) throws Exception {
        // 建立連線工廠,用於獲取頻道channel
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("192.168.40.130");
        factory.setPort(5672);
        factory.setUsername("lscl");
        factory.setPassword("admin");
        factory.setVirtualHost("/lscl");

        // 2.建立連線
        Connection connection = factory.newConnection();

        // 3.建立頻道
        Channel channel = connection.createChannel();

        String queueName = "test_fanout_queue2";
        channel.queueDeclare(queueName, true, false, false, null);

        // 4.接收訊息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer-2 body:" + new String(body));
            }
        };
        channel.basicConsume(queueName, true, consumer);
        
        // 不釋放資源,讓rabbitmq一直監聽
    }
}

4.3.5 Pub/Sub模式小結

Pub/Sub模式下(Exchange型別為Fanout,也叫分列模式),訊息首先被髮送到Exchange,由Exchange路由到繫結的Queue中,類似於我們微信群,有什麼事在群裡面傳送一下,群裡面的人都能看得到;這樣就不需要每個人單獨傳送訊息了;

需要注意的兩點:


1、work、simple也會有交換機,他們使用的是預設的交換機


2、Exchange還可以繫結到另一個Exchange上

Pub/Sub模式重點強調的是Fanout型別的Exchange交換模式,很多人也把Pub/Sub模式稱為分列模式或者Fanout模式;


下一篇:《RabbitMQ系列教程-第四章-04-RabbitMQ工作模式之Routing路由模式》