Skip to content

Commit

Permalink
Merge pull request #720 from Particular/upgrade-to-core-8
Browse files Browse the repository at this point in the history
Upgrade to core V8
  • Loading branch information
SzymonPobiega authored Feb 23, 2021
2 parents 93b38e1 + e3dc71b commit 241f98d
Show file tree
Hide file tree
Showing 52 changed files with 1,264 additions and 1,205 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using NServiceBus;
using NServiceBus.AcceptanceTests.EndpointTemplates;

static class ConfigurationHelpers
{
public static RabbitMQTransport ConfigureRabbitMQTransport(this EndpointConfiguration configuration)
{
return (RabbitMQTransport)configuration.ConfigureTransport();
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Linq;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.AcceptanceTesting.Support;
using NServiceBus.Configuration.AdvancedExtensibility;
using NServiceBus.Transport;
using RabbitMQ.Client;

class ConfigureEndpointRabbitMQTransport : IConfigureEndpointTestExecution
{
DbConnectionStringBuilder connectionStringBuilder;
QueueBindings queueBindings;
TestRabbitMQTransport transport;


public Task Configure(string endpointName, EndpointConfiguration configuration, RunSettings settings, PublisherMetadata publisherMetadata)
{
Expand All @@ -22,13 +23,11 @@ public Task Configure(string endpointName, EndpointConfiguration configuration,
throw new Exception("The 'RabbitMQTransport_ConnectionString' environment variable is not set.");
}

//For cleanup
connectionStringBuilder = new DbConnectionStringBuilder { ConnectionString = connectionString };

var transport = configuration.UseTransport<RabbitMQTransport>();
transport.ConnectionString(connectionStringBuilder.ConnectionString);
transport.UseConventionalRoutingTopology();

queueBindings = configuration.GetSettings().Get<QueueBindings>();
transport = new TestRabbitMQTransport(Topology.Conventional, connectionString);
configuration.UseTransport(transport);

return Task.CompletedTask;
}
Expand All @@ -42,7 +41,7 @@ public Task Cleanup()

void PurgeQueues()
{
if (connectionStringBuilder == null)
if (connectionStringBuilder == null || transport == null)
{
return;
}
Expand Down Expand Up @@ -77,7 +76,7 @@ void PurgeQueues()
throw new Exception("The connection string doesn't contain a value for 'host'.");
}

var queues = queueBindings.ReceivingAddresses.Concat(queueBindings.SendingAddresses);
var queues = transport.QueuesToCleanup.Distinct().ToArray();

using (var connection = connectionFactory.CreateConnection("Test Queue Purger"))
using (var channel = connection.CreateModel())
Expand All @@ -95,4 +94,20 @@ void PurgeQueues()
}
}
}

class TestRabbitMQTransport : RabbitMQTransport
{
public TestRabbitMQTransport(Topology topology, string connectionString)
: base(topology, connectionString)
{
}

public override Task<TransportInfrastructure> Initialize(HostSettings hostSettings, ReceiveSettings[] receivers, string[] sendingAddresses)
{
QueuesToCleanup.AddRange(receivers.Select(x => x.ReceiveAddress).Concat(sendingAddresses).Distinct());
return base.Initialize(hostSettings, receivers, sendingAddresses);
}

public List<string> QueuesToCleanup { get; } = new List<string>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void Should_throw()
.Run());

Assert.That(exception, Is.Not.Null);
Assert.IsTrue(exception.Message.StartsWith("Message cannot be sent with"));
StringAssert.StartsWith("Message cannot be delayed by", exception.Message);
}

public class Endpoint : EndpointConfigurationBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.3" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="8.0.0-alpha.644" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="8.0.0-alpha.842" />
<PackageReference Include="NUnit" Version="3.13.1" />
<PackageReference Include="NUnit3TestAdapter" Version="3.17.0" />
<PackageReference Include="RabbitMQ.Client" Version="6.2.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public partial class TestSuiteConstraints
public bool SupportsDelayedDelivery => true;

public bool SupportsOutbox => true;
public bool SupportsPurgeOnStartup => true;

public IConfigureEndpointTestExecution CreateTransportConfiguration() => new ConfigureEndpointRabbitMQTransport();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ public class Publisher : EndpointConfigurationBuilder
{
public Publisher()
{
EndpointSetup<DefaultServer>(c => c.UseTransport<RabbitMQTransport>()
.UseDirectRoutingTopology());
EndpointSetup<DefaultServer>(c =>
{
c.ConfigureRabbitMQTransport().RoutingTopology = new DirectRoutingTopology(true);
});
}
}

Expand All @@ -47,7 +49,7 @@ public Receiver()
{
EndpointSetup<DefaultServer>(builder =>
{
builder.UseTransport<RabbitMQTransport>().UseDirectRoutingTopology();
builder.ConfigureRabbitMQTransport().RoutingTopology = new DirectRoutingTopology(true);
builder.DisableFeature<AutoSubscribe>();
}, metadata => metadata.RegisterPublisherFor<IMyRequest>(typeof(Publisher)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class Receiver : EndpointConfigurationBuilder
{
public Receiver()
{
EndpointSetup<DefaultServer>(c => c.UseTransport<RabbitMQTransport>());
EndpointSetup<DefaultServer>();
}

class MyEventHandler : IHandleMessages<Message>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public OriginatingEndpoint()
{
EndpointSetup<DefaultServer>(config =>
{
config.ConfigureTransport().Routing().RouteToEndpoint(typeof(Request), typeof(ReceivingEndpoint));
config.ConfigureRouting().RouteToEndpoint(typeof(Request), typeof(ReceivingEndpoint));
config.AuditProcessedMessagesTo<AuditSpyEndpoint>();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class Receiver : EndpointConfigurationBuilder
{
public Receiver()
{
EndpointSetup<DefaultServer>(c => c.UseTransport<RabbitMQTransport>());
EndpointSetup<DefaultServer>();
}

class MyEventHandler : IHandleMessages<Message>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class ScaledOutClient : EndpointConfigurationBuilder
public ScaledOutClient()
{
EndpointSetup<DefaultServer>(config =>
config.ConfigureTransport().Routing().RouteToEndpoint(typeof(MyRequest), typeof(ServerThatRespondsToCallbacks)));
config.ConfigureRouting().RouteToEndpoint(typeof(MyRequest), typeof(ServerThatRespondsToCallbacks)));
}

class MyResponseHandler : IHandleMessages<MyResponse>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using AcceptanceTesting;
using DeliveryConstraints;
using Extensibility;
using Features;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -51,7 +50,7 @@ protected override void Setup(FeatureConfigurationContext context)

class ConnectionKiller : FeatureStartupTask
{
public ConnectionKiller(IDispatchMessages sender, ReadOnlySettings settings, MyContext context)
public ConnectionKiller(IMessageDispatcher sender, ReadOnlySettings settings, MyContext context)
{
this.context = context;
this.sender = sender;
Expand All @@ -72,8 +71,13 @@ async Task BreakConnectionBySendingInvalidMessage()
try
{
var outgoingMessage = new OutgoingMessage("Foo", new Dictionary<string, string>(), new byte[0]);
var operation = new TransportOperation(outgoingMessage, new UnicastAddressTag(settings.EndpointName()), deliveryConstraints: new List<DeliveryConstraint> { new DiscardIfNotReceivedBefore(TimeSpan.FromMilliseconds(-1)) });
await sender.Dispatch(new TransportOperations(operation), new TransportTransaction(), new ContextBag());
var props = new DispatchProperties
{
DiscardIfNotReceivedBefore =
new DiscardIfNotReceivedBefore(TimeSpan.FromMilliseconds(-1))
};
var operation = new TransportOperation(outgoingMessage, new UnicastAddressTag(settings.EndpointName()), props);
await sender.Dispatch(new TransportOperations(operation), new TransportTransaction());
}
catch (Exception)
{
Expand All @@ -82,7 +86,7 @@ async Task BreakConnectionBySendingInvalidMessage()
}

readonly MyContext context;
readonly IDispatchMessages sender;
readonly IMessageDispatcher sender;
readonly ReadOnlySettings settings;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class OriginatingEndpoint : EndpointConfigurationBuilder
public OriginatingEndpoint()
{
EndpointSetup<DefaultServer>(config =>
config.ConfigureTransport().Routing().RouteToEndpoint(typeof(Request), typeof(ReceivingEndpoint)));
config.ConfigureRouting().RouteToEndpoint(typeof(Request), typeof(ReceivingEndpoint)));
}

class ReplyHandler : IHandleMessages<Reply>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ public Receiver()
EndpointSetup<DefaultServer>(c =>
{
c.EnableFeature<StarterFeature>();
c.UseTransport<RabbitMQTransport>()
.CustomMessageIdStrategy(m => customMessageId);
c.ConfigureRabbitMQTransport().MessageIdStrategy = m => customMessageId;
});
}

Expand All @@ -51,7 +50,7 @@ protected override void Setup(FeatureConfigurationContext context)

class Starter : FeatureStartupTask
{
public Starter(IDispatchMessages dispatchMessages, ReadOnlySettings settings)
public Starter(IMessageDispatcher dispatchMessages, ReadOnlySettings settings)
{
this.dispatchMessages = dispatchMessages;
this.settings = settings;
Expand All @@ -71,12 +70,12 @@ protected override Task OnStart(IMessageSession session)
Encoding.UTF8.GetBytes(messageBody));

var transportOperation = new TransportOperation(message, new UnicastAddressTag(settings.EndpointName()));
return dispatchMessages.Dispatch(new TransportOperations(transportOperation), new TransportTransaction(), new ContextBag());
return dispatchMessages.Dispatch(new TransportOperations(transportOperation), new TransportTransaction());
}

protected override Task OnStop(IMessageSession session) => Task.CompletedTask;

readonly IDispatchMessages dispatchMessages;
readonly IMessageDispatcher dispatchMessages;
readonly ReadOnlySettings settings;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ public Receiver()
{
EndpointSetup<DefaultServer>(c =>
{
c.UseTransport<RabbitMQTransport>()
.CustomMessageIdStrategy(m => "");
c.ConfigureRabbitMQTransport().MessageIdStrategy = m => "";
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ public class Receiver : EndpointConfigurationBuilder
{
public Receiver()
{
EndpointSetup<DefaultServer>(c => c.UseTransport<RabbitMQTransport>()
.UseDirectRoutingTopology());
EndpointSetup<DefaultServer>(c =>
{
c.ConfigureRabbitMQTransport().RoutingTopology = new DirectRoutingTopology(true);
});
}

class MyEventHandler : IHandleMessages<MyRequest>
Expand Down
Loading

0 comments on commit 241f98d

Please sign in to comment.