1. 程式人生 > WINDOWS開發 >C#佇列學習筆記:RabbitMQ延遲佇列

C#佇列學習筆記:RabbitMQ延遲佇列

一、引言

日常生活中,很多的APP都有延遲佇列的影子。比如在手機淘寶上,經常遇到APP派發的限時消費紅包,一般有幾個小時或24小時不等。假如在紅包倒計時的過程中,沒有消費掉紅包的話,紅包會自動失效。假如上述行為使用RabbitMQ延時佇列來理解的話,就是在你收到限時消費紅包的時候,手機淘寶會自動發一條延時訊息到佇列中以供消費。在規定時間內,則可正常消費,否則依TTL自動失效。

在RabbitMQ中,有兩種方式來實現延時佇列:一種是基於佇列方式,另外一種是基於訊息方式。

技術分享圖片

二、示例

2.1、傳送端(生產端)

新建一個控制檯專案Send,並新增一個類RabbitMQConfig。

技術分享圖片
    class RabbitMQConfig
    {
        public static string Host { get; set; }

        public static string VirtualHost { get; set; }

        public static string UserName { get; set; }

        public static string Password { get; set; }

        public static int Port { get; set; }

        static
RabbitMQConfig() { Host = "192.168.2.242"; VirtualHost = "/"; UserName = "hello"; Password = "world"; Port = 5672; } }
RabbitMQConfig.cs 技術分享圖片
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine(
"C# RabbitMQ實現延遲佇列有以下兩種方式:"); Console.WriteLine("1、基於佇列方式實現延遲佇列,請按1開始生產。"); Console.WriteLine("2、基於訊息方式實現延遲佇列,請按2開始生產。"); string chooseChar = Console.ReadLine(); if (chooseChar == "1") { DelayMessagePublishByQueueExpires(); } else if (chooseChar == "2") { DelayMessagePublishByMessageTTL(); } Console.ReadLine(); } /// <summary> /// 基於佇列方式實現延遲佇列 /// 將佇列中所有訊息的TTL(Time To Live,即過期時間)設定為一樣 /// </summary> private static void DelayMessagePublishByQueueExpires() { const string MessagePrefix = "message_"; const int PublishMessageCount = 6; const int QuequeExpirySeconds = 1000 * 30; const int MessageExpirySeconds = 1000 * 10; var factory = new ConnectionFactory() { HostName = RabbitMQConfig.Host,Port = RabbitMQConfig.Port,VirtualHost = RabbitMQConfig.VirtualHost,UserName = RabbitMQConfig.UserName,Password = RabbitMQConfig.Password,Protocol = Protocols.DefaultProtocol }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //當同時指定了queue和message的TTL值,則兩者中較小的那個才會起作用。 Dictionary<string,object> dict = new Dictionary<string,object> { { "x-expires",QuequeExpirySeconds },//佇列過期時間 { "x-message-ttl",MessageExpirySeconds },//訊息過期時間 { "x-dead-letter-exchange","dead exchange 1" },//過期訊息轉向路由 { "x-dead-letter-routing-key","dead routing key 1" }//過期訊息轉向路由的routing key }; //宣告佇列 channel.QueueDeclare(queue: "delay1",durable: true,exclusive: false,autoDelete: false,arguments: dict); //向該訊息佇列傳送訊息message for (int i = 0; i < PublishMessageCount; i++) { var message = MessagePrefix + i.ToString(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "",routingKey: "delay1",basicProperties: null,body: body); Thread.Sleep(1000 * 2); Console.WriteLine($"{DateTime.Now.ToString()} Send {message} MessageExpirySeconds {MessageExpirySeconds / 1000}"); } } } } /// <summary> /// 基於訊息方式實現延遲佇列 /// 對佇列中訊息進行單獨設定,每條訊息的TTL可以不同。 /// </summary> private static void DelayMessagePublishByMessageTTL() { const string MessagePrefix = "message_"; const int PublishMessageCount = 6; int MessageExpirySeconds = 0; var factory = new ConnectionFactory() { HostName = RabbitMQConfig.Host,Protocol = Protocols.DefaultProtocol }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { Dictionary<string,object> { { "x-dead-letter-exchange","dead exchange 2" },"dead routing key 2" }//過期訊息轉向路由的routing key }; //宣告佇列 channel.QueueDeclare(queue: "delay2",arguments: dict); //向該訊息佇列傳送訊息message Random random = new Random(); for (int i = 0; i < PublishMessageCount; i++) { MessageExpirySeconds = i * 1000; var properties = channel.CreateBasicProperties(); properties.Expiration = MessageExpirySeconds.ToString(); var message = MessagePrefix + i.ToString(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "",routingKey: "delay2",basicProperties: properties,body: body); Console.WriteLine($"{DateTime.Now.ToString()} Send {message} MessageExpirySeconds {MessageExpirySeconds / 1000}"); } } } } }
Program.cs

2.2、接收端(消費端)

新建一個控制檯專案Receive,按住Alt鍵,將傳送端RabbitMQConfig類拖一個快捷方式到Receive專案中。

技術分享圖片
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("C# RabbitMQ實現延遲佇列有以下兩種方式:");
            Console.WriteLine("1、基於佇列方式實現延遲佇列,請按1開始消費。");
            Console.WriteLine("2、基於訊息方式實現延遲佇列,請按2開始消費。");

            string chooseChar = Console.ReadLine();
            if (chooseChar == "1")
            {
                DelayMessageConsumeByQueueExpires();
            }
            else if (chooseChar == "2")
            {
                DelayMessageConsumeByMessageTTL();
            }
            Console.ReadLine();
        }

        public static void DelayMessageConsumeByQueueExpires()
        {
            var factory = new ConnectionFactory()
            {
                HostName = RabbitMQConfig.Host,Protocol = Protocols.DefaultProtocol
            };

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "dead exchange 1",type: "direct");
                    string name = channel.QueueDeclare().QueueName;
                    channel.QueueBind(queue: name,exchange: "dead exchange 1",routingKey: "dead routing key 1");

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model,ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine($"{DateTime.Now.ToString()} Received {message}");
                    };
                    channel.BasicConsume(queue: name,noAck: true,consumer: consumer);
                    Console.ReadKey();
                }
            }
        }

        public static void DelayMessageConsumeByMessageTTL()
        {
            var factory = new ConnectionFactory()
            {
                HostName = RabbitMQConfig.Host,Protocol = Protocols.DefaultProtocol
            };

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "dead exchange 2",exchange: "dead exchange 2",routingKey: "dead routing key 2");

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model,consumer: consumer);
                    Console.ReadKey();
                }
            }
        }
    }
Program.cs

2.3、執行結果

技術分享圖片

技術分享圖片

-----------------------------------------------------------------------------------------------------------

技術分享圖片

技術分享圖片