.Net Core 程式碼中使用Azure Service Bus 去實現訊息的傳送和消費
阿新 • • 發佈:2021-07-21
Service Bus
新增Azure.Messaging.SerivceBus的包
PM> Install-Package Azure.Messaging.ServiceBus -Version 7.2.1
在appsetting.json中新增AzureServiceBus的配置項
"ConnectionStrings": { "AzureServiceBus": "Endpoint=sb://sb-sne2-its-das-acrp-dev.servicebus.chinacloudapi.cn/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=riczsDd4FmMul/6O2WesWTYrJ9gRD6uwOXJHOtV5Ulg=" },
建立一個client的類,我這邊命名為
QueueService.cs
(位於~/App_Utilities/ServiceBus資料夾下)
using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; using Microsoft.Extensions.Configuration; using System.Text; using System.Text.Json; using System.Threading.Tasks; namespace Dtt.DDP.App_Utilities.ServiceBus { public class QueueService : IQueueService { private readonly IConfiguration config; public QueueService(IConfiguration config) { this.config = config; } public async Task SendMessagesAsync<T>(T serviceBusMessage, string queueName) { try { ServiceBusClient client = new ServiceBusClient(config.GetConnectionString("AzureServiceBus")); var options = new CreateQueueOptions(queueName) { EnableBatchedOperations = false }; var administrationClient = new ServiceBusAdministrationClient(config.GetConnectionString("AzureServiceBus")); if (!(await administrationClient.QueueExistsAsync(queueName))) { await administrationClient.CreateQueueAsync(options); } ServiceBusSender sender = client.CreateSender(queueName); var messageBody = JsonSerializer.Serialize(serviceBusMessage); // create a message that we can send ServiceBusMessage message = new ServiceBusMessage(Encoding.UTF8.GetBytes(messageBody)); await sender.SendMessageAsync(message); } catch (System.Exception ex) { throw ex; } // create a sender for the queue } } }
ctrl
+enter
為QueueService
生成一個介面IQueueService
Startup.cs檔案
ConfigureServices
方法中配置服務注入
services.AddSingleton<IQueueService, QueueService>();
新增一個消費者的基類
Queuelistener
(位於~/App_Utilities/ServiceBus資料夾下),需要實現IHostedService
的介面
using Microsoft.Extensions.Hosting; using System; using System.Threading; using Microsoft.Extensions.Configuration; using System.Threading.Tasks; using log4net; using Azure.Messaging.ServiceBus; namespace Dtt.DDP.App_Utilities.ServiceBus { public class QueueListener : IHostedService { private static ILog Logger; public string queueName; protected ServiceBusClient serviceBusClient; protected ServiceBusProcessor processor; public QueueListener(IConfiguration configuration) { Logger = LogManager.GetLogger(typeof(QueueListener)); try { serviceBusClient = new ServiceBusClient(configuration.GetConnectionString("AzureServiceBus")); } catch (Exception ex) { Logger.Error($"Service Bus 初始化出錯, error info : [ {ex.Message} ] ", ex); } } public virtual Task ProcessMessagesAsync(ProcessMessageEventArgs arg1) { throw new NotImplementedException(); } // handle any errors when receiving messages static Task ErrorHandler(ProcessErrorEventArgs args) { return Task.CompletedTask; } public async Task Register() { try { processor = serviceBusClient.CreateProcessor(queueName, new ServiceBusProcessorOptions()); processor.ProcessMessageAsync += ProcessMessagesAsync; processor.ProcessErrorAsync += ErrorHandler; await processor.StartProcessingAsync(); } catch (Exception ex) { Logger.Error($"Service Bus 註冊消費者監聽出錯!Error: [ {ex.Message} ]", ex); } } public async Task StartAsync(CancellationToken cancellationToken) { await Register(); //Task.CompletedTask; } public async Task StopAsync(CancellationToken cancellationToken) { await processor.StopProcessingAsync(); await processor.CloseAsync(); await this.serviceBusClient.DisposeAsync(); } } }
新建一個類繼承
QueueListener
去重寫虛方法ProcessMessagesAsync
實現不同業務邏輯的處理。我這裡是匯出報表
using Azure.Messaging.ServiceBus;
using Dtt.DDP.App_Utilities.ServiceBus;
using Dtt.DDP.Areas.DDP1000_DeclationMaintenance.BLL;
using Dtt.DDP.Areas.DDP1000_DeclationMaintenance.Models;
using Dtt.DDP.Areas.ZZZ0000_Common.Enums;
using Dtt.DDP.Areas.ZZZ0000_Common.Utils;
using Dtt.DDP.Models;
using Dtt.Framework.App_Utilities.DI;
using Microsoft.Extensions.DependencyInjection;
using Dtt.Framework.Repository;
using log4net;
using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
namespace Dtt.DDP.Areas.DDP1000_DeclationMaintenance.MQ
{
public class DeclarationReportListener : QueueListener
{
private static ILog _logger;
private readonly IServiceProvider _services;
private DeclarationListBLL bll;
private CommonRepository taskRepository;
public DeclarationReportListener(IServiceProvider services, IConfiguration configuration) : base(configuration)
{
_logger = LogManager.GetLogger(typeof(DeclarationReportListener));
base.queueName = EnumUtil.GetEnumDes(ServiceBusQueueEnum.ExportDeclaraationReport);
taskRepository = services.GetService<CommonRepository>();
bll = services.GetService<DeclarationListBLL>();
}
public override async Task ProcessMessagesAsync(ProcessMessageEventArgs args)
{
decimal taskID = 0;
try
{
_logger.Info($"{typeof(DeclarationReportListener).FullName} 開始消費, args: [ {args} ]");
//處理 message
// MQMessageModel
var jsonStr = Encoding.UTF8.GetString(args.Message.Body.ToArray());
MQMessageModel mQMessageModel = JsonSerializer.Deserialize<MQMessageModel>(jsonStr);
taskID = mQMessageModel.TaskID;
DeclarationReportSearchModel bizPara = JsonSerializer.Deserialize<DeclarationReportSearchModel>(mQMessageModel.BizMessageBody);
taskRepository.UpdateTaskToStart(taskID);
// do business
//string fileName =bll.ExportScheduleConflictDetail(bizPara, mQMessageModel.TaskCreatedBy);
string fileName = await bll.ExportDeclarationReport(bizPara, mQMessageModel.TaskCreatedBy);
taskRepository.UpdateTaskToSucceeded(taskID, fileName);
await args.CompleteMessageAsync(args.Message);
}
catch (Exception ex)
{
taskRepository.UpdateTaskToFailed(taskID, $"非同步匯出 Excel 出錯!Error: [ {ex.Message} ]" + ex.StackTrace + ex.InnerException?.Message + ex.InnerException?.StackTrace);
_logger.Error($"非同步匯出 Excel 出錯!Error: [ {ex.Message} ]", ex);
// 訊息已經接受,處理完成
await Task.CompletedTask;
}
}
}
}
EnumUtil.GetEnumDes(ServiceBusQueueEnum.ExportDeclaraationReport)
這段程式碼是為了通過列舉去定義不同的業務場景,具體的QueueName可以按照自己的業務去實現。
在Startup.cs中的
ConfigureServices
方法中去啟動一個host服務
services.AddHostedService<DeclarationReportListener>();
到這裡,我們的Service bus基本就配置完成了,剩下就是在具體的業務中去呼叫發訊息的方法。