1. 程式人生 > 實用技巧 >微服務下,使用 ELK 進行日誌採集以及統一處理

微服務下,使用 ELK 進行日誌採集以及統一處理

技術標籤:javarabbitmq中介軟體

RabbitMQ:

什麼是MQ(訊息佇列):

​ 訊息的傳送者和接收者不需要同時與訊息佇列互交。訊息會儲存在佇列中,直到接收者取回它。

​ 三個特點:非同步,解耦,流量削峰。

實現:

​ 訊息佇列常常儲存在連結串列結構中。擁有許可權的程序可以向訊息佇列中寫入或讀取訊息。

當前使用較多的訊息佇列有 RabbitMQ(資料不會丟失,常用) 、 RocketMQ (有些不開源)、 ActiveMQ(併發量太小) 、 Kafka(訊息丟失) 、 ZeroMQ 、 MetaMq等,而部分資料庫如 Redis 、 Mysql 以及 phxsql 也可實現訊息佇列的功能。

2.2. 特點

​ MQ是消費者-生產者模型的一個典型的代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以讀取或者訂閱佇列中的訊息。MQ和JMS類似,但不同的是JMS是SUN JAVA訊息中介軟體服務的一個標準和API定義,而MQ則是遵循了AMQP協議的具體實現和產品。

注意:

  1. AMQP ,即Advanced Message Queuing Protocol,一個提供統一訊息服務的應用層標準高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。基於此協議的客戶端與訊息中介軟體可傳遞訊息,並不受客戶端/中介軟體不同產品,不同的開發語言等條件的限制。

  2. JMS ,Java訊息服務(Java Message Service)應用程式介面,是一個Java平臺中關於面向訊息中介軟體的API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。 Java訊息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支援。常見的訊息佇列,大部分都實現了JMS API,如 ActiveMQ , Redis 以及 RabbitMQ 等。

2.3. 優缺點

優點

解耦、非同步處理、流量削鋒

缺點

​ 系統可用性降低、系統複雜性增加

2.4. 使用場景

​ 訊息佇列,是分散式系統中重要的元件,其通用的使用場景可以簡單地描述為:當不需要立即獲得結果,但是併發量又需要進行控制的時候,差不多就是需要使用訊息佇列的時候。

​ 在專案中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量。

2.5. 為什麼使用RabbitMQ

​ AMQP,即Advanced Message Queuing Protocol,高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。訊息中介軟體主要用於元件之間的解耦,訊息的傳送者無需知道訊息使用者的存在,反之亦然。

​ AMQP的主要特徵是面向訊息、佇列、路由(包括點對點和釋出/訂閱)、可靠性、安全。

​ RabbitMQ是一個開源的AMQP實現,伺服器端用Erlang語言編寫,支援多種客戶端,如: Python 、 Ruby 、 .NET 、 Java 、 JMS 、 C 、 PHP 、 ActionScript 、 XMPP 、 STOMP 等,支援AJAX。用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。

總結如下:

​ 基於AMQP協議

​ 高併發(是一個容量的概念,伺服器可以接受的最大任務數量)

​ 高效能(是一個速度的概念,單位時間內伺服器可以處理的任務數)

​ 高可用(是一個持久的概念,單位時間內伺服器可以正常工作的時間比例)

​ 強大的社群支援,以及很多公司都在使用

​ 支援外掛

​ 支援多語言

4.4. ConnectionFactory**、Connection、**Channel

​ ConnectionFactory 、 Connection 、 Channel 都是RabbitMQ對外提供的API中最基本的物件。

​ Connection 是RabbitMQ的 socket 連線,它封裝了 socket 協議相關部分邏輯。

​ ConnectionFactory 為Connection的製造工廠。

​ Channel 是我們與RabbitMQ打交道的最重要的一個介面,我們大部分的業務操作是在Channel這個介面中完成的,包括定義 Queue 、定義 Exchange 、繫結 Queue 與 Exchange 、釋出訊息等。

5. 簡單模式佇列

https://www.rabbitmq.com/tutorials/tutorial-one-java.html

1.“Hello,World!”:

/**
 * 入門案例-生產者
 */
public class Send {
	//定義佇列名稱
	private final static String QUEUE_NAME = "hello";

	public static void main(String[] argv) throws Exception {
		//建立連線工廠
		ConnectionFactory factory = new ConnectionFactory();
		//工廠設定
		factory.setHost("192.168.10.100");
		factory.setVirtualHost("/shop");
		factory.setUsername("shop");
		factory.setPassword("shop");
		//根據連線工廠建立連線
		try (Connection connection = factory.newConnection();
		     //根據連線建立通道
		     Channel channel = connection.createChannel()) {
			/**
			 * 宣告佇列
			 *  第一個引數queue:佇列名稱
			 *  第二個引數durable:是否持久化
			 *  第三個引數Exclusive:排他佇列,如果一個佇列被宣告為排他佇列,該佇列僅對首次宣告它的連線可見,並在連線斷開時自動刪除。
			 *      這裡需要注意三點:
			 *          1. 排他佇列是基於連線可見的,同一連線的不同通道是可以同時訪問同一個連線建立的排他佇列的。
			 *          2. "首次",如果一個連線已經聲明瞭一個排他佇列,其他連線是不允許建立同名的排他佇列的,這個與普通佇列不同。
			 *          3. 即使該佇列是持久化的,一旦連線關閉或者客戶端退出,該排他佇列都會被自動刪除的。
			 *          這種佇列適用於只限於一個客戶端傳送讀取訊息的應用場景。
			 *  第四個引數Auto-delete:自動刪除,如果該佇列沒有任何訂閱的消費者的話,該佇列會被自動刪除。
			 *                          這種佇列適用於臨時佇列。
			 */
			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
			String message = "Hello World!";
			//傳送訊息
			channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
			System.out.println(" [x] Sent '" + message + "'");
		}
	}
}
/**
 * 入門案例-消費者
 */
public class Recv {
	//定義佇列名稱
	private final static String QUEUE_NAME = "hello";

	public static void main(String[] argv) throws Exception {
		//建立連線工廠
		ConnectionFactory factory = new ConnectionFactory();
		//工廠設定
		factory.setHost("192.168.10.100");
		factory.setVirtualHost("/shop");
		factory.setUsername("shop");
		factory.setPassword("shop");
		//根據工廠建立連線
		Connection connection = factory.newConnection();
		//根據連線建立通道
		Channel channel = connection.createChannel();
		//繫結佇列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
			String message = new String(delivery.getBody(), "UTF-8");
			System.out.println(" [x] Received '" + message + "'");
		};
		/**
		 * 消費佇列
		 * 1.佇列名稱
		 * 2.自動確認
		 * 3.收到的訊息的實體類
		 */
		channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
		});
	}
}

問題:如果任務量很大,訊息得不到及時的消費會造成佇列積壓,問題非常嚴重,比如記憶體溢位,訊息丟失等。

解決:配置多個消費者消費訊息。

總結:簡單佇列-處理訊息效率不高,吞吐量較低,不適合生成環境

6. Work queues-工作模式佇列

6.1. 工作模式佇列-訊息輪詢分發(Round-robin)

問題:任務量很大,訊息雖然得到了及時的消費,單位時間內訊息處理速度加快,提高了吞吐量,可

是不同消費者處理訊息的時間不同,導致部分消費者的資源被浪費。

解決:採用訊息公平分發。

總結:工作佇列-訊息輪詢分發-消費者收到的訊息數量平均分配,單位時間內訊息處理速度加快,提高了吞吐量。

6.2. 工作模式佇列**-**訊息公平分發(fair dispatch)

消費者:

​ //限制每次只發送一條,消費者處理完在傳送下一條
​ int prefetchCount = 1;
​ channel.basicQos(prefetchCount);

/**
* 手動確認
* 1. 訊息實體裡的唯一標識
* 2. 是否多條確認
*/
			channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
/**
* 消費佇列
* 1.佇列名稱
* 2.自動確認
* 3.收到的訊息的實體類
*/
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});

總結:工作佇列-公平輪詢分發-根據不同消費者機器硬體配置,訊息處理速度不同,收到的訊息數量也不同,通常速度快的處理的訊息數量比較多,最大化使用計算機資源。適用於生成環境。

7. Publish/Subscribe-訊息的釋出與訂閱模式佇列

queueName為唯一標識

在這裡插入圖片描述

fanout型別

廣播形式

問題:生產者產生的訊息所有消費者都可以消費,可不可以指定某些消費者消費呢?

解決:採用direct路由模式

8. Routing-路由模式佇列

direct型別

問題:生產者產生的訊息如果場景需求過多需要設定很多路由規則,可不可以減少?

解決:採用topic主題模式。

9. Topics-主題模式佇列(常用)

在這裡插入圖片描述

topic型別

問題:RabbitMQ本身是基於非同步的訊息處理,是否可以同步實現?

解決:採用RPC模式。

10. RPC-遠端過程呼叫模式佇列

同步的

11. RabbitMQ訊息的事務機制

在使用RabbitMQ的時候,我們可以通過訊息持久化操作來解決因為伺服器的異常奔潰導致的訊息丟失,除此之外我們還會遇到一個問題,當訊息的釋出者在將訊息傳送出去之後,訊息到底有沒有正確到達broker代理伺服器呢?如果不進行特殊配置的話,預設情況下發布操作是不會返回任何資訊給生產者的,也就是預設情況下我們的生產者是不知道訊息有沒有正確到達broker的,如果在訊息到達broker之前已經丟失的話,持久化操作也解決不了這個問題,因為訊息根本就沒到達代理伺服器,你怎麼進行持久化,那麼這個問題該怎麼解決呢?

RabbitMQ為我們提供了兩種方式:

​ 通過AMQP事務機制實現,這也是AMQP協議層面提供的解決方案;

​ 通過將channel設定成confirm模式來實現;

11.1. AMQP事物機制控制

​ RabbitMQ中與事務機制有關的方法有三個: txSelect() , txCommit() 以及 txRollback(), txSelect() 用於將當前channel設定成transaction模式, txCommit() 用於提交事務,txRollback() 用於回滾事務,在通過 txSelect() 開啟事務之後,我們便可以釋出訊息給broker代理伺服器了,如果 txCommit() 提交成功了,則訊息一定到達了broker了,如果在 txCommit() 執行之前broker異常崩潰或者由於其他原因丟擲異常,這個時候我們便可以捕獲異常通過 txRollback() 回滾事務。

​ 事務確實能夠解決producer與broker之間訊息確認的問題,只有訊息成功被broker接受,事務提交才能成功,否則我們便可以在捕獲異常進行事務回滾操作同時進行訊息重發,但是使用事務機制的話會降低RabbitMQ的效能,那麼有沒有更好的方法既能保障producer知道訊息已經正確送到,又能基本上不帶來效能上的損失呢?從AMQP協議的層面看是沒有更好的方法,但是RabbitMQ提供了一個更好的方案,即將channel通道設定成confirm模式。

12. confirm確認模式

通過AMQP協議層面為我們提供了事務機制解決了這個問題,但是採用事務機制實現會降低RabbitMQ的訊息吞吐量,此時處理AMQP協議層面能夠實現訊息事物控制外,我們還有第二種方式即:Confirm模式。

12.3. 同步Confirm

12.4. 非同步confirm

非同步confirm模式的程式設計實現最複雜,Channel物件提供的 ConfirmListener() 回撥方法只包含deliveryTag (當前Chanel發出的訊息序號),我們需要自己為每一個Channel維護一個 unconfirm的訊息序號集合,每publish一條資料,集合中元素加1,每回調一次 handleAck 方法, unconfirm 集合刪掉相應的一條 (multiple=false) 或多條 (multiple=true) 記錄。從程式執行效率上看,這個 unconfirm 集合最好採用有序集合SortedSet儲存結構。實際上, waitForConfirms() 方法也是通過SortedSet維護訊息序號的。

​ 非同步模式的優點就是執行效率高,不需要等待訊息執行完,只需要監聽訊息即可。

13. Spring整合RabbitMQ

官網:https://spring.io/projects/spring-amqp

13.1. 為什麼使用spring AMQP?

​ 基於Spring之上社群活躍

​ 對AMQP協議進行了高度的封裝

​ 極大的簡化了RabbitMQ的操作

​ 易用性、可擴充套件

13.7. 總結

當然這是官網最簡單的例子,以後如果專案是基於配置來做的話要掌握以下:

  1. pom中引用jar

  2. 先配置rabbitmq的配置

    1. 先配置ConnectionFactory

    2. 配置RabbitAmdmin

  3. 配置RabbitTemplate這裡通常在配置一個Message Convert使用JSON進行資料格式的傳輸

  4. 配置Exchange

  5. 配置Queue

  6. 配置一個訊息處理的bean或者通過Spring掃描,這個Bean最後繼承MessageListener 來處理JSON數 據

​ 極大的簡化了RabbitMQ的操作

​ 易用性、可擴充套件

13.7. 總結

當然這是官網最簡單的例子,以後如果專案是基於配置來做的話要掌握以下:

  1. pom中引用jar

  2. 先配置rabbitmq的配置

    1. 先配置ConnectionFactory

    2. 配置RabbitAmdmin

  3. 配置RabbitTemplate這裡通常在配置一個Message Convert使用JSON進行資料格式的傳輸

  4. 配置Exchange

  5. 配置Queue

  6. 配置一個訊息處理的bean或者通過Spring掃描,這個Bean最後繼承MessageListener 來處理JSON數 據

  7. 配置Listener Container