You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When using the automatic rate limiting feature with the RabbitMQ transport, the functionality might produce duplicate messages in some edge cases.
Expected behavior
When automatic rate limiting kicks in, message processing should be reduced to a single message at the time with the configured delay interval between further processing attempts until message processing succeeds again.
Actual behavior
In the specific cases where the configured consecutive failure counter is reached and the rate-limiting starts to kick in, a message that should be handed over to delayed retries or to the error queue can be duplicated. There is no impact on messages that are handled via immediate retries. This also only happens when the failure counter reaches the limit, further failures while the rate limiting is enabled do not create message duplicates.
This is caused by the implementation which tries to reduce the transports concurrency setting once the configured failure counter is reached. The concurrency change is achieved by reconnecting to the broker with adjusted concurrency settings. However, this is done while the (failed) message is still being processed. After the connection is reset, the error handling pipeline tries to send a copy of the message for delayed delivery or to the error queue, while the failed message is expected to be acknowledged afterward. While sending messages works with the new connection, the currently processed message can't be acknowledged via the new connection. Therefore the message will be available for processing again even though a copy has been sent for delayed retries or to the error queue.
send a message to a handler throwing an exception. See the following log example.
Relevant log output
2023-05-01 16:07:51.737 INFO Processing message
message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Delayed retry counter: 0
2023-05-01 16:07:51.932 INFO Immediate Retry is going to retry message '96dff5e4-f63b-4d60-8a48-aff500e8df38' because of an exception:
System.Exception: pewpew
at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in\Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22
at (ArrayClosure, Object, Object, IMessageHandlerContext)
at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext)
at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context)
at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage) at NServiceBus.MutateIncomingMessageBehavior.InvokeIncomingMessageMutators(IIncomingLogicalMessageContext context, Func`2 next) at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage) at NServiceBus.UnitOfWorkBehavior.InvokeUnitsOfWork(IIncomingPhysicalMessageContext context, Func`2 next) at NServiceBus.UnitOfWorkBehavior.InvokeUnitsOfWork(IIncomingPhysicalMessageContext context, Func`2 next) at NServiceBus.MutateIncomingTransportMessageBehavior.InvokeIncomingTransportMessagesMutators(IIncomingPhysicalMessageContext context, Func`2 next) at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next) at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)Exception details: Message type: MyCommand Handler type: MyCommandHandler Handler start time: 2023-05-01 14:07:51:736450 Z Handler failure time: 2023-05-01 14:07:51:767432 Z Handler canceled: False Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38 Pipeline canceled: False2023-05-01 16:07:51.953 INFO Processing messagemessage ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38Delayed retry counter: 02023-05-01 16:07:52.060 WARN The consecutive failures circuit breaker is now in the armed state2023-05-01 16:07:52.062 INFO Pause message processing for 0:00:10 due to enabled consecutive failures mode.2023-05-01 16:07:52.063 INFO Calling a change concurrency and reconnecting with new value 1.2023-05-01 16:07:52.076 INFO 'Samples.RabbitMQ.SimpleReceiver MessagePump': Attempting to reconnect in 10 seconds.2023-05-01 16:08:02.080 WARN Delayed Retry will reschedule message '96dff5e4-f63b-4d60-8a48-aff500e8df38' after a delay of 00:00:10 because of an exception:System.Exception: pewpew at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in\Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22 at (ArrayClosure, Object, Object, IMessageHandlerContext) at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext) at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context) at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage) at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage) at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next) at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)Exception details: Message type: MyCommand Handler type: MyCommandHandler Handler start time: 2023-05-01 14:07:51:953324 Z Handler failure time: 2023-05-01 14:07:51:967382 Z Handler canceled: False Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38 Pipeline canceled: False2023-05-01 16:08:02.103 INFO 'Samples.RabbitMQ.SimpleReceiver MessagePump': Connection to the broker reestablished successfully.2023-05-01 16:08:02.106 INFO Processing messagemessage ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38Delayed retry counter: 02023-05-01 16:08:02.214 INFO Pause message processing for 0:00:10 due to enabled consecutive failures mode.2023-05-01 16:08:02.217 WARN Failed to acknowledge message '96dff5e4-f63b-4d60-8a48-aff500e8df38' because the channel was closed. The message was returned to the queue.RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text='Goodbye', classId=0, methodId=0 at RabbitMQ.Client.Impl.SessionBase.Transmit(OutgoingCommand& cmd) at RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemory`1 body) at RabbitMQ.Client.Framing.Impl.Model.BasicAck(UInt64 deliveryTag, Boolean multiple) at NServiceBus.Transport.RabbitMQ.ModelExtensions.BasicAckSingle(IModel channel, UInt64 deliveryTag) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/ModelExtensions.cs:line 9 at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs:line 4312023-05-01 16:08:12.227 WARN Delayed Retry will reschedule message '96dff5e4-f63b-4d60-8a48-aff500e8df38' after a delay of 00:00:10 because of an exception:System.Exception: pewpew at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in\Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22 at (ArrayClosure, Object, Object, IMessageHandlerContext) at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext) at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context) at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage) at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage) at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next) at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)Exception details: Message type: MyCommand Handler type: MyCommandHandler Handler start time: 2023-05-01 14:08:02:106038 Z Handler failure time: 2023-05-01 14:08:02:120234 Z Handler canceled: False Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38 Pipeline canceled: False2023-05-01 16:08:12.237 INFO Processing messagemessage ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38Delayed retry counter: 12023-05-01 16:08:12.416 INFO Pause message processing for 0:00:10 due to enabled consecutive failures mode.2023-05-01 16:08:22.423 INFO Immediate Retry is going to retry message '96dff5e4-f63b-4d60-8a48-aff500e8df38' because of an exception:System.Exception: pewpew at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in\Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22 at (ArrayClosure, Object, Object, IMessageHandlerContext) at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext) at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context) at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage) at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage) at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next) at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)Exception details: Message type: MyCommand Handler type: MyCommandHandler Handler start time: 2023-05-01 14:08:12:237766 Z Handler failure time: 2023-05-01 14:08:12:264706 Z Handler canceled: False Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38 Pipeline canceled: False2023-05-01 16:08:22.426 INFO Processing messagemessage ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38Delayed retry counter: 12023-05-01 16:08:22.593 INFO Pause message processing for 0:00:10 due to enabled consecutive failures mode.2023-05-01 16:08:32.608 INFO Immediate Retry is going to retry message '96dff5e4-f63b-4d60-8a48-aff500e8df38' because of an exception:System.Exception: pewpew at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in\Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22 at (ArrayClosure, Object, Object, IMessageHandlerContext) at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext) at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context) at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage) at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage) at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next) at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)Exception details: Message type: MyCommand Handler type: MyCommandHandler Handler start time: 2023-05-01 14:08:22:426253 Z Handler failure time: 2023-05-01 14:08:22:446579 Z Handler canceled: False Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38 Pipeline canceled: False2023-05-01 16:08:32.610 INFO Processing messagemessage ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38Delayed retry counter: 12023-05-01 16:08:32.769 INFO Pause message processing for 0:00:10 due to enabled consecutive failures mode.2023-05-01 16:08:42.777 ERROR Moving message '96dff5e4-f63b-4d60-8a48-aff500e8df38' to the error queue 'error' because processing failed due to an exception:System.Exception: pewpew at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in\Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22 at (ArrayClosure, Object, Object, IMessageHandlerContext) at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext) at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context) at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage) at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage) at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next) at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)Exception details: Message type: MyCommand Handler type: MyCommandHandler Handler start time: 2023-05-01 14:08:32:610920 Z Handler failure time: 2023-05-01 14:08:32:629076 Z Handler canceled: False Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38 Pipeline canceled: False2023-05-01 16:08:42.794 INFO Processing messagemessage ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38Delayed retry counter: 12023-05-01 16:08:42.959 INFO Pause message processing for 0:00:10 due to enabled consecutive failures mode.2023-05-01 16:08:52.974 ERROR Moving message '96dff5e4-f63b-4d60-8a48-aff500e8df38' to the error queue 'error' because processing failed due to an exception:System.Exception: pewpew at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in\Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22 at (ArrayClosure, Object, Object, IMessageHandlerContext) at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext) at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context) at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage) at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage) at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next) at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next) at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next) at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken) at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)Exception details: Message type: MyCommand Handler type: MyCommandHandler Handler start time: 2023-05-01 14:08:42:794772 Z Handler failure time: 2023-05-01 14:08:42:813593 Z Handler canceled: False Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38 Pipeline canceled: False
Additional Information
Workarounds
Possible solutions
Ensure that reconnecting only happens when the remaining inflight messages have been processed.
Additional information
The text was updated successfully, but these errors were encountered:
Sending delayed delivery or error messages doesn't fail because the connection to send messages is separate and is not being changed when changing the concurrency setting of the receive connection.
Describe the bug
Description
When using the automatic rate limiting feature with the RabbitMQ transport, the functionality might produce duplicate messages in some edge cases.
Expected behavior
When automatic rate limiting kicks in, message processing should be reduced to a single message at the time with the configured delay interval between further processing attempts until message processing succeeds again.
Actual behavior
In the specific cases where the configured consecutive failure counter is reached and the rate-limiting starts to kick in, a message that should be handed over to delayed retries or to the error queue can be duplicated. There is no impact on messages that are handled via immediate retries. This also only happens when the failure counter reaches the limit, further failures while the rate limiting is enabled do not create message duplicates.
This is caused by the implementation which tries to reduce the transports concurrency setting once the configured failure counter is reached. The concurrency change is achieved by reconnecting to the broker with adjusted concurrency settings. However, this is done while the (failed) message is still being processed. After the connection is reset, the error handling pipeline tries to send a copy of the message for delayed delivery or to the error queue, while the failed message is expected to be acknowledged afterward. While sending messages works with the new connection, the currently processed message can't be acknowledged via the new connection. Therefore the message will be available for processing again even though a copy has been sent for delayed retries or to the error queue.
Versions
Steps to reproduce
With the following configuration:
send a message to a handler throwing an exception. See the following log example.
Relevant log output
Additional Information
Workarounds
Possible solutions
Additional information
The text was updated successfully, but these errors were encountered: