1. 程式人生 > 其它 >分散式中介軟體訊息佇列(3)---.Net Core 使用rabbitmq

分散式中介軟體訊息佇列(3)---.Net Core 使用rabbitmq

一、通過install-package rabbitmq.client命令或nuget安裝rabbitmq.client

二、rabbitmq操作

#region 1、生產者
                {
                    //1、建立rabbitmq連線
                    var rabbitmqFactory = new ConnectionFactory()
                    {
                        HostName = "localhost",//IP地址
                        Port = 5672
,// UserName = "admin",//使用者賬號 Password = "admin@123",//密碼 VirtualHost = "/" }; //建立連線物件 using (var connection = rabbitmqFactory.CreateConnection()) {
var channel = connection.CreateModel();//建立連線會話物件 string name = "product-create"; //2、宣告一個佇列 channel.QueueDeclare( queue: name,//訊息佇列名稱 durable: false,//是否持久化,true持久化,佇列會儲存磁碟,伺服器重啟時可以保證不丟失相關資訊。
exclusive: false,//是否排他,true排他的,如果一個佇列宣告為排他佇列,該佇列僅對首次宣告它的連線可見,並在連線斷開時自動刪除. autoDelete: false,//是否自動刪除。true是自動刪除。自動刪除的前提是:致少有一個消費者連線到這個佇列,之後所有與這個佇列連線的消費者都斷開時,才會自動刪除. arguments: null//設定佇列的一些其它引數 ); string productJson = JsonConvert.SerializeObject(productCreateDto); var body = Encoding.UTF8.GetBytes(productJson); //3、傳送訊息 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 設定訊息持久化(個性化控制) channel.BasicPublish(exchange: "", routingKey: name, basicProperties: properties, body: body); connection.Close(); channel.Close(); _logger.LogInformation("傳送訊息到rabbitMQ成功"); } } #endregion

2.2 消費者

 #region 1、工作佇列(單消費者)
            {

// 1、建立連線
var factory = new ConnectionFactory()
{
HostName = "localhost",//IP地址
Port = 5672,//埠
UserName = "admin",//使用者賬號
Password = "admin@123",//密碼
VirtualHost = "/"
};
var connection = factory.CreateConnection();

var channel = connection.CreateModel();

                // 2、定義佇列
                channel.QueueDeclare(queue: "product-create",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {

                    Console.WriteLine($"model:{model}");
                    var body = ea.Body;
                    // 1、邏輯程式碼,新增商品到資料庫
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    Console.WriteLine(" [x] 建立商品 {0}", message);
                };

                channel.BasicConsume(queue: "product-create",
                                     autoAck: true, // 訊息確認(防止訊息重新消費)
                                     consumer: consumer);
            }
            #endregion

三、防止訊息丟失和訊息重複消費

3.1 防止訊息丟失

使用訊息確認機制

設定訊息持久化

channel.QueueDeclare(
queue: name,//訊息佇列名稱
durable: true,//是否持久化,true持久化,佇列會儲存磁碟,伺服器重啟時可以保證不丟失相關資訊。
exclusive: false,//是否排他,true排他的,如果一個佇列宣告為排他佇列,該佇列僅對首次宣告它的連線可見,並在連線斷開時自動刪除.
autoDelete: false,//是否自動刪除。true是自動刪除。自動刪除的前提是:致少有一個消費者連線到這個佇列,之後所有與這個佇列連線的消費者都斷開時,才會自動刪除.
arguments: null//設定佇列的一些其它引數
);

3.2 防止訊息重複消費

autoAck設定為true,設定訊息消費應答機制,不會導致訊息堆積;
如果autoAck為false就會產生訊息堆積,導致重複消費
 channel.BasicConsume(queue: "product-create",
                                     autoAck: true, // 自動訊息確認
                                     consumer: consumer);*/