diff --git a/docs/intro.md b/docs/intro.md index 93617bd1..9d4e76b4 100644 --- a/docs/intro.md +++ b/docs/intro.md @@ -1263,5 +1263,5 @@ This allows to recreate missing elements in the infrastructure without restartin ## Versions -- The v3 release [migration guide](https://github.com/zarusz/SlimMessageBus/tree/release/v3). +- The v3 release [migration guide](https://github.com/zarusz/SlimMessageBus/releases/tag/3.0.0). - The v2 release [migration guide](https://github.com/zarusz/SlimMessageBus/releases/tag/Host.Transport-2.0.0). \ No newline at end of file diff --git a/docs/provider_azure_servicebus.md b/docs/provider_azure_servicebus.md index 4f8d55ac..d2218220 100644 --- a/docs/provider_azure_servicebus.md +++ b/docs/provider_azure_servicebus.md @@ -10,6 +10,8 @@ Please read the [Introduction](intro.md) before reading this provider documentat - [Default Subscription Name](#default-subscription-name) - [Consumer context](#consumer-context) - [Exception Handling for Consumers](#exception-handling-for-consumers) + - [DeadLetter: Application-Level Dead-Lettering](#deadletter-application-level-dead-lettering) + - [Failure: Modify Application Properties on Failure](#failure-modify-application-properties-on-failure) - [Transport Specific Settings](#transport-specific-settings) - [Request-Response Configuration](#request-response-configuration) - [Produce Request Messages](#produce-request-messages) @@ -19,6 +21,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat - [Validation of Topology](#validation-of-topology) - [Trigger Topology Provisioning](#trigger-topology-provisioning) + ## Configuration Azure Service Bus provider requires a connection string: @@ -193,12 +196,52 @@ This could be useful to extract the message's `CorrelationId` or `ApplicationPro ### Exception Handling for Consumers -In case the consumer was to throw an exception while processing a message, SMB marks the message as abandoned. -This results in a message delivery retry performed by Azure SB (potentially event in another running instance of your service). By default, Azure SB retries 10 times. After last attempt the message Azure SB moves the message to a dead letter queue (DLQ). More information [here](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues). +In the case where the consumer throws an exception while processing a message, SMB marks the message as abandoned. +This results in a message delivery retry performed by Azure SB (potentially as an event in another running instance of your service). By default, Azure SB retries 10 times. After last attempt, Azure SB will move the message to the [dead letter queue (DLQ)](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues). + +SMB will also add a user property, `SMB.Exception`, on the message with the exception details (just the message, no stack trace). This should be helpful when reviewing messages on the DLQ. + +For finer control, a custom error handler can be added by registering an instance of `IConsumerErrorHandler` with the DI. The offending message and the raised exception can then be inspected to determine if the message should be retried (in proceess), failed, or considered as having executed successfully. + +In addition to the standard `IConsumerErrorHandler` return types, `ServiceBusConsumerErrorHandler` provides additional, specialized responses for use with the Azure Service Bus transport. + +#### DeadLetter: Application-Level Dead-Lettering + +[Application-level dead-lettering](https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues#application-level-dead-lettering) is supported via `DeadLetter(string reason, string description)`. If neither a `reason` nor `description` are supplied, the raised exception type and message will be used as the `reason` and `description`. -If you need to send only selected messages to DLQ, wrap the body of your consumer method in a `try-catch` block and rethrow the exception for only the messages you want to be moved to DLQ (after the retry limit is reached). +```cs +public sealed class SampleConsumerErrorHandler : ServiceBusConsumerErrorHandler +{ + public override Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts) + { + return Task.FromResult(DeadLetter("reason", "description")); + } +} +``` + +#### Failure: Modify Application Properties on Failure + +An overload to `Failure(IReadOnlyDictionary : ServiceBusConsumerErrorHandler +{ + public override Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts) + { + var properties = new Dictionary + { + { "Key", "value" }, + { "Attempts", attempts }, + { "SMB.Exception", exception.ToString().Substring(0, 1000) } + }; + + return Task.FromResult(Failure(properties)); + } +} +``` + +> By using `IConsumerContext.Properties` (`IConsumerWithContext`) to pass state to the `IConsumerErrorHandler` instance, consumer state can be persisted with the message. This can then be retrieved from `IConsumerContext.Headers` in a subsequent execution to resume processing from a checkpoint, supporting idempotency, especially when distributed transactions are not possible. -SMB will also set a user property `SMB.Exception` on the message with the exception details (just the message, no stack trace). This should be helpful when reviewing messages on the DLQ. ### Transport Specific Settings @@ -480,4 +523,4 @@ However, in situations when the underlying ASB topology changes (queue / topic i ITopologyControl ctrl = // injected await ctrl.ProvisionTopology(); -``` +``` \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs index 09a5db8f..14398e82 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs @@ -165,6 +165,8 @@ protected async Task ProcessMessageAsyncInternal( Func deadLetterMessage, CancellationToken token) { + const string smbException = "SMB.Exception"; + // Process the message. Logger.LogDebug("Received message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId); @@ -188,16 +190,33 @@ protected async Task ProcessMessageAsyncInternal( await completeMessage(message, token).ConfigureAwait(false); return; - case ServiceBusProcessResult.DeadLetterState: + case ServiceBusProcessResult.DeadLetterState deadLetterState: Logger.LogError(r.Exception, "Dead letter message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId); - await deadLetterMessage(message, r.Exception?.GetType().Name ?? string.Empty, r.Exception?.Message ?? string.Empty, token).ConfigureAwait(false); + + var reason = deadLetterState.Reason ?? r.Exception?.GetType().Name ?? string.Empty; + var descripiton = deadLetterState.Description ?? r.Exception?.GetType().Name ?? string.Empty; + await deadLetterMessage(message, reason, descripiton, token).ConfigureAwait(false); + return; + + case ServiceBusProcessResult.FailureStateWithProperties withProperties: + var dict = new Dictionary(withProperties.Properties.Count + 1); + foreach (var properties in withProperties.Properties) + { + dict.Add(properties.Key, properties.Value); + } + + // Set the exception message if it has not been provided + dict.TryAdd(smbException, r.Exception.Message); + + Logger.LogError(r.Exception, "Abandon message (exception occurred while processing) - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId); + await abandonMessage(message, dict, token).ConfigureAwait(false); return; case ProcessResult.FailureState: var messageProperties = new Dictionary(); { // Set the exception message - messageProperties.Add("SMB.Exception", r.Exception.Message); + messageProperties.Add(smbException, r.Exception.Message); } Logger.LogError(r.Exception, "Abandon message (exception occurred while processing) - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId); diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs index 21cfd89f..4c77e87a 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs @@ -4,7 +4,19 @@ public interface IServiceBusConsumerErrorHandler : IConsumerErrorHandler : ConsumerErrorHandler, IServiceBusConsumerErrorHandler { - public ProcessResult DeadLetter() => ServiceBusProcessResult.DeadLetter; + public ProcessResult DeadLetter(string reason = null, string description = null) + { + return reason == null && description == null + ? ServiceBusProcessResult.DeadLetter + : new ServiceBusProcessResult.DeadLetterState(reason, description); + } + + public ProcessResult Failure(IReadOnlyDictionary properties) + { + return properties != null && properties.Count > 0 + ? new ServiceBusProcessResult.FailureStateWithProperties(properties) + : Failure(); + } } public record ServiceBusProcessResult : ProcessResult @@ -14,5 +26,25 @@ public record ServiceBusProcessResult : ProcessResult /// public static readonly ProcessResult DeadLetter = new DeadLetterState(); - public record DeadLetterState() : ProcessResult(); + public record DeadLetterState : ProcessResult + { + public DeadLetterState(string reason = null, string description = null) + { + Reason = reason; + Description = description; + } + + public string Reason { get; } + public string Description { get; } + } + + public record FailureStateWithProperties : FailureState + { + public FailureStateWithProperties(IReadOnlyDictionary properties) + { + Properties = properties; + } + + public IReadOnlyDictionary Properties { get; } + } } \ No newline at end of file diff --git a/src/SlimMessageBus/IConsumerWithContext.cs b/src/SlimMessageBus/IConsumerWithContext.cs index b580d0fb..2ef2d0f1 100644 --- a/src/SlimMessageBus/IConsumerWithContext.cs +++ b/src/SlimMessageBus/IConsumerWithContext.cs @@ -1,7 +1,7 @@ namespace SlimMessageBus; /// -/// An extension point for to recieve provider specific (for current message subject to processing). +/// An extension point for to receive provider specific (for current message subject to processing). /// public interface IConsumerWithContext { diff --git a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs index 22560fd2..537723c1 100644 --- a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs +++ b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs @@ -116,7 +116,88 @@ public async Task BasicPubSubOnQueue(bool bulkProduce) } [Fact] - public async Task DeadLetterMessage_IsDeliveredTo_DeadLetterQueue() + public async Task Failure_WithProperties_UpdatesServiceBusMessage() + { + // arrange + var queue = QueueName(); + + var expectedProperties = new Dictionary + { + { "key", "value" }, + { "SMB.Exception", "Do not overwrite" } + }; + + AddTestServices((services, configuration) => + { + services.AddTransient(sp => + { + var connectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"]); + return ActivatorUtilities.CreateInstance(sp, connectionString); + }); + + services.AddTransient(sp => + { + var connectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"]); + return ActivatorUtilities.CreateInstance(sp, connectionString); + }); + + services.AddScoped(typeof(IConsumerInterceptor<>), typeof(ThrowExceptionPingMessageInterceptor<>)); + services.AddSingleton>(_ => new DelegatedConsumerErrorHandler((handler, message, context, exception, attempts) => handler.Failure(expectedProperties))); + }); + + AddBusConfiguration(mbb => + { + mbb + .Produce(x => x.DefaultQueue(queue).WithModifier(MessageModifier)) + .Consume(x => x + .Queue(queue) + .CreateQueueOptions(z => z.MaxDeliveryCount = 1) + .WithConsumer() + .Instances(20)); + }); + + var adminClient = ServiceProvider.GetRequiredService(); + var testMetric = ServiceProvider.GetRequiredService(); + var consumedMessages = ServiceProvider.GetRequiredService>(); + var client = ServiceProvider.GetRequiredService(); + var deadLetterReceiver = client.CreateReceiver($"{queue}/$DeadLetterQueue"); + + // act + var messageBus = MessageBus; + + var producedMessages = Enumerable + .Range(0, NumberOfMessages) + .Select(i => new PingMessage { Counter = i }) + .ToList(); + + await messageBus.Publish(producedMessages); + await consumedMessages.WaitUntilArriving(newMessagesTimeout: 5); + + // assert + // ensure number of instances of consumers created matches + testMetric.CreatedConsumerCount.Should().Be(NumberOfMessages); + consumedMessages.Count.Should().Be(NumberOfMessages); + + // all messages should be in the DLQ + var properties = await adminClient.GetQueueRuntimePropertiesAsync(queue); + properties.Value.ActiveMessageCount.Should().Be(0); + properties.Value.DeadLetterMessageCount.Should().Be(NumberOfMessages); + + // all messages should have been sent directly to the DLQ + var messages = await deadLetterReceiver.PeekMessagesAsync(NumberOfMessages); + messages.Count.Should().Be(NumberOfMessages); + foreach (var message in messages) + { + message.DeliveryCount.Should().Be(1); + foreach (var property in expectedProperties) + { + message.ApplicationProperties[property.Key].Should().Be(property.Value); + } + } + } + + [Fact] + public async Task DeadLetterMessageWithoutReasonOrDescription_IsDeliveredTo_DeadLetterQueueWithExceptionDetails() { // arrange var queue = QueueName(); @@ -136,7 +217,7 @@ public async Task DeadLetterMessage_IsDeliveredTo_DeadLetterQueue() }); services.AddScoped(typeof(IConsumerInterceptor<>), typeof(ThrowExceptionPingMessageInterceptor<>)); - services.AddScoped(typeof(IServiceBusConsumerErrorHandler<>), typeof(DeadLetterMessageConsumerErrorHandler<>)); + services.AddSingleton>(_ => new DelegatedConsumerErrorHandler((handler, message, context, exception, attempts) => handler.DeadLetter())); }); AddBusConfiguration(mbb => @@ -186,6 +267,81 @@ public async Task DeadLetterMessage_IsDeliveredTo_DeadLetterQueue() } } + [Fact] + public async Task DeadLetterMessageWithReasonAndDescription_IsDeliveredTo_DeadLetterQueueWithReasonAndDescription() + { + // arrange + const string expectedReason = "Reason"; + const string expectedDescription = "Description"; + + var queue = QueueName(); + + AddTestServices((services, configuration) => + { + services.AddTransient(sp => + { + var connectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"]); + return ActivatorUtilities.CreateInstance(sp, connectionString); + }); + + services.AddTransient(sp => + { + var connectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"]); + return ActivatorUtilities.CreateInstance(sp, connectionString); + }); + + services.AddScoped(typeof(IConsumerInterceptor<>), typeof(ThrowExceptionPingMessageInterceptor<>)); + services.AddSingleton>(_ => new DelegatedConsumerErrorHandler((handler, message, context, exception, attempts) => handler.DeadLetter(expectedReason, expectedDescription))); + }); + + AddBusConfiguration(mbb => + { + mbb + .Produce(x => x.DefaultQueue(queue).WithModifier(MessageModifier)) + .Consume(x => x + .Queue(queue) + .WithConsumer() + .Instances(20)); + }); + + var adminClient = ServiceProvider.GetRequiredService(); + var testMetric = ServiceProvider.GetRequiredService(); + var consumedMessages = ServiceProvider.GetRequiredService>(); + var client = ServiceProvider.GetRequiredService(); + var deadLetterReceiver = client.CreateReceiver($"{queue}/$DeadLetterQueue"); + + // act + var messageBus = MessageBus; + + var producedMessages = Enumerable + .Range(0, NumberOfMessages) + .Select(i => new PingMessage { Counter = i }) + .ToList(); + + await messageBus.Publish(producedMessages); + await consumedMessages.WaitUntilArriving(newMessagesTimeout: 5); + + // assert + // ensure number of instances of consumers created matches + testMetric.CreatedConsumerCount.Should().Be(NumberOfMessages); + consumedMessages.Count.Should().Be(NumberOfMessages); + + // all messages should be in the DLQ + var properties = await adminClient.GetQueueRuntimePropertiesAsync(queue); + properties.Value.ActiveMessageCount.Should().Be(0); + properties.Value.DeadLetterMessageCount.Should().Be(NumberOfMessages); + + // all messages should have been sent directly to the DLQ + var messages = await deadLetterReceiver.PeekMessagesAsync(NumberOfMessages); + messages.Count.Should().Be(NumberOfMessages); + foreach (var message in messages) + { + message.DeliveryCount.Should().Be(0); + message.ApplicationProperties["DeadLetterReason"].Should().Be(expectedReason); + message.ApplicationProperties["DeadLetterErrorDescription"].Should().Be(expectedDescription); + } + } + public class ThrowExceptionPingMessageInterceptor : IConsumerInterceptor { public async Task OnHandle(T message, Func> next, IConsumerContext context) @@ -196,12 +352,16 @@ public async Task OnHandle(T message, Func> next, IConsumer } } - public class DeadLetterMessageConsumerErrorHandler : ServiceBusConsumerErrorHandler + public class DelegatedConsumerErrorHandler(DelegatedConsumerErrorHandler.OnHandleErrorDelegate onHandleError) : ServiceBusConsumerErrorHandler { + private readonly OnHandleErrorDelegate _onHandleError = onHandleError; + public override Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts) { - return Task.FromResult(DeadLetter()); + return Task.FromResult(_onHandleError(this, message, consumerContext, exception, attempts)); } + + public delegate ProcessResult OnHandleErrorDelegate(ServiceBusConsumerErrorHandler handler, T message, IConsumerContext contrext, Exception exception, int attempts); } [Fact]