1. 程式人生 > >RabbitMQ (訊息佇列)專題學習04 Publish/Subscribe(釋出者/訂閱者)

RabbitMQ (訊息佇列)專題學習04 Publish/Subscribe(釋出者/訂閱者)

(使用Java客戶端)

一、概述

在前面的專題學習中,我們建立了一個工作佇列,在工作佇列中假如每個任務交給一個確定的工作者,不管是生產者還是消費者都必須知道一個指定的佇列名稱才能傳送和接收訊息,而RabbitMQ訊息模型的核心思想就是生產者不會將訊息直接傳送給佇列。

因為生產者通常不會知道訊息將會被哪些消費者接收,生產者的訊息雖然不是直接傳送給queue(佇列),但是訊息會交給exchange(交換機),所以需要定義exchange的訊息分發模式來實現訊息的分發,這便是這部分專題學習中我們將要學習的釋出者/訂閱者模式,這樣實現了訊息生產者和訊息消費者之間的解耦。

在前面的專題學習中實現簡單訊息傳遞和工作佇列中有如下一行程式碼:

  channel.basicPublish("", queueName, null, msg.getBytes());
在上述程式碼中第一個是空字串其實就是exchangeName,這裡用了空字串,就表示訊息會交給預設的exchange。

為了說明這種訊息分發模型,我們將構建一個簡單的日誌記錄系統,它包括兩個程式--第一個程式用來發送日誌訊息,第二個程式用來接收列印這些日誌訊息。

在日誌記錄系統執行的每個接收者都將接收到訊息,這樣我們可以執行一個接收者將訊息輸出到控制檯。

總的原則:傳送的日誌訊息將被廣播到所有的接收者。

二、日誌訊息系統的實現

2.1、exchange(交換機)

之前傳送和接收訊息都是通過一個佇列來實現,現在是時候介紹下一個完整的RabbitMQ的訊息傳遞模型了。

首先來對之前學習的訊息傳遞加深一下映象

>一個生產者是一個用於傳送訊息的應用

>一個佇列是儲存訊息的緩衝區

>一個消費者是一個接收訊息的應用。

在前面已經提到了RabbitMQ的核心思想是:生產者從來不需要直接傳送任何訊息到佇列中,實際上通常生產者甚至不知道訊息江北傳送到任何一個佇列中。

相反,生產者只能傳送訊息到一個交換元件中(exchange),exchange是一個很簡單的東西,一方面它接收來自生產者的訊息,另外一方面它將把來自生產者的訊息放入到佇列中,exchange必須知道怎麼接收一個訊息,而且接收的訊息應該被新增到一個指定的佇列?還是多個佇列中,或者接收的訊息被丟棄,這個規則被exchange所定義,它的結構如下:

圖-1

exchange有如下幾種定義型別:direct、topic、headers、fanout,每種型別都自己的實現方式和訊息分發機制,在此我們將重點放在最後一種型別:fanout,首先建立一個這種型別的交換。

channel.exchangeDeclare("logs", "fanout");

基於fanout的exchange是非常簡單的,正如它的名字一樣,我們能猜到它的具體實現,它不僅僅廣播各種來著生產者的訊息到它所知道的所有佇列中,這正是日誌記錄系統的所需要的。

2.2、交換列表(exchange list)

為了列出伺服器中所有的exchanges(交換機),我們通過執行rabbitmqctl來實現,在列出的列表中有一些amp.*changes和沒有定義名稱的exchange(預設),這些是被伺服器預設建立的,但是這些當我們需要使用的時候是不可用的。

在之前不知道關於exchange的任何東西,但是它仍然能夠傳送訊息到佇列,這可能因為是使用了預設的exchange,因為我們定義一個空的串("")。

之前釋出的訊息:

channel.basicPublish("", "hello", null, message.getBytes());
第一個引數就是exchange的名稱,空的字串表示預設或者是無名的exchange,訊息被路由到指定的routingKey名稱的隊名,加入它存在的話。

2.3、臨時佇列(Temporary queues)

在之前我們使用的佇列都是被定義過特殊的名稱(hello和task_queue),對於RabbitMQ來說命名一個佇列是至關重要的,當你想在生產者和消費者中分享佇列的時候,給一個佇列的名稱是必須的。

但是那些都不是日誌記錄系統所需要的,我們希望能夠獲得所有的日誌資訊,而不只是其中的一部分,而且我們只對當前正在傳遞的資訊感興趣,對舊的日誌資訊不感興趣,要解決這些問題,我們需要分兩個步驟:

首先當我們連結到RabbitMQ伺服器的時候,需要一個新的、空的佇列,為了做到這點,可以建立一個隨機名的佇列,或者更好的方法就是讓伺服器選擇一個隨機的佇列名。

其次,當斷開與佇列的連線時,消費者應該被自動刪除掉。

在Java客戶端,我們通過一個無引數的queueDeclare()方法為我們建立一個非持久的、唯一的、能自動刪除的佇列與佇列名稱

String queueName = channel.queueDeclare().getQueue();
在這一點上queueName包含一個隨機佇列名稱,比如它可能看起來像amq.gen-JzTY20BRgKO-HjmUJj0wLg.的隨機串

2.4、繫結(bindings)

圖-2

我們已經建立了一個fanout exchange和一個佇列,現在我們需要告訴exchange去傳送訊息到佇列中,exchange和佇列之間的關係被稱為一個繫結(binding)。

channel.queueBind(queueName, "logs", "");
從現在開始我們從logs exchange將被新增訊息到佇列中,使用rabbitmqctl list_bingdins能列出所有的繫結。

2.5、釋出者/訂閱者實現(putting it all together)

圖-3

生產者程式碼和之前的傳送訊息的程式碼並沒有太大的區別,最重要的變化是,我們現在要將釋出的訊息傳遞給logs exchange來代替無名的exchange(之前的是""),在傳送訊息時需要提供一個routingKey,它對於fanout exchange是非常重要的,不能被忽視的,這裡的EmitLog.java程式碼如下:

傳送

package com.xuz.ps;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog {
	private static final String EXCHANGE_NAME = "logs";
	
	public static void main(String[] args) throws IOException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		Connection conn = factory.newConnection();
		Channel channel = conn.createChannel();
		/**exchange型別
		 * direct(直接)、topic(主題)、headers(標題)和fanout
		 */
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		String message = getMessage(args);
		channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
		System.out.println("Sent["+message+"]");
		channel.close();
		conn.close();
	}
	private static String getMessage(String[] strings) {
		if(strings.length<1){
			return "info:Hello World!";
		}
		return joinStrings(strings,"");
	}
	private static String joinStrings(String[] strings, String string) {
		int len = strings.length;
		if(len == 0)return "";
		StringBuilder words = new StringBuilder(strings[0]);
		for (int i = 0; i < len; i++) {
			words.append(string).append(strings[i]);
		}
		return words.toString();
	}
}

接收
package com.xuz.ps;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class ReceiveLogs {
	private static final String EXCHANGE_NAME = "logs";
	
	public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		Connection conn = factory.newConnection();
		Channel channel = conn.createChannel();
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		//獲取佇列名稱
		String queueName = channel.queueDeclare().getQueue();
		//繫結佇列與exchange
		channel.queueBind(queueName, EXCHANGE_NAME, "");
		System.out.println("ReceiveLogs wait for message .TO exit press CTRL+C");
		
		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true,consumer);
		while(true){
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
		    String message = new String(delivery.getBody());
		    System.out.println("Received [" + message + "]");  
		}
	}
}

2.6、測試釋出者/訂閱者

操作步驟:

1、執行多個ReceiveLogs,分別記為01、02、03、04,首先執行前三個接收者,如下圖所示

     

-4

圖-5

圖-6

2、執行EmitLog.java,此時可以看到上述三個接收者都能接收訊息

圖-7

3、執行ReceiveLogs04,此時它沒有收到訊息。

圖-8

4、再次執行EmitLog.java,此時可以看到所有的接收者都接收到了訊息。

圖-9

說明exchange在接收到生產者的訊息後,會將訊息傳送到當前已經與它綁定了的所有的queue中,在接收者完訊息之後,RabbitMQ將佇列中的訊息移除。

原始碼下載

相關推薦

RabbitMQ 訊息佇列專題學習04 Publish/Subscribe(釋出者/訂閱)

(使用Java客戶端) 一、概述 在前面的專題學習中,我們建立了一個工作佇列,在工作佇列中假如每個任務交給一個確定的工作者,不管是生產者還是消費者都必須知道一個指定的佇列名稱才能傳送和接收訊息,而RabbitMQ訊息模型的核心思想就是生產者不會將訊息直接傳送給佇列。 因為生

RabbitMQ 訊息佇列專題學習03 Work Queues(工作佇列)

一、概述 工作佇列(Work queues) (使用Java客戶端) 在前面的專題學習中,我們使用Java語言實現了一個簡單的從名為"hello"的佇列中傳送和接收訊息的程式,在這部內容中我們將建立一個工作佇列被用來分配定時訊息任務,而且通過多個接收者(工作者)實現。 工作

RabbitMQ訊息佇列叢集配置與使用篇

介紹 MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而無需專用連線來連結它們。訊息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊,直接呼叫通常是用於諸如遠端過程呼叫的

【c#】佇列Queue和MSMQ訊息佇列的基礎使用

    首先我們知道佇列是先進先出的機制,所以在處理併發是個不錯的選擇。然後就寫兩個佇列的簡單應用。 Queue 名稱空間     名稱空間:System.Collections,不在這裡做過多的理論解釋,這個東西非常的好理解。     可以看下官方文件:https://docs.microsof

MQ訊息佇列簡介

一:概念 MQ(message queue):MQ是一種應用程式對應用程式的通訊方法。應用程式通過寫和檢索出入列隊的針對應用程式的資料(訊息)來通訊,而無需專用連線來連結它們。 訊息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊

SQL Server Service Broker建立單個數據庫會話訊息佇列

概述   SQL Server Service Broker 用來建立用於交換訊息的會話。訊息在目標和發起方這兩個端點之間進行交換。訊息用於傳輸資料和觸發訊息收到時的處理過程。目標和發起方既可以在同一資料庫引擎例項的同一資料庫或不同資料庫中,也可以在不同資料庫引擎例項的同一資料庫或不同資料庫中。

Linux c 基於記憶體的程序通訊—共享記憶體、共享佇列訊息佇列

基於記憶體的程序通訊: 1.      核心共享記憶體 程式設計模型:  1.1.建立共享記憶體,得到一個ID  shmget 1.2.把ID影射成虛擬地址(掛載)  shmat        1.3.使用虛擬地址訪問核心共享記憶體使用任何記憶體函式與運算子號       

Linux的程序程式設計-之二-程序間通訊訊息佇列

1.1         系統V訊息佇列 訊息佇列中的每個訊息都有如下的資料結構: struct msgbuf { long mtype;         // 訊息型別 char mtext[n];      // 訊息內容,n由使用者自己定義 }; 1.1.1      

IPC程序間通訊訊息佇列

基本概念訊息佇列提供了一個從一個程序向另外一個程序傳送一塊資料的方法。每個資料塊都被認為是有一個型別,接收者程序接收的資料塊可以有不同的型別值。訊息佇列與管道同樣有缺陷,就是每個訊息的最大長度是有上限的(MSGMAX),每個訊息佇列的總的位元組數是有上限的(MSGMNB),系

rabbitmq 和celery 分散式訊息佇列

一、 安裝RabbitMQ: 1、RabbitMQ  (MAC )(訊息佇列工具,在celery中扮演broker的角色,broker是訊息代理,或者叫做訊息中介軟體) (1)使用brew來安裝     brew install rabbitmq或者官網下載: http:/

python使用訊息佇列RabbitMq進階

import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #宣

uC/OS 的訊息佇列——uC/OS學習筆記

1.訊息佇列的介紹     使用訊息佇列可在任務之間傳遞多條訊息。訊息佇列相當於訊息郵箱集合,一個郵箱只能在任務間傳遞一條訊息,而訊息佇列則可以傳遞多條訊息。訊息佇列由三部分組成:事件控制塊、訊息佇列和訊息。 2.訊息佇列的操作    a)建立訊息佇列:QSQCreate

Ubuntu16.04安裝RabbitMQ快速安裝

密碼 ESS 快速 實現 狀況 key 安裝完成 分發 需要 RabbitMQ:是程序與程序之間的通訊中轉站,通過消息讀寫,實現程序之間通訊,經典實用場景-生產消費者模式(異步分發消費者執行,保障消息正確) Ubuntu16.04安裝rabbitmq:1.為了避免簽名錯誤,

RabbitMQ訊息中介軟體在工作中的應用場景

  RabbitMQ(訊息中介軟體)在工作中的應用場景  1、跨系統的非同步通訊,所有需要非同步互動的地方都可以使用訊息佇列。就像我們除了打電話(同步)以外,還需要發簡訊,發電子郵件(非同步)的通訊方式。   2、多個應用之間的耦合,由於訊息是平臺無關和語言

rabbitmq實現延時佇列死信佇列

基於佇列和基於訊息的TTL TTL是time to live 的簡稱,顧名思義指的是訊息的存活時間。rabbitMq可以從兩種維度設定訊息過期時間,分別是佇列和訊息本身。 佇列訊息過期時間-Per-Queue Message TTL: 通過設定佇列的x-message-ttl引數來設定指定佇列上訊息的存活時

RabbitMQ訊息中介軟體的應用場景

   1、跨系統的非同步通訊,所有需要非同步互動的地方都可以使用訊息佇列。就像我們除了打電話(同步)以外,還需要發簡訊,發電子郵件(非同步)的通訊方式。   2、多個應用之間的耦合,由於訊息是平臺無關和語言無關的,而且語義上也不再是函式呼叫,因此更適合作為多個應用之

雙系統Win10+Ubuntu18.04 +單機械硬碟+Tensorflow深度學習環境

【深度學習環境】安裝Win10 + Ubuntu18.04 雙系統過程 【時間:2018.10.4】 硬體配置 主機板:華碩 Z370-A 顯示卡:1070Ti-8G 磁碟:單機械一盤(1個) PART-1 雙系統安裝 雙系統安裝:Win10+Ubun

boost程序間通訊常用開發一篇全訊息佇列,共享記憶體,訊號

本文概要:         敏捷開發大家想必知道而且評價甚高,縮短開發週期,提高開發質量。將大工程獨立為不同的小app開發,整個開發過程,程式可用可測,所以提高了整體的質量。基於這種開發模式和開發理念,程序間通訊必然是童鞋們必掌握技能之一了,而boost庫是眾多庫中平臺支援

用PHP嘗試RabbitMQamqp擴充套件實現訊息的傳送和接收

j教程: http://my.oschina.net/yuansir/blog/135226 消費者:接收訊息 邏輯: 建立連線-->建立channel-->建立交換機-->建立佇列-->繫結交換機/佇列/路由鍵-->接收訊息 <?

JavaScript執行機制堆、棧、訊息佇列

棧 JavaScript是單執行緒語言,主執行緒執行同步程式碼。 函式呼叫時, 便會在記憶體形成了一個“呼叫記錄”, 又稱“呼叫幀”, 儲存呼叫位置和內部變數等資訊。 如果函式內部還呼叫了其他函式,那麼在呼叫記錄上方又會形成一個呼叫記錄, 所有的呼叫