1. 程式人生 > 其它 >.Net Core 程式碼中使用Azure Service Bus 去實現訊息的傳送和消費

.Net Core 程式碼中使用Azure Service Bus 去實現訊息的傳送和消費

Service Bus

新增Azure.Messaging.SerivceBus的包

 PM> Install-Package Azure.Messaging.ServiceBus -Version 7.2.1

在appsetting.json中新增AzureServiceBus的配置項

 "ConnectionStrings": {
    "AzureServiceBus": "Endpoint=sb://sb-sne2-its-das-acrp-dev.servicebus.chinacloudapi.cn/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=riczsDd4FmMul/6O2WesWTYrJ9gRD6uwOXJHOtV5Ulg=" 
  },

建立一個client的類,我這邊命名為QueueService.cs (位於~/App_Utilities/ServiceBus資料夾下)

using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using Microsoft.Extensions.Configuration;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;

namespace Dtt.DDP.App_Utilities.ServiceBus
{
    public class QueueService : IQueueService
    {
        private readonly IConfiguration config;

        public QueueService(IConfiguration config)
        {
            this.config = config;
        }
        public async Task SendMessagesAsync<T>(T serviceBusMessage, string queueName)
        {

            try
            {
                ServiceBusClient client = new ServiceBusClient(config.GetConnectionString("AzureServiceBus"));
                var options = new CreateQueueOptions(queueName)
                {
                    EnableBatchedOperations = false
                };
                var administrationClient = new ServiceBusAdministrationClient(config.GetConnectionString("AzureServiceBus"));
                if (!(await administrationClient.QueueExistsAsync(queueName)))
                {
                    await administrationClient.CreateQueueAsync(options);
                }

                ServiceBusSender sender = client.CreateSender(queueName);
                var messageBody = JsonSerializer.Serialize(serviceBusMessage);
                // create a message that we can send
                ServiceBusMessage message = new ServiceBusMessage(Encoding.UTF8.GetBytes(messageBody));
                await sender.SendMessageAsync(message);

            }
            catch (System.Exception ex)
            {

                throw ex;
            }



            // create a sender for the queue 


        }
    }
}

ctrl + enterQueueService生成一個介面IQueueService

Startup.cs檔案ConfigureServices方法中配置服務注入

    services.AddSingleton<IQueueService, QueueService>();

新增一個消費者的基類Queuelistener(位於~/App_Utilities/ServiceBus資料夾下),需要實現IHostedService的介面

using Microsoft.Extensions.Hosting;
using System;
using System.Threading;
using Microsoft.Extensions.Configuration;
using System.Threading.Tasks;
using log4net;
using Azure.Messaging.ServiceBus;

namespace Dtt.DDP.App_Utilities.ServiceBus
{
    public class QueueListener : IHostedService
    {
        private static ILog Logger;
        public string queueName;
        protected ServiceBusClient serviceBusClient;
        protected ServiceBusProcessor processor;
        public QueueListener(IConfiguration configuration)
        {
            Logger = LogManager.GetLogger(typeof(QueueListener));
            try
            {
                serviceBusClient = new ServiceBusClient(configuration.GetConnectionString("AzureServiceBus"));
                
            }
            catch (Exception ex)
            {
                Logger.Error($"Service Bus 初始化出錯, error info : [ {ex.Message} ] ", ex);
            }

        }
        public virtual Task ProcessMessagesAsync(ProcessMessageEventArgs arg1)
        {
            throw new NotImplementedException();
        }

        // handle any errors when receiving messages
        static Task ErrorHandler(ProcessErrorEventArgs args)
        {
            return Task.CompletedTask;
        }
        public async Task Register()
        {
            try
            {
                processor = serviceBusClient.CreateProcessor(queueName, new ServiceBusProcessorOptions());
                processor.ProcessMessageAsync += ProcessMessagesAsync;
                processor.ProcessErrorAsync += ErrorHandler;
                await processor.StartProcessingAsync();
            }
            catch (Exception ex)
            {
                Logger.Error($"Service Bus 註冊消費者監聽出錯!Error: [ {ex.Message} ]", ex);
            }

        }

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            await Register();
            //Task.CompletedTask;
        }

        public async Task StopAsync(CancellationToken cancellationToken)
        {
            await processor.StopProcessingAsync();
            await processor.CloseAsync();
            await this.serviceBusClient.DisposeAsync();
        }
    }
}

新建一個類繼承QueueListener去重寫虛方法ProcessMessagesAsync實現不同業務邏輯的處理。我這裡是匯出報表

using Azure.Messaging.ServiceBus;
using Dtt.DDP.App_Utilities.ServiceBus;
using Dtt.DDP.Areas.DDP1000_DeclationMaintenance.BLL;
using Dtt.DDP.Areas.DDP1000_DeclationMaintenance.Models;
using Dtt.DDP.Areas.ZZZ0000_Common.Enums;
using Dtt.DDP.Areas.ZZZ0000_Common.Utils;
using Dtt.DDP.Models;
using Dtt.Framework.App_Utilities.DI;
using Microsoft.Extensions.DependencyInjection;
using Dtt.Framework.Repository;
using log4net;
using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

namespace Dtt.DDP.Areas.DDP1000_DeclationMaintenance.MQ
{
    public class DeclarationReportListener : QueueListener
    {
        private static ILog _logger;
        private readonly IServiceProvider _services;    
        private DeclarationListBLL bll;
        private CommonRepository taskRepository;
        public DeclarationReportListener(IServiceProvider services, IConfiguration configuration) : base(configuration)
        {            
            _logger = LogManager.GetLogger(typeof(DeclarationReportListener));
            base.queueName = EnumUtil.GetEnumDes(ServiceBusQueueEnum.ExportDeclaraationReport);
            taskRepository = services.GetService<CommonRepository>();
            bll = services.GetService<DeclarationListBLL>();

        }
        public override async Task ProcessMessagesAsync(ProcessMessageEventArgs args)
        {

            decimal taskID = 0;

            try
            {
                _logger.Info($"{typeof(DeclarationReportListener).FullName} 開始消費, args: [ {args} ]");
                //處理 message

                // MQMessageModel
                var jsonStr = Encoding.UTF8.GetString(args.Message.Body.ToArray());
                MQMessageModel mQMessageModel = JsonSerializer.Deserialize<MQMessageModel>(jsonStr);
                taskID = mQMessageModel.TaskID;

                DeclarationReportSearchModel bizPara = JsonSerializer.Deserialize<DeclarationReportSearchModel>(mQMessageModel.BizMessageBody);

                taskRepository.UpdateTaskToStart(taskID);
                // do business
                //string fileName =bll.ExportScheduleConflictDetail(bizPara, mQMessageModel.TaskCreatedBy);
                string fileName = await bll.ExportDeclarationReport(bizPara, mQMessageModel.TaskCreatedBy);

                taskRepository.UpdateTaskToSucceeded(taskID, fileName);
                await args.CompleteMessageAsync(args.Message);
            }
            catch (Exception ex)
            {

                taskRepository.UpdateTaskToFailed(taskID, $"非同步匯出 Excel 出錯!Error: [ {ex.Message} ]" + ex.StackTrace + ex.InnerException?.Message + ex.InnerException?.StackTrace);
                _logger.Error($"非同步匯出 Excel 出錯!Error: [ {ex.Message} ]", ex);
                // 訊息已經接受,處理完成
                await Task.CompletedTask;
            }
        }
    }
}

EnumUtil.GetEnumDes(ServiceBusQueueEnum.ExportDeclaraationReport)這段程式碼是為了通過列舉去定義不同的業務場景,具體的QueueName可以按照自己的業務去實現。

在Startup.cs中的ConfigureServices方法中去啟動一個host服務

services.AddHostedService<DeclarationReportListener>();

到這裡,我們的Service bus基本就配置完成了,剩下就是在具體的業務中去呼叫發訊息的方法。