分散式中介軟體訊息佇列(3)---.Net Core 使用rabbitmq
一、通過install-package rabbitmq.client
命令或nuget安裝rabbitmq.client
包
二、rabbitmq操作
#region 1、生產者 { //1、建立rabbitmq連線 var rabbitmqFactory = new ConnectionFactory() { HostName = "localhost",//IP地址 Port = 5672,//埠 UserName = "admin",//使用者賬號 Password = "admin@123",//密碼 VirtualHost = "/" }; //建立連線物件 using (var connection = rabbitmqFactory.CreateConnection()) {var channel = connection.CreateModel();//建立連線會話物件 string name = "product-create"; //2、宣告一個佇列 channel.QueueDeclare( queue: name,//訊息佇列名稱 durable: false,//是否持久化,true持久化,佇列會儲存磁碟,伺服器重啟時可以保證不丟失相關資訊。exclusive: false,//是否排他,true排他的,如果一個佇列宣告為排他佇列,該佇列僅對首次宣告它的連線可見,並在連線斷開時自動刪除. autoDelete: false,//是否自動刪除。true是自動刪除。自動刪除的前提是:致少有一個消費者連線到這個佇列,之後所有與這個佇列連線的消費者都斷開時,才會自動刪除. arguments: null//設定佇列的一些其它引數 ); string productJson = JsonConvert.SerializeObject(productCreateDto); var body = Encoding.UTF8.GetBytes(productJson); //3、傳送訊息 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 設定訊息持久化(個性化控制) channel.BasicPublish(exchange: "", routingKey: name, basicProperties: properties, body: body); connection.Close(); channel.Close(); _logger.LogInformation("傳送訊息到rabbitMQ成功"); } } #endregion
2.2 消費者
#region 1、工作佇列(單消費者) {
// 1、建立連線
var factory = new ConnectionFactory()
{
HostName = "localhost",//IP地址
Port = 5672,//埠
UserName = "admin",//使用者賬號
Password = "admin@123",//密碼
VirtualHost = "/"
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel(); // 2、定義佇列 channel.QueueDeclare(queue: "product-create", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Console.WriteLine($"model:{model}"); var body = ea.Body; // 1、邏輯程式碼,新增商品到資料庫 var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine(" [x] 建立商品 {0}", message); }; channel.BasicConsume(queue: "product-create", autoAck: true, // 訊息確認(防止訊息重新消費) consumer: consumer); } #endregion
三、防止訊息丟失和訊息重複消費
3.1 防止訊息丟失
使用訊息確認機制
設定訊息持久化
channel.QueueDeclare(
queue: name,//訊息佇列名稱
durable: true,//是否持久化,true持久化,佇列會儲存磁碟,伺服器重啟時可以保證不丟失相關資訊。
exclusive: false,//是否排他,true排他的,如果一個佇列宣告為排他佇列,該佇列僅對首次宣告它的連線可見,並在連線斷開時自動刪除.
autoDelete: false,//是否自動刪除。true是自動刪除。自動刪除的前提是:致少有一個消費者連線到這個佇列,之後所有與這個佇列連線的消費者都斷開時,才會自動刪除.
arguments: null//設定佇列的一些其它引數
);
3.2 防止訊息重複消費
autoAck設定為true,設定訊息消費應答機制,不會導致訊息堆積;
如果autoAck為false就會產生訊息堆積,導致重複消費
channel.BasicConsume(queue: "product-create",
autoAck: true, // 自動訊息確認
consumer: consumer);*/