Skip to content

Commit

Permalink
zarusz#358 Modify ASB properties
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Pringle <[email protected]>
  • Loading branch information
EtherZa committed Jan 29, 2025
1 parent 637197e commit 9679984
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 14 deletions.
47 changes: 42 additions & 5 deletions docs/provider_azure_servicebus.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Validation of Topology](#validation-of-topology)
- [Trigger Topology Provisioning](#trigger-topology-provisioning)


## Configuration

Azure Service Bus provider requires a connection string:
Expand Down Expand Up @@ -193,12 +194,48 @@ This could be useful to extract the message's `CorrelationId` or `ApplicationPro
### Exception Handling for Consumers

In case the consumer was to throw an exception while processing a message, SMB marks the message as abandoned.
This results in a message delivery retry performed by Azure SB (potentially event in another running instance of your service). By default, Azure SB retries 10 times. After last attempt the message Azure SB moves the message to a dead letter queue (DLQ). More information [here](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues).
In the case where the consumer throws an exception while processing a message, SMB marks the message as abandoned.
This results in a message delivery retry performed by Azure SB (potentially as an event in another running instance of your service). By default, Azure SB retries 10 times. After last attempt, Azure SB will move the message to the [dead letter queue (DLQ)](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues).

SMB will also add a user property, `SMB.Exception`, on the message with the exception details (just the message, no stack trace). This should be helpful when reviewing messages on the DLQ.

For finer control, a custom error handler can be added by registering an instance of `IConsumerErrorHandler<T>` with the DI. The offending message and the raised exception can then be inspected to determine if the message should be retried (in proceess), failed, or considered as having executed successfully.

In addition to the standard `IConsumerErrorHandler<T>` return types, `ServiceBusConsumerErrorHandler<T>` provides additional, specialized responses for use with the Azure Service Bus transport.

#### DeadLetter: Application-Level Dead-Lettering
[Application-level dead-lettering](https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues#application-level-dead-lettering) is supported via `DeadLetter(string reason, string description)`. If neither a `reason` nor `description` are supplied, the raised exception type and message will be used as the `reason` and `description`.

```cs
public sealed class SampleConsumerErrorHandler<T> : ServiceBusConsumerErrorHandler<T>
{
public override Task<ProcessResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts)
{
return Task.FromResult(DeadLetter("reason", "description"));
}
}
```

#### Failure: Modify Application Properties on Failure
An overload to `Failure(IReadOnlyDictionary<string, object)` is included to facilitate the modification of application properites on a failed message. This includes the `SMB.Exception` property should alternative detail be required.

If you need to send only selected messages to DLQ, wrap the body of your consumer method in a `try-catch` block and rethrow the exception for only the messages you want to be moved to DLQ (after the retry limit is reached).
```cs
public sealed class SampleConsumerErrorHandler<T> : ServiceBusConsumerErrorHandler<T>
{
public override Task<ProcessResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts)
{
var properties = new Dictionary<string, object>
{
{ "Key", "value" },
{ "Attempts", attempts },
{ "SMB.Exception", exception.ToString().Substring(0, 1000) }
};

return Task.FromResult(Failure(properties));
}
}
```

SMB will also set a user property `SMB.Exception` on the message with the exception details (just the message, no stack trace). This should be helpful when reviewing messages on the DLQ.

### Transport Specific Settings

Expand Down Expand Up @@ -480,4 +517,4 @@ However, in situations when the underlying ASB topology changes (queue / topic i
ITopologyControl ctrl = // injected
await ctrl.ProvisionTopology();
```
```
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ protected async Task ProcessMessageAsyncInternal(
Func<ServiceBusReceivedMessage, string, string, CancellationToken, Task> deadLetterMessage,
CancellationToken token)
{
const string smbException = "SMB.Exception";

// Process the message.
Logger.LogDebug("Received message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);

Expand All @@ -188,16 +190,33 @@ protected async Task ProcessMessageAsyncInternal(
await completeMessage(message, token).ConfigureAwait(false);
return;

case ServiceBusProcessResult.DeadLetterState:
case ServiceBusProcessResult.DeadLetterState deadLetterState:
Logger.LogError(r.Exception, "Dead letter message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);
await deadLetterMessage(message, r.Exception?.GetType().Name ?? string.Empty, r.Exception?.Message ?? string.Empty, token).ConfigureAwait(false);

var reason = deadLetterState.Reason ?? r.Exception?.GetType().Name ?? string.Empty;
var descripiton = deadLetterState.Description ?? r.Exception?.GetType().Name ?? string.Empty;
await deadLetterMessage(message, reason, descripiton, token).ConfigureAwait(false);
return;

case ServiceBusProcessResult.FailureStateWithProperties withProperties:
var dict = new Dictionary<string, object>(withProperties.Properties.Count + 1);
foreach (var properties in withProperties.Properties)
{
dict.Add(properties.Key, properties.Value);
}

// Set the exception message if it has not been provided
dict.TryAdd(smbException, r.Exception.Message);

Logger.LogError(r.Exception, "Abandon message (exception occurred while processing) - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);
await abandonMessage(message, dict, token).ConfigureAwait(false);
return;

case ProcessResult.FailureState:
var messageProperties = new Dictionary<string, object>();
{
// Set the exception message
messageProperties.Add("SMB.Exception", r.Exception.Message);
messageProperties.Add(smbException, r.Exception.Message);
}

Logger.LogError(r.Exception, "Abandon message (exception occurred while processing) - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,19 @@ public interface IServiceBusConsumerErrorHandler<in T> : IConsumerErrorHandler<T

public abstract class ServiceBusConsumerErrorHandler<T> : ConsumerErrorHandler<T>, IServiceBusConsumerErrorHandler<T>
{
public ProcessResult DeadLetter() => ServiceBusProcessResult.DeadLetter;
public ProcessResult DeadLetter(string reason = null, string description = null)
{
return reason == null && description == null
? ServiceBusProcessResult.DeadLetter
: new ServiceBusProcessResult.DeadLetterState(reason, description);
}

public ProcessResult Failure(IReadOnlyDictionary<string, object> properties)
{
return properties != null && properties.Count > 0
? new ServiceBusProcessResult.FailureStateWithProperties(properties)
: Failure();
}
}

public record ServiceBusProcessResult : ProcessResult
Expand All @@ -14,5 +26,25 @@ public record ServiceBusProcessResult : ProcessResult
/// </summary>
public static readonly ProcessResult DeadLetter = new DeadLetterState();

public record DeadLetterState() : ProcessResult();
public record DeadLetterState : ProcessResult
{
public DeadLetterState(string reason = null, string description = null)
{
Reason = reason;
Description = description;
}

public string Reason { get; }
public string Description { get; }
}

public record FailureStateWithProperties : FailureState
{
public FailureStateWithProperties(IReadOnlyDictionary<string, object> properties)
{
Properties = properties;
}

public IReadOnlyDictionary<string, object> Properties { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,88 @@ public async Task BasicPubSubOnQueue(bool bulkProduce)
}

[Fact]
public async Task DeadLetterMessage_IsDeliveredTo_DeadLetterQueue()
public async Task Failure_WithProperties_UpdatesServiceBusMessage()
{
// arrange
var queue = QueueName();

var expectedProperties = new Dictionary<string, object>
{
{ "key", "value" },
{ "SMB.Exception", "Do not overwrite" }
};

AddTestServices((services, configuration) =>
{
services.AddTransient(sp =>
{
var connectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"]);
return ActivatorUtilities.CreateInstance<ServiceBusClient>(sp, connectionString);
});

services.AddTransient(sp =>
{
var connectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"]);
return ActivatorUtilities.CreateInstance<ServiceBusAdministrationClient>(sp, connectionString);
});

services.AddScoped(typeof(IConsumerInterceptor<>), typeof(ThrowExceptionPingMessageInterceptor<>));
services.AddSingleton<IServiceBusConsumerErrorHandler<PingMessage>>(_ => new DelegatedConsumerErrorHandler<PingMessage>((handler, message, context, exception, attempts) => handler.Failure(expectedProperties)));
});

AddBusConfiguration(mbb =>
{
mbb
.Produce<PingMessage>(x => x.DefaultQueue(queue).WithModifier(MessageModifier))
.Consume<PingMessage>(x => x
.Queue(queue)
.CreateQueueOptions(z => z.MaxDeliveryCount = 1)
.WithConsumer<PingConsumer>()
.Instances(20));
});

var adminClient = ServiceProvider.GetRequiredService<ServiceBusAdministrationClient>();
var testMetric = ServiceProvider.GetRequiredService<TestMetric>();
var consumedMessages = ServiceProvider.GetRequiredService<TestEventCollector<TestEvent>>();
var client = ServiceProvider.GetRequiredService<ServiceBusClient>();
var deadLetterReceiver = client.CreateReceiver($"{queue}/$DeadLetterQueue");

// act
var messageBus = MessageBus;

var producedMessages = Enumerable
.Range(0, NumberOfMessages)
.Select(i => new PingMessage { Counter = i })
.ToList();

await messageBus.Publish(producedMessages);
await consumedMessages.WaitUntilArriving(newMessagesTimeout: 5);

// assert
// ensure number of instances of consumers created matches
testMetric.CreatedConsumerCount.Should().Be(NumberOfMessages);
consumedMessages.Count.Should().Be(NumberOfMessages);

// all messages should be in the DLQ
var properties = await adminClient.GetQueueRuntimePropertiesAsync(queue);
properties.Value.ActiveMessageCount.Should().Be(0);
properties.Value.DeadLetterMessageCount.Should().Be(NumberOfMessages);

// all messages should have been sent directly to the DLQ
var messages = await deadLetterReceiver.PeekMessagesAsync(NumberOfMessages);
messages.Count.Should().Be(NumberOfMessages);
foreach (var message in messages)
{
message.DeliveryCount.Should().Be(1);
foreach (var property in expectedProperties)
{
message.ApplicationProperties[property.Key].Should().Be(property.Value);
}
}
}

[Fact]
public async Task DeadLetterMessageWithoutReasonOrDescription_IsDeliveredTo_DeadLetterQueueWithExceptionDetails()
{
// arrange
var queue = QueueName();
Expand All @@ -136,7 +217,7 @@ public async Task DeadLetterMessage_IsDeliveredTo_DeadLetterQueue()
});

services.AddScoped(typeof(IConsumerInterceptor<>), typeof(ThrowExceptionPingMessageInterceptor<>));
services.AddScoped(typeof(IServiceBusConsumerErrorHandler<>), typeof(DeadLetterMessageConsumerErrorHandler<>));
services.AddSingleton<IServiceBusConsumerErrorHandler<PingMessage>>(_ => new DelegatedConsumerErrorHandler<PingMessage>((handler, message, context, exception, attempts) => handler.DeadLetter()));
});

AddBusConfiguration(mbb =>
Expand Down Expand Up @@ -186,6 +267,81 @@ public async Task DeadLetterMessage_IsDeliveredTo_DeadLetterQueue()
}
}

[Fact]
public async Task DeadLetterMessageWithReasonAndDescription_IsDeliveredTo_DeadLetterQueueWithReasonAndDescription()
{
// arrange
const string expectedReason = "Reason";
const string expectedDescription = "Description";

var queue = QueueName();

AddTestServices((services, configuration) =>
{
services.AddTransient(sp =>
{
var connectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"]);
return ActivatorUtilities.CreateInstance<ServiceBusClient>(sp, connectionString);
});

services.AddTransient(sp =>
{
var connectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"]);
return ActivatorUtilities.CreateInstance<ServiceBusAdministrationClient>(sp, connectionString);
});

services.AddScoped(typeof(IConsumerInterceptor<>), typeof(ThrowExceptionPingMessageInterceptor<>));
services.AddSingleton<IServiceBusConsumerErrorHandler<PingMessage>>(_ => new DelegatedConsumerErrorHandler<PingMessage>((handler, message, context, exception, attempts) => handler.DeadLetter(expectedReason, expectedDescription)));
});

AddBusConfiguration(mbb =>
{
mbb
.Produce<PingMessage>(x => x.DefaultQueue(queue).WithModifier(MessageModifier))
.Consume<PingMessage>(x => x
.Queue(queue)
.WithConsumer<PingConsumer>()
.Instances(20));
});

var adminClient = ServiceProvider.GetRequiredService<ServiceBusAdministrationClient>();
var testMetric = ServiceProvider.GetRequiredService<TestMetric>();
var consumedMessages = ServiceProvider.GetRequiredService<TestEventCollector<TestEvent>>();
var client = ServiceProvider.GetRequiredService<ServiceBusClient>();
var deadLetterReceiver = client.CreateReceiver($"{queue}/$DeadLetterQueue");

// act
var messageBus = MessageBus;

var producedMessages = Enumerable
.Range(0, NumberOfMessages)
.Select(i => new PingMessage { Counter = i })
.ToList();

await messageBus.Publish(producedMessages);
await consumedMessages.WaitUntilArriving(newMessagesTimeout: 5);

// assert
// ensure number of instances of consumers created matches
testMetric.CreatedConsumerCount.Should().Be(NumberOfMessages);
consumedMessages.Count.Should().Be(NumberOfMessages);

// all messages should be in the DLQ
var properties = await adminClient.GetQueueRuntimePropertiesAsync(queue);
properties.Value.ActiveMessageCount.Should().Be(0);
properties.Value.DeadLetterMessageCount.Should().Be(NumberOfMessages);

// all messages should have been sent directly to the DLQ
var messages = await deadLetterReceiver.PeekMessagesAsync(NumberOfMessages);
messages.Count.Should().Be(NumberOfMessages);
foreach (var message in messages)
{
message.DeliveryCount.Should().Be(0);
message.ApplicationProperties["DeadLetterReason"].Should().Be(expectedReason);
message.ApplicationProperties["DeadLetterErrorDescription"].Should().Be(expectedDescription);
}
}

public class ThrowExceptionPingMessageInterceptor<T> : IConsumerInterceptor<T>
{
public async Task<object> OnHandle(T message, Func<Task<object>> next, IConsumerContext context)
Expand All @@ -196,12 +352,16 @@ public async Task<object> OnHandle(T message, Func<Task<object>> next, IConsumer
}
}

public class DeadLetterMessageConsumerErrorHandler<T> : ServiceBusConsumerErrorHandler<T>
public class DelegatedConsumerErrorHandler<T>(DelegatedConsumerErrorHandler<T>.OnHandleErrorDelegate onHandleError) : ServiceBusConsumerErrorHandler<T>
{
private readonly OnHandleErrorDelegate _onHandleError = onHandleError;

public override Task<ProcessResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts)
{
return Task.FromResult(DeadLetter());
return Task.FromResult(_onHandleError(this, message, consumerContext, exception, attempts));
}

public delegate ProcessResult OnHandleErrorDelegate(ServiceBusConsumerErrorHandler<T> handler, T message, IConsumerContext contrext, Exception exception, int attempts);
}

[Fact]
Expand Down

0 comments on commit 9679984

Please sign in to comment.