From 10d9edcba861c9f7ddbc76677efa88c279dc7e7f Mon Sep 17 00:00:00 2001 From: Brandon Ording Date: Thu, 27 Apr 2023 17:48:17 -0400 Subject: [PATCH] Message loss in migrate-to-quorum command (#1233) * Add failing test * Decode UTF8 byte array to get message ID string * Clarify empty string usage by removing emptyRoutingKey * Add net7.0 TFM to CommandLine project --- .../MigrateQueue/QueueMigrateToQuorumTests.cs | 16 ++++++++++++++++ .../Commands/Queue/QueueMigrateCommand.cs | 19 +++++++++++-------- ...eBus.Transport.RabbitMQ.CommandLine.csproj | 2 +- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/MigrateQueue/QueueMigrateToQuorumTests.cs b/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/MigrateQueue/QueueMigrateToQuorumTests.cs index 6d4d5e013..1c7707072 100644 --- a/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/MigrateQueue/QueueMigrateToQuorumTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine.Tests/MigrateQueue/QueueMigrateToQuorumTests.cs @@ -77,6 +77,22 @@ public async Task Should_preserve_existing_messages() Assert.AreEqual(numExistingMessages, MessageCount(endpointName)); } + [Test] + public async Task Should_preserve_existing_messages_with_messageIds() + { + var endpointName = "EndpointWithExistingMessages"; + var numExistingMessages = 10; + + PrepareTestEndpoint(endpointName); + + AddMessages(endpointName, numExistingMessages, properties => properties.Headers = new Dictionary { { NServiceBus.Headers.MessageId, Guid.NewGuid().ToString() } }); + + await ExecuteMigration(endpointName); + + Assert.True(QueueIsQuorum(endpointName)); + Assert.AreEqual(numExistingMessages, MessageCount(endpointName)); + } + [Test] public async Task Should_preserve_existing_messages_in_holding_queue() { diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Queue/QueueMigrateCommand.cs b/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Queue/QueueMigrateCommand.cs index 6585a8676..48f94e956 100644 --- a/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Queue/QueueMigrateCommand.cs +++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine/Commands/Queue/QueueMigrateCommand.cs @@ -2,6 +2,7 @@ { using System; using System.CommandLine; + using System.Text; using global::RabbitMQ.Client; using global::RabbitMQ.Client.Exceptions; @@ -88,11 +89,11 @@ MigrationStage MoveMessagesToHoldingQueue(IConnection connection, CancellationTo // bind the holding queue to the exchange of the queue under migration // this will throw if the exchange for the queue doesn't exist - channel.QueueBind(holdingQueueName, queueName, emptyRoutingKey); + channel.QueueBind(holdingQueueName, queueName, string.Empty); console.WriteLine($"Bound '{holdingQueueName}' to exchange '{queueName}'"); // unbind the queue under migration to stop more messages from coming in - channel.QueueUnbind(queueName, queueName, emptyRoutingKey); + channel.QueueUnbind(queueName, queueName, string.Empty); console.WriteLine($"Unbound '{queueName}' from exchange '{queueName}' "); // move all existing messages to the holding queue @@ -103,7 +104,7 @@ MigrationStage MoveMessagesToHoldingQueue(IConnection connection, CancellationTo queueName, message => { - channel.BasicPublish(emptyRoutingKey, holdingQueueName, message.BasicProperties, message.Body); + channel.BasicPublish(string.Empty, holdingQueueName, message.BasicProperties, message.Body); channel.WaitForConfirmsOrDie(); }, cancellationToken); @@ -143,10 +144,10 @@ MigrationStage RestoreMessages(IConnection connection, CancellationToken cancell { using var channel = connection.CreateModel(); - channel.QueueBind(queueName, queueName, emptyRoutingKey); + channel.QueueBind(queueName, queueName, string.Empty); console.WriteLine($"Re-bound '{queueName}' to exchange '{queueName}'"); - channel.QueueUnbind(holdingQueueName, queueName, emptyRoutingKey); + channel.QueueUnbind(holdingQueueName, queueName, string.Empty); console.WriteLine($"Unbound '{holdingQueueName}' from exchange '{queueName}'"); var messageIds = new Dictionary(); @@ -163,7 +164,10 @@ MigrationStage RestoreMessages(IConnection connection, CancellationToken cancell if (message.BasicProperties.Headers.TryGetValue("NServiceBus.MessageId", out var messageId)) { - messageIdString = messageId?.ToString(); + if (messageId is byte[] bytes) + { + messageIdString = Encoding.UTF8.GetString(bytes); + } if (messageIdString != null && messageIds.ContainsKey(messageIdString)) { @@ -171,7 +175,7 @@ MigrationStage RestoreMessages(IConnection connection, CancellationToken cancell } } - channel.BasicPublish(emptyRoutingKey, queueName, message.BasicProperties, message.Body); + channel.BasicPublish(string.Empty, queueName, message.BasicProperties, message.Body); channel.WaitForConfirmsOrDie(); if (messageIdString != null) @@ -231,7 +235,6 @@ uint ProcessMessages(IModel channel, string sourceQueue, Action readonly IConsole console; readonly MigrationState migrationState; - static string emptyRoutingKey = string.Empty; static Dictionary quorumQueueArguments = new Dictionary { { "x-queue-type", "quorum" } }; enum MigrationStage diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj b/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj index e2cb02329..a89f20997 100644 --- a/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj +++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj @@ -1,7 +1,7 @@  - net6.0 + net6.0;net7.0 Exe rabbitmq-transport True