diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/ConfigurationHelpers.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/ConfigurationHelpers.cs new file mode 100644 index 000000000..3a45c6adf --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/ConfigurationHelpers.cs @@ -0,0 +1,10 @@ +using NServiceBus; +using NServiceBus.AcceptanceTests.EndpointTemplates; + +static class ConfigurationHelpers +{ + public static RabbitMQTransport ConfigureRabbitMQTransport(this EndpointConfiguration configuration) + { + return (RabbitMQTransport)configuration.ConfigureTransport(); + } +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/ConfigureEndpointRabbitMQTransport.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/ConfigureEndpointRabbitMQTransport.cs index 5b02b4e4e..e342bdcf3 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/ConfigureEndpointRabbitMQTransport.cs +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/ConfigureEndpointRabbitMQTransport.cs @@ -1,17 +1,18 @@ using System; +using System.Collections.Generic; using System.Data.Common; using System.Linq; using System.Threading.Tasks; using NServiceBus; using NServiceBus.AcceptanceTesting.Support; -using NServiceBus.Configuration.AdvancedExtensibility; using NServiceBus.Transport; using RabbitMQ.Client; class ConfigureEndpointRabbitMQTransport : IConfigureEndpointTestExecution { DbConnectionStringBuilder connectionStringBuilder; - QueueBindings queueBindings; + TestRabbitMQTransport transport; + public Task Configure(string endpointName, EndpointConfiguration configuration, RunSettings settings, PublisherMetadata publisherMetadata) { @@ -22,13 +23,11 @@ public Task Configure(string endpointName, EndpointConfiguration configuration, throw new Exception("The 'RabbitMQTransport_ConnectionString' environment variable is not set."); } + //For cleanup connectionStringBuilder = new DbConnectionStringBuilder { ConnectionString = connectionString }; - var transport = configuration.UseTransport(); - transport.ConnectionString(connectionStringBuilder.ConnectionString); - transport.UseConventionalRoutingTopology(); - - queueBindings = configuration.GetSettings().Get(); + transport = new TestRabbitMQTransport(Topology.Conventional, connectionString); + configuration.UseTransport(transport); return Task.CompletedTask; } @@ -42,7 +41,7 @@ public Task Cleanup() void PurgeQueues() { - if (connectionStringBuilder == null) + if (connectionStringBuilder == null || transport == null) { return; } @@ -77,7 +76,7 @@ void PurgeQueues() throw new Exception("The connection string doesn't contain a value for 'host'."); } - var queues = queueBindings.ReceivingAddresses.Concat(queueBindings.SendingAddresses); + var queues = transport.QueuesToCleanup.Distinct().ToArray(); using (var connection = connectionFactory.CreateConnection("Test Queue Purger")) using (var channel = connection.CreateModel()) @@ -95,4 +94,20 @@ void PurgeQueues() } } } + + class TestRabbitMQTransport : RabbitMQTransport + { + public TestRabbitMQTransport(Topology topology, string connectionString) + : base(topology, connectionString) + { + } + + public override Task Initialize(HostSettings hostSettings, ReceiveSettings[] receivers, string[] sendingAddresses) + { + QueuesToCleanup.AddRange(receivers.Select(x => x.ReceiveAddress).Concat(sendingAddresses).Distinct()); + return base.Initialize(hostSettings, receivers, sendingAddresses); + } + + public List QueuesToCleanup { get; } = new List(); + } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/DelayedDelivery/When_deferring_a_message_longer_than_allowed_maximum.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/DelayedDelivery/When_deferring_a_message_longer_than_allowed_maximum.cs index 6b155169e..0c3d2177f 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/DelayedDelivery/When_deferring_a_message_longer_than_allowed_maximum.cs +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/DelayedDelivery/When_deferring_a_message_longer_than_allowed_maximum.cs @@ -28,7 +28,7 @@ public void Should_throw() .Run()); Assert.That(exception, Is.Not.Null); - Assert.IsTrue(exception.Message.StartsWith("Message cannot be sent with")); + StringAssert.StartsWith("Message cannot be delayed by", exception.Message); } public class Endpoint : EndpointConfigurationBuilder diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj index 371284d3e..b8b5e4d5c 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj @@ -11,7 +11,7 @@ - + diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/TestSuiteConstraints.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/TestSuiteConstraints.cs index 4a392d4df..1a1c87c29 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/TestSuiteConstraints.cs +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/TestSuiteConstraints.cs @@ -13,6 +13,7 @@ public partial class TestSuiteConstraints public bool SupportsDelayedDelivery => true; public bool SupportsOutbox => true; + public bool SupportsPurgeOnStartup => true; public IConfigureEndpointTestExecution CreateTransportConfiguration() => new ConfigureEndpointRabbitMQTransport(); diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_publishing_message_implementing_interface_in_direct_topology.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_publishing_message_implementing_interface_in_direct_topology.cs index c98ffa9be..1495536ba 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_publishing_message_implementing_interface_in_direct_topology.cs +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_publishing_message_implementing_interface_in_direct_topology.cs @@ -36,8 +36,10 @@ public class Publisher : EndpointConfigurationBuilder { public Publisher() { - EndpointSetup(c => c.UseTransport() - .UseDirectRoutingTopology()); + EndpointSetup(c => + { + c.ConfigureRabbitMQTransport().RoutingTopology = new DirectRoutingTopology(true); + }); } } @@ -47,7 +49,7 @@ public Receiver() { EndpointSetup(builder => { - builder.UseTransport().UseDirectRoutingTopology(); + builder.ConfigureRabbitMQTransport().RoutingTopology = new DirectRoutingTopology(true); builder.DisableFeature(); }, metadata => metadata.RegisterPublisherFor(typeof(Publisher))); } diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_receiving_a_message.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_receiving_a_message.cs index 7b815424d..2a000a2b9 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_receiving_a_message.cs +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_receiving_a_message.cs @@ -24,7 +24,7 @@ public class Receiver : EndpointConfigurationBuilder { public Receiver() { - EndpointSetup(c => c.UseTransport()); + EndpointSetup(); } class MyEventHandler : IHandleMessages diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_receiving_a_reply_that_contains_a_legacy_callback_header.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_receiving_a_reply_that_contains_a_legacy_callback_header.cs index 199726277..0f5daa65b 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_receiving_a_reply_that_contains_a_legacy_callback_header.cs +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_receiving_a_reply_that_contains_a_legacy_callback_header.cs @@ -48,7 +48,7 @@ public OriginatingEndpoint() { EndpointSetup(config => { - config.ConfigureTransport().Routing().RouteToEndpoint(typeof(Request), typeof(ReceivingEndpoint)); + config.ConfigureRouting().RouteToEndpoint(typeof(Request), typeof(ReceivingEndpoint)); config.AuditProcessedMessagesTo(); }); } diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_requesting_non_persistent_delivery.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_requesting_non_persistent_delivery.cs index 9a0641990..03b39d94f 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_requesting_non_persistent_delivery.cs +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_requesting_non_persistent_delivery.cs @@ -32,7 +32,7 @@ public class Receiver : EndpointConfigurationBuilder { public Receiver() { - EndpointSetup(c => c.UseTransport()); + EndpointSetup(); } class MyEventHandler : IHandleMessages diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_scaling_out_senders_that_uses_callbacks.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_scaling_out_senders_that_uses_callbacks.cs index 6df448578..a7ba7a380 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_scaling_out_senders_that_uses_callbacks.cs +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_scaling_out_senders_that_uses_callbacks.cs @@ -71,7 +71,7 @@ public class ScaledOutClient : EndpointConfigurationBuilder public ScaledOutClient() { EndpointSetup(config => - config.ConfigureTransport().Routing().RouteToEndpoint(typeof(MyRequest), typeof(ServerThatRespondsToCallbacks))); + config.ConfigureRouting().RouteToEndpoint(typeof(MyRequest), typeof(ServerThatRespondsToCallbacks))); } class MyResponseHandler : IHandleMessages diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_the_broker_connection_is_lost.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_the_broker_connection_is_lost.cs index 8dcb2d7dc..35b8df32e 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_the_broker_connection_is_lost.cs +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_the_broker_connection_is_lost.cs @@ -4,7 +4,6 @@ using System.Collections.Generic; using System.Threading.Tasks; using AcceptanceTesting; - using DeliveryConstraints; using Extensibility; using Features; using Microsoft.Extensions.DependencyInjection; @@ -51,7 +50,7 @@ protected override void Setup(FeatureConfigurationContext context) class ConnectionKiller : FeatureStartupTask { - public ConnectionKiller(IDispatchMessages sender, ReadOnlySettings settings, MyContext context) + public ConnectionKiller(IMessageDispatcher sender, ReadOnlySettings settings, MyContext context) { this.context = context; this.sender = sender; @@ -72,8 +71,13 @@ async Task BreakConnectionBySendingInvalidMessage() try { var outgoingMessage = new OutgoingMessage("Foo", new Dictionary(), new byte[0]); - var operation = new TransportOperation(outgoingMessage, new UnicastAddressTag(settings.EndpointName()), deliveryConstraints: new List { new DiscardIfNotReceivedBefore(TimeSpan.FromMilliseconds(-1)) }); - await sender.Dispatch(new TransportOperations(operation), new TransportTransaction(), new ContextBag()); + var props = new DispatchProperties + { + DiscardIfNotReceivedBefore = + new DiscardIfNotReceivedBefore(TimeSpan.FromMilliseconds(-1)) + }; + var operation = new TransportOperation(outgoingMessage, new UnicastAddressTag(settings.EndpointName()), props); + await sender.Dispatch(new TransportOperations(operation), new TransportTransaction()); } catch (Exception) { @@ -82,7 +86,7 @@ async Task BreakConnectionBySendingInvalidMessage() } readonly MyContext context; - readonly IDispatchMessages sender; + readonly IMessageDispatcher sender; readonly ReadOnlySettings settings; } } diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_the_message_contains_a_legacy_callback_header.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_the_message_contains_a_legacy_callback_header.cs index 5746f0617..8735f6a82 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_the_message_contains_a_legacy_callback_header.cs +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_the_message_contains_a_legacy_callback_header.cs @@ -47,7 +47,7 @@ class OriginatingEndpoint : EndpointConfigurationBuilder public OriginatingEndpoint() { EndpointSetup(config => - config.ConfigureTransport().Routing().RouteToEndpoint(typeof(Request), typeof(ReceivingEndpoint))); + config.ConfigureRouting().RouteToEndpoint(typeof(Request), typeof(ReceivingEndpoint))); } class ReplyHandler : IHandleMessages diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_using_a_custom_message_id_strategy.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_using_a_custom_message_id_strategy.cs index 1ea6cc922..49ede1806 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_using_a_custom_message_id_strategy.cs +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_using_a_custom_message_id_strategy.cs @@ -36,8 +36,7 @@ public Receiver() EndpointSetup(c => { c.EnableFeature(); - c.UseTransport() - .CustomMessageIdStrategy(m => customMessageId); + c.ConfigureRabbitMQTransport().MessageIdStrategy = m => customMessageId; }); } @@ -51,7 +50,7 @@ protected override void Setup(FeatureConfigurationContext context) class Starter : FeatureStartupTask { - public Starter(IDispatchMessages dispatchMessages, ReadOnlySettings settings) + public Starter(IMessageDispatcher dispatchMessages, ReadOnlySettings settings) { this.dispatchMessages = dispatchMessages; this.settings = settings; @@ -71,12 +70,12 @@ protected override Task OnStart(IMessageSession session) Encoding.UTF8.GetBytes(messageBody)); var transportOperation = new TransportOperation(message, new UnicastAddressTag(settings.EndpointName())); - return dispatchMessages.Dispatch(new TransportOperations(transportOperation), new TransportTransaction(), new ContextBag()); + return dispatchMessages.Dispatch(new TransportOperations(transportOperation), new TransportTransaction()); } protected override Task OnStop(IMessageSession session) => Task.CompletedTask; - readonly IDispatchMessages dispatchMessages; + readonly IMessageDispatcher dispatchMessages; readonly ReadOnlySettings settings; } } diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_using_a_custom_message_id_strategy_that_returns_an_invalid_message_id.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_using_a_custom_message_id_strategy_that_returns_an_invalid_message_id.cs index 006458f17..264ebc58e 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_using_a_custom_message_id_strategy_that_returns_an_invalid_message_id.cs +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_using_a_custom_message_id_strategy_that_returns_an_invalid_message_id.cs @@ -25,8 +25,7 @@ public Receiver() { EndpointSetup(c => { - c.UseTransport() - .CustomMessageIdStrategy(m => ""); + c.ConfigureRabbitMQTransport().MessageIdStrategy = m => ""; }); } diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_using_direct_routing.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_using_direct_routing.cs index 564fc2f6a..3a479c1b4 100644 --- a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_using_direct_routing.cs +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_using_direct_routing.cs @@ -23,8 +23,10 @@ public class Receiver : EndpointConfigurationBuilder { public Receiver() { - EndpointSetup(c => c.UseTransport() - .UseDirectRoutingTopology()); + EndpointSetup(c => + { + c.ConfigureRabbitMQTransport().RoutingTopology = new DirectRoutingTopology(true); + }); } class MyEventHandler : IHandleMessages diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt index 21223bbf2..d71fd41ce 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt @@ -2,11 +2,29 @@ [assembly: System.Runtime.InteropServices.ComVisible(false)] namespace NServiceBus { + public delegate int PrefetchCountCalculation(int maximumConcurrency); public class RabbitMQTransport : NServiceBus.Transport.TransportDefinition { - public RabbitMQTransport() { } - public override string ExampleConnectionStringForErrorMessage { get; } - public override NServiceBus.Transport.TransportInfrastructure Initialize(NServiceBus.Settings.SettingsHolder settings, string connectionString) { } + public RabbitMQTransport(NServiceBus.Topology topology, string connectionString) { } + public RabbitMQTransport(NServiceBus.Transport.RabbitMQ.IRoutingTopology topology, string connectionString) { } + public System.Security.Cryptography.X509Certificates.X509Certificate2 ClientCertificate { get; set; } + public System.TimeSpan HeartbeatInterval { get; set; } + public string Host { get; set; } + public System.Func MessageIdStrategy { get; set; } + public System.TimeSpan NetworkRecoveryInterval { get; set; } + public string Password { get; set; } + public int? Port { get; set; } + public NServiceBus.PrefetchCountCalculation PrefetchCountCalculation { get; set; } + public NServiceBus.Transport.RabbitMQ.IRoutingTopology RoutingTopology { get; set; } + public System.TimeSpan TimeToWaitBeforeTriggeringCircuitBreaker { get; set; } + public bool UseExternalAuthMechanism { get; set; } + public bool UseTLS { get; set; } + public string UserName { get; set; } + public string VHost { get; set; } + public bool ValidateRemoteCertificate { get; set; } + public override System.Collections.Generic.IReadOnlyCollection GetSupportedTransactionModes() { } + public override System.Threading.Tasks.Task Initialize(NServiceBus.Transport.HostSettings hostSettings, NServiceBus.Transport.ReceiveSettings[] receivers, string[] sendingAddresses) { } + public override string ToTransportAddress(NServiceBus.Transport.QueueAddress address) { } } public static class RabbitMQTransportOptionsExtensions { @@ -16,33 +34,91 @@ namespace NServiceBus } public static class RabbitMQTransportSettingsExtensions { + [System.Obsolete("The configuration has been moved to RabbitMQTransport class. Use `RabbitMQTranspo" + + "rt.MessageIdStrategy` instead. The member currently throws a NotImplementedExcep" + + "tion. Will be removed in version 8.0.0.", true)] public static NServiceBus.TransportExtensions CustomMessageIdStrategy(this NServiceBus.TransportExtensions transportExtensions, System.Func customIdStrategy) { } - [System.Obsolete("The timeout manager has been removed, so there are no delayed delivery configurat" + - "ion options now. The member currently throws a NotImplementedException. Will be " + - "removed in version 8.0.0.", true)] - public static NServiceBus.Transport.RabbitMQ.DelayedDeliverySettings DelayedDelivery(this NServiceBus.TransportExtensions transportExtensions) { } + [System.Obsolete("The configuration has been moved to the topology implementations. The member curr" + + "ently throws a NotImplementedException. Will be removed in version 8.0.0.", true)] public static NServiceBus.TransportExtensions DisableDurableExchangesAndQueues(this NServiceBus.TransportExtensions transportExtensions) { } + [System.Obsolete("The configuration has been moved to RabbitMQTransport class. Use `RabbitMQTranspo" + + "rt.ValidateRemoteCertificate` instead. The member currently throws a NotImplemen" + + "tedException. Will be removed in version 8.0.0.", true)] public static NServiceBus.TransportExtensions DisableRemoteCertificateValidation(this NServiceBus.TransportExtensions transportExtensions) { } + [System.Obsolete("The configuration has been moved to RabbitMQTransport class. Use `RabbitMQTranspo" + + "rt.PrefetchCountCalculation` instead. The member currently throws a NotImplement" + + "edException. Will be removed in version 8.0.0.", true)] public static NServiceBus.TransportExtensions PrefetchCount(this NServiceBus.TransportExtensions transportExtensions, ushort prefetchCount) { } + [System.Obsolete("The configuration has been moved to RabbitMQTransport class. Use `RabbitMQTranspo" + + "rt.PrefetchCountCalculation` instead. The member currently throws a NotImplement" + + "edException. Will be removed in version 8.0.0.", true)] public static NServiceBus.TransportExtensions PrefetchMultiplier(this NServiceBus.TransportExtensions transportExtensions, int prefetchMultiplier) { } + [System.Obsolete("The configuration has been moved to RabbitMQTransport class. Use `RabbitMQTranspo" + + "rt.ClientCertificate` instead. The member currently throws a NotImplementedExcep" + + "tion. Will be removed in version 8.0.0.", true)] public static NServiceBus.TransportExtensions SetClientCertificate(this NServiceBus.TransportExtensions transportExtensions, System.Security.Cryptography.X509Certificates.X509Certificate2 clientCertificate) { } + [System.Obsolete("The configuration has been moved to RabbitMQTransport class. Use `RabbitMQTranspo" + + "rt.ClientCertificate` instead. The member currently throws a NotImplementedExcep" + + "tion. Will be removed in version 8.0.0.", true)] public static NServiceBus.TransportExtensions SetClientCertificate(this NServiceBus.TransportExtensions transportExtensions, string path, string password) { } + [System.Obsolete("The configuration has been moved to RabbitMQTransport class. Use `RabbitMQTranspo" + + "rt.HeartbeatInterval` instead. The member currently throws a NotImplementedExcep" + + "tion. Will be removed in version 8.0.0.", true)] public static NServiceBus.TransportExtensions SetHeartbeatInterval(this NServiceBus.TransportExtensions transportExtensions, System.TimeSpan heartbeatInterval) { } + [System.Obsolete("The configuration has been moved to RabbitMQTransport class. Use `RabbitMQTranspo" + + "rt.NetworkRecoveryInterval` instead. The member currently throws a NotImplemente" + + "dException. Will be removed in version 8.0.0.", true)] public static NServiceBus.TransportExtensions SetNetworkRecoveryInterval(this NServiceBus.TransportExtensions transportExtensions, System.TimeSpan networkRecoveryInterval) { } + [System.Obsolete("The configuration has been moved to RabbitMQTransport class. Use `RabbitMQTranspo" + + "rt.TimeToWaitBeforeTriggeringCircuitBreaker` instead. The member currently throw" + + "s a NotImplementedException. Will be removed in version 8.0.0.", true)] public static NServiceBus.TransportExtensions TimeToWaitBeforeTriggeringCircuitBreaker(this NServiceBus.TransportExtensions transportExtensions, System.TimeSpan waitTime) { } + [System.Obsolete("The configuration has been moved to RabbitMQTransport class. Use `RabbitMQTranspo" + + "rt.RoutingTopology` instead. The member currently throws a NotImplementedExcepti" + + "on. Will be removed in version 8.0.0.", true)] public static NServiceBus.TransportExtensions UseConventionalRoutingTopology(this NServiceBus.TransportExtensions transportExtensions) { } + [System.Obsolete("The configuration has been moved to RabbitMQTransport class. Use `RabbitMQTranspo" + + "rt.RoutingTopology` instead. The member currently throws a NotImplementedExcepti" + + "on. Will be removed in version 8.0.0.", true)] public static NServiceBus.TransportExtensions UseCustomRoutingTopology(this NServiceBus.TransportExtensions transportExtensions, System.Func topologyFactory) { } + [System.Obsolete("The configuration has been moved to RabbitMQTransport class. Use `RabbitMQTranspo" + + "rt.RoutingTopology` instead. The member currently throws a NotImplementedExcepti" + + "on. Will be removed in version 8.0.0.", true)] public static NServiceBus.TransportExtensions UseDirectRoutingTopology(this NServiceBus.TransportExtensions transportExtensions, System.Func routingKeyConvention = null, System.Func exchangeNameConvention = null) { } + [System.Obsolete("The configuration has been moved to RabbitMQTransport class. Use `RabbitMQTranspo" + + "rt.UseExternalAuthMechanism` instead. The member currently throws a NotImplement" + + "edException. Will be removed in version 8.0.0.", true)] public static NServiceBus.TransportExtensions UseExternalAuthMechanism(this NServiceBus.TransportExtensions transportExtensions) { } } + public enum Topology + { + Conventional = 0, + Direct = 1, + } } namespace NServiceBus.Transport.RabbitMQ { - [System.Obsolete("The timeout manager has been removed, so it is no longer possible to consume lega" + - "cy delayed messages from timeout storage. Will be removed in version 8.0.0.", true)] - public class DelayedDeliverySettings : NServiceBus.Configuration.AdvancedExtensibility.ExposeSettings + public class ConventionalRoutingTopology : NServiceBus.Transport.RabbitMQ.IRoutingTopology + { + public ConventionalRoutingTopology(bool useDurableExchanges) { } + public void BindToDelayInfrastructure(RabbitMQ.Client.IModel channel, string address, string deliveryExchange, string routingKey) { } + public void Initialize(RabbitMQ.Client.IModel channel, System.Collections.Generic.IEnumerable receivingAddresses, System.Collections.Generic.IEnumerable sendingAddresses) { } + public void Publish(RabbitMQ.Client.IModel channel, System.Type type, NServiceBus.Transport.OutgoingMessage message, RabbitMQ.Client.IBasicProperties properties) { } + public void RawSendInCaseOfFailure(RabbitMQ.Client.IModel channel, string address, System.ReadOnlyMemory body, RabbitMQ.Client.IBasicProperties properties) { } + public void Send(RabbitMQ.Client.IModel channel, string address, NServiceBus.Transport.OutgoingMessage message, RabbitMQ.Client.IBasicProperties properties) { } + public void SetupSubscription(RabbitMQ.Client.IModel channel, NServiceBus.Unicast.Messages.MessageMetadata type, string subscriberName) { } + public void TeardownSubscription(RabbitMQ.Client.IModel channel, NServiceBus.Unicast.Messages.MessageMetadata type, string subscriberName) { } + } + public class DirectRoutingTopology : NServiceBus.Transport.RabbitMQ.IRoutingTopology { - public NServiceBus.Transport.RabbitMQ.DelayedDeliverySettings EnableTimeoutManager() { } + public DirectRoutingTopology(bool useDurableExchanges, System.Func exchangeNameConvention = null, System.Func routingKeyConvention = null) { } + public void BindToDelayInfrastructure(RabbitMQ.Client.IModel channel, string address, string deliveryExchange, string routingKey) { } + public void Initialize(RabbitMQ.Client.IModel channel, System.Collections.Generic.IEnumerable receivingAddresses, System.Collections.Generic.IEnumerable sendingAddresses) { } + public void Publish(RabbitMQ.Client.IModel channel, System.Type type, NServiceBus.Transport.OutgoingMessage message, RabbitMQ.Client.IBasicProperties properties) { } + public void RawSendInCaseOfFailure(RabbitMQ.Client.IModel channel, string address, System.ReadOnlyMemory body, RabbitMQ.Client.IBasicProperties properties) { } + public void Send(RabbitMQ.Client.IModel channel, string address, NServiceBus.Transport.OutgoingMessage message, RabbitMQ.Client.IBasicProperties properties) { } + public void SetupSubscription(RabbitMQ.Client.IModel channel, NServiceBus.Unicast.Messages.MessageMetadata type, string subscriberName) { } + public void TeardownSubscription(RabbitMQ.Client.IModel channel, NServiceBus.Unicast.Messages.MessageMetadata type, string subscriberName) { } } public interface IRoutingTopology { @@ -51,7 +127,7 @@ namespace NServiceBus.Transport.RabbitMQ void Publish(RabbitMQ.Client.IModel channel, System.Type type, NServiceBus.Transport.OutgoingMessage message, RabbitMQ.Client.IBasicProperties properties); void RawSendInCaseOfFailure(RabbitMQ.Client.IModel channel, string address, System.ReadOnlyMemory body, RabbitMQ.Client.IBasicProperties properties); void Send(RabbitMQ.Client.IModel channel, string address, NServiceBus.Transport.OutgoingMessage message, RabbitMQ.Client.IBasicProperties properties); - void SetupSubscription(RabbitMQ.Client.IModel channel, System.Type type, string subscriberName); - void TeardownSubscription(RabbitMQ.Client.IModel channel, System.Type type, string subscriberName); + void SetupSubscription(RabbitMQ.Client.IModel channel, NServiceBus.Unicast.Messages.MessageMetadata type, string subscriberName); + void TeardownSubscription(RabbitMQ.Client.IModel channel, NServiceBus.Unicast.Messages.MessageMetadata type, string subscriberName); } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/ConnectionString/ConnectionConfigurationTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/ConnectionString/ConnectionConfigurationTests.cs index 55fa784f5..8df9751b3 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/ConnectionString/ConnectionConfigurationTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/ConnectionString/ConnectionConfigurationTests.cs @@ -1,47 +1,46 @@ namespace NServiceBus.Transport.RabbitMQ.Tests.ConnectionString { using System; + using System.IO; using NUnit.Framework; - using RabbitMQ; [TestFixture] public class ConnectionConfigurationTests { - const string connectionString = + static readonly string connectionString = "virtualHost=Copa;username=Copa;host=192.168.1.1:1234;password=abc_xyz;port=12345;requestedHeartbeat=3;" + - "retryDelay=01:02:03;useTls=true;certPath=/path/to/client/keycert.p12;certPassPhrase=abc123"; + $"retryDelay=01:02:03;useTls=true;certPath=..{Path.DirectorySeparatorChar}..{Path.DirectorySeparatorChar}..{Path.DirectorySeparatorChar}myp12.p12;certPassPhrase=abc123"; - const string endpointName = "endpoint"; - - ConnectionConfiguration defaults = ConnectionConfiguration.Create("host=localhost", "endpoint"); + RabbitMQTransport CreateTransportDefinition(string connectionString) + { + return new RabbitMQTransport(Topology.Conventional, connectionString); + } [Test] public void Should_correctly_parse_full_connection_string() { - var connectionConfiguration = ConnectionConfiguration.Create(connectionString, endpointName); + var connectionConfiguration = CreateTransportDefinition(connectionString); Assert.AreEqual(connectionConfiguration.Host, "192.168.1.1"); Assert.AreEqual(connectionConfiguration.Port, 1234); - Assert.AreEqual(connectionConfiguration.VirtualHost, "Copa"); + Assert.AreEqual(connectionConfiguration.VHost, "Copa"); Assert.AreEqual(connectionConfiguration.UserName, "Copa"); Assert.AreEqual(connectionConfiguration.Password, "abc_xyz"); - Assert.AreEqual(connectionConfiguration.RequestedHeartbeat, TimeSpan.FromSeconds(3)); - Assert.AreEqual(connectionConfiguration.RetryDelay, new TimeSpan(1, 2, 3)); //01:02:03 - Assert.AreEqual(connectionConfiguration.UseTls, true); - Assert.AreEqual(connectionConfiguration.CertPath, "/path/to/client/keycert.p12"); - Assert.AreEqual(connectionConfiguration.CertPassphrase, "abc123"); + Assert.AreEqual(connectionConfiguration.HeartbeatInterval, TimeSpan.FromSeconds(3)); + Assert.AreEqual(connectionConfiguration.NetworkRecoveryInterval, new TimeSpan(1, 2, 3)); //01:02:03 + Assert.AreEqual("O=Particular, S=Some-State, C=PL", connectionConfiguration.ClientCertificate.Issuer); } [Test] public void Should_fail_if_host_is_not_present() { - Assert.Throws(() => ConnectionConfiguration.Create("virtualHost=Copa;username=Copa;password=abc_xyz;port=12345;requestedHeartbeat=3", endpointName)); + Assert.Throws(() => CreateTransportDefinition("virtualHost=Copa;username=Copa;password=abc_xyz;port=12345;requestedHeartbeat=3")); } [Test] public void Should_parse_host() { - var connectionConfiguration = ConnectionConfiguration.Create("host=host.one:1001;port=1002", endpointName); + var connectionConfiguration = CreateTransportDefinition("host=host.one:1001;port=1002"); Assert.AreEqual(connectionConfiguration.Host, "host.one"); Assert.AreEqual(connectionConfiguration.Port, 1001); @@ -50,7 +49,7 @@ public void Should_parse_host() [Test] public void Should_parse_host_with_separate_port() { - var connectionConfiguration = ConnectionConfiguration.Create("host=my.host.com;port=1234", endpointName); + var connectionConfiguration = CreateTransportDefinition("host=my.host.com;port=1234"); Assert.AreEqual(connectionConfiguration.Host, "my.host.com"); Assert.AreEqual(connectionConfiguration.Port, 1234); @@ -59,16 +58,15 @@ public void Should_parse_host_with_separate_port() [Test] public void Should_parse_host_without_port() { - var connectionConfiguration = ConnectionConfiguration.Create("host=my.host.com", endpointName); + var connectionConfiguration = CreateTransportDefinition("host=my.host.com"); Assert.AreEqual(connectionConfiguration.Host, "my.host.com"); - Assert.AreEqual(connectionConfiguration.Port, 5672); } [Test] public void Should_parse_the_hostname() { - var connectionConfiguration = ConnectionConfiguration.Create("host=myHost", endpointName); + var connectionConfiguration = CreateTransportDefinition("host=myHost"); Assert.AreEqual("myHost", connectionConfiguration.Host); } @@ -76,7 +74,7 @@ public void Should_parse_the_hostname() [Test] public void Should_parse_the_password() { - var connectionConfiguration = ConnectionConfiguration.Create("host=localhost;password=test", endpointName); + var connectionConfiguration = CreateTransportDefinition("host=localhost;password=test"); Assert.AreEqual("test", connectionConfiguration.Password); } @@ -84,7 +82,7 @@ public void Should_parse_the_password() [Test] public void Should_parse_the_port() { - var connectionConfiguration = ConnectionConfiguration.Create("host=localhost;port=8181", endpointName); + var connectionConfiguration = CreateTransportDefinition("host=localhost;port=8181"); Assert.AreEqual(8181, connectionConfiguration.Port); } @@ -92,23 +90,23 @@ public void Should_parse_the_port() [Test] public void Should_parse_the_requestedHeartbeat() { - var connectionConfiguration = ConnectionConfiguration.Create("host=localhost;requestedHeartbeat=5", endpointName); + var connectionConfiguration = CreateTransportDefinition("host=localhost;requestedHeartbeat=5"); - Assert.AreEqual(TimeSpan.FromSeconds(5), connectionConfiguration.RequestedHeartbeat); + Assert.AreEqual(TimeSpan.FromSeconds(5), connectionConfiguration.HeartbeatInterval); } [Test] public void Should_parse_the_retry_delay() { - var connectionConfiguration = ConnectionConfiguration.Create("host=localhost;retryDelay=00:00:10", endpointName); + var connectionConfiguration = CreateTransportDefinition("host=localhost;retryDelay=00:00:10"); - Assert.AreEqual(TimeSpan.FromSeconds(10), connectionConfiguration.RetryDelay); + Assert.AreEqual(TimeSpan.FromSeconds(10), connectionConfiguration.NetworkRecoveryInterval); } [Test] public void Should_parse_the_username() { - var connectionConfiguration = ConnectionConfiguration.Create("host=localhost;username=test", endpointName); + var connectionConfiguration = CreateTransportDefinition("host=localhost;username=test"); Assert.AreEqual("test", connectionConfiguration.UserName); } @@ -116,125 +114,30 @@ public void Should_parse_the_username() [Test] public void Should_parse_the_virtual_hostname() { - var connectionConfiguration = ConnectionConfiguration.Create("host=localhost;virtualHost=myVirtualHost", endpointName); + var connectionConfiguration = CreateTransportDefinition("host=localhost;virtualHost=myVirtualHost"); - Assert.AreEqual("myVirtualHost", connectionConfiguration.VirtualHost); + Assert.AreEqual("myVirtualHost", connectionConfiguration.VHost); } [Test] public void Should_parse_use_tls() { - var connectionConfiguration = ConnectionConfiguration.Create("host=localhost;useTls=true", endpointName); + var connectionConfiguration = CreateTransportDefinition("host=localhost;useTls=true"); - Assert.AreEqual(true, connectionConfiguration.UseTls); - Assert.AreEqual(5671, connectionConfiguration.Port); + Assert.AreEqual(true, connectionConfiguration.UseTLS); } - [Test] public void Should_parse_the_cert_path() { - var connectionConfiguration = ConnectionConfiguration.Create("host=localhost;certPath=/path/keyfile.p12", endpointName); - - Assert.AreEqual("/path/keyfile.p12", connectionConfiguration.CertPath); - } - - [Test] - public void Should_parse_the_cert_passphrase() - { - var connectionConfiguration = ConnectionConfiguration.Create("host=localhost;certPassphrase=abc123", endpointName); + var connectionConfiguration = CreateTransportDefinition($"host=localhost;certPath=..{Path.DirectorySeparatorChar}..{Path.DirectorySeparatorChar}..{Path.DirectorySeparatorChar}myp12.p12;certPassphrase=abc123"); - Assert.AreEqual("abc123", connectionConfiguration.CertPassphrase); + Assert.AreEqual("O=Particular, S=Some-State, C=PL", connectionConfiguration.ClientCertificate.Issuer); } [Test] public void Should_throw_on_malformed_string() { - Assert.Throws(() => ConnectionConfiguration.Create("not a well formed name value pair;", endpointName)); - } - - [Test] - public void Should_list_all_invalid_options() - { - var connectionString = - "host=:notaport1,host=localhost2;" + - "port=notaport2;" + - "useTls=notusetls;" + - "requestedHeartbeat=notaheartbeat;" + - "retryDelay=notaretrydelay;" + - "usePublisherConfirms=true;" + - "prefetchcount=100;" + - "maxWaitTimeForConfirms=02:03:39;" + - "dequeuetimeout=1;"; - - var exception = Assert.Throws(() => - ConnectionConfiguration.Create(connectionString, endpointName)); - - Assert.That(exception.Message, Does.Contain("Multiple hosts are no longer supported")); - Assert.That(exception.Message, Does.Contain("consider using a load balancer")); - Assert.That(exception.Message, Does.Contain("Empty host name in 'host' connection string option.")); - Assert.That(exception.Message, Does.Contain("'notaport1' is not a valid Int32 value for the port in the 'host' connection string option.")); - Assert.That(exception.Message, Does.Contain("'notaport2' is not a valid Int32 value for the 'port' connection string option.")); - Assert.That(exception.Message, Does.Contain("'notusetls' is not a valid Boolean value for the 'useTls' connection string option.")); - Assert.That(exception.Message, Does.Contain("'notaheartbeat' is not a valid UInt16 value for the 'requestedHeartbeat' connection string option.")); - Assert.That(exception.Message, Does.Contain("'notaretrydelay' is not a valid TimeSpan value for the 'retryDelay' connection string option.")); - Assert.That(exception.Message, Does.Contain("The 'UsePublisherConfirms' connection string option has been removed")); - Assert.That(exception.Message, Does.Contain("The 'PrefetchCount' connection string option has been removed")); - Assert.That(exception.Message, Does.Contain("The 'MaxWaitTimeForConfirms' connection string option has been removed")); - Assert.That(exception.Message, Does.Contain("The 'DequeueTimeout' connection string option has been removed")); - } - - [Test] - public void Should_set_default_port() - { - Assert.AreEqual(defaults.Port, 5672); - } - - [Test] - public void Should_set_default_virtual_host() - { - Assert.AreEqual(defaults.VirtualHost, "/"); - } - - [Test] - public void Should_set_default_username() - { - Assert.AreEqual(defaults.UserName, "guest"); - } - - [Test] - public void Should_set_default_password() - { - Assert.AreEqual(defaults.Password, "guest"); - } - - [Test] - public void Should_set_default_requested_heartbeat() - { - Assert.AreEqual(defaults.RequestedHeartbeat, TimeSpan.FromSeconds(60)); - } - - [Test] - public void Should_set_default_retry_delay() - { - Assert.AreEqual(defaults.RetryDelay, TimeSpan.FromSeconds(10)); - } - - [Test] - public void Should_set_default_use_tls() - { - Assert.AreEqual(defaults.UseTls, false); - } - - [Test] - public void Should_set_default_cert_path() - { - Assert.AreEqual(defaults.CertPath, ""); - } - - [Test] - public void Should_set_default_retry_cert_passphrase() - { - Assert.AreEqual(defaults.CertPassphrase, null); + Assert.Throws(() => CreateTransportDefinition("not a well formed name value pair;")); } } } diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/ConnectionString/ConnectionConfigurationWithAmqpTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/ConnectionString/ConnectionConfigurationWithAmqpTests.cs index 142bf5bf2..765ec0e12 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/ConnectionString/ConnectionConfigurationWithAmqpTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/ConnectionString/ConnectionConfigurationWithAmqpTests.cs @@ -2,25 +2,25 @@ { using System; using NUnit.Framework; - using RabbitMQ; [TestFixture] public class ConnectionConfigurationWithAmqpTests { - const string endpointName = "endpoint"; - - ConnectionConfiguration defaults = ConnectionConfiguration.Create("amqp://guest:guest@localhost:5672/", endpointName); + RabbitMQTransport CreateTransportDefinition(string connectionString) + { + return new RabbitMQTransport(Topology.Conventional, connectionString); + } [Test] public void Should_correctly_parse_full_connection_string() { const string connectionString = "amqp://Copa:abc_xyz@192.168.1.1:5672/Copa"; - var connectionConfiguration = ConnectionConfiguration.Create(connectionString, endpointName); + var connectionConfiguration = CreateTransportDefinition(connectionString); Assert.AreEqual(connectionConfiguration.Host, "192.168.1.1"); Assert.AreEqual(connectionConfiguration.Port, 5672); - Assert.AreEqual(connectionConfiguration.VirtualHost, "Copa"); + Assert.AreEqual(connectionConfiguration.VHost, "Copa"); Assert.AreEqual(connectionConfiguration.UserName, "Copa"); Assert.AreEqual(connectionConfiguration.Password, "abc_xyz"); } @@ -28,33 +28,31 @@ public void Should_correctly_parse_full_connection_string() [Test] public void Should_fail_if_host_is_not_present() { - Assert.Throws(() => ConnectionConfiguration.Create("amqp://:1234/", endpointName)); + Assert.Throws(() => CreateTransportDefinition("amqp://:1234/")); } [TestCase("amqp", 5672U, false)] [TestCase("amqps", 5671U, true)] public void Should_determine_if_tls_should_be_used_from_connection_string(string scheme, uint port, bool useTls) { - var connectionConfiguration = ConnectionConfiguration.Create($"{scheme}://guest:guest@localhost/", endpointName); + var connectionConfiguration = CreateTransportDefinition($"{scheme}://guest:guest@localhost/"); - Assert.AreEqual(connectionConfiguration.UseTls, useTls); - Assert.AreEqual(connectionConfiguration.Port, port); + Assert.AreEqual(connectionConfiguration.UseTLS, useTls); } [Test] public void Should_use_explicit_port_setting_over_scheme_default() { - var connectionConfiguration = ConnectionConfiguration.Create("amqp://localhost:1234/", endpointName); + var connectionConfiguration = CreateTransportDefinition("amqp://localhost:1234/"); Assert.AreEqual(connectionConfiguration.Port, 1234); } [Test] public void Should_parse_host_without_port() { - var connectionConfiguration = ConnectionConfiguration.Create("amqp://my.host.com/", endpointName); + var connectionConfiguration = CreateTransportDefinition("amqp://my.host.com/"); Assert.AreEqual(connectionConfiguration.Host, "my.host.com"); - Assert.AreEqual(connectionConfiguration.Port, 5672); } [Test] @@ -62,63 +60,9 @@ public void Should_throw_on_invalid_port() { var connectionString = "amqp://localhost:notaport/"; - var exception = Assert.Throws(() => ConnectionConfiguration.Create(connectionString, endpointName)); + var exception = Assert.Throws(() => CreateTransportDefinition(connectionString)); Assert.That(exception.Message, Does.Contain("Invalid URI: Invalid port specified.")); } - - [Test] - public void Should_set_default_port() - { - Assert.AreEqual(defaults.Port, 5672); - } - - [Test] - public void Should_set_default_virtual_host() - { - Assert.AreEqual(defaults.VirtualHost, "/"); - } - - [Test] - public void Should_set_default_username() - { - Assert.AreEqual(defaults.UserName, "guest"); - } - - [Test] - public void Should_set_default_password() - { - Assert.AreEqual(defaults.Password, "guest"); - } - - [Test] - public void Should_set_default_requested_heartbeat() - { - Assert.AreEqual(defaults.RequestedHeartbeat, TimeSpan.FromSeconds(60)); - } - - [Test] - public void Should_set_default_retry_delay() - { - Assert.AreEqual(defaults.RetryDelay, TimeSpan.FromSeconds(10)); - } - - [Test] - public void Should_set_default_use_tls() - { - Assert.AreEqual(defaults.UseTls, false); - } - - [Test] - public void Should_set_default_cert_path() - { - Assert.AreEqual(defaults.CertPath, ""); - } - - [Test] - public void Should_set_default_retry_cert_passphrase() - { - Assert.AreEqual(defaults.CertPassphrase, null); - } } } diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/MessageConverterTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/MessageConverterTests.cs index 75a5d76d4..cd0b677c8 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/MessageConverterTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/MessageConverterTests.cs @@ -159,7 +159,7 @@ public bool IsUserIdPresent() } } - MessageConverter converter = new MessageConverter(); + MessageConverter converter = new MessageConverter(MessageConverter.DefaultMessageIdStrategy); [Test] public void TestCanHandleNoInterestingProperties() diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj b/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj index ac72351a1..5fd22aea8 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/NServiceBus.Transport.RabbitMQ.Tests.csproj @@ -13,7 +13,7 @@ - + diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/OutgoingMessageBuilder.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/OutgoingMessageBuilder.cs index 96df3511d..d86a6ee48 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/OutgoingMessageBuilder.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/OutgoingMessageBuilder.cs @@ -2,7 +2,6 @@ { using System; using System.Collections.Generic; - using DeliveryConstraints; using Performance.TimeToBeReceived; using Routing; @@ -24,12 +23,12 @@ public TransportOperations Build(int copies = 1) { if (eventType != null) { - transportOperations.Add(new TransportOperation(message, new MulticastAddressTag(eventType), dispatchConsistency, constraints)); + transportOperations.Add(new TransportOperation(message, new MulticastAddressTag(eventType), constraints, dispatchConsistency)); } if (!string.IsNullOrEmpty(destination)) { - transportOperations.Add(new TransportOperation(message, new UnicastAddressTag(destination), dispatchConsistency, constraints)); + transportOperations.Add(new TransportOperation(message, new UnicastAddressTag(destination), constraints, dispatchConsistency)); } } @@ -56,7 +55,7 @@ public OutgoingMessageBuilder WithHeader(string key, string value) public OutgoingMessageBuilder TimeToBeReceived(TimeSpan timeToBeReceived) { - constraints.Add(new DiscardIfNotReceivedBefore(timeToBeReceived)); + constraints.DiscardIfNotReceivedBefore = new DiscardIfNotReceivedBefore(timeToBeReceived); return this; } @@ -80,7 +79,7 @@ public OutgoingMessageBuilder WithIntent(MessageIntentEnum intent) string messageId = Guid.NewGuid().ToString(); byte[] body; Dictionary headers = new Dictionary(); - List constraints = new List(); + DispatchProperties constraints = new DispatchProperties(); DispatchConsistency dispatchConsistency = DispatchConsistency.Default; } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/RabbitMqContext.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/RabbitMqContext.cs index 4629e97e6..ff2b8c005 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/RabbitMqContext.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/RabbitMqContext.cs @@ -6,16 +6,14 @@ using System.Linq; using System.Threading.Tasks; using NUnit.Framework; - using Support; class RabbitMqContext { public virtual int MaximumConcurrency => 1; [SetUp] - public void SetUp() + public async Task SetUp() { - routingTopology = new ConventionalRoutingTopology(true); receivedMessages = new BlockingCollection(); var connectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString"); @@ -25,41 +23,46 @@ public void SetUp() throw new Exception("The 'RabbitMQTransport_ConnectionString' environment variable is not set."); } - var config = ConnectionConfiguration.Create(connectionString, ReceiverQueue); + var transport = new RabbitMQTransport(Topology.Conventional, connectionString); - connectionFactory = new ConnectionFactory(ReceiverQueue, config, default, false, false, default, default); - channelProvider = new ChannelProvider(connectionFactory, config.RetryDelay, routingTopology); - channelProvider.CreateConnection(); + connectionFactory = new ConnectionFactory(ReceiverQueue, transport.Host, transport.Port ?? 5672, + transport.VHost, transport.UserName, transport.Password, false, null, false, + false, transport.HeartbeatInterval, transport.NetworkRecoveryInterval); - messageDispatcher = new MessageDispatcher(channelProvider); - - var purger = new QueuePurger(connectionFactory); - - messagePump = new MessagePump(connectionFactory, new MessageConverter(), "Unit test", channelProvider, purger, TimeSpan.FromMinutes(2), 3, 0); - - routingTopology.Reset(connectionFactory, new[] { ReceiverQueue }.Concat(AdditionalReceiverQueues), new[] { ErrorQueue }); - - subscriptionManager = new SubscriptionManager(connectionFactory, routingTopology, ReceiverQueue); - - messagePump.Init(messageContext => + infra = await transport.Initialize(new HostSettings(ReceiverQueue, ReceiverQueue, new StartupDiagnosticEntries(), + (msg, ex) => { }, true), new[] { - receivedMessages.Add(new IncomingMessage(messageContext.MessageId, messageContext.Headers, messageContext.Body)); - return Task.CompletedTask; - }, - ErrorContext => Task.FromResult(ErrorHandleResult.Handled), - new CriticalError(_ => Task.CompletedTask), - new PushSettings(ReceiverQueue, ErrorQueue, true, TransportTransactionMode.ReceiveOnly) - ).GetAwaiter().GetResult(); - - messagePump.Start(new PushRuntimeSettings(MaximumConcurrency)); + new ReceiveSettings(ReceiverQueue, ReceiverQueue, true, true, "error") + }, AdditionalReceiverQueues.Concat(new[] { ErrorQueue }).ToArray()); + + messageDispatcher = infra.Dispatcher; + messagePump = infra.Receivers[ReceiverQueue]; + subscriptionManager = messagePump.Subscriptions; + + await messagePump.Initialize(new PushRuntimeSettings(MaximumConcurrency), + messageContext => + { + receivedMessages.Add(new IncomingMessage(messageContext.MessageId, messageContext.Headers, + messageContext.Body)); + return Task.CompletedTask; + }, ErrorContext => Task.FromResult(ErrorHandleResult.Handled) + ); + + await messagePump.StartReceive(); } [TearDown] - public void TearDown() + public async Task TearDown() { - messagePump?.Stop().GetAwaiter().GetResult(); + if (messagePump != null) + { + await messagePump.StopReceive(); + } - channelProvider?.Dispose(); + if (infra != null) + { + await infra.Shutdown(); + } } protected bool TryWaitForMessageReceipt() => TryReceiveMessage(out var _, incomingMessageTimeout); @@ -81,15 +84,14 @@ bool TryReceiveMessage(out IncomingMessage message, TimeSpan timeout) => protected const string ReceiverQueue = "testreceiver"; protected const string ErrorQueue = "error"; - protected MessageDispatcher messageDispatcher; protected ConnectionFactory connectionFactory; - protected MessagePump messagePump; - protected SubscriptionManager subscriptionManager; + protected IMessageDispatcher messageDispatcher; + protected IMessageReceiver messagePump; + protected ISubscriptionManager subscriptionManager; - ChannelProvider channelProvider; BlockingCollection receivedMessages; - ConventionalRoutingTopology routingTopology; static readonly TimeSpan incomingMessageTimeout = TimeSpan.FromSeconds(1); + TransportInfrastructure infra; } } diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/When_consuming_messages.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/When_consuming_messages.cs index e15f6d32a..05c1c387f 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/When_consuming_messages.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/When_consuming_messages.cs @@ -16,7 +16,7 @@ public async Task Should_block_until_a_message_is_available() var message = new OutgoingMessage(Guid.NewGuid().ToString(), new Dictionary(), new byte[0]); var transportOperations = new TransportOperations(new TransportOperation(message, new UnicastAddressTag(ReceiverQueue))); - await messageDispatcher.Dispatch(transportOperations, new TransportTransaction(), new ContextBag()); + await messageDispatcher.Dispatch(transportOperations, new TransportTransaction()); var receivedMessage = ReceiveMessage(); diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/When_sending_a_message_over_rabbitmq.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/When_sending_a_message_over_rabbitmq.cs index 6d2a25175..8318c66e0 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/When_sending_a_message_over_rabbitmq.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/When_sending_a_message_over_rabbitmq.cs @@ -99,13 +99,13 @@ async Task Verify(OutgoingMessageBuilder builder, Action(); - await subscriptionManager.Unsubscribe(typeof(MyEvent), new ContextBag()); + await subscriptionManager.Unsubscribe(new MessageMetadata(typeof(MyEvent)), new ContextBag()); //publish a event that that this publisher isn't subscribed to await Publish(); @@ -134,14 +136,31 @@ public async Task Should_not_receive_events_after_unsubscribing() AssertNoEventReceived(); } - Task Subscribe() => subscriptionManager.Subscribe(typeof(T), new ContextBag()); + [SetUp] + public async Task Prepare() + { + await Unsubscribe(); + await Unsubscribe(); + await Unsubscribe(); + await Unsubscribe(); + await Unsubscribe(); + await Unsubscribe(); + await Unsubscribe(); + await Unsubscribe(); + await Unsubscribe(); + await Unsubscribe(); + await Unsubscribe(); + } + + Task Subscribe() => subscriptionManager.SubscribeAll(new[] { new MessageMetadata(typeof(T)) }, new ContextBag()); + Task Unsubscribe() => subscriptionManager.Unsubscribe(new MessageMetadata(typeof(T)), new ContextBag()); Task Publish() { var type = typeof(T); var message = new OutgoingMessageBuilder().WithBody(new byte[0]).CorrelationId(type.FullName).PublishType(type).Build(); - return messageDispatcher.Dispatch(message, new TransportTransaction(), new ContextBag()); + return messageDispatcher.Dispatch(message, new TransportTransaction()); } void AssertReceived() diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/myp12.p12 b/src/NServiceBus.Transport.RabbitMQ.Tests/myp12.p12 new file mode 100644 index 000000000..6c10c3906 Binary files /dev/null and b/src/NServiceBus.Transport.RabbitMQ.Tests/myp12.p12 differ diff --git a/src/NServiceBus.Transport.RabbitMQ.TransportTests/ConfigureRabbitMQTransportInfrastructure.cs b/src/NServiceBus.Transport.RabbitMQ.TransportTests/ConfigureRabbitMQTransportInfrastructure.cs index eb88c81d5..b74d28fee 100644 --- a/src/NServiceBus.Transport.RabbitMQ.TransportTests/ConfigureRabbitMQTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.RabbitMQ.TransportTests/ConfigureRabbitMQTransportInfrastructure.cs @@ -1,20 +1,15 @@ using System; using System.Data.Common; -using System.Linq; using System.Threading.Tasks; using NServiceBus; -using NServiceBus.Settings; using NServiceBus.Transport; using NServiceBus.TransportTests; using RabbitMQ.Client; class ConfigureRabbitMQTransportInfrastructure : IConfigureTransportInfrastructure { - public TransportConfigurationResult Configure(SettingsHolder settings, TransportTransactionMode transactionMode) + public TransportDefinition CreateTransportDefinition() { - var result = new TransportConfigurationResult(); - var transport = new RabbitMQTransport(); - var connectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString"); if (string.IsNullOrEmpty(connectionString)) @@ -24,32 +19,34 @@ public TransportConfigurationResult Configure(SettingsHolder settings, Transport connectionStringBuilder = new DbConnectionStringBuilder { ConnectionString = connectionString }; - queueBindings = settings.Get(); + var transport = new RabbitMQTransport(Topology.Conventional, connectionString); + + return transport; + } - new TransportExtensions(settings).UseConventionalRoutingTopology(); - result.TransportInfrastructure = transport.Initialize(settings, connectionStringBuilder.ConnectionString); - isTransportInitialized = true; - result.PurgeInputQueueOnStartup = true; + public Task Configure(TransportDefinition transportDefinition, HostSettings hostSettings, string inputQueueName, + string errorQueueName) + { + queuesToCleanUp = new[] { inputQueueName, errorQueueName }; - transportTransactionMode = result.TransportInfrastructure.TransactionMode; - requestedTransactionMode = transactionMode; + var mainReceiverSettings = new ReceiveSettings( + "mainReceiver", + inputQueueName, + true, + true, errorQueueName); - return result; + return transportDefinition.Initialize(hostSettings, new[] { mainReceiverSettings }, new[] { errorQueueName }); } public Task Cleanup() { - if (isTransportInitialized && transportTransactionMode >= requestedTransactionMode) - { - PurgeQueues(connectionStringBuilder, queueBindings); - } - + PurgeQueues(connectionStringBuilder, queuesToCleanUp); return Task.FromResult(0); } - static void PurgeQueues(DbConnectionStringBuilder connectionStringBuilder, QueueBindings queueBindings) + static void PurgeQueues(DbConnectionStringBuilder connectionStringBuilder, string[] queues) { - if (connectionStringBuilder == null) + if (connectionStringBuilder == null || queues == null) { return; } @@ -84,8 +81,6 @@ static void PurgeQueues(DbConnectionStringBuilder connectionStringBuilder, Queue throw new Exception("The connection string doesn't contain a value for 'host'."); } - var queues = queueBindings.ReceivingAddresses.Concat(queueBindings.SendingAddresses); - using (var connection = connectionFactory.CreateConnection("Test Queue Purger")) using (var channel = connection.CreateModel()) { @@ -103,9 +98,6 @@ static void PurgeQueues(DbConnectionStringBuilder connectionStringBuilder, Queue } } + string[] queuesToCleanUp; DbConnectionStringBuilder connectionStringBuilder; - QueueBindings queueBindings; - TransportTransactionMode transportTransactionMode; - TransportTransactionMode requestedTransactionMode; - bool isTransportInitialized; } \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj b/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj index 3c5a93d5a..a9786448b 100644 --- a/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj +++ b/src/NServiceBus.Transport.RabbitMQ.TransportTests/NServiceBus.Transport.RabbitMQ.TransportTests.csproj @@ -11,7 +11,7 @@ - + diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/QueueCreator.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/QueueCreator.cs deleted file mode 100644 index 0b42423b0..000000000 --- a/src/NServiceBus.Transport.RabbitMQ/Administration/QueueCreator.cs +++ /dev/null @@ -1,34 +0,0 @@ -namespace NServiceBus.Transport.RabbitMQ -{ - using System.Threading.Tasks; - - class QueueCreator : ICreateQueues - { - readonly ConnectionFactory connectionFactory; - readonly IRoutingTopology routingTopology; - - public QueueCreator(ConnectionFactory connectionFactory, IRoutingTopology routingTopology) - { - this.connectionFactory = connectionFactory; - this.routingTopology = routingTopology; - } - - public Task CreateQueueIfNecessary(QueueBindings queueBindings, string identity) - { - using (var connection = connectionFactory.CreateAdministrationConnection()) - using (var channel = connection.CreateModel()) - { - DelayInfrastructure.Build(channel); - - routingTopology.Initialize(channel, queueBindings.ReceivingAddresses, queueBindings.SendingAddresses); - - foreach (var receivingAddress in queueBindings.ReceivingAddresses) - { - routingTopology.BindToDelayInfrastructure(channel, receivingAddress, DelayInfrastructure.DeliveryExchange, DelayInfrastructure.BindingKey(receivingAddress)); - } - } - - return Task.CompletedTask; - } - } -} \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/SubscriptionManager.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/SubscriptionManager.cs index 221528c93..b8ae8f946 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Administration/SubscriptionManager.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/SubscriptionManager.cs @@ -1,10 +1,11 @@ -namespace NServiceBus.Transport.RabbitMQ + +namespace NServiceBus.Transport.RabbitMQ { - using System; + using Unicast.Messages; using System.Threading.Tasks; using Extensibility; - class SubscriptionManager : IManageSubscriptions + class SubscriptionManager : ISubscriptionManager { readonly ConnectionFactory connectionFactory; readonly IRoutingTopology routingTopology; @@ -17,18 +18,20 @@ public SubscriptionManager(ConnectionFactory connectionFactory, IRoutingTopology this.localQueue = localQueue; } - public Task Subscribe(Type eventType, ContextBag context) + public Task SubscribeAll(MessageMetadata[] eventTypes, ContextBag context) { using (var connection = connectionFactory.CreateAdministrationConnection()) using (var channel = connection.CreateModel()) { - routingTopology.SetupSubscription(channel, eventType, localQueue); + foreach (var eventType in eventTypes) + { + routingTopology.SetupSubscription(channel, eventType, localQueue); + } } - return Task.CompletedTask; } - public Task Unsubscribe(Type eventType, ContextBag context) + public Task Unsubscribe(MessageMetadata eventType, ContextBag context) { using (var connection = connectionFactory.CreateAdministrationConnection()) using (var channel = connection.CreateModel()) diff --git a/src/NServiceBus.Transport.RabbitMQ/Configuration/AmqpConnectionString.cs b/src/NServiceBus.Transport.RabbitMQ/Configuration/AmqpConnectionString.cs new file mode 100644 index 000000000..56306cf0e --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Configuration/AmqpConnectionString.cs @@ -0,0 +1,55 @@ + +namespace NServiceBus.Transport.RabbitMQ +{ + using System; + + class AmqpConnectionString + { + public static Action Parse(string connectionString) + { + return transport => + { + var uri = new Uri(connectionString); + + transport.Host = uri.Host; + + if (!uri.IsDefaultPort) + { + transport.Port = uri.Port; + } + + transport.UseTLS = uri.Scheme == "amqps"; + + if (!string.IsNullOrEmpty(uri.UserInfo)) + { + var userPass = uri.UserInfo.Split(':'); + if (userPass.Length > 2) + { + throw new Exception($"Invalid user information: {uri.UserInfo}. Expected user and password separated by a colon."); + } + + transport.UserName = UriDecode(userPass[0]); + if (userPass.Length == 2) + { + transport.Password = UriDecode(userPass[1]); + } + } + + if (uri.Segments.Length > 2) + { + throw new Exception($"Multiple segments are not allowed in the path of an AMQP URI: {string.Join(", ", uri.Segments)}"); + } + + if (uri.Segments.Length == 2) + { + transport.VHost = UriDecode(uri.Segments[1]); + } + }; + } + + static string UriDecode(string value) + { + return Uri.UnescapeDataString(value); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ/Configuration/ConnectionConfiguration.cs b/src/NServiceBus.Transport.RabbitMQ/Configuration/ConnectionConfiguration.cs deleted file mode 100644 index 9253dfe2e..000000000 --- a/src/NServiceBus.Transport.RabbitMQ/Configuration/ConnectionConfiguration.cs +++ /dev/null @@ -1,284 +0,0 @@ -namespace NServiceBus.Transport.RabbitMQ -{ - using System; - using System.Collections.Generic; - using System.Data.Common; - using System.Diagnostics; - using System.IO; - using System.Linq; - using System.Text; - using Support; - - class ConnectionConfiguration - { - const bool defaultUseTls = false; - const int defaultPort = 5672; - const int defaultTlsPort = 5671; - const string defaultVirtualHost = "/"; - const string defaultUserName = "guest"; - const string defaultPassword = "guest"; - const ushort defaultRequestedHeartbeat = 60; - static readonly TimeSpan defaultRetryDelay = TimeSpan.FromSeconds(10); - const string defaultCertPath = ""; - const string defaultCertPassphrase = null; - - public string Host { get; } - - public int Port { get; } - - public string VirtualHost { get; } - - public string UserName { get; } - - public string Password { get; } - - public TimeSpan RequestedHeartbeat { get; } - - public TimeSpan RetryDelay { get; } - - public bool UseTls { get; } - - public string CertPath { get; } - - public string CertPassphrase { get; } - - public Dictionary ClientProperties { get; } - - ConnectionConfiguration( - string host, - int port, - string virtualHost, - string userName, - string password, - TimeSpan requestedHeartbeat, - TimeSpan retryDelay, - bool useTls, - string certPath, - string certPassphrase, - Dictionary clientProperties) - { - Host = host; - Port = port; - VirtualHost = virtualHost; - UserName = userName; - Password = password; - RequestedHeartbeat = requestedHeartbeat; - RetryDelay = retryDelay; - UseTls = useTls; - CertPath = certPath; - CertPassphrase = certPassphrase; - ClientProperties = clientProperties; - } - - public static ConnectionConfiguration Create(string connectionString, string endpointName) - { - Dictionary dictionary; - var invalidOptionsMessage = new StringBuilder(); - - if (connectionString.StartsWith("amqp", StringComparison.OrdinalIgnoreCase)) - { - dictionary = ParseAmqpConnectionString(connectionString, invalidOptionsMessage); - } - else - { - dictionary = ParseNServiceBusConnectionString(connectionString, invalidOptionsMessage); - } - - var host = GetValue(dictionary, "host", default); - var useTls = GetValue(dictionary, "useTls", bool.TryParse, defaultUseTls, invalidOptionsMessage); - var port = GetValue(dictionary, "port", int.TryParse, useTls ? defaultTlsPort : defaultPort, invalidOptionsMessage); - var virtualHost = GetValue(dictionary, "virtualHost", defaultVirtualHost); - var userName = GetValue(dictionary, "userName", defaultUserName); - var password = GetValue(dictionary, "password", defaultPassword); - - var requestedHeartbeatSeconds = GetValue(dictionary, "requestedHeartbeat", ushort.TryParse, defaultRequestedHeartbeat, invalidOptionsMessage); - var requestedHeartbeat = TimeSpan.FromSeconds(requestedHeartbeatSeconds); - - var retryDelay = GetValue(dictionary, "retryDelay", TimeSpan.TryParse, defaultRetryDelay, invalidOptionsMessage); - var certPath = GetValue(dictionary, "certPath", defaultCertPath); - var certPassPhrase = GetValue(dictionary, "certPassphrase", defaultCertPassphrase); - - if (invalidOptionsMessage.Length > 0) - { - throw new NotSupportedException(invalidOptionsMessage.ToString().TrimEnd('\r', '\n')); - } - - var nsbVersion = FileVersionInfo.GetVersionInfo(typeof(Endpoint).Assembly.Location); - var nsbFileVersion = $"{nsbVersion.FileMajorPart}.{nsbVersion.FileMinorPart}.{nsbVersion.FileBuildPart}"; - - var rabbitMQVersion = FileVersionInfo.GetVersionInfo(typeof(ConnectionConfiguration).Assembly.Location); - var rabbitMQFileVersion = $"{rabbitMQVersion.FileMajorPart}.{rabbitMQVersion.FileMinorPart}.{rabbitMQVersion.FileBuildPart}"; - - var applicationNameAndPath = Environment.GetCommandLineArgs()[0]; - var applicationName = Path.GetFileName(applicationNameAndPath); - var applicationPath = Path.GetDirectoryName(applicationNameAndPath); - - var hostname = RuntimeEnvironment.MachineName; - - var clientProperties = new Dictionary - { - { "client_api", "NServiceBus" }, - { "nservicebus_version", nsbFileVersion }, - { "nservicebus.rabbitmq_version", rabbitMQFileVersion }, - { "application", applicationName }, - { "application_location", applicationPath }, - { "machine_name", hostname }, - { "user", userName }, - { "endpoint_name", endpointName }, - }; - - return new ConnectionConfiguration( - host, port, virtualHost, userName, password, requestedHeartbeat, retryDelay, useTls, certPath, certPassPhrase, clientProperties); - } - - static Dictionary ParseAmqpConnectionString(string connectionString, StringBuilder invalidOptionsMessage) - { - var dictionary = new Dictionary(); - var uri = new Uri(connectionString); - - var usingTls = string.Equals("amqps", uri.Scheme, StringComparison.OrdinalIgnoreCase) ? bool.TrueString : bool.FalseString; - dictionary.Add("useTls", usingTls); - - dictionary.Add("host", uri.Host); - - if (!uri.IsDefaultPort) - { - dictionary.Add("port", uri.Port.ToString()); - } - - if (!string.IsNullOrEmpty(uri.UserInfo)) - { - var userPass = uri.UserInfo.Split(':'); - - if (userPass.Length > 2) - { - invalidOptionsMessage.AppendLine($"Bad user info in AMQP URI: {uri.UserInfo}"); - } - else - { - dictionary.Add("userName", UriDecode(userPass[0])); - - if (userPass.Length == 2) - { - dictionary.Add("password", UriDecode(userPass[1])); - } - } - } - - if (uri.Segments.Length > 2) - { - invalidOptionsMessage.AppendLine($"Multiple segments in path of AMQP URI: {string.Join(", ", uri.Segments)}"); - } - else if (uri.Segments.Length == 2) - { - dictionary.Add("virtualHost", UriDecode(uri.Segments[1])); - } - - return dictionary; - } - - static Dictionary ParseNServiceBusConnectionString(string connectionString, StringBuilder invalidOptionsMessage) - { - var dictionary = new DbConnectionStringBuilder { ConnectionString = connectionString } - .OfType>() - .ToDictionary(pair => pair.Key, pair => pair.Value.ToString(), StringComparer.OrdinalIgnoreCase); - - RegisterDeprecatedSettingsAsInvalidOptions(dictionary, invalidOptionsMessage); - - if (dictionary.TryGetValue("port", out var portValue) && !int.TryParse(portValue, out var port)) - { - invalidOptionsMessage.AppendLine($"'{portValue}' is not a valid Int32 value for the 'port' connection string option."); - } - - if (dictionary.TryGetValue("host", out var value)) - { - var firstHostAndPort = value.Split(',')[0]; - var parts = firstHostAndPort.Split(':'); - var host = parts.ElementAt(0); - - if (host.Length == 0) - { - invalidOptionsMessage.AppendLine("Empty host name in 'host' connection string option."); - } - - dictionary["host"] = host; - - if (parts.Length > 1) - { - if (!int.TryParse(parts[1], out port)) - { - invalidOptionsMessage.AppendLine($"'{parts[1]}' is not a valid Int32 value for the port in the 'host' connection string option."); - } - else - { - dictionary["port"] = port.ToString(); - } - } - } - else - { - invalidOptionsMessage.AppendLine("Invalid connection string. 'host' value must be supplied. e.g: \"host=myServer\""); - } - - return dictionary; - } - - static void RegisterDeprecatedSettingsAsInvalidOptions(Dictionary dictionary, StringBuilder invalidOptionsMessage) - { - if (dictionary.TryGetValue("host", out var value)) - { - var hostsAndPorts = value.Split(','); - - if (hostsAndPorts.Length > 1) - { - invalidOptionsMessage.AppendLine("Multiple hosts are no longer supported. If using RabbitMQ in a cluster, consider using a load balancer to represent the nodes as a single host."); - } - } - - if (dictionary.ContainsKey("dequeuetimeout")) - { - invalidOptionsMessage.AppendLine("The 'DequeueTimeout' connection string option has been removed. Consult the documentation for further information."); - } - - if (dictionary.ContainsKey("maxwaittimeforconfirms")) - { - invalidOptionsMessage.AppendLine("The 'MaxWaitTimeForConfirms' connection string option has been removed. Consult the documentation for further information."); - } - - if (dictionary.ContainsKey("prefetchcount")) - { - invalidOptionsMessage.AppendLine("The 'PrefetchCount' connection string option has been removed. Use 'EndpointConfiguration.UseTransport().PrefetchCount' instead."); - } - - if (dictionary.ContainsKey("usepublisherconfirms")) - { - invalidOptionsMessage.AppendLine("The 'UsePublisherConfirms' connection string option has been removed. Consult the documentation for further information."); - } - } - - static string GetValue(Dictionary dictionary, string key, string defaultValue) - { - return dictionary.TryGetValue(key, out var value) ? value : defaultValue; - } - - static string UriDecode(string value) - { - return Uri.UnescapeDataString(value); - } - - static T GetValue(Dictionary dictionary, string key, Convert convert, T defaultValue, StringBuilder invalidOptionsMessage) - { - if (dictionary.TryGetValue(key, out var value)) - { - if (!convert(value, out defaultValue)) - { - invalidOptionsMessage.AppendLine($"'{value}' is not a valid {typeof(T).Name} value for the '{key}' connection string option."); - } - } - - return defaultValue; - } - - delegate bool Convert(string input, out T output); - } -} diff --git a/src/NServiceBus.Transport.RabbitMQ/Configuration/NServiceBusConnectionString.cs b/src/NServiceBus.Transport.RabbitMQ/Configuration/NServiceBusConnectionString.cs new file mode 100644 index 000000000..0d79e3df0 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Configuration/NServiceBusConnectionString.cs @@ -0,0 +1,153 @@ +namespace NServiceBus.Transport.RabbitMQ +{ + using System; + using System.Collections.Generic; + using System.Data.Common; + using System.Linq; + using System.Security.Cryptography.X509Certificates; + + static class NServiceBusConnectionString + { + public static Action Parse(string connectionString) + { + var dictionary1 = new DbConnectionStringBuilder { ConnectionString = connectionString } + .OfType>() + .ToDictionary(pair => pair.Key, pair => pair.Value.ToString(), StringComparer.OrdinalIgnoreCase); + + CheckDeprecatedSettings(dictionary1); + var dictionary = dictionary1; + + return transport => + { + if (dictionary.TryGetValue("port", out var portString)) + { + if (!int.TryParse(portString, out var port)) + { + throw new Exception($"'{portString}' is not a valid value for the 'port' connection string option."); + } + + transport.Port = port; + } + + if (dictionary.TryGetValue("host", out var host)) + { + if (host.Contains(":")) + { + var hostAndPort = host.Split(new[] { ':' }, StringSplitOptions.RemoveEmptyEntries); + if (hostAndPort.Length > 2) + { + throw new Exception($"'{host}' is not a valid value for the 'host' connection string option."); + } + + transport.Host = hostAndPort[0]; + if (!int.TryParse(hostAndPort[1], out var port)) + { + throw new Exception($"'{host}' is not a valid value for the 'host' connection string option."); + } + + transport.Port = port; + } + else + { + transport.Host = host; + } + } + else + { + throw new Exception("Missing required 'host' connection string option."); + } + + if (dictionary.TryGetValue("virtualHost", out var vhost)) + { + transport.VHost = vhost; + } + + if (dictionary.TryGetValue("userName", out var userName)) + { + transport.UserName = userName; + } + + if (dictionary.TryGetValue("password", out var password)) + { + transport.Password = password; + } + + if (dictionary.TryGetValue("retryDelay", out var retryDelayString)) + { + if (!TimeSpan.TryParse(retryDelayString, out var retryDelay)) + { + throw new Exception($"'{retryDelayString}' is not a valid value for the 'retryDelay' connection string option."); + } + + transport.NetworkRecoveryInterval = retryDelay; + } + + if (dictionary.TryGetValue("requestedHeartbeat", out var heartbeatString)) + { + if (int.TryParse(heartbeatString, out var heartbeatSeconds)) + { + transport.HeartbeatInterval = TimeSpan.FromSeconds(heartbeatSeconds); + } + else if (TimeSpan.TryParse(heartbeatString, out var heartbeatTimeSpan)) + { + transport.HeartbeatInterval = heartbeatTimeSpan; + } + else + { + throw new Exception($"'{heartbeatString}' is not a valid value for the 'requestedHeartbeat' connection string option."); + } + } + + if (dictionary.TryGetValue("certPath", out var certPath) + && dictionary.TryGetValue("certPassphrase", out var passPhrase)) + { + transport.ClientCertificate = new X509Certificate2(certPath, passPhrase); + } + + if (dictionary.TryGetValue("useTls", out var useTlsString)) + { + if (!bool.TryParse(useTlsString, out var useTls)) + { + throw new Exception($"'{useTlsString}' is not a valid value for the 'useTls' connection string option."); + } + + transport.UseTLS = useTls; + } + }; + } + + + static void CheckDeprecatedSettings(Dictionary dictionary) + { + if (dictionary.TryGetValue("host", out var value)) + { + var hostsAndPorts = value.Split(','); + + if (hostsAndPorts.Length > 1) + { + throw new Exception("Multiple hosts are no longer supported. If using RabbitMQ in a cluster, consider using a load balancer to represent the nodes as a single host."); + } + } + + if (dictionary.ContainsKey("dequeuetimeout")) + { + throw new Exception("The 'DequeueTimeout' connection string option has been removed. Consult the documentation for further information."); + } + + if (dictionary.ContainsKey("maxwaittimeforconfirms")) + { + throw new Exception("The 'MaxWaitTimeForConfirms' connection string option has been removed. Consult the documentation for further information."); + } + + if (dictionary.ContainsKey("prefetchcount")) + { + throw new Exception("The 'PrefetchCount' connection string option has been removed. Use 'EndpointConfiguration.UseTransport().PrefetchCount' instead."); + } + + if (dictionary.ContainsKey("usepublisherconfirms")) + { + throw new Exception("The 'UsePublisherConfirms' connection string option has been removed. Consult the documentation for further information."); + } + } + } +} diff --git a/src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs b/src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs deleted file mode 100644 index 1b0b61734..000000000 --- a/src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs +++ /dev/null @@ -1,224 +0,0 @@ -namespace NServiceBus -{ - using System; - using System.Security.Cryptography.X509Certificates; - using Configuration.AdvancedExtensibility; - using RabbitMQ.Client.Events; - using Transport.RabbitMQ; - - /// - /// Adds access to the RabbitMQ transport config to the global Transports object. - /// - public static partial class RabbitMQTransportSettingsExtensions - { - /// - /// Registers a custom routing topology. - /// - /// The transport configuration object - /// The function used to create the routing topology instance. The parameter of the function indicates whether exchanges and queues declared by the routing topology should be durable. - public static TransportExtensions UseCustomRoutingTopology(this TransportExtensions transportExtensions, Func topologyFactory) - { - Guard.AgainstNull(nameof(transportExtensions), transportExtensions); - Guard.AgainstNull(nameof(topologyFactory), topologyFactory); - - transportExtensions.GetSettings().Set(topologyFactory); - - return transportExtensions; - } - - /// - /// Uses the conventional routing topology. - /// - /// The transport configuration object - public static TransportExtensions UseConventionalRoutingTopology(this TransportExtensions transportExtensions) - { - Guard.AgainstNull(nameof(transportExtensions), transportExtensions); - - return transportExtensions.UseCustomRoutingTopology(durable => new ConventionalRoutingTopology(durable)); - } - - /// - /// Uses the direct routing topology with the specified conventions. - /// - /// The transport configuration object - /// The routing key convention. - /// The exchange name convention. - public static TransportExtensions UseDirectRoutingTopology(this TransportExtensions transportExtensions, Func routingKeyConvention = null, Func exchangeNameConvention = null) - { - Guard.AgainstNull(nameof(transportExtensions), transportExtensions); - - if (routingKeyConvention == null) - { - routingKeyConvention = DefaultRoutingKeyConvention.GenerateRoutingKey; - } - - if (exchangeNameConvention == null) - { - exchangeNameConvention = () => "amq.topic"; - } - - return transportExtensions.UseCustomRoutingTopology(durable => new DirectRoutingTopology(new DirectRoutingTopology.Conventions(exchangeNameConvention, routingKeyConvention), durable)); - } - - /// - /// Allows the user to control how the message ID is determined. Mostly useful when doing native integration with non-NSB endpoints. - /// - /// The transport configuration object - /// The user-defined strategy for giving the message a unique ID. - public static TransportExtensions CustomMessageIdStrategy(this TransportExtensions transportExtensions, Func customIdStrategy) - { - Guard.AgainstNull(nameof(transportExtensions), transportExtensions); - Guard.AgainstNull(nameof(customIdStrategy), customIdStrategy); - - transportExtensions.GetSettings().Set(SettingsKeys.CustomMessageIdStrategy, customIdStrategy); - - return transportExtensions; - } - - /// - /// Overrides the default time to wait before triggering a circuit breaker that initiates the endpoint shutdown procedure when the message pump's connection to the broker is lost and cannot be recovered. - /// - /// The transport configuration object - /// The time to wait before triggering the circuit breaker. - public static TransportExtensions TimeToWaitBeforeTriggeringCircuitBreaker(this TransportExtensions transportExtensions, TimeSpan waitTime) - { - Guard.AgainstNull(nameof(transportExtensions), transportExtensions); - Guard.AgainstNegativeAndZero(nameof(waitTime), waitTime); - - transportExtensions.GetSettings().Set(SettingsKeys.TimeToWaitBeforeTriggeringCircuitBreaker, waitTime); - - return transportExtensions; - } - - /// - /// Specifies the multiplier to apply to the maximum concurrency value to calculate the prefetch count. - /// - /// The transport configuration object - /// The multiplier value to use in the prefetch calculation. - public static TransportExtensions PrefetchMultiplier(this TransportExtensions transportExtensions, int prefetchMultiplier) - { - Guard.AgainstNull(nameof(transportExtensions), transportExtensions); - Guard.AgainstNegativeAndZero(nameof(prefetchMultiplier), prefetchMultiplier); - - transportExtensions.GetSettings().Set(SettingsKeys.PrefetchMultiplier, prefetchMultiplier); - - return transportExtensions; - } - - /// - /// Overrides the default prefetch count calculation with the specified value. - /// - /// The transport configuration object - /// The prefetch count to use. - public static TransportExtensions PrefetchCount(this TransportExtensions transportExtensions, ushort prefetchCount) - { - Guard.AgainstNull(nameof(transportExtensions), transportExtensions); - - transportExtensions.GetSettings().Set(SettingsKeys.PrefetchCount, prefetchCount); - - return transportExtensions; - } - - /// - /// Specifies the certificate to use for client authentication when connecting to the broker via TLS. - /// - /// The transport configuration object - /// The certificate to use for client authentication. - public static TransportExtensions SetClientCertificate(this TransportExtensions transportExtensions, X509Certificate2 clientCertificate) - { - Guard.AgainstNull(nameof(transportExtensions), transportExtensions); - Guard.AgainstNull(nameof(clientCertificate), clientCertificate); - - transportExtensions.GetSettings().Set(SettingsKeys.ClientCertificateCollection, new X509Certificate2Collection(clientCertificate)); - - return transportExtensions; - } - - /// - /// Specifies the certificate to use for client authentication when connecting to the broker via TLS. - /// - /// The transport configuration object - /// The path to the certificate file. - /// The password for the certificate specified in . - public static TransportExtensions SetClientCertificate(this TransportExtensions transportExtensions, string path, string password) - { - Guard.AgainstNull(nameof(transportExtensions), transportExtensions); - Guard.AgainstNullAndEmpty(nameof(path), path); - Guard.AgainstNullAndEmpty(nameof(password), password); - - transportExtensions.GetSettings().Set(SettingsKeys.ClientCertificateCollection, new X509Certificate2Collection(new X509Certificate2(path, password))); - - return transportExtensions; - } - - /// - /// Disables all remote certificate validation when connecting to the broker via TLS. - /// - /// The transport configuration object - public static TransportExtensions DisableRemoteCertificateValidation(this TransportExtensions transportExtensions) - { - Guard.AgainstNull(nameof(transportExtensions), transportExtensions); - - transportExtensions.GetSettings().Set(SettingsKeys.DisableRemoteCertificateValidation, true); - - return transportExtensions; - } - - /// - /// Specifies that an external authentication mechanism should be used for client authentication. - /// - /// The transport configuration object - public static TransportExtensions UseExternalAuthMechanism(this TransportExtensions transportExtensions) - { - Guard.AgainstNull(nameof(transportExtensions), transportExtensions); - - transportExtensions.GetSettings().Set(SettingsKeys.UseExternalAuthMechanism, true); - - return transportExtensions; - } - - - /// - /// Specifies that exchanges and queues should be declared as non-durable. - /// - /// The transport configuration object - public static TransportExtensions DisableDurableExchangesAndQueues(this TransportExtensions transportExtensions) - { - Guard.AgainstNull(nameof(transportExtensions), transportExtensions); - - transportExtensions.GetSettings().Set(SettingsKeys.UseDurableExchangesAndQueues, false); - - return transportExtensions; - } - - /// - /// Sets the interval for heartbeats between the endpoint and the broker. - /// - /// The transport configuration object - /// The time interval to use. - public static TransportExtensions SetHeartbeatInterval(this TransportExtensions transportExtensions, TimeSpan heartbeatInterval) - { - Guard.AgainstNull(nameof(transportExtensions), transportExtensions); - Guard.AgainstNegativeAndZero(nameof(heartbeatInterval), heartbeatInterval); - - transportExtensions.GetSettings().Set(SettingsKeys.HeartbeatInterval, heartbeatInterval); - - return transportExtensions; - } - - /// - /// Sets the time to wait between attempts to reconnect to the broker if the connection is lost. - /// - /// The transport configuration object - /// The time interval to use. - public static TransportExtensions SetNetworkRecoveryInterval(this TransportExtensions transportExtensions, TimeSpan networkRecoveryInterval) - { - Guard.AgainstNull(nameof(transportExtensions), transportExtensions); - Guard.AgainstNegativeAndZero(nameof(networkRecoveryInterval), networkRecoveryInterval); - - transportExtensions.GetSettings().Set(SettingsKeys.NetworkRecoveryInterval, networkRecoveryInterval); - - return transportExtensions; - } - } -} \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ/Configuration/SettingsKeys.cs b/src/NServiceBus.Transport.RabbitMQ/Configuration/SettingsKeys.cs deleted file mode 100644 index 20fb06898..000000000 --- a/src/NServiceBus.Transport.RabbitMQ/Configuration/SettingsKeys.cs +++ /dev/null @@ -1,16 +0,0 @@ -namespace NServiceBus.Transport.RabbitMQ -{ - static class SettingsKeys - { - public const string CustomMessageIdStrategy = "RabbitMQ.CustomMessageIdStrategy"; - public const string TimeToWaitBeforeTriggeringCircuitBreaker = "RabbitMQ.TimeToWaitBeforeTriggeringCircuitBreaker"; - public const string PrefetchMultiplier = "RabbitMQ.PrefetchMultiplier"; - public const string PrefetchCount = "RabbitMQ.PrefetchCount"; - public const string ClientCertificateCollection = "RabbitMQ.ClientCertificateCollection"; - public const string DisableRemoteCertificateValidation = "RabbitMQ.DisableRemoteCertificateValidation"; - public const string UseExternalAuthMechanism = "RabbitMQ.UseExternalAuthMechanism"; - public const string UseDurableExchangesAndQueues = "RabbitMQ.UseDurableExchangesAndQueues"; - public const string HeartbeatInterval = "RabbitMQ.HeartbeatInterval"; - public const string NetworkRecoveryInterval = "RabbitMQ.NetworkRecoveryInterval"; - } -} diff --git a/src/NServiceBus.Transport.RabbitMQ/Connection/ConnectionFactory.cs b/src/NServiceBus.Transport.RabbitMQ/Connection/ConnectionFactory.cs index 462f6cae2..f5c8b1e2e 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Connection/ConnectionFactory.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Connection/ConnectionFactory.cs @@ -1,11 +1,14 @@ namespace NServiceBus.Transport.RabbitMQ { using System; + using System.Diagnostics; + using System.IO; using System.Net.Security; using System.Security.Authentication; using System.Security.Cryptography.X509Certificates; using global::RabbitMQ.Client; using Logging; + using Support; class ConnectionFactory { @@ -15,7 +18,7 @@ class ConnectionFactory readonly global::RabbitMQ.Client.ConnectionFactory connectionFactory; readonly object lockObject = new object(); - public ConnectionFactory(string endpointName, ConnectionConfiguration connectionConfiguration, X509Certificate2Collection clientCertificateCollection, bool disableRemoteCertificateValidation, bool useExternalAuthMechanism, TimeSpan? heartbeatInterval, TimeSpan? networkRecoveryInterval) + public ConnectionFactory(string endpointName, string host, int port, string vhost, string userName, string password, bool useTls, X509Certificate2Collection clientCertificateCollection, bool validateRemoteCertificate, bool useExternalAuthMechanism, TimeSpan heartbeatInterval, TimeSpan networkRecoveryInterval) { if (endpointName is null) { @@ -29,37 +32,25 @@ public ConnectionFactory(string endpointName, ConnectionConfiguration connection this.endpointName = endpointName; - if (connectionConfiguration == null) - { - throw new ArgumentNullException(nameof(connectionConfiguration)); - } - - if (connectionConfiguration.Host == null) - { - throw new ArgumentException("The connectionConfiguration has a null Host.", nameof(connectionConfiguration)); - } - connectionFactory = new global::RabbitMQ.Client.ConnectionFactory { - HostName = connectionConfiguration.Host, - Port = connectionConfiguration.Port, - VirtualHost = connectionConfiguration.VirtualHost, - UserName = connectionConfiguration.UserName, - Password = connectionConfiguration.Password, - RequestedHeartbeat = heartbeatInterval ?? connectionConfiguration.RequestedHeartbeat, - NetworkRecoveryInterval = networkRecoveryInterval ?? connectionConfiguration.RetryDelay, + HostName = host, + Port = port, + VirtualHost = vhost, + UserName = userName, + Password = password, + RequestedHeartbeat = heartbeatInterval, + NetworkRecoveryInterval = networkRecoveryInterval, UseBackgroundThreadsForIO = true, DispatchConsumersAsync = true }; - connectionFactory.Ssl.ServerName = connectionConfiguration.Host; + connectionFactory.Ssl.ServerName = host; connectionFactory.Ssl.Certs = clientCertificateCollection; - connectionFactory.Ssl.CertPath = connectionConfiguration.CertPath; - connectionFactory.Ssl.CertPassphrase = connectionConfiguration.CertPassphrase; connectionFactory.Ssl.Version = SslProtocols.Tls12; - connectionFactory.Ssl.Enabled = connectionConfiguration.UseTls; + connectionFactory.Ssl.Enabled = useTls; - if (disableRemoteCertificateValidation) + if (!validateRemoteCertificate) { connectionFactory.Ssl.AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateChainErrors | SslPolicyErrors.RemoteCertificateNameMismatch | @@ -71,12 +62,34 @@ public ConnectionFactory(string endpointName, ConnectionConfiguration connection connectionFactory.AuthMechanisms = new[] { new ExternalMechanismFactory() }; } + SetClientProperties(endpointName, userName); + } + + void SetClientProperties(string endpointName, string userName) + { connectionFactory.ClientProperties.Clear(); - foreach (var item in connectionConfiguration.ClientProperties) - { - connectionFactory.ClientProperties.Add(item.Key, item.Value); - } + var nsbVersion = FileVersionInfo.GetVersionInfo(typeof(Endpoint).Assembly.Location); + var nsbFileVersion = $"{nsbVersion.FileMajorPart}.{nsbVersion.FileMinorPart}.{nsbVersion.FileBuildPart}"; + + var rabbitMQVersion = FileVersionInfo.GetVersionInfo(typeof(ConnectionFactory).Assembly.Location); + var rabbitMQFileVersion = + $"{rabbitMQVersion.FileMajorPart}.{rabbitMQVersion.FileMinorPart}.{rabbitMQVersion.FileBuildPart}"; + + var applicationNameAndPath = Environment.GetCommandLineArgs()[0]; + var applicationName = Path.GetFileName(applicationNameAndPath); + var applicationPath = Path.GetDirectoryName(applicationNameAndPath); + + var hostname = RuntimeEnvironment.MachineName; + + connectionFactory.ClientProperties.Add("client_api", "NServiceBus"); + connectionFactory.ClientProperties.Add("nservicebus_version", nsbFileVersion); + connectionFactory.ClientProperties.Add("nservicebus.rabbitmq_version", rabbitMQFileVersion); + connectionFactory.ClientProperties.Add("application", applicationName); + connectionFactory.ClientProperties.Add("application_location", applicationPath); + connectionFactory.ClientProperties.Add("machine_name", hostname); + connectionFactory.ClientProperties.Add("user", userName); + connectionFactory.ClientProperties.Add("endpoint_name", endpointName); } public IConnection CreatePublishConnection() => CreateConnection($"{endpointName} Publish", false); diff --git a/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj b/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj index 8a02c9167..93d9f1173 100644 --- a/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj +++ b/src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/NServiceBus.Transport.RabbitMQ/PrefetchCountCalculation.cs b/src/NServiceBus.Transport.RabbitMQ/PrefetchCountCalculation.cs new file mode 100644 index 000000000..7681e23c6 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/PrefetchCountCalculation.cs @@ -0,0 +1,9 @@ +namespace NServiceBus +{ + /// + /// Calculates the value for the prefetch count based on the endpoint's maximum concurrency setting. + /// + /// Maximum concurrency of the message receiver. + /// The prefetch count to use for the receiver. + public delegate int PrefetchCountCalculation(int maximumConcurrency); +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs index 74bd07415..039b2a409 100644 --- a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs +++ b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs @@ -1,25 +1,287 @@ namespace NServiceBus { - using Settings; - using Transport.RabbitMQ; + using System; + using System.Collections.Generic; + using System.Linq; + using System.Security.Cryptography.X509Certificates; + using System.Text; + using System.Threading.Tasks; + using RabbitMQ.Client; + using RabbitMQ.Client.Events; using Transport; + using Transport.RabbitMQ; + using ConnectionFactory = Transport.RabbitMQ.ConnectionFactory; /// - /// Transport definition for RabbitMQ. + /// Transport definition for RabbitMQ. /// public class RabbitMQTransport : TransportDefinition { + static readonly TransportTransactionMode[] SupportedTransactionModes = + { + TransportTransactionMode.None, TransportTransactionMode.ReceiveOnly + }; + + TimeSpan heartbeatInterval = TimeSpan.FromMinutes(1); + string host; + Func messageIdStrategy = MessageConverter.DefaultMessageIdStrategy; + TimeSpan networkRecoveryInterval = TimeSpan.FromSeconds(10); + PrefetchCountCalculation prefetchCountCalculation = maxConcurrency => 3 * maxConcurrency; + + TimeSpan timeToWaitBeforeTriggeringCircuitBreaker = TimeSpan.FromMinutes(2); + + /// + /// Creates new instance of the RabbitMQ transport. + /// + /// The built-in topology to use. + /// Connection string. + public RabbitMQTransport(Topology topology, string connectionString) + : this(GetBuiltInTopology(topology), connectionString) + { + } + + static IRoutingTopology GetBuiltInTopology(Topology topology) + { + return topology == Topology.Conventional + ? (IRoutingTopology)new ConventionalRoutingTopology(true) + : new DirectRoutingTopology(true); + } + + /// + /// Creates new instance of the RabbitMQ transport. + /// + /// The custom topology to use. + /// Connection string. + public RabbitMQTransport(IRoutingTopology topology, string connectionString) + : base(TransportTransactionMode.ReceiveOnly, true, true, true) + { + Guard.AgainstNull(nameof(topology), topology); + Guard.AgainstNull(nameof(connectionString), connectionString); + + RoutingTopology = topology; + if (connectionString.StartsWith("amqp", StringComparison.OrdinalIgnoreCase)) + { + AmqpConnectionString.Parse(connectionString)(this); + } + else + { + NServiceBusConnectionString.Parse(connectionString)(this); + } + } + + /// + /// The host to connect to. + /// + public string Host + { + get => host; + set + { + Guard.AgainstNullAndEmpty("value", value); + host = value; + } + } + + /// + /// The port to connect to. + /// If not specified, the default port will be used (5672 if not encrypted and 5671 if using TLS) + /// + public int? Port { get; set; } + + /// + /// The vhost to connect to. + /// + public string VHost { get; set; } = "/"; + + /// + /// The user name to pass to the broker for authentication. + /// + public string UserName { get; set; } = "guest"; + + /// + /// The password to pass to the broker for authentication. + /// + public string Password { get; set; } = "guest"; + + /// + /// The routing topology to use. If not set the conventional routing topology will be used + /// . + /// + public IRoutingTopology RoutingTopology { get; set; } + + /// + /// The strategy for deriving the message ID from the raw RabbitMQ message. Override in case of native integration when + /// the sender + /// of the message is not an NServiceBus endpoint. + /// + public Func MessageIdStrategy + { + get => messageIdStrategy; + set + { + Guard.AgainstNull("value", value); + messageIdStrategy = value; + } + } + + /// + /// Time to wait before triggering a circuit breaker that initiates the endpoint shutdown procedure when the + /// message pump's connection to the broker is lost and cannot be recovered. + /// + public TimeSpan TimeToWaitBeforeTriggeringCircuitBreaker + { + get => timeToWaitBeforeTriggeringCircuitBreaker; + set + { + Guard.AgainstNegativeAndZero("value", value); + timeToWaitBeforeTriggeringCircuitBreaker = value; + } + } + /// - /// Initializes all the factories and supported features for the transport. + /// The calculation method for prefetch count. By default 3 times the maximum concurrency value. + /// The argument for the callback is the maximum concurrency. The result needs to be a positive integer value. /// - /// An instance of the current settings. - /// The connection string. - /// The supported factories. - public override TransportInfrastructure Initialize(SettingsHolder settings, string connectionString) => new RabbitMQTransportInfrastructure(settings, connectionString); + public PrefetchCountCalculation PrefetchCountCalculation + { + get => prefetchCountCalculation; + set + { + Guard.AgainstNull("value", value); + prefetchCountCalculation = value; + } + } + + /// + /// Configures if the client should use TLS-secured connection. + /// + public bool UseTLS { get; set; } + + /// + /// The certificate to use for client authentication when connecting to the broker via TLS. + /// + public X509Certificate2 ClientCertificate { get; set; } + + /// + /// Should the client validate the broker certificate when connecting via TLS. + /// + public bool ValidateRemoteCertificate { get; set; } = true; + + /// + /// Specifies if an external authentication mechanism should be used for client authentication. + /// + public bool UseExternalAuthMechanism { get; set; } = false; + + /// + /// The interval for heartbeats between the endpoint and the broker. + /// + public TimeSpan HeartbeatInterval + { + get => heartbeatInterval; + set + { + Guard.AgainstNegativeAndZero("value", value); + heartbeatInterval = value; + } + } + + /// + /// The time to wait between attempts to reconnect to the broker if the connection is lost. + /// + public TimeSpan NetworkRecoveryInterval + { + get => networkRecoveryInterval; + set + { + Guard.AgainstNegativeAndZero("value", value); + networkRecoveryInterval = value; + } + } + + int DefaultPort => UseTLS ? 5671 : 5672; + + /// + /// Initializes all the factories and supported features for the transport. This method is called right before all + /// features + /// are activated and the settings will be locked down. This means you can use the SettingsHolder both for providing + /// default capabilities as well as for initializing the transport's configuration based on those settings (the user + /// cannot + /// provide information anymore at this stage). + /// + public override async Task Initialize(HostSettings hostSettings, + ReceiveSettings[] receivers, string[] sendingAddresses) + { + X509Certificate2Collection certCollection = null; + if (ClientCertificate != null) + { + certCollection = new X509Certificate2Collection(ClientCertificate); + } + + var connectionFactory = new ConnectionFactory(hostSettings.Name, Host, Port ?? DefaultPort, + VHost, UserName, Password, UseTLS, certCollection, ValidateRemoteCertificate, + UseExternalAuthMechanism, HeartbeatInterval, NetworkRecoveryInterval); + + var channelProvider = new ChannelProvider(connectionFactory, NetworkRecoveryInterval, RoutingTopology); + channelProvider.CreateConnection(); + + var converter = new MessageConverter(MessageIdStrategy); + + if (hostSettings.SetupInfrastructure) + { + string[] receivingAddresses = receivers.Select(x => x.ReceiveAddress).ToArray(); + await SetupInfrastructure(receivingAddresses, sendingAddresses, connectionFactory) + .ConfigureAwait(false); + } + + return new RabbitMQTransportInfrastructure(hostSettings, receivers, connectionFactory, + RoutingTopology, channelProvider, converter, TimeToWaitBeforeTriggeringCircuitBreaker, + PrefetchCountCalculation); + } + + Task SetupInfrastructure(string[] receivingQueues, string[] sendingQueues, ConnectionFactory connectionFactory) + { + using (IConnection connection = connectionFactory.CreateAdministrationConnection()) + using (IModel channel = connection.CreateModel()) + { + DelayInfrastructure.Build(channel); + + RoutingTopology.Initialize(channel, receivingQueues, sendingQueues); + + foreach (string receivingAddress in receivingQueues) + { + RoutingTopology.BindToDelayInfrastructure(channel, receivingAddress, + DelayInfrastructure.DeliveryExchange, DelayInfrastructure.BindingKey(receivingAddress)); + } + } + + return Task.CompletedTask; + } + + + /// + /// Translates a object into a transport specific queue + /// address-string. + /// + public override string ToTransportAddress(QueueAddress address) + { + var queue = new StringBuilder(address.BaseAddress); + if (address.Discriminator != null) + { + queue.Append("-" + address.Discriminator); + } + + if (address.Qualifier != null) + { + queue.Append("." + address.Qualifier); + } + + return queue.ToString(); + } /// - /// Gets an example connection string to use when reporting the lack of a configured connection string to the user. + /// Returns a list of all supported transaction modes of this transport. /// - public override string ExampleConnectionStringForErrorMessage => "amqp://localhost"; + public override IReadOnlyCollection GetSupportedTransactionModes() => + SupportedTransactionModes; } -} +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs index 56d0e966d..2edadc126 100644 --- a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs @@ -1,155 +1,44 @@ namespace NServiceBus.Transport.RabbitMQ { + using System.Linq; using System; - using System.Collections.Generic; - using System.Security.Cryptography.X509Certificates; - using System.Text; using System.Threading.Tasks; - using DelayedDelivery; - using global::RabbitMQ.Client.Events; - using Performance.TimeToBeReceived; - using Routing; - using Settings; sealed class RabbitMQTransportInfrastructure : TransportInfrastructure { - const string coreHostInformationDisplayNameKey = "NServiceBus.HostInformation.DisplayName"; - - readonly SettingsHolder settings; readonly ConnectionFactory connectionFactory; readonly ChannelProvider channelProvider; IRoutingTopology routingTopology; - public RabbitMQTransportInfrastructure(SettingsHolder settings, string connectionString) + public RabbitMQTransportInfrastructure(HostSettings hostSettings, ReceiveSettings[] receiverSettings, ConnectionFactory connectionFactory, IRoutingTopology routingTopology, + ChannelProvider channelProvider, MessageConverter messageConverter, + TimeSpan timeToWaitBeforeTriggeringCircuitBreaker, PrefetchCountCalculation prefetchCountCalculation) { - this.settings = settings; - - var endpointName = settings.EndpointName(); - var connectionConfiguration = ConnectionConfiguration.Create(connectionString, endpointName); - - settings.TryGet(SettingsKeys.ClientCertificateCollection, out X509Certificate2Collection clientCertificateCollection); - settings.TryGet(SettingsKeys.DisableRemoteCertificateValidation, out bool disableRemoteCertificateValidation); - settings.TryGet(SettingsKeys.UseExternalAuthMechanism, out bool useExternalAuthMechanism); - settings.TryGet(SettingsKeys.HeartbeatInterval, out TimeSpan? heartbeatInterval); - settings.TryGet(SettingsKeys.NetworkRecoveryInterval, out TimeSpan? networkRecoveryInterval); - - connectionFactory = new ConnectionFactory(endpointName, connectionConfiguration, clientCertificateCollection, disableRemoteCertificateValidation, useExternalAuthMechanism, heartbeatInterval, networkRecoveryInterval); - - routingTopology = CreateRoutingTopology(); - - channelProvider = new ChannelProvider(connectionFactory, connectionConfiguration.RetryDelay, routingTopology); - } - - public override IEnumerable DeliveryConstraints => new List { typeof(DiscardIfNotReceivedBefore), typeof(DoNotDeliverBefore), typeof(DelayDeliveryWith) }; + this.connectionFactory = connectionFactory; + this.routingTopology = routingTopology; + this.channelProvider = channelProvider; - public override OutboundRoutingPolicy OutboundRoutingPolicy => new OutboundRoutingPolicy(OutboundRoutingType.Unicast, OutboundRoutingType.Multicast, OutboundRoutingType.Unicast); - - public override TransportTransactionMode TransactionMode => TransportTransactionMode.ReceiveOnly; - - public override EndpointInstance BindToLocalEndpoint(EndpointInstance instance) => instance; - - public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure() - { - return new TransportReceiveInfrastructure( - () => CreateMessagePump(), - () => new QueueCreator(connectionFactory, routingTopology), - () => Task.FromResult(StartupCheckResult.Success)); - } - - public override TransportSendInfrastructure ConfigureSendInfrastructure() - { - return new TransportSendInfrastructure( - () => new MessageDispatcher(channelProvider), - () => Task.FromResult(StartupCheckResult.Success)); + Dispatcher = new MessageDispatcher(channelProvider); + Receivers = receiverSettings.Select(x => CreateMessagePump(hostSettings, x, messageConverter, timeToWaitBeforeTriggeringCircuitBreaker, prefetchCountCalculation)) + .ToDictionary(x => x.Id, x => x); } - public override TransportSubscriptionInfrastructure ConfigureSubscriptionInfrastructure() + IMessageReceiver CreateMessagePump(HostSettings hostSettings, ReceiveSettings settings, MessageConverter messageConverter, + TimeSpan timeToWaitBeforeTriggeringCircuitBreaker, PrefetchCountCalculation prefetchCountCalculation) { - return new TransportSubscriptionInfrastructure(() => new SubscriptionManager(connectionFactory, routingTopology, settings.LocalAddress())); + var consumerTag = $"{hostSettings.HostDisplayName} - {hostSettings.Name}"; + return new MessagePump(connectionFactory, routingTopology, messageConverter, consumerTag, channelProvider, timeToWaitBeforeTriggeringCircuitBreaker, prefetchCountCalculation, settings, hostSettings.CriticalErrorAction); } - public override string ToTransportAddress(LogicalAddress logicalAddress) + public override Task Shutdown() { - var queue = new StringBuilder(logicalAddress.EndpointInstance.Endpoint); - - if (logicalAddress.EndpointInstance.Discriminator != null) - { - queue.Append("-" + logicalAddress.EndpointInstance.Discriminator); - } - - if (logicalAddress.Qualifier != null) + foreach (IMessageReceiver receiver in Receivers.Values) { - queue.Append("." + logicalAddress.Qualifier); + ((MessagePump)receiver).Dispose(); } - return queue.ToString(); - } - - public override Task Start() - { - channelProvider.CreateConnection(); - return base.Start(); - } - - public override Task Stop() - { channelProvider.Dispose(); - return base.Stop(); - } - - IRoutingTopology CreateRoutingTopology() - { - if (!settings.TryGet(out Func topologyFactory)) - { - throw new InvalidOperationException("A routing topology must be configured with one of the 'EndpointConfiguration.UseTransport().UseXXXXRoutingTopology()` methods."); - } - - if (!settings.TryGet(SettingsKeys.UseDurableExchangesAndQueues, out bool useDurableExchangesAndQueues)) - { - useDurableExchangesAndQueues = true; - } - - return topologyFactory(useDurableExchangesAndQueues); - } - - IPushMessages CreateMessagePump() - { - MessageConverter messageConverter; - - if (settings.HasSetting(SettingsKeys.CustomMessageIdStrategy)) - { - messageConverter = new MessageConverter(settings.Get>(SettingsKeys.CustomMessageIdStrategy)); - } - else - { - messageConverter = new MessageConverter(); - } - - if (!settings.TryGet(coreHostInformationDisplayNameKey, out string hostDisplayName)) - { - hostDisplayName = Support.RuntimeEnvironment.MachineName; - } - - var consumerTag = $"{hostDisplayName} - {settings.EndpointName()}"; - - var queuePurger = new QueuePurger(connectionFactory); - - if (!settings.TryGet(SettingsKeys.TimeToWaitBeforeTriggeringCircuitBreaker, out TimeSpan timeToWaitBeforeTriggeringCircuitBreaker)) - { - timeToWaitBeforeTriggeringCircuitBreaker = TimeSpan.FromMinutes(2); - } - - if (!settings.TryGet(SettingsKeys.PrefetchMultiplier, out int prefetchMultiplier)) - { - prefetchMultiplier = 3; - } - - if (!settings.TryGet(SettingsKeys.PrefetchCount, out ushort prefetchCount)) - { - prefetchCount = 0; - } - - return new MessagePump(connectionFactory, messageConverter, consumerTag, channelProvider, queuePurger, timeToWaitBeforeTriggeringCircuitBreaker, prefetchMultiplier, prefetchCount); + return Task.CompletedTask; } } } diff --git a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessageConverter.cs b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessageConverter.cs index 7f9091c4d..1ea122b00 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessageConverter.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessageConverter.cs @@ -4,14 +4,8 @@ using System.Collections.Generic; using System.Text; using global::RabbitMQ.Client.Events; - class MessageConverter { - public MessageConverter() - { - messageIdStrategy = DefaultMessageIdStrategy; - } - public MessageConverter(Func messageIdStrategy) { this.messageIdStrategy = messageIdStrategy; @@ -76,7 +70,7 @@ public Dictionary RetrieveHeaders(BasicDeliverEventArgs message) return deserializedHeaders; } - string DefaultMessageIdStrategy(BasicDeliverEventArgs message) + public static string DefaultMessageIdStrategy(BasicDeliverEventArgs message) { var properties = message.BasicProperties; diff --git a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs index ed05a67f8..f8fa15223 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs @@ -10,28 +10,24 @@ using global::RabbitMQ.Client.Exceptions; using Logging; - sealed class MessagePump : IPushMessages, IDisposable + sealed class MessagePump : IMessageReceiver, IDisposable { static readonly ILog Logger = LogManager.GetLogger(typeof(MessagePump)); - static readonly TransportTransaction transportTransaction = new TransportTransaction(); + static readonly TransportTransaction TransportTransaction = new TransportTransaction(); readonly ConnectionFactory connectionFactory; readonly MessageConverter messageConverter; readonly string consumerTag; readonly ChannelProvider channelProvider; readonly QueuePurger queuePurger; - readonly TimeSpan timeToWaitBeforeTriggeringCircuitBreaker; - readonly int prefetchMultiplier; - readonly ushort overriddenPrefetchCount; - - // Init - Func onMessage; - Func> onError; - PushSettings settings; - CriticalError criticalError; - MessagePumpConnectionFailedCircuitBreaker circuitBreaker; - - // Start + readonly PrefetchCountCalculation prefetchCountCalculation; + readonly ReceiveSettings settings; + readonly Action criticalErrorAction; + readonly MessagePumpConnectionFailedCircuitBreaker circuitBreaker; + + bool disposed; + OnMessage onMessage; + OnError onError; int maxConcurrency; long numberOfMessagesBeingProcessed; CancellationTokenSource messageProcessing; @@ -41,59 +37,60 @@ sealed class MessagePump : IPushMessages, IDisposable // Stop TaskCompletionSource connectionShutdownCompleted; - public MessagePump(ConnectionFactory connectionFactory, MessageConverter messageConverter, string consumerTag, ChannelProvider channelProvider, QueuePurger queuePurger, TimeSpan timeToWaitBeforeTriggeringCircuitBreaker, int prefetchMultiplier, ushort overriddenPrefetchCount) + public MessagePump(ConnectionFactory connectionFactory, IRoutingTopology routingTopology, MessageConverter messageConverter, string consumerTag, + ChannelProvider channelProvider, TimeSpan timeToWaitBeforeTriggeringCircuitBreaker, + PrefetchCountCalculation prefetchCountCalculation, ReceiveSettings settings, + Action criticalErrorAction) { this.connectionFactory = connectionFactory; this.messageConverter = messageConverter; this.consumerTag = consumerTag; this.channelProvider = channelProvider; - this.queuePurger = queuePurger; - this.timeToWaitBeforeTriggeringCircuitBreaker = timeToWaitBeforeTriggeringCircuitBreaker; - this.prefetchMultiplier = prefetchMultiplier; - this.overriddenPrefetchCount = overriddenPrefetchCount; + this.prefetchCountCalculation = prefetchCountCalculation; + this.settings = settings; + this.criticalErrorAction = criticalErrorAction; + + if (settings.UsePublishSubscribe) + { + Subscriptions = new SubscriptionManager(connectionFactory, routingTopology, settings.ReceiveAddress); + } + + queuePurger = new QueuePurger(connectionFactory); + circuitBreaker = new MessagePumpConnectionFailedCircuitBreaker($"'{settings.ReceiveAddress} MessagePump'", timeToWaitBeforeTriggeringCircuitBreaker, criticalErrorAction); } - public Task Init(Func onMessage, Func> onError, CriticalError criticalError, PushSettings settings) + public ISubscriptionManager Subscriptions { get; } + public string Id => settings.Id; + + public Task Initialize(PushRuntimeSettings limitations, OnMessage onMessage, OnError onError) { this.onMessage = onMessage; this.onError = onError; - this.settings = settings; - this.criticalError = criticalError; + maxConcurrency = limitations.MaxConcurrency; - circuitBreaker = new MessagePumpConnectionFailedCircuitBreaker($"'{settings.InputQueue} MessagePump'", timeToWaitBeforeTriggeringCircuitBreaker, criticalError); if (settings.PurgeOnStartup) { - queuePurger.Purge(settings.InputQueue); + queuePurger.Purge(settings.ReceiveAddress); } return Task.CompletedTask; } - public void Start(PushRuntimeSettings limitations) + public Task StartReceive() { - maxConcurrency = limitations.MaxConcurrency; messageProcessing = new CancellationTokenSource(); - connection = connectionFactory.CreateConnection($"{settings.InputQueue} MessagePump", consumerDispatchConcurrency: maxConcurrency); + connection = connectionFactory.CreateConnection($"{settings.ReceiveAddress} MessagePump", consumerDispatchConcurrency: maxConcurrency); var channel = connection.CreateModel(); - long prefetchCount; - - if (overriddenPrefetchCount > 0) - { - prefetchCount = overriddenPrefetchCount; + var prefetchCount = prefetchCountCalculation(maxConcurrency); - if (prefetchCount < maxConcurrency) - { - Logger.Warn($"The specified prefetch count '{prefetchCount}' is smaller than the specified maximum concurrency '{maxConcurrency}'. The maximum concurrency value will be used as the prefetch count instead."); - prefetchCount = maxConcurrency; - } - } - else + if (prefetchCount < maxConcurrency) { - prefetchCount = (long)maxConcurrency * prefetchMultiplier; + Logger.Warn($"The specified prefetch count '{prefetchCount}' is smaller than the specified maximum concurrency '{maxConcurrency}'. The maximum concurrency value will be used as the prefetch count instead."); + prefetchCount = maxConcurrency; } channel.BasicQos(0, (ushort)Math.Min(prefetchCount, ushort.MaxValue), false); @@ -105,10 +102,12 @@ public void Start(PushRuntimeSettings limitations) consumer.Received += Consumer_Received; - channel.BasicConsume(settings.InputQueue, false, consumerTag, consumer); + channel.BasicConsume(settings.ReceiveAddress, false, consumerTag, consumer); + + return Task.CompletedTask; } - public async Task Stop() + public async Task StopReceive() { consumer.Received -= Consumer_Received; messageProcessing.Cancel(); @@ -219,7 +218,7 @@ async Task Process(BasicDeliverEventArgs message) var contextBag = new ContextBag(); contextBag.Set(message); - var messageContext = new MessageContext(messageId, headers, messageBody ?? Array.Empty(), transportTransaction, tokenSource, contextBag); + var messageContext = new MessageContext(messageId, headers, messageBody ?? Array.Empty(), TransportTransaction, contextBag); await onMessage(messageContext).ConfigureAwait(false); processed = true; @@ -231,7 +230,7 @@ async Task Process(BasicDeliverEventArgs message) var contextBag = new ContextBag(); contextBag.Set(message); - var errorContext = new ErrorContext(exception, headers, messageId, messageBody ?? Array.Empty(), transportTransaction, numberOfDeliveryAttempts, contextBag); + var errorContext = new ErrorContext(exception, headers, messageId, messageBody ?? Array.Empty(), TransportTransaction, numberOfDeliveryAttempts, contextBag); try { @@ -244,7 +243,7 @@ async Task Process(BasicDeliverEventArgs message) } catch (Exception ex) { - criticalError.Raise($"Failed to execute recoverability policy for message with native ID: `{messageId}`", ex); + criticalErrorAction($"Failed to execute recoverability policy for message with native ID: `{messageId}`", ex); await consumer.Model.BasicRejectAndRequeueIfOpen(message.DeliveryTag).ConfigureAwait(false); return; @@ -305,9 +304,14 @@ async Task MovePoisonMessage(BasicDeliverEventArgs message, string queue) public void Dispose() { + if (disposed) + { + return; + } circuitBreaker?.Dispose(); messageProcessing?.Dispose(); connection?.Dispose(); + disposed = true; } } } diff --git a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePumpConnectionFailedCircuitBreaker.cs b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePumpConnectionFailedCircuitBreaker.cs index 83243ca4a..02ed43249 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePumpConnectionFailedCircuitBreaker.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePumpConnectionFailedCircuitBreaker.cs @@ -6,10 +6,10 @@ namespace NServiceBus.Transport.RabbitMQ sealed class MessagePumpConnectionFailedCircuitBreaker : IDisposable { - public MessagePumpConnectionFailedCircuitBreaker(string name, TimeSpan timeToWaitBeforeTriggering, CriticalError criticalError) + public MessagePumpConnectionFailedCircuitBreaker(string name, TimeSpan timeToWaitBeforeTriggering, Action criticalErrorAction) { this.name = name; - this.criticalError = criticalError; + this.criticalErrorAction = criticalErrorAction; this.timeToWaitBeforeTriggering = timeToWaitBeforeTriggering; timer = new Timer(CircuitBreakerTriggered); @@ -50,7 +50,7 @@ void CircuitBreakerTriggered(object state) if (Interlocked.Read(ref failureCount) > 0) { Logger.WarnFormat("The circuit breaker for {0} will now be triggered", name); - criticalError.Raise($"{name} connection to the broker has failed.", lastException); + criticalErrorAction($"{name} connection to the broker has failed.", lastException); } } @@ -59,7 +59,7 @@ void CircuitBreakerTriggered(object state) string name; TimeSpan timeToWaitBeforeTriggering; Timer timer; - CriticalError criticalError; + Action criticalErrorAction; long failureCount; Exception lastException; } diff --git a/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs b/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs index d888ecba2..f308a2047 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Routing/ConventionalRoutingTopology.cs @@ -1,10 +1,12 @@ -namespace NServiceBus.Transport.RabbitMQ + +namespace NServiceBus.Transport.RabbitMQ { using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using global::RabbitMQ.Client; + using Unicast.Messages; /// /// Implements the RabbitMQ routing topology as described at http://codebetter.com/drusellers/2011/05/08/brain-dump-conventional-routing-in-rabbitmq/ @@ -21,32 +23,46 @@ /// we generate an exchange for each queue so that we can do direct sends to the queue. it is bound as a fanout exchange /// /// - class ConventionalRoutingTopology : IRoutingTopology + public class ConventionalRoutingTopology : IRoutingTopology { - readonly bool useDurableExchanges; - + /// + /// Creates a new instance of conventional routing topology. + /// + /// Indicates whether exchanges and queues declared by the routing topology should be durable. public ConventionalRoutingTopology(bool useDurableExchanges) { this.useDurableExchanges = useDurableExchanges; + exchangeNameConvention = DefaultExchangeNameConvention; } - public void SetupSubscription(IModel channel, Type type, string subscriberName) + static string DefaultExchangeNameConvention(Type type) => type.Namespace + ":" + type.Name; + + /// + /// Sets up a subscription for the subscriber to the specified type. + /// + /// The RabbitMQ channel to operate on. + /// The type to subscribe to. + /// The name of the subscriber. + public void SetupSubscription(IModel channel, MessageMetadata type, string subscriberName) { - if (type == typeof(IEvent)) - { - // Make handlers for IEvent handle all events whether they extend IEvent or not - type = typeof(object); - } + // Make handlers for IEvent handle all events whether they extend IEvent or not + var typeToSubscribe = type.MessageType != typeof(IEvent) ? type.MessageType : typeof(object); - SetupTypeSubscriptions(channel, type); - channel.ExchangeBind(subscriberName, ExchangeName(type), string.Empty); + SetupTypeSubscriptions(channel, typeToSubscribe); + channel.ExchangeBind(subscriberName, exchangeNameConvention(typeToSubscribe), string.Empty); } - public void TeardownSubscription(IModel channel, Type type, string subscriberName) + /// + /// Removes a subscription for the subscriber to the specified type. + /// + /// The RabbitMQ channel to operate on. + /// The type to unsubscribe from. + /// The name of the subscriber. + public void TeardownSubscription(IModel channel, MessageMetadata type, string subscriberName) { try { - channel.ExchangeUnbind(subscriberName, ExchangeName(type), string.Empty, null); + channel.ExchangeUnbind(subscriberName, exchangeNameConvention(type.MessageType), string.Empty, null); } catch (Exception) { @@ -54,22 +70,53 @@ public void TeardownSubscription(IModel channel, Type type, string subscriberNam } } + /// + /// Publishes a message of the specified type. + /// + /// The RabbitMQ channel to operate on. + /// The type of the message to be published. + /// The message to publish. + /// The RabbitMQ properties of the message to publish. public void Publish(IModel channel, Type type, OutgoingMessage message, IBasicProperties properties) { SetupTypeSubscriptions(channel, type); - channel.BasicPublish(ExchangeName(type), string.Empty, false, properties, message.Body); + channel.BasicPublish(exchangeNameConvention(type), string.Empty, false, properties, message.Body); } + /// + /// Sends a message to the specified endpoint. + /// + /// The RabbitMQ channel to operate on. + /// The address of the destination endpoint. + /// The message to send. + /// The RabbitMQ properties of the message to send. public void Send(IModel channel, string address, OutgoingMessage message, IBasicProperties properties) { channel.BasicPublish(address, string.Empty, true, properties, message.Body); } + /// + /// Sends a raw message body to the specified endpoint. + /// + /// The RabbitMQ channel to operate on. + /// The address of the destination endpoint. + /// The raw message body to send. + /// The RabbitMQ properties of the message to send. public void RawSendInCaseOfFailure(IModel channel, string address, ReadOnlyMemory body, IBasicProperties properties) { channel.BasicPublish(address, string.Empty, true, properties, body); } + /// + /// Declares queues and performs any other initialization logic needed (e.g. creating exchanges and bindings). + /// + /// The RabbitMQ channel to operate on. + /// + /// The addresses of the queues to declare and perform initialization for, that this endpoint is receiving from. + /// + /// + /// The addresses of the queues to declare and perform initialization for, that this endpoint is sending to. + /// public void Initialize(IModel channel, IEnumerable receivingAddresses, IEnumerable sendingAddresses) { foreach (var address in receivingAddresses.Concat(sendingAddresses)) @@ -80,12 +127,18 @@ public void Initialize(IModel channel, IEnumerable receivingAddresses, I } } + /// + /// Binds an address to the delay infrastructure's delivery exchange. + /// + /// The RabbitMQ channel to operate on. + /// The address that needs to be bound to the delivery exchange. + /// The name of the delivery exchange. + /// The routing key required for the binding. public void BindToDelayInfrastructure(IModel channel, string address, string deliveryExchange, string routingKey) { channel.ExchangeBind(address, deliveryExchange, routingKey); } - static string ExchangeName(Type type) => type.Namespace + ":" + type.Name; void SetupTypeSubscriptions(IModel channel, Type type) { @@ -95,23 +148,23 @@ void SetupTypeSubscriptions(IModel channel, Type type) } var typeToProcess = type; - CreateExchange(channel, ExchangeName(typeToProcess)); + CreateExchange(channel, exchangeNameConvention(typeToProcess)); var baseType = typeToProcess.BaseType; while (baseType != null) { - CreateExchange(channel, ExchangeName(baseType)); - channel.ExchangeBind(ExchangeName(baseType), ExchangeName(typeToProcess), string.Empty); + CreateExchange(channel, exchangeNameConvention(baseType)); + channel.ExchangeBind(exchangeNameConvention(baseType), exchangeNameConvention(typeToProcess), string.Empty); typeToProcess = baseType; baseType = typeToProcess.BaseType; } foreach (var interfaceType in type.GetInterfaces()) { - var exchangeName = ExchangeName(interfaceType); + var exchangeName = exchangeNameConvention(interfaceType); CreateExchange(channel, exchangeName); - channel.ExchangeBind(exchangeName, ExchangeName(type), string.Empty); + channel.ExchangeBind(exchangeName, exchangeNameConvention(type), string.Empty); } MarkTypeConfigured(type); @@ -136,6 +189,8 @@ void CreateExchange(IModel channel, string exchangeName) } } + readonly bool useDurableExchanges; readonly ConcurrentDictionary typeTopologyConfiguredSet = new ConcurrentDictionary(); + Func exchangeNameConvention; } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs b/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs index 528e56a62..5a82072bd 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Routing/DirectRoutingTopology.cs @@ -1,47 +1,103 @@ -namespace NServiceBus.Transport.RabbitMQ + +namespace NServiceBus.Transport.RabbitMQ { using System; using System.Collections.Generic; using System.Linq; using global::RabbitMQ.Client; + using Unicast.Messages; + /// /// Route using a static routing convention for routing messages from publishers to subscribers using routing keys. /// - class DirectRoutingTopology : IRoutingTopology + public class DirectRoutingTopology : IRoutingTopology { - public DirectRoutingTopology(Conventions conventions, bool useDurableExchanges) + /// + /// Creates a new instance of DirectRoutingTopology, + /// + /// Indicates whether exchanges and queues declared by the routing topology should be durable. + /// Exchange name convention. + /// Routing key convention. + public DirectRoutingTopology(bool useDurableExchanges, Func exchangeNameConvention = null, Func routingKeyConvention = null) { - this.conventions = conventions; + conventions = new Conventions( + exchangeNameConvention ?? DefaultExchangeNameConvention, + routingKeyConvention ?? DefaultRoutingKeyConvention.GenerateRoutingKey); this.useDurableExchanges = useDurableExchanges; } - public void SetupSubscription(IModel channel, Type type, string subscriberName) + string DefaultExchangeNameConvention() => "amq.topic"; + + /// + /// Sets up a subscription for the subscriber to the specified type. + /// + /// The RabbitMQ channel to operate on. + /// The type to subscribe to. + /// The name of the subscriber. + public void SetupSubscription(IModel channel, MessageMetadata type, string subscriberName) { CreateExchange(channel, ExchangeName()); - channel.QueueBind(subscriberName, ExchangeName(), GetRoutingKeyForBinding(type)); + channel.QueueBind(subscriberName, ExchangeName(), GetRoutingKeyForBinding(type.MessageType)); } - public void TeardownSubscription(IModel channel, Type type, string subscriberName) + /// + /// Removes a subscription for the subscriber to the specified type. + /// + /// The RabbitMQ channel to operate on. + /// The type to unsubscribe from. + /// The name of the subscriber. + public void TeardownSubscription(IModel channel, MessageMetadata type, string subscriberName) { - channel.QueueUnbind(subscriberName, ExchangeName(), GetRoutingKeyForBinding(type), null); + channel.QueueUnbind(subscriberName, ExchangeName(), GetRoutingKeyForBinding(type.MessageType), null); } + /// + /// Publishes a message of the specified type. + /// + /// The RabbitMQ channel to operate on. + /// The type of the message to be published. + /// The message to publish. + /// The RabbitMQ properties of the message to publish. public void Publish(IModel channel, Type type, OutgoingMessage message, IBasicProperties properties) { channel.BasicPublish(ExchangeName(), GetRoutingKeyForPublish(type), false, properties, message.Body); } + /// + /// Sends a message to the specified endpoint. + /// + /// The RabbitMQ channel to operate on. + /// The address of the destination endpoint. + /// The message to send. + /// The RabbitMQ properties of the message to send. public void Send(IModel channel, string address, OutgoingMessage message, IBasicProperties properties) { channel.BasicPublish(string.Empty, address, true, properties, message.Body); } + /// + /// Sends a raw message body to the specified endpoint. + /// + /// The RabbitMQ channel to operate on. + /// The address of the destination endpoint. + /// The raw message body to send. + /// The RabbitMQ properties of the message to send. public void RawSendInCaseOfFailure(IModel channel, string address, ReadOnlyMemory body, IBasicProperties properties) { channel.BasicPublish(string.Empty, address, true, properties, body); } + /// + /// Declares queues and performs any other initialization logic needed (e.g. creating exchanges and bindings). + /// + /// The RabbitMQ channel to operate on. + /// + /// The addresses of the queues to declare and perform initialization for, that this endpoint is receiving from. + /// + /// + /// The addresses of the queues to declare and perform initialization for, that this endpoint is sending to. + /// public void Initialize(IModel channel, IEnumerable receivingAddresses, IEnumerable sendingAddresses) { foreach (var address in receivingAddresses.Concat(sendingAddresses)) @@ -50,6 +106,13 @@ public void Initialize(IModel channel, IEnumerable receivingAddresses, I } } + /// + /// Binds an address to the delay infrastructure's delivery exchange. + /// + /// The RabbitMQ channel to operate on. + /// The address that needs to be bound to the delivery exchange. + /// The name of the delivery exchange. + /// The routing key required for the binding. public void BindToDelayInfrastructure(IModel channel, string address, string deliveryExchange, string routingKey) { channel.QueueBind(address, deliveryExchange, routingKey); @@ -91,7 +154,7 @@ string GetRoutingKeyForBinding(Type eventType) readonly Conventions conventions; readonly bool useDurableExchanges; - public class Conventions + class Conventions { public Conventions(Func exchangeName, Func routingKey) { diff --git a/src/NServiceBus.Transport.RabbitMQ/Routing/IRoutingTopology.cs b/src/NServiceBus.Transport.RabbitMQ/Routing/IRoutingTopology.cs index c37174425..9fe514c67 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Routing/IRoutingTopology.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Routing/IRoutingTopology.cs @@ -1,8 +1,11 @@ -namespace NServiceBus.Transport.RabbitMQ + +namespace NServiceBus.Transport.RabbitMQ { using System; using System.Collections.Generic; using global::RabbitMQ.Client; + using Unicast.Messages; + /// /// Topology for routing messages on the transport. @@ -15,7 +18,7 @@ public interface IRoutingTopology /// The RabbitMQ channel to operate on. /// The type to subscribe to. /// The name of the subscriber. - void SetupSubscription(IModel channel, Type type, string subscriberName); + void SetupSubscription(IModel channel, MessageMetadata type, string subscriberName); /// /// Removes a subscription for the subscriber to the specified type. @@ -23,7 +26,7 @@ public interface IRoutingTopology /// The RabbitMQ channel to operate on. /// The type to unsubscribe from. /// The name of the subscriber. - void TeardownSubscription(IModel channel, Type type, string subscriberName); + void TeardownSubscription(IModel channel, MessageMetadata type, string subscriberName); /// /// Publishes a message of the specified type. diff --git a/src/NServiceBus.Transport.RabbitMQ/Sending/BasicPropertiesExtensions.cs b/src/NServiceBus.Transport.RabbitMQ/Sending/BasicPropertiesExtensions.cs index 4ec2f6223..503a7f64b 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Sending/BasicPropertiesExtensions.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Sending/BasicPropertiesExtensions.cs @@ -5,14 +5,11 @@ using System.Globalization; using System.Linq; using System.Text; - using DelayedDelivery; - using DeliveryConstraints; using global::RabbitMQ.Client; - using Performance.TimeToBeReceived; static class BasicPropertiesExtensions { - public static void Fill(this IBasicProperties properties, OutgoingMessage message, List deliveryConstraints) + public static void Fill(this IBasicProperties properties, OutgoingMessage message, DispatchProperties dispatchProperties) { if (message.MessageId != null) { @@ -21,7 +18,7 @@ public static void Fill(this IBasicProperties properties, OutgoingMessage messag var messageHeaders = message.Headers ?? new Dictionary(); - var delayed = CalculateDelay(deliveryConstraints, out var delay); + var delayed = CalculateDelay(dispatchProperties, out var delay); properties.Persistent = !messageHeaders.Remove(UseNonPersistentDeliveryHeader); @@ -32,7 +29,7 @@ public static void Fill(this IBasicProperties properties, OutgoingMessage messag properties.Headers[DelayInfrastructure.DelayHeader] = Convert.ToInt32(delay); } - if (deliveryConstraints.TryGet(out DiscardIfNotReceivedBefore timeToBeReceived) && timeToBeReceived.MaxTime < TimeSpan.MaxValue) + if (dispatchProperties.DiscardIfNotReceivedBefore != null && dispatchProperties.DiscardIfNotReceivedBefore.MaxTime < TimeSpan.MaxValue) { // align with TimeoutManager behavior if (delayed) @@ -40,7 +37,7 @@ public static void Fill(this IBasicProperties properties, OutgoingMessage messag throw new Exception("Postponed delivery of messages with TimeToBeReceived set is not supported. Remove the TimeToBeReceived attribute to postpone messages of this type."); } - properties.Expiration = timeToBeReceived.MaxTime.TotalMilliseconds.ToString(CultureInfo.InvariantCulture); + properties.Expiration = dispatchProperties.DiscardIfNotReceivedBefore.MaxTime.TotalMilliseconds.ToString(CultureInfo.InvariantCulture); } if (messageHeaders.TryGetValue(NServiceBus.Headers.CorrelationId, out var correlationId) && correlationId != null) @@ -77,30 +74,30 @@ public static void Fill(this IBasicProperties properties, OutgoingMessage messag } } - static bool CalculateDelay(List deliveryConstraints, out long delay) + static bool CalculateDelay(DispatchProperties dispatchProperties, out long delay) { delay = 0; var delayed = false; - if (deliveryConstraints.TryGet(out DoNotDeliverBefore doNotDeliverBefore)) + if (dispatchProperties.DoNotDeliverBefore != null) { delayed = true; - delay = Convert.ToInt64(Math.Ceiling((doNotDeliverBefore.At - DateTimeOffset.UtcNow).TotalSeconds)); + delay = Convert.ToInt64(Math.Ceiling((dispatchProperties.DoNotDeliverBefore.At - DateTimeOffset.UtcNow).TotalSeconds)); if (delay > DelayInfrastructure.MaxDelayInSeconds) { - throw new Exception($"Message cannot be sent with {nameof(DoNotDeliverBefore)} value '{doNotDeliverBefore.At}' because it exceeds the maximum delay value '{TimeSpan.FromSeconds(DelayInfrastructure.MaxDelayInSeconds)}'."); + throw new Exception($"Message cannot set to be delivered at '{dispatchProperties.DoNotDeliverBefore.At}' because the delay specified via {nameof(DelayedDeliveryOptionExtensions.DoNotDeliverBefore)} exceeds the maximum delay value '{TimeSpan.FromSeconds(DelayInfrastructure.MaxDelayInSeconds)}'."); } } - else if (deliveryConstraints.TryGet(out DelayDeliveryWith delayDeliveryWith)) + else if (dispatchProperties.DelayDeliveryWith != null) { delayed = true; - delay = Convert.ToInt64(Math.Ceiling(delayDeliveryWith.Delay.TotalSeconds)); + delay = Convert.ToInt64(Math.Ceiling(dispatchProperties.DelayDeliveryWith.Delay.TotalSeconds)); if (delay > DelayInfrastructure.MaxDelayInSeconds) { - throw new Exception($"Message cannot be sent with {nameof(DelayDeliveryWith)} value '{delayDeliveryWith.Delay}' because it exceeds the maximum delay value '{TimeSpan.FromSeconds(DelayInfrastructure.MaxDelayInSeconds)}'."); + throw new Exception($"Message cannot be delayed by '{dispatchProperties.DelayDeliveryWith.Delay}' because the delay specified via {nameof(DelayedDeliveryOptionExtensions.DelayDeliveryWith)} exceeds the maximum delay value '{TimeSpan.FromSeconds(DelayInfrastructure.MaxDelayInSeconds)}'."); } } @@ -120,9 +117,6 @@ public static bool TryGetConfirmationId(this IBasicProperties properties, out ul ulong.TryParse(Encoding.UTF8.GetString(value as byte[] ?? Array.Empty()), out confirmationId); } - static bool TryGet(this List list, out T constraint) where T : DeliveryConstraint => - (constraint = list.OfType().FirstOrDefault()) != null; - public const string ConfirmationIdHeader = "NServiceBus.Transport.RabbitMQ.ConfirmationId"; public const string UseNonPersistentDeliveryHeader = "NServiceBus.Transport.RabbitMQ.UseNonPersistentDelivery"; } diff --git a/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs b/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs index a3b72bfd2..53330c6dc 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs @@ -2,9 +2,8 @@ { using System.Collections.Generic; using System.Threading.Tasks; - using Extensibility; - class MessageDispatcher : IDispatchMessages + class MessageDispatcher : IMessageDispatcher { readonly ChannelProvider channelProvider; @@ -13,7 +12,7 @@ public MessageDispatcher(ChannelProvider channelProvider) this.channelProvider = channelProvider; } - public Task Dispatch(TransportOperations outgoingMessages, TransportTransaction transaction, ContextBag context) + public Task Dispatch(TransportOperations outgoingMessages, TransportTransaction transaction) { var channel = channelProvider.GetPublishChannel(); @@ -50,7 +49,7 @@ Task SendMessage(UnicastTransportOperation transportOperation, ConfirmsAwareChan var message = transportOperation.Message; var properties = channel.CreateBasicProperties(); - properties.Fill(message, transportOperation.DeliveryConstraints); + properties.Fill(message, transportOperation.Properties); return channel.SendMessage(transportOperation.Destination, message, properties); } @@ -60,9 +59,10 @@ Task PublishMessage(MulticastTransportOperation transportOperation, ConfirmsAwar var message = transportOperation.Message; var properties = channel.CreateBasicProperties(); - properties.Fill(message, transportOperation.DeliveryConstraints); + properties.Fill(message, transportOperation.Properties); return channel.PublishMessage(transportOperation.MessageType, message, properties); } + } } diff --git a/src/NServiceBus.Transport.RabbitMQ/Topology.cs b/src/NServiceBus.Transport.RabbitMQ/Topology.cs new file mode 100644 index 000000000..ab868931a --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/Topology.cs @@ -0,0 +1,21 @@ +namespace NServiceBus +{ + /// + /// Defines built-in topologies. + /// + public enum Topology + { + /// + /// The conventional routing topology. + /// + /// Uses an exchange cascade convention to route published messages. + /// + Conventional, + /// + /// The direct routing topology. + /// + /// Uses topic exchange to route published messages. + /// + Direct, + } +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ/obsoletes-v7.cs b/src/NServiceBus.Transport.RabbitMQ/obsoletes-v7.cs deleted file mode 100644 index 3a9b30853..000000000 --- a/src/NServiceBus.Transport.RabbitMQ/obsoletes-v7.cs +++ /dev/null @@ -1,38 +0,0 @@ -#pragma warning disable 1591 - -namespace NServiceBus.Transport.RabbitMQ -{ - using System; - using Configuration.AdvancedExtensibility; - - [ObsoleteEx( - Message = "The timeout manager has been removed, so it is no longer possible to consume legacy delayed messages from timeout storage.", - TreatAsErrorFromVersion = "7", - RemoveInVersion = "8")] - public class DelayedDeliverySettings : ExposeSettings - { - DelayedDeliverySettings() : base(null) => throw new NotImplementedException(); - - public DelayedDeliverySettings EnableTimeoutManager() => throw new NotImplementedException(); - } -} - -namespace NServiceBus -{ - using System; - using Transport.RabbitMQ; - - partial class RabbitMQTransportSettingsExtensions - { - [ObsoleteEx( - Message = "The timeout manager has been removed, so there are no delayed delivery configuration options now.", - TreatAsErrorFromVersion = "7", - RemoveInVersion = "8")] - public static DelayedDeliverySettings DelayedDelivery(this TransportExtensions transportExtensions) - { - throw new NotImplementedException(); - } - } -} - -#pragma warning restore 1591 \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ/obsoletes-v8.cs b/src/NServiceBus.Transport.RabbitMQ/obsoletes-v8.cs new file mode 100644 index 000000000..fe5cbdb74 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ/obsoletes-v8.cs @@ -0,0 +1,170 @@ +#pragma warning disable 1591 +#pragma warning disable 618 + +namespace NServiceBus +{ + using System; + + public static class RabbitMQTransportSettingsExtensions + { + [ObsoleteEx( + ReplacementTypeOrMember = "RabbitMQTransport.MessageIdStrategy", + Message = "The configuration has been moved to RabbitMQTransport class.", + TreatAsErrorFromVersion = "7", + RemoveInVersion = "8")] + public static TransportExtensions CustomMessageIdStrategy( + this TransportExtensions transportExtensions, + Func customIdStrategy) + { + throw new NotImplementedException(); + } + + [ObsoleteEx( + Message = "The configuration has been moved to the topology implementations.", + TreatAsErrorFromVersion = "7", + RemoveInVersion = "8")] + public static TransportExtensions DisableDurableExchangesAndQueues( + this TransportExtensions transportExtensions) + { + throw new NotImplementedException(); + } + + [ObsoleteEx( + ReplacementTypeOrMember = "RabbitMQTransport.ValidateRemoteCertificate", + Message = "The configuration has been moved to RabbitMQTransport class.", + TreatAsErrorFromVersion = "7", + RemoveInVersion = "8")] + public static TransportExtensions DisableRemoteCertificateValidation( + this TransportExtensions transportExtensions) + { + throw new NotImplementedException(); + } + + [ObsoleteEx( + ReplacementTypeOrMember = "RabbitMQTransport.PrefetchCountCalculation", + Message = "The configuration has been moved to RabbitMQTransport class.", + TreatAsErrorFromVersion = "7", + RemoveInVersion = "8")] + public static TransportExtensions PrefetchCount( + this TransportExtensions transportExtensions, ushort prefetchCount) + { + throw new NotImplementedException(); + } + + [ObsoleteEx( + ReplacementTypeOrMember = "RabbitMQTransport.PrefetchCountCalculation", + Message = "The configuration has been moved to RabbitMQTransport class.", + TreatAsErrorFromVersion = "7", + RemoveInVersion = "8")] + public static TransportExtensions PrefetchMultiplier( + this TransportExtensions transportExtensions, int prefetchMultiplier) + { + throw new NotImplementedException(); + } + + [ObsoleteEx( + ReplacementTypeOrMember = "RabbitMQTransport.ClientCertificate", + Message = "The configuration has been moved to RabbitMQTransport class.", + TreatAsErrorFromVersion = "7", + RemoveInVersion = "8")] + public static TransportExtensions SetClientCertificate( + this TransportExtensions transportExtensions, + System.Security.Cryptography.X509Certificates.X509Certificate2 clientCertificate) + { + throw new NotImplementedException(); + } + + [ObsoleteEx( + ReplacementTypeOrMember = "RabbitMQTransport.ClientCertificate", + Message = "The configuration has been moved to RabbitMQTransport class.", + TreatAsErrorFromVersion = "7", + RemoveInVersion = "8")] + public static TransportExtensions SetClientCertificate( + this TransportExtensions transportExtensions, string path, string password) + { + throw new NotImplementedException(); + } + + [ObsoleteEx( + ReplacementTypeOrMember = "RabbitMQTransport.HeartbeatInterval", + Message = "The configuration has been moved to RabbitMQTransport class.", + TreatAsErrorFromVersion = "7", + RemoveInVersion = "8")] + public static TransportExtensions SetHeartbeatInterval( + this TransportExtensions transportExtensions, TimeSpan heartbeatInterval) + { + throw new NotImplementedException(); + } + + [ObsoleteEx( + ReplacementTypeOrMember = "RabbitMQTransport.NetworkRecoveryInterval", + Message = "The configuration has been moved to RabbitMQTransport class.", + TreatAsErrorFromVersion = "7", + RemoveInVersion = "8")] + public static TransportExtensions SetNetworkRecoveryInterval( + this TransportExtensions transportExtensions, TimeSpan networkRecoveryInterval) + { + throw new NotImplementedException(); + } + + + [ObsoleteEx( + ReplacementTypeOrMember = "RabbitMQTransport.TimeToWaitBeforeTriggeringCircuitBreaker", + Message = "The configuration has been moved to RabbitMQTransport class.", + TreatAsErrorFromVersion = "7", + RemoveInVersion = "8")] + public static TransportExtensions TimeToWaitBeforeTriggeringCircuitBreaker( + this TransportExtensions transportExtensions, TimeSpan waitTime) + { + throw new NotImplementedException(); + } + + [ObsoleteEx( + ReplacementTypeOrMember = "RabbitMQTransport.RoutingTopology", + Message = "The configuration has been moved to RabbitMQTransport class.", + TreatAsErrorFromVersion = "7", + RemoveInVersion = "8")] + public static TransportExtensions UseConventionalRoutingTopology( + this TransportExtensions transportExtensions) + { + throw new NotImplementedException(); + } + + [ObsoleteEx( + ReplacementTypeOrMember = "RabbitMQTransport.RoutingTopology", + Message = "The configuration has been moved to RabbitMQTransport class.", + TreatAsErrorFromVersion = "7", + RemoveInVersion = "8")] + public static TransportExtensions UseCustomRoutingTopology( + this TransportExtensions transportExtensions, + Func topologyFactory) + { + throw new NotImplementedException(); + } + + [ObsoleteEx( + ReplacementTypeOrMember = "RabbitMQTransport.RoutingTopology", + Message = "The configuration has been moved to RabbitMQTransport class.", + TreatAsErrorFromVersion = "7", + RemoveInVersion = "8")] + public static TransportExtensions UseDirectRoutingTopology( + this TransportExtensions transportExtensions, + Func routingKeyConvention = null, Func exchangeNameConvention = null) + { + throw new NotImplementedException(); + } + + [ObsoleteEx( + ReplacementTypeOrMember = "RabbitMQTransport.UseExternalAuthMechanism", + Message = "The configuration has been moved to RabbitMQTransport class.", + TreatAsErrorFromVersion = "7", + RemoveInVersion = "8")] + public static TransportExtensions UseExternalAuthMechanism( + this TransportExtensions transportExtensions) + { + throw new NotImplementedException(); + } + } +} +#pragma warning restore 618 +#pragma warning restore 1591 \ No newline at end of file