diff --git a/docs/adr/0021-move-archive-from-esb.md b/docs/adr/0021-move-archive-from-esb.md new file mode 100644 index 000000000..235f7e9df --- /dev/null +++ b/docs/adr/0021-move-archive-from-esb.md @@ -0,0 +1,32 @@ +# 20. Move Archive Methods from External Service Bus to Outbox Archiver to Reduce Complexity + +Date: 2024-10-28 + +## Status + +Adopted + +## Context + +The `ExternalServiceBus` class is a mediator between producer and outbox. It suffers from complexity, see ADR 0020. + +The `ExternalServiceBus` class has a number of methods that are related to archiving messages. These methods are: + - Archive + - ArchiveAsync + +The OutboxArchiver and the TimedOuboxArchiver are the only callers of these ExternalBus Archive methods. The TimedOutboxArchiver provides both a timer that fires the archiver at a given periodic interval, and uses the global distributed lock to ensure only one archiver runs. + +## Decision + +Whilst the `Archive` methods were not called out by CodeScene analysis, they do add to the overall set of responsibilities of `ExternalServiceBus`. As they have different reasons to change to `ExternalServiceBus` they should be moved to a separate class. + +We will move the implementation from `ExternalServiceBus` into `OutboxArchiver`. We will call `OutboxArchiver` from `TimedOutboxArchiver`. + +## Consequences + +One, we have moved these functions, it makes sense to rename the `ExternalServiceBus` class to 'OutboxProducerMediator' as this better describes its role within our codebase. + + + + + diff --git a/src/Paramore.Brighter.Extensions.DependencyInjection/ServiceCollectionExtensions.cs b/src/Paramore.Brighter.Extensions.DependencyInjection/ServiceCollectionExtensions.cs index 3d47010db..30acbb9e2 100644 --- a/src/Paramore.Brighter.Extensions.DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/Paramore.Brighter.Extensions.DependencyInjection/ServiceCollectionExtensions.cs @@ -30,7 +30,6 @@ THE SOFTWARE. */ using Paramore.Brighter.FeatureSwitch; using Paramore.Brighter.Logging; using System.Text.Json; -using System.Transactions; using Paramore.Brighter.DynamoDb; using Paramore.Brighter.Observability; using Polly.Registry; @@ -94,7 +93,7 @@ public static IBrighterBuilder BrighterHandlerBuilder(IServiceCollection service var mapperRegistry = new ServiceCollectionMessageMapperRegistry(services, options.MapperLifetime); services.TryAddSingleton(mapperRegistry); - services.TryAddSingleton(options.RequestContextFactory); + services.TryAddSingleton(options.RequestContextFactory); if (options.FeatureSwitchRegistry != null) services.TryAddSingleton(options.FeatureSwitchRegistry); @@ -136,9 +135,6 @@ public static IBrighterBuilder BrighterHandlerBuilder(IServiceCollection service /// /// The Brighter builder to add this option to /// A callback that allows you to configure options - /// The transaction provider for the outbox, can be null for in-memory default - /// of which you must set the generic type to for - /// /// The lifetime of the transaction provider /// The Brighter builder to allow chaining of requests public static IBrighterBuilder UseExternalBus( @@ -222,8 +218,8 @@ public static IBrighterBuilder UseExternalBus( brighterBuilder.Services.TryAddSingleton(busConfiguration); - brighterBuilder.Services.TryAdd(new ServiceDescriptor(typeof(IAmAnExternalBusService), - (serviceProvider) => BuildExternalBus( + brighterBuilder.Services.TryAdd(new ServiceDescriptor(typeof(IAmAnOutboxProducerMediator), + (serviceProvider) => BuildOutBoxProducerMediator( serviceProvider, transactionType, busConfiguration, brighterBuilder.PolicyRegistry, outbox ), ServiceLifetime.Singleton)); @@ -237,7 +233,7 @@ private static INeedInstrumentation AddEventBus( INeedMessaging messagingBuilder, IUseRpc useRequestResponse) { - var eventBus = provider.GetService(); + var eventBus = provider.GetService(); var eventBusConfiguration = provider.GetService(); var serviceActivatorOptions = provider.GetService(); @@ -325,7 +321,7 @@ private static object BuildCommandProcessor(IServiceProvider provider) return commandProcessor; } - private static IAmAnExternalBusService BuildExternalBus(IServiceProvider serviceProvider, + private static IAmAnOutboxProducerMediator BuildOutBoxProducerMediator(IServiceProvider serviceProvider, Type transactionType, ExternalBusConfiguration busConfiguration, IPolicyRegistry policyRegistry, @@ -333,9 +329,9 @@ private static IAmAnExternalBusService BuildExternalBus(IServiceProvider service { //Because the bus has specialized types as members, we need to create the bus type dynamically //again to prevent someone configuring Brighter from having to pass generic types - var busType = typeof(ExternalBusService<,>).MakeGenericType(typeof(Message), transactionType); + var busType = typeof(OutboxProducerMediator<,>).MakeGenericType(typeof(Message), transactionType); - return (IAmAnExternalBusService)Activator.CreateInstance( + return (IAmAnOutboxProducerMediator)Activator.CreateInstance( busType, busConfiguration.ProducerRegistry, policyRegistry, @@ -344,13 +340,11 @@ private static IAmAnExternalBusService BuildExternalBus(IServiceProvider service TransformFactoryAsync(serviceProvider), Tracer(serviceProvider), outbox, - busConfiguration.ArchiveProvider, RequestContextFactory(serviceProvider), busConfiguration.OutboxTimeout, busConfiguration.MaxOutStandingMessages, busConfiguration.MaxOutStandingCheckInterval, busConfiguration.OutBoxBag, - busConfiguration.ArchiveBatchSize, TimeProvider.System, busConfiguration.InstrumentationOptions); } diff --git a/src/Paramore.Brighter.Extensions.Hosting/HostedServiceCollectionExtensions.cs b/src/Paramore.Brighter.Extensions.Hosting/HostedServiceCollectionExtensions.cs index 9ee223203..6a4bb2147 100644 --- a/src/Paramore.Brighter.Extensions.Hosting/HostedServiceCollectionExtensions.cs +++ b/src/Paramore.Brighter.Extensions.Hosting/HostedServiceCollectionExtensions.cs @@ -57,7 +57,7 @@ public static IBrighterBuilder UseOutboxArchiver(this IBrighterBui brighterBuilder.Services.TryAddSingleton(options); brighterBuilder.Services.AddSingleton(archiveProvider); - brighterBuilder.Services.AddHostedService(); + brighterBuilder.Services.AddHostedService>(); return brighterBuilder; } diff --git a/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxArchiver.cs b/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxArchiver.cs index 713ba27b0..bb2b81911 100644 --- a/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxArchiver.cs +++ b/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxArchiver.cs @@ -25,29 +25,43 @@ THE SOFTWARE. */ using System; using System.Threading; using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Paramore.Brighter.Logging; +// ReSharper disable StaticMemberInGenericType namespace Paramore.Brighter.Extensions.Hosting { - /// /// The archiver will find messages in the outbox that are older than a certain age and archive them /// - /// Needed to create a scope within which to create a - /// Used to ensure that only one instance of the is running - /// The that control how the archiver runs, such as interval - public class TimedOutboxArchiver( - IServiceScopeFactory serviceScopeFactory, - IDistributedLock distributedLock, - TimedOutboxArchiverOptions options - ) - : IHostedService, IDisposable + public class TimedOutboxArchiver : IHostedService, IDisposable where TMessage : Message { private static readonly ILogger s_logger = ApplicationLogging.CreateLogger(); private Timer _timer; + private readonly OutboxArchiver _archiver; + private readonly TimeSpan _dispatchedSince; + private readonly IDistributedLock _distributedLock; + private readonly TimedOutboxArchiverOptions _options; + + /// + /// The archiver will find messages in the outbox that are older than a certain age and archive them + /// + /// The archiver to use + /// How old should a message be, in order to archive it? + /// Used to ensure that only one instance of the is running + /// The that control how the archiver runs, such as interval + public TimedOutboxArchiver( + OutboxArchiver archiver, + TimeSpan dispatchedSince, + IDistributedLock distributedLock, + TimedOutboxArchiverOptions options) + { + _archiver = archiver; + _dispatchedSince = dispatchedSince; + _distributedLock = distributedLock; + _options = options; + } private const string LockingResourceName = "Archiver"; @@ -60,8 +74,8 @@ public Task StartAsync(CancellationToken cancellationToken) { s_logger.LogInformation("Outbox Archiver Service is starting"); - _timer = new Timer(async (e) => await Archive(e, cancellationToken), null, TimeSpan.Zero, - TimeSpan.FromSeconds(options.TimerInterval)); + _timer = new Timer(async e => await Archive(e, cancellationToken), null, TimeSpan.Zero, + TimeSpan.FromSeconds(_options.TimerInterval)); return Task.CompletedTask; } @@ -90,16 +104,13 @@ public void Dispose() private async Task Archive(object state, CancellationToken cancellationToken) { - var lockId = await distributedLock.ObtainLockAsync(LockingResourceName, cancellationToken); + var lockId = await _distributedLock.ObtainLockAsync(LockingResourceName, cancellationToken); if (lockId != null) { - var scope = serviceScopeFactory.CreateScope(); s_logger.LogInformation("Outbox Archiver looking for messages to Archive"); try { - IAmAnExternalBusService externalBusService = scope.ServiceProvider.GetService(); - - await externalBusService.ArchiveAsync(options.MinimumAge, new RequestContext(), cancellationToken); + await _archiver.ArchiveAsync(_dispatchedSince, new RequestContext(), cancellationToken); } catch (Exception e) { @@ -107,7 +118,7 @@ private async Task Archive(object state, CancellationToken cancellationToken) } finally { - await distributedLock.ReleaseLockAsync(LockingResourceName, lockId, cancellationToken); + await _distributedLock.ReleaseLockAsync(LockingResourceName, lockId, cancellationToken); } s_logger.LogInformation("Outbox Sweeper sleeping"); diff --git a/src/Paramore.Brighter.ServiceActivator/ControlBus/ControlBusReceiverBuilder.cs b/src/Paramore.Brighter.ServiceActivator/ControlBus/ControlBusReceiverBuilder.cs index 87d50ee78..b5d2fe371 100644 --- a/src/Paramore.Brighter.ServiceActivator/ControlBus/ControlBusReceiverBuilder.cs +++ b/src/Paramore.Brighter.ServiceActivator/ControlBus/ControlBusReceiverBuilder.cs @@ -158,7 +158,7 @@ public Dispatcher Build(string hostName) var outbox = new SinkOutboxSync(); - var externalBus = new ExternalBusService( + var mediator = new OutboxProducerMediator( producerRegistry: producerRegistry, policyRegistry: new DefaultPolicy(), mapperRegistry: outgoingMessageMapperRegistry, @@ -175,7 +175,7 @@ public Dispatcher Build(string hostName) commandProcessor = CommandProcessorBuilder.StartNew() .Handlers(new HandlerConfiguration(subscriberRegistry, new ControlBusHandlerFactorySync(_dispatcher, () => commandProcessor))) .Policies(policyRegistry) - .ExternalBus(ExternalBusType.FireAndForget, externalBus) + .ExternalBus(ExternalBusType.FireAndForget, mediator) .ConfigureInstrumentation(null, InstrumentationOptions.None) .RequestContextFactory(new InMemoryRequestContextFactory()) .Build(); diff --git a/src/Paramore.Brighter/CommandProcessor.cs b/src/Paramore.Brighter/CommandProcessor.cs index b0909e8da..b8225cb0e 100644 --- a/src/Paramore.Brighter/CommandProcessor.cs +++ b/src/Paramore.Brighter/CommandProcessor.cs @@ -101,9 +101,9 @@ public class CommandProcessor : IAmACommandProcessor /// Bus: We want to hold a reference to the bus; use double lock to let us pass parameters to the constructor from the first instance /// MethodCache: Used to reduce the cost of reflection for bulk calls /// - private static IAmAnExternalBusService? s_bus; + private static IAmAnOutboxProducerMediator? s_outboxProducerMediator; private static readonly object s_padlock = new(); - private static ConcurrentDictionary s_boundDepositCalls = new(); + private static readonly ConcurrentDictionary s_boundDepositCalls = new(); /// /// Initializes a new instance of the class @@ -167,7 +167,7 @@ public CommandProcessor( IAmAHandlerFactory handlerFactory, IAmARequestContextFactory requestContextFactory, IPolicyRegistry policyRegistry, - IAmAnExternalBusService bus, + IAmAnOutboxProducerMediator bus, IAmAFeatureSwitchRegistry? featureSwitchRegistry = null, InboxConfiguration? inboxConfiguration = null, IEnumerable? replySubscriptions = null, @@ -199,7 +199,7 @@ public CommandProcessor( public CommandProcessor( IAmARequestContextFactory requestContextFactory, IPolicyRegistry policyRegistry, - IAmAnExternalBusService bus, + IAmAnOutboxProducerMediator bus, IAmAFeatureSwitchRegistry? featureSwitchRegistry = null, InboxConfiguration? inboxConfiguration = null, IEnumerable? replySubscriptions = null, @@ -247,9 +247,9 @@ public void Send(T command, RequestContext? requestContext = null) where T : handlerChain.First().Handle(command); } - catch (Exception) + catch (Exception e) { - span?.SetStatus(ActivityStatusCode.Error); + _tracer?.AddExceptionToSpan(span, [e]); throw; } finally @@ -296,9 +296,9 @@ public async Task SendAsync( await handlerChain.First().HandleAsync(command, cancellationToken) .ConfigureAwait(continueOnCapturedContext); } - catch (Exception) + catch (Exception e) { - span?.SetStatus(ActivityStatusCode.Error); + _tracer?.AddExceptionToSpan(span, [e]); throw; } finally @@ -362,7 +362,7 @@ public void Publish(T @event, RequestContext? requestContext = null) where T if (exceptions.Any()) { - span?.SetStatus(ActivityStatusCode.Error); + _tracer?.AddExceptionToSpan(span, exceptions); throw new AggregateException( "Failed to publish to one more handlers successfully, see inner exceptions for details", exceptions); @@ -441,7 +441,7 @@ public async Task PublishAsync( _tracer?.LinkSpans(handlerSpans); if (exceptions.Any()) - span?.SetStatus(ActivityStatusCode.Error); + _tracer?.AddExceptionToSpan(span, exceptions); if (exceptions.Count > 0) { @@ -562,16 +562,21 @@ public string DepositPost( try { - Message message = s_bus!.CreateMessageFromRequest(request, context); + Message message = s_outboxProducerMediator!.CreateMessageFromRequest(request, context); - var bus = ((IAmAnExternalBusService)s_bus); + var bus = ((IAmAnOutboxProducerMediator)s_outboxProducerMediator); if (!bus.HasOutbox()) throw new InvalidOperationException("No outbox defined."); bus.AddToOutbox(message, context, transactionProvider, batchId); - return message.Id!; + return message.Id; + } + catch (Exception e) + { + _tracer?.AddExceptionToSpan(span, [e]); + throw; } finally { @@ -627,9 +632,9 @@ public string[] DepositPost( { var successfullySentMessage = new List(); - var bus = (IAmAnExternalBusService)s_bus!; + var mediator = (IAmAnOutboxProducerMediator)s_outboxProducerMediator!; - var batchId = bus.StartBatchAddToOutbox(); + var batchId = mediator.StartBatchAddToOutbox(); foreach (var request in requests) { @@ -639,10 +644,15 @@ public string[] DepositPost( context.Span = createSpan; } - bus.EndBatchAddToOutbox(batchId, transactionProvider, context); + mediator.EndBatchAddToOutbox(batchId, transactionProvider, context); return successfullySentMessage.ToArray(); } + catch (Exception e) + { + _tracer?.AddExceptionToSpan(span, [e]); + throw; + } finally { _tracer?.EndSpan(span); @@ -748,9 +758,9 @@ public async Task DepositPostAsync( try { - Message message = await s_bus!.CreateMessageFromRequestAsync(request, context, cancellationToken); + Message message = await s_outboxProducerMediator!.CreateMessageFromRequestAsync(request, context, cancellationToken); - var bus = ((IAmAnExternalBusService)s_bus); + var bus = ((IAmAnOutboxProducerMediator)s_outboxProducerMediator); if (!bus.HasAsyncOutbox()) throw new InvalidOperationException("No async outbox defined."); @@ -758,7 +768,12 @@ public async Task DepositPostAsync( await bus.AddToOutboxAsync(message, context, transactionProvider, continueOnCapturedContext, cancellationToken, batchId); - return message.Id!; + return message.Id; + } + catch (Exception e) + { + _tracer?.AddExceptionToSpan(span, [e]); + throw; } finally { @@ -828,7 +843,7 @@ public async Task DepositPostAsync( { var successfullySentMessage = new List(); - var bus = (IAmAnExternalBusService)s_bus!; + var bus = (IAmAnOutboxProducerMediator)s_outboxProducerMediator!; var batchId = bus.StartBatchAddToOutbox(); @@ -845,6 +860,11 @@ public async Task DepositPostAsync( return successfullySentMessage.ToArray(); } + catch (Exception e) + { + _tracer?.AddExceptionToSpan(span, [e]); + throw; + } finally { _tracer?.EndSpan(span); @@ -899,7 +919,12 @@ public void ClearOutbox(string[] ids, RequestContext? requestContext = null, Dic try { - s_bus!.ClearOutbox(ids, context, args); + s_outboxProducerMediator!.ClearOutbox(ids, context, args); + } + catch (Exception e) + { + _tracer?.AddExceptionToSpan(span, [e]); + throw; } finally { @@ -928,7 +953,12 @@ public async Task ClearOutboxAsync( try { - await s_bus!.ClearOutboxAsync(posts, context, continueOnCapturedContext, args, cancellationToken); + await s_outboxProducerMediator!.ClearOutboxAsync(posts, context, continueOnCapturedContext, args, cancellationToken); + } + catch (Exception e) + { + _tracer?.AddExceptionToSpan(span, [e]); + throw; } finally { @@ -960,7 +990,12 @@ public void ClearOutstandingFromOutbox( try { var minAge = minimumAge ?? TimeSpan.FromMilliseconds(5000); - s_bus!.ClearOutstandingFromOutbox(amountToClear, minAge, useBulk, context, args); + s_outboxProducerMediator!.ClearOutstandingFromOutbox(amountToClear, minAge, useBulk, context, args); + } + catch (Exception e) + { + _tracer?.AddExceptionToSpan(span, [e]); + throw; } finally { @@ -993,10 +1028,10 @@ public void ClearOutstandingFromOutbox( var subscription = _replySubscriptions?.FirstOrDefault(s => s.DataType == typeof(TResponse)); if (subscription is null) - throw new ArgumentOutOfRangeException($"No Subscription registered fpr replies of type {typeof(T)}"); + throw new InvalidOperationException($"No Subscription registered fpr replies of type {typeof(T)}"); if (_responseChannelFactory is null) - throw new ArgumentOutOfRangeException("No ResponseChannelFactory registered"); + throw new InvalidOperationException("No ResponseChannelFactory registered"); //create a reply queue via creating a consumer - we use random identifiers as we will destroy var channelName = Guid.NewGuid(); @@ -1020,11 +1055,11 @@ public void ClearOutstandingFromOutbox( try { - var outMessage = s_bus!.CreateMessageFromRequest(request, context); + var outMessage = s_outboxProducerMediator!.CreateMessageFromRequest(request, context); //We don't store the message, if we continue to fail further retry is left to the sender s_logger.LogDebug("Sending request with routingkey {ChannelName}", channelName); - s_bus.CallViaExternalBus(outMessage, requestContext); + s_outboxProducerMediator.CallViaExternalBus(outMessage, requestContext); Message? responseMessage = null; @@ -1036,17 +1071,21 @@ public void ClearOutstandingFromOutbox( { s_logger.LogDebug("Reply received from {ChannelName}", channelName); //map to request is map to a response, but it is a request from consumer point of view. Confusing, but... - s_bus.CreateRequestFromMessage(responseMessage, context, out TResponse response); + s_outboxProducerMediator.CreateRequestFromMessage(responseMessage, context, out TResponse response); Send(response); return response; } - s_logger.LogInformation("Deleting queue for routingkey: {ChannelName}", channelName); return null; } + catch (Exception e) + { + _tracer?.AddExceptionToSpan(span, [e]); + throw; + } finally { _tracer?.EndSpan(span); @@ -1060,15 +1099,12 @@ public void ClearOutstandingFromOutbox( /// public static void ClearServiceBus() { - if (s_bus != null) + if (s_outboxProducerMediator != null) { lock (s_padlock) { - if (s_bus != null) - { - s_bus.Dispose(); - s_bus = null; - } + s_outboxProducerMediator.Dispose(); + s_outboxProducerMediator = null; } } s_boundDepositCalls.Clear(); @@ -1091,7 +1127,7 @@ private bool HandlerFactoryIsNotEitherIAmAHandlerFactorySyncOrAsync(IAmAHandlerF { // If we do not have a subscriber registry and we do not have a handler factory // then we're creating a control bus sender and we don't need them - if (_subscriberRegistry is null && handlerFactory is null) + if (_subscriberRegistry is null) return false; switch (handlerFactory) @@ -1104,16 +1140,16 @@ private bool HandlerFactoryIsNotEitherIAmAHandlerFactorySyncOrAsync(IAmAHandlerF } } - // Create an instance of the ExternalBusService if one not already set for this app. Note that we do not support reinitialization here, so once you have + // Create an instance of the OutboxProducerMediator if one not already set for this app. Note that we do not support reinitialization here, so once you have // set a command processor for the app, you can't call init again to set them - although the properties are not read-only so overwriting is possible // if needed as a "get out of gaol" card. - private static void InitExtServiceBus(IAmAnExternalBusService bus) + private static void InitExtServiceBus(IAmAnOutboxProducerMediator bus) { - if (s_bus == null) + if (s_outboxProducerMediator == null) { lock (s_padlock) { - s_bus ??= bus; + s_outboxProducerMediator ??= bus; } } } diff --git a/src/Paramore.Brighter/CommandProcessorBuilder.cs b/src/Paramore.Brighter/CommandProcessorBuilder.cs index 6dd91234f..10217804e 100644 --- a/src/Paramore.Brighter/CommandProcessorBuilder.cs +++ b/src/Paramore.Brighter/CommandProcessorBuilder.cs @@ -86,7 +86,7 @@ public class CommandProcessorBuilder : INeedAHandlers, INeedPolicy, INeedMessagi private IPolicyRegistry? _policyRegistry; private IAmAFeatureSwitchRegistry? _featureSwitchRegistry; - private IAmAnExternalBusService? _bus; + private IAmAnOutboxProducerMediator? _bus; private bool _useRequestReplyQueues; private IAmAChannelFactory? _responseChannelFactory; private IEnumerable? _replySubscriptions; @@ -174,7 +174,7 @@ public INeedMessaging DefaultPolicy() /// public INeedInstrumentation ExternalBus( ExternalBusType busType, - IAmAnExternalBusService bus, + IAmAnOutboxProducerMediator bus, IAmAChannelFactory? responseChannelFactory = null, IEnumerable? subscriptions = null, InboxConfiguration? inboxConfiguration = null @@ -372,7 +372,7 @@ public interface INeedMessaging /// INeedInstrumentation ExternalBus( ExternalBusType busType, - IAmAnExternalBusService bus, + IAmAnOutboxProducerMediator bus, IAmAChannelFactory? responseChannelFactory = null, IEnumerable? subscriptions = null, InboxConfiguration? inboxConfiguration = null diff --git a/src/Paramore.Brighter/ControlBusSenderFactory.cs b/src/Paramore.Brighter/ControlBusSenderFactory.cs index 936b0e281..20d9317f2 100644 --- a/src/Paramore.Brighter/ControlBusSenderFactory.cs +++ b/src/Paramore.Brighter/ControlBusSenderFactory.cs @@ -49,7 +49,7 @@ public IAmAControlBusSender Create(IAmAnOutbox outbox, IAmAProd null); mapper.Register(); - var bus = new ExternalBusService( + var mediator = new OutboxProducerMediator( producerRegistry: producerRegistry, policyRegistry: new DefaultPolicy(), mapperRegistry: mapper, @@ -62,7 +62,7 @@ public IAmAControlBusSender Create(IAmAnOutbox outbox, IAmAProd CommandProcessorBuilder.StartNew() .Handlers(new HandlerConfiguration()) .DefaultPolicy() - .ExternalBus(ExternalBusType.FireAndForget, bus) + .ExternalBus(ExternalBusType.FireAndForget, mediator) .ConfigureInstrumentation(null, InstrumentationOptions.None) .RequestContextFactory(new InMemoryRequestContextFactory()) .Build() diff --git a/src/Paramore.Brighter/IAmAnExternalBusService.cs b/src/Paramore.Brighter/IAmAnOutboxProducerMediator.cs similarity index 90% rename from src/Paramore.Brighter/IAmAnExternalBusService.cs rename to src/Paramore.Brighter/IAmAnOutboxProducerMediator.cs index 056ce8c66..f754cc6fa 100644 --- a/src/Paramore.Brighter/IAmAnExternalBusService.cs +++ b/src/Paramore.Brighter/IAmAnOutboxProducerMediator.cs @@ -9,24 +9,8 @@ namespace Paramore.Brighter /// An external bus service allows us to send messages to external systems /// The interaction with the CommandProcessor is mostly via the Outbox and the Message Mapper /// - public interface IAmAnExternalBusService : IDisposable + public interface IAmAnOutboxProducerMediator : IDisposable { - - /// - /// Archive Message from the outbox to the outbox archive provider - /// - /// Minimum age - /// What is the context for this request; used to access the Span - void Archive(TimeSpan dispatchedSince, RequestContext requestContext); - - /// - /// Archive Message from the outbox to the outbox archive provider - /// - /// How stale is the message that we want to archive - /// The context for the request pipeline; gives us the OTel span for example - /// The Cancellation Token - Task ArchiveAsync(TimeSpan dispatchedSince, RequestContext requestContext, CancellationToken cancellationToken); - /// /// Used with RPC to call a remote service via the external bus /// @@ -119,7 +103,7 @@ void CreateRequestFromMessage(Message message, RequestContext? request /// An external bus service allows us to send messages to external systems /// The interaction with the CommandProcessor is mostly via the Outbox and the Message Mapper /// - public interface IAmAnExternalBusService : IDisposable + public interface IAmAnOutboxProducerMediator : IDisposable { /// /// Adds a message to the outbox diff --git a/src/Paramore.Brighter/InMemoryOutbox.cs b/src/Paramore.Brighter/InMemoryOutbox.cs index 294529661..6e54fa33f 100644 --- a/src/Paramore.Brighter/InMemoryOutbox.cs +++ b/src/Paramore.Brighter/InMemoryOutbox.cs @@ -214,6 +214,8 @@ public Task AddAsync( IAmABoxTransactionProvider? transactionProvider = null, CancellationToken cancellationToken = default) { + //NOTE: As we call Add, don't create telemetry here + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); if (cancellationToken.IsCancellationRequested) @@ -241,11 +243,11 @@ public void Delete(string[] messageIds, RequestContext? requestContext, Dictiona { foreach (string messageId in messageIds) { - Requests.TryRemove(messageId, out _); + Delete(messageId); } } - /// + /// /// Deletes the messages from the Outbox /// /// The ids of the messages to delete @@ -282,13 +284,26 @@ public IEnumerable DispatchedMessages( Dictionary? args = null) { ClearExpiredMessages(); + + var span = Tracer?.CreateDbSpan( + new OutboxSpanInfo(DbSystem.Brighter, InMemoryAttributes.DbName, OutboxDbOperation.DispatchedMessages, InMemoryAttributes.DbTable), + requestContext?.Span, + options: _instrumentationOptions + ); - var age = - _timeProvider.GetUtcNow() - dispatchedSince; - return Requests.Values - .Where(oe => (oe.TimeFlushed != DateTimeOffset.MinValue) && (oe.TimeFlushed <= age)) - .Take(pageSize) - .Select(oe => oe.Message).ToArray(); + try + { + var now = _timeProvider.GetUtcNow(); + var age = now - dispatchedSince; + return Requests.Values + .Where(oe => (oe.TimeFlushed != DateTimeOffset.MinValue) && (oe.TimeFlushed <= age)) + .Take(pageSize) + .Select(oe => oe.Message).ToArray(); + } + finally + { + Tracer?.EndSpan(span); + } } /// @@ -309,6 +324,8 @@ public Task> DispatchedMessagesAsync(TimeSpan dispatchedSin Dictionary? args = null, CancellationToken cancellationToken = default) { + //NOTE: As we call DispatchedMessages, don't create telemetry here + return Task.FromResult(DispatchedMessages(dispatchedSince, requestContext, pageSize, pageNumber, outboxTimeout, args)); @@ -391,6 +408,8 @@ public Task MarkDispatchedAsync( Dictionary? args = null, CancellationToken cancellationToken = default) { + //NOTE: We don't create a span here as we just call the sync method + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); MarkDispatched(id, requestContext, dispatchedAt); @@ -415,6 +434,8 @@ public Task MarkDispatchedAsync( Dictionary? args = null, CancellationToken cancellationToken = default) { + //NOTE: We don't create a span here as we just call the sync method + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); ids.Each((id) => MarkDispatched(id, requestContext, dispatchedAt)); @@ -436,10 +457,23 @@ public void MarkDispatched(string id, RequestContext requestContext, DateTimeOff Dictionary? args = null) { ClearExpiredMessages(); + + var span = Tracer?.CreateDbSpan( + new OutboxSpanInfo(DbSystem.Brighter, InMemoryAttributes.DbName, OutboxDbOperation.MarkDispatched, InMemoryAttributes.DbTable), + requestContext?.Span, + options: _instrumentationOptions + ); - if (Requests.TryGetValue(id, out OutboxEntry? entry)) + try { - entry.TimeFlushed = dispatchedAt ?? _timeProvider.GetUtcNow().DateTime; + if (Requests.TryGetValue(id, out OutboxEntry? entry)) + { + entry.TimeFlushed = dispatchedAt ?? _timeProvider.GetUtcNow(); + } + } + finally + { + Tracer?.EndSpan(span); } } @@ -502,12 +536,31 @@ public Task> OutstandingMessagesAsync( Dictionary? args = null, CancellationToken cancellationToken = default) { - var tcs = new TaskCompletionSource>(TaskCreationOptions - .RunContinuationsAsynchronously); + //NOTE: We don't create a span here as we just call the sync method + + var tcs = new TaskCompletionSource>(TaskCreationOptions.RunContinuationsAsynchronously); tcs.SetResult(OutstandingMessages(dispatchedSince, requestContext, pageSize, pageNumber, args)); return tcs.Task; } + + private void Delete(string messageId, RequestContext? requestContext = null) + { + var span = Tracer?.CreateDbSpan( + new OutboxSpanInfo(DbSystem.Brighter, InMemoryAttributes.DbName, OutboxDbOperation.Delete, InMemoryAttributes.DbTable), + requestContext?.Span, + options: _instrumentationOptions + ); + + try + { + Requests.TryRemove(messageId, out _); + } + finally + { + Tracer?.EndSpan(span); + } + } } } diff --git a/src/Paramore.Brighter/MessageMapperRegistry.cs b/src/Paramore.Brighter/MessageMapperRegistry.cs index 8fe0dd318..6e1ad7d3d 100644 --- a/src/Paramore.Brighter/MessageMapperRegistry.cs +++ b/src/Paramore.Brighter/MessageMapperRegistry.cs @@ -23,9 +23,7 @@ THE SOFTWARE. */ #endregion using System; -using System.Collections; using System.Collections.Generic; -using System.Linq; namespace Paramore.Brighter { @@ -38,7 +36,7 @@ namespace Paramore.Brighter /// public class MessageMapperRegistry : IAmAMessageMapperRegistry, IAmAMessageMapperRegistryAsync { - private readonly IAmAMessageMapperFactory _messageMapperFactory; + private readonly IAmAMessageMapperFactory? _messageMapperFactory; private readonly IAmAMessageMapperFactoryAsync? _messageMapperFactoryAsync; private readonly Dictionary _messageMappers = new Dictionary(); private readonly Dictionary _asyncMessageMappers = new Dictionary(); @@ -48,7 +46,7 @@ public class MessageMapperRegistry : IAmAMessageMapperRegistry, IAmAMessageMappe /// /// The message mapper factory. /// The async message mapper factory - public MessageMapperRegistry(IAmAMessageMapperFactory messageMapperFactory, IAmAMessageMapperFactoryAsync? messageMapperFactoryAsync) + public MessageMapperRegistry(IAmAMessageMapperFactory? messageMapperFactory, IAmAMessageMapperFactoryAsync? messageMapperFactoryAsync) { _messageMapperFactory = messageMapperFactory; _messageMapperFactoryAsync = messageMapperFactoryAsync; @@ -64,7 +62,7 @@ public MessageMapperRegistry(IAmAMessageMapperFactory messageMapperFactory, IAmA /// IAmAMessageMapper<TRequest>. public IAmAMessageMapper? Get() where TRequest : class, IRequest { - if (_messageMappers.ContainsKey(typeof(TRequest))) + if ( _messageMapperFactory is not null && _messageMappers.ContainsKey(typeof(TRequest))) { var messageMapperType = _messageMappers[typeof(TRequest)]; return (IAmAMessageMapper)_messageMapperFactory.Create(messageMapperType); diff --git a/src/Paramore.Brighter/Observability/BrighterSemanticConventions.cs b/src/Paramore.Brighter/Observability/BrighterSemanticConventions.cs index fb158fb77..306e41d2e 100644 --- a/src/Paramore.Brighter/Observability/BrighterSemanticConventions.cs +++ b/src/Paramore.Brighter/Observability/BrighterSemanticConventions.cs @@ -29,6 +29,8 @@ namespace Paramore.Brighter.Observability; /// public static class BrighterSemanticConventions { + public const string ArchiveAge = "paramore.brighter.archive_age_in_milliseconds"; + public const string ArchiveMessages = "paramore.brighter.archive_messages"; public const string CeSource = "cloudevents.event_source"; public const string CeMessageId = "cloudevents.event_id"; public const string CeVersion = "cloudevents.event_spec_version"; diff --git a/src/Paramore.Brighter/Observability/BrighterSpanExtensions.cs b/src/Paramore.Brighter/Observability/BrighterSpanExtensions.cs index 2c858d3cf..9bc5e89a2 100644 --- a/src/Paramore.Brighter/Observability/BrighterSpanExtensions.cs +++ b/src/Paramore.Brighter/Observability/BrighterSpanExtensions.cs @@ -41,6 +41,7 @@ public static class BrighterSpanExtensions CommandProcessorSpanOperation.Publish => "publish", CommandProcessorSpanOperation.Send => "send", CommandProcessorSpanOperation.Clear => "clear", + CommandProcessorSpanOperation.Archive => "archive", _ => throw new ArgumentOutOfRangeException(nameof(operation), operation, null) }; diff --git a/src/Paramore.Brighter/Observability/BrighterTracer.cs b/src/Paramore.Brighter/Observability/BrighterTracer.cs index 5f923d566..d1f20e7f6 100644 --- a/src/Paramore.Brighter/Observability/BrighterTracer.cs +++ b/src/Paramore.Brighter/Observability/BrighterTracer.cs @@ -63,7 +63,32 @@ public BrighterTracer(TimeProvider? timeProvider = null) /// public void Dispose() { - ActivitySource?.Dispose(); + ActivitySource.Dispose(); + } + + /// + /// If an activity has an exception, then we should record it on the span + /// + /// + /// + public void AddExceptionToSpan(Activity? span, IEnumerable exceptions) + { + if (span == null ) return; + + var exceptionList = exceptions.ToArray(); + + if (exceptionList.Length == 0) return; + + if (exceptionList .Length == 1) + { + span.RecordException(exceptionList[0]); + span.SetStatus(ActivityStatusCode.Error, exceptionList[0].Message); + return; + } + + var exception = new AggregateException("Operation failed, see inner exceptions for details", exceptionList); + span.RecordException(exception); + span.SetStatus(ActivityStatusCode.Error, exception.Message); } /// @@ -286,6 +311,43 @@ public void Dispose() return activity; } + /// + /// Creates a span for an archive operation. Because a sweeper may not create an externa bus, but just use the archiver directly, we + /// check for this existing and then recreate directly in the archive provider if it does not exist + /// + /// A parent activity that called this one + /// The minimum age of a row to be archived + /// The for how deep should the instrumentation go? + /// + public Activity? CreateArchiveSpan( + Activity? parentActivity, + TimeSpan dispatchedSince, + InstrumentationOptions options = InstrumentationOptions.All) + { + var operation = CommandProcessorSpanOperation.Archive; + var spanName = $"{BrighterSemanticConventions.ArchiveMessages} {operation.ToSpanName()}"; + var kind = ActivityKind.Producer; + var parentId = parentActivity?.Id; + var now = _timeProvider.GetUtcNow(); + var tags = new ActivityTagsCollection() + + { + { BrighterSemanticConventions.Operation, operation.ToSpanName() }, + { BrighterSemanticConventions.ArchiveAge, dispatchedSince.TotalMilliseconds } + }; + + var activity = ActivitySource.StartActivity( + name: spanName, + kind: kind, + parentId: parentId, + tags: tags, + startTime: now); + + Activity.Current = activity; + + return activity; + } + /// /// Create a span for a batch of messages to be cleared /// @@ -539,7 +601,6 @@ InstrumentationOptions instrumentationOptions /// /// Create an event representing the external service bus calling the outbox /// This is generic and not specific details from a particular outbox and is thus mostly message properties - /// This is a batch version of /// NOTE: Events are static, as we only need the instance state to create an activity /// /// What are we performing on the group of messages diff --git a/src/Paramore.Brighter/Observability/CommandProcessorSpanOperation.cs b/src/Paramore.Brighter/Observability/CommandProcessorSpanOperation.cs index f0dabbcd9..19f29e8b6 100644 --- a/src/Paramore.Brighter/Observability/CommandProcessorSpanOperation.cs +++ b/src/Paramore.Brighter/Observability/CommandProcessorSpanOperation.cs @@ -33,5 +33,6 @@ public enum CommandProcessorSpanOperation Create = 1, // A batch operation, such as publishing an event or clearing a message Publish = 2, // Publish an event Deposit = 3, // Deposit a message in the outbox - Clear = 4 // Clear a message from the outbox + Clear = 4, // Clear a message from the outbox + Archive = 5 //Archive a message from the outbox } diff --git a/src/Paramore.Brighter/Observability/IAmABrighterTracer.cs b/src/Paramore.Brighter/Observability/IAmABrighterTracer.cs index df27a6a6f..2b45bb5e1 100644 --- a/src/Paramore.Brighter/Observability/IAmABrighterTracer.cs +++ b/src/Paramore.Brighter/Observability/IAmABrighterTracer.cs @@ -69,11 +69,23 @@ public interface IAmABrighterTracer : IDisposable InstrumentationOptions options = InstrumentationOptions.All ) where TRequest : class, IRequest; + /// + /// Creates a span for an archive operation. Because a sweeper may not create an externa bus, but just use the archiver directly, we + /// check for this existing and then recreate directly in the archive provider if it does not exist + /// + /// A parent activity that called this one + /// The minimum age of a row to be archived + /// The for how deep should the instrumentation go? + /// + Activity? CreateArchiveSpan( + Activity? parentActivity, + TimeSpan dispatchedSince, + InstrumentationOptions options = InstrumentationOptions.All + ); + /// /// Create a span for a request in CommandProcessor /// - /// What type of span are we creating - /// What is the request that we are tracking with this span /// The parent activity, if any, that we should assign to this span /// Are there links to other spans that we should add to this span /// How deep should the instrumentation go? @@ -83,6 +95,20 @@ public interface IAmABrighterTracer : IDisposable ActivityLink[]? links = null, InstrumentationOptions options = InstrumentationOptions.All ) where TRequest : class, IRequest; + + /// + /// The parent span for the message pump. This is the entry point for the message pump + /// + /// The . This should be Begin or End + /// The for this span + /// The that we are receiving from + /// The for how deep should the instrumentation go? + /// A span (or dotnet Activity) for the current request named request.name operation.name + Activity? CreateMessagePumpSpan( + MessagePumpSpanOperation operation, + RoutingKey topic, + MessagingSystem messagingSystem, + InstrumentationOptions options = InstrumentationOptions.All); /// /// The for this span @@ -161,16 +187,9 @@ public interface IAmABrighterTracer : IDisposable void LinkSpans(ConcurrentDictionary handlerSpans); /// - /// The parent span for the message pump. This is the entry point for the message pump + /// If an activity has an exception, then we should record it on the span /// - /// The . This should be Begin or End - /// The for this span - /// The that we are receiving from - /// The for how deep should the instrumentation go? - /// A span (or dotnet Activity) for the current request named request.name operation.name - Activity? CreateMessagePumpSpan( - MessagePumpSpanOperation operation, - RoutingKey topic, - MessagingSystem messagingSystem, - InstrumentationOptions instrumentationOptions = InstrumentationOptions.All); + /// + /// + void AddExceptionToSpan(Activity? span, IEnumerable exceptions); } diff --git a/src/Paramore.Brighter/OutboxArchiver.cs b/src/Paramore.Brighter/OutboxArchiver.cs index 05c7d30ae..cf7826486 100644 --- a/src/Paramore.Brighter/OutboxArchiver.cs +++ b/src/Paramore.Brighter/OutboxArchiver.cs @@ -23,12 +23,12 @@ THE SOFTWARE. */ #endregion using System; -using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Paramore.Brighter.Logging; +using Paramore.Brighter.Observability; namespace Paramore.Brighter { @@ -37,17 +37,43 @@ namespace Paramore.Brighter /// /// The type of message to archive /// The transaction type of the Db - public class OutboxArchiver( - IAmAnExternalBusService bus, - IAmARequestContextFactory? requestContextFactory = null) - where TMessage : Message + public class OutboxArchiver where TMessage : Message { - private const string ARCHIVE_OUTBOX = "Archive Outbox"; - - private readonly ILogger _logger = ApplicationLogging.CreateLogger>(); - private readonly IAmARequestContextFactory _requestContextFactory = requestContextFactory ?? new InMemoryRequestContextFactory(); + private static readonly ILogger s_logger = ApplicationLogging.CreateLogger>(); + private readonly IAmARequestContextFactory _requestContextFactory; + private readonly IAmAnOutboxSync? _outBox; + private readonly IAmAnOutboxAsync? _asyncOutbox; + private readonly IAmAnArchiveProvider _archiveProvider; + private readonly int _archiveBatchSize; + private readonly IAmABrighterTracer? _tracer; + private readonly InstrumentationOptions _instrumentationOptions; - private const string SUCCESS_MESSAGE = "Successfully archiver {NumberOfMessageArchived} out of {MessagesToArchive}, batch size : {BatchSize}"; + /// + /// Used to archive messages from an Outbox + /// + /// The type of message to archive + /// The transaction type of the Db + public OutboxArchiver( + IAmAnOutbox outbox, + IAmAnArchiveProvider archiveProvider, + IAmARequestContextFactory? requestContextFactory = null, + int archiveBatchSize = 100, + IAmABrighterTracer? tracer = null, + InstrumentationOptions instrumentationOptions = InstrumentationOptions.All) + { + _archiveProvider = archiveProvider; + _archiveBatchSize = archiveBatchSize; + _tracer = tracer; + _instrumentationOptions = instrumentationOptions; + _requestContextFactory = requestContextFactory ?? new InMemoryRequestContextFactory(); + + if (outbox is IAmAnOutboxSync syncOutbox) _outBox = syncOutbox; + if (outbox is IAmAnOutboxAsync asyncOutbox) _asyncOutbox = asyncOutbox; + } + + private const string NoSyncOutboxError = "A sync Outbox must be defined."; + private const string NoArchiveProviderError = "An Archive Provider must be defined."; + private const string NoAsyncOutboxError = "An async Outbox must be defined."; /// /// Archive Message from the outbox to the outbox archive provider @@ -55,26 +81,52 @@ public class OutboxArchiver( /// that these are transient errors which can be retried /// /// How stale is the message that we want archive - public void Archive(TimeSpan dispatchedSince) + /// The context for the request pipeline; gives us the OTel span for example + public void Archive(TimeSpan dispatchedSince, RequestContext? requestContext = null) { -#pragma warning disable CS0618 // Type or member is obsolete - var activity = ApplicationTelemetry.ActivitySource.StartActivity(ARCHIVE_OUTBOX, ActivityKind.Server); -#pragma warning restore CS0618 // Type or member is obsolete - var requestContext = _requestContextFactory.Create(); - requestContext.Span = activity; + requestContext ??= _requestContextFactory.Create(); + //This is an archive span parent; we expect individual archiving operations for messages to have their own spans + var parentSpan = requestContext.Span; + var span = _tracer?.CreateArchiveSpan(requestContext.Span, dispatchedSince, options: _instrumentationOptions); + requestContext.Span = span; try { - bus.Archive(dispatchedSince, requestContext); + if (_outBox is null) throw new ArgumentException(NoSyncOutboxError); + if (_archiveProvider is null) throw new ArgumentException(NoArchiveProviderError); + var messages = _outBox + .DispatchedMessages(dispatchedSince, requestContext, _archiveBatchSize) + .ToArray(); + + s_logger.LogInformation( + "Found {NumberOfMessageArchived} message to archive, batch size : {BatchSize}", + messages.Count(), _archiveBatchSize + ); + + if (messages.Length <= 0) return; + + foreach (var message in messages) + { + _archiveProvider.ArchiveMessage(message); + } + + _outBox.Delete(messages.Select(e => e.Id).ToArray(), requestContext); + + s_logger.LogInformation( + "Successfully archived {NumberOfMessageArchived}, batch size : {BatchSize}", + messages.Count(), _archiveBatchSize + ); } catch (Exception e) { - activity?.SetStatus(ActivityStatusCode.Error, e.Message); + s_logger.LogError(e, "Error while archiving from the outbox"); + _tracer?.AddExceptionToSpan(span, [e]); + throw; } finally { - if(activity?.DisplayName == ARCHIVE_OUTBOX) - activity.Dispose(); + _tracer?.EndSpan(span); + requestContext.Span = parentSpan; } } @@ -86,25 +138,48 @@ public void Archive(TimeSpan dispatchedSince) /// How stale is the message that /// The context for the request pipeline; gives us the OTel span for example /// The Cancellation Token - public async Task ArchiveAsync(TimeSpan dispatchedSince, RequestContext requestContext, CancellationToken cancellationToken) + public async Task ArchiveAsync(TimeSpan dispatchedSince, RequestContext? requestContext = null, CancellationToken cancellationToken = default) { -#pragma warning disable CS0618 // Type or member is obsolete - var activity = ApplicationTelemetry.ActivitySource.StartActivity(ARCHIVE_OUTBOX, ActivityKind.Server); -#pragma warning restore CS0618 // Type or member is obsolete - requestContext.Span = activity; + requestContext ??= _requestContextFactory.Create(); + //This is an archive span parent; we expect individual archiving operations for messages to have their own spans + var parentSpan = requestContext.Span; + var span = _tracer?.CreateArchiveSpan(requestContext.Span, dispatchedSince, options: _instrumentationOptions); + requestContext.Span = span; try { - await bus.ArchiveAsync(dispatchedSince, requestContext, cancellationToken); + if (_asyncOutbox is null) throw new ArgumentException(NoAsyncOutboxError); + if (_archiveProvider is null) throw new ArgumentException(NoArchiveProviderError); + var messages = (await _asyncOutbox.DispatchedMessagesAsync( + dispatchedSince, requestContext, pageSize: _archiveBatchSize, + cancellationToken: cancellationToken + )).ToArray(); + + if (messages.Length <= 0) + { + } + else + { + foreach (var message in messages) + { + await _archiveProvider.ArchiveMessageAsync(message, cancellationToken); + } + + await _asyncOutbox.DeleteAsync(messages.Select(e => e.Id).ToArray(), requestContext, + cancellationToken: cancellationToken + ); + } } catch (Exception e) { - activity?.SetStatus(ActivityStatusCode.Error, e.Message); + s_logger.LogError(e, "Error while archiving from the outbox"); + _tracer?.AddExceptionToSpan(span, [e]); + throw; } finally { - if(activity?.DisplayName == ARCHIVE_OUTBOX) - activity.Dispose(); + _tracer?.EndSpan(span); + requestContext.Span = parentSpan; } } } diff --git a/src/Paramore.Brighter/ExternalBusService.cs b/src/Paramore.Brighter/OutboxProducerMediator.cs similarity index 90% rename from src/Paramore.Brighter/ExternalBusService.cs rename to src/Paramore.Brighter/OutboxProducerMediator.cs index f8b8dc9b5..a094d0770 100644 --- a/src/Paramore.Brighter/ExternalBusService.cs +++ b/src/Paramore.Brighter/OutboxProducerMediator.cs @@ -11,27 +11,28 @@ using Polly; using Polly.Registry; +// ReSharper disable StaticMemberInGenericType + namespace Paramore.Brighter { /// - /// Provide services to CommandProcessor that persist across the lifetime of the application. Allows separation from - /// elements that have a lifetime linked to the scope of a request, or are transient for DI purposes + /// Mediates the interaction between a producer and an outbox. As we want to write to the outbox, and then send from there + /// to the producer, we need to take control of produce operations to mediate between the two in a transaction. + /// NOTE: This class is singleton. The CommandProcessor by contrast, is transient or more typically scoped. /// - public class ExternalBusService : IAmAnExternalBusService, - IAmAnExternalBusService + public class OutboxProducerMediator : IAmAnOutboxProducerMediator, + IAmAnOutboxProducerMediator where TMessage : Message { private static readonly ILogger s_logger = ApplicationLogging.CreateLogger(); private readonly IPolicyRegistry _policyRegistry; - private readonly IAmAnArchiveProvider? _archiveProvider; private readonly TransformPipelineBuilder _transformPipelineBuilder; private readonly TransformPipelineBuilderAsync _transformPipelineBuilderAsync; private readonly IAmAnOutboxSync? _outBox; private readonly IAmAnOutboxAsync? _asyncOutbox; private readonly int _outboxTimeout; private readonly IAmAProducerRegistry _producerRegistry; - private readonly int _archiveBatchSize; private readonly InstrumentationOptions _instrumentationOptions; private readonly Dictionary> _outboxBatches = new(); @@ -47,7 +48,6 @@ public class ExternalBusService : IAmAnExternalBusServic private const string NoSyncOutboxError = "A sync Outbox must be defined."; private const string NoAsyncOutboxError = "An async Outbox must be defined."; - private const string NoArchiveProviderError = "An Archive Provider must be defined."; //Uses -1 to indicate no outbox and will thus force a throw on a failed publish private int _outStandingCount; @@ -59,7 +59,7 @@ public class ExternalBusService : IAmAnExternalBusServic private readonly TimeProvider _timeProvider; /// - /// Creates an instance of External Bus Services + /// Creates an instance of the Outbox Producer Mediator /// /// A registry of producers /// A registry for reliability policies @@ -68,29 +68,26 @@ public class ExternalBusService : IAmAnExternalBusServic /// The factory used to create a transformer pipeline for an async message mapper /// /// An outbox for transactional messaging, if none is provided, use an InMemoryOutbox - /// When archiving rows from the Outbox, abstracts to where we should send them /// /// How long to timeout for with an outbox /// How many messages can become outstanding in the Outbox before we throw an OutboxLimitReached exception /// How long before we check for maxOutStandingMessages /// An outbox may require additional arguments, such as a topic list to search - /// What batch size to use when archiving from the Outbox /// /// How verbose do we want our instrumentation to be - public ExternalBusService(IAmAProducerRegistry producerRegistry, + public OutboxProducerMediator( + IAmAProducerRegistry producerRegistry, IPolicyRegistry policyRegistry, IAmAMessageMapperRegistry mapperRegistry, IAmAMessageTransformerFactory messageTransformerFactory, IAmAMessageTransformerFactoryAsync messageTransformerFactoryAsync, IAmABrighterTracer tracer, IAmAnOutbox? outbox = null, - IAmAnArchiveProvider? archiveProvider = null, IAmARequestContextFactory? requestContextFactory = null, int outboxTimeout = 300, int maxOutStandingMessages = -1, TimeSpan? maxOutStandingCheckInterval = null, Dictionary? outBoxBag = null, - int archiveBatchSize = 100, TimeProvider? timeProvider = null, InstrumentationOptions instrumentationOptions = InstrumentationOptions.All) { @@ -98,7 +95,6 @@ public ExternalBusService(IAmAProducerRegistry producerRegistry, throw new ConfigurationException("Missing Producer Registry for External Bus Services"); _policyRegistry = policyRegistry ?? throw new ConfigurationException("Missing Policy Registry for External Bus Services"); - _archiveProvider = archiveProvider; requestContextFactory ??= new InMemoryRequestContextFactory(); @@ -119,7 +115,7 @@ public ExternalBusService(IAmAProducerRegistry producerRegistry, _transformPipelineBuilderAsync = new TransformPipelineBuilderAsync(mapperRegistryAsync, messageTransformerFactoryAsync); - //default to in-memory; expectation for a in memory box is Message and CommittableTransaction + //default to in-memory; expectation for an in memory box is Message and CommittableTransaction outbox ??= new InMemoryOutbox(TimeProvider.System); outbox.Tracer = tracer; @@ -130,7 +126,6 @@ public ExternalBusService(IAmAProducerRegistry producerRegistry, _maxOutStandingMessages = maxOutStandingMessages; _maxOutStandingCheckInterval = maxOutStandingCheckInterval ?? TimeSpan.FromMilliseconds(1000); _outBoxBag = outBoxBag ?? new Dictionary(); - _archiveBatchSize = archiveBatchSize; _instrumentationOptions = instrumentationOptions; _tracer = tracer; @@ -151,7 +146,7 @@ private void Dispose(bool disposing) if (_disposed) return; - if (disposing && _producerRegistry != null) + if (disposing) _producerRegistry.CloseAll(); _disposed = true; } @@ -242,86 +237,6 @@ public void AddToOutbox( throw new ChannelFailureException($"Could not write message {message.Id} to the outbox"); } - /// - /// Archive Message from the outbox to the outbox archive provider - /// Throws any archiving exception - /// - /// Minimum age - /// The request context for the pipeline - public void Archive(TimeSpan dispatchedSince, RequestContext requestContext) - { - try - { - if (_outBox is null) throw new ArgumentException(NoSyncOutboxError); - if (_archiveProvider is null) throw new ArgumentException(NoArchiveProviderError); - var messages = _outBox - .DispatchedMessages(dispatchedSince, requestContext, _archiveBatchSize) - .ToArray(); - - s_logger.LogInformation( - "Found {NumberOfMessageArchived} message to archive, batch size : {BatchSize}", - messages.Count(), _archiveBatchSize - ); - - if (messages.Length <= 0) return; - - foreach (var message in messages) - { - _archiveProvider.ArchiveMessage(message); - } - - _outBox.Delete(messages.Select(e => e.Id).ToArray(), requestContext); - - s_logger.LogInformation( - "Successfully archived {NumberOfMessageArchived}, batch size : {BatchSize}", - messages.Count(), - _archiveBatchSize - ); - } - catch (Exception e) - { - s_logger.LogError(e, "Error while archiving from the outbox"); - throw; - } - } - - /// - /// Archive Message from the outbox to the outbox archive provider - /// Throws any archiving exception - /// - /// - /// - /// The Cancellation Token - public async Task ArchiveAsync(TimeSpan dispatchedSince, RequestContext requestContext, - CancellationToken cancellationToken) - { - try - { - if (_asyncOutbox is null) throw new ArgumentException(NoAsyncOutboxError); - if (_archiveProvider is null) throw new ArgumentException(NoArchiveProviderError); - var messages = (await _asyncOutbox.DispatchedMessagesAsync( - dispatchedSince, requestContext, pageSize: _archiveBatchSize, - cancellationToken: cancellationToken - )).ToArray(); - - if (messages.Length <= 0) return; - - foreach (var message in messages) - { - await _archiveProvider.ArchiveMessageAsync(message, cancellationToken); - } - - await _asyncOutbox.DeleteAsync(messages.Select(e => e.Id).ToArray(), requestContext, - cancellationToken: cancellationToken - ); - } - catch (Exception e) - { - s_logger.LogError(e, "Error while archiving from the outbox"); - throw; - } - } - /// /// Used with RPC to call a remote service via the external bus /// @@ -555,7 +470,7 @@ public void CreateRequestFromMessage(Message message, RequestContext? /// /// Commence a batch of outbox messages to add /// - /// The Id of the new batch + /// The ID of the new batch public string StartBatchAddToOutbox() { var batchId = Guid.NewGuid().ToString(); @@ -568,7 +483,7 @@ public void EndBatchAddToOutbox(string batchId, IAmABoxTransactionProvider /// Flush the batch of Messages to the outbox. /// - /// The Id of the batch to be flushed + /// The ID of the batch to be flushed /// /// The context of the request; if null we will start one via a /// @@ -597,7 +512,7 @@ public async Task EndBatchAddToOutboxAsync(string batchId, { CheckOutboxOutstandingLimit(); - BrighterTracer.WriteOutboxEvent(OutboxDbOperation.Add, _outboxBatches[batchId], requestContext?.Span, + BrighterTracer.WriteOutboxEvent(OutboxDbOperation.Add, _outboxBatches[batchId], requestContext.Span, transactionProvider != null, true, _instrumentationOptions); if (_asyncOutbox is null) throw new ArgumentException(NoAsyncOutboxError); @@ -654,7 +569,7 @@ private Task BackgroundDispatchUsingSync( s_clearSemaphoreToken.Wait(); var parentSpan = requestContext.Span; - var span = _tracer?.CreateClearSpan(CommandProcessorSpanOperation.Clear, requestContext.Span, null, + var span = _tracer.CreateClearSpan(CommandProcessorSpanOperation.Clear, requestContext.Span, null, _instrumentationOptions); try @@ -687,7 +602,7 @@ private Task BackgroundDispatchUsingSync( } finally { - _tracer?.EndSpan(span); + _tracer.EndSpan(span); s_clearSemaphoreToken.Release(); s_backgroundClearSemaphoreToken.Release(); } @@ -722,7 +637,7 @@ private async Task BackgroundDispatchUsingAsync( await s_clearSemaphoreToken.WaitAsync(); var parentSpan = requestContext.Span; - var span = _tracer?.CreateClearSpan(CommandProcessorSpanOperation.Clear, requestContext.Span, null, + var span = _tracer.CreateClearSpan(CommandProcessorSpanOperation.Clear, requestContext.Span, null, _instrumentationOptions); try { @@ -760,7 +675,7 @@ private async Task BackgroundDispatchUsingAsync( } finally { - _tracer?.EndSpan(span); + _tracer.EndSpan(span); s_clearSemaphoreToken.Release(); s_backgroundClearSemaphoreToken.Release(); } diff --git a/tests/Paramore.Brighter.Core.Tests/Archiving/When_Archiving_Old_Messages_From_The_Outbox.cs b/tests/Paramore.Brighter.Core.Tests/Archiving/When_Archiving_Old_Messages_From_The_Outbox.cs index faf312617..685623b62 100644 --- a/tests/Paramore.Brighter.Core.Tests/Archiving/When_Archiving_Old_Messages_From_The_Outbox.cs +++ b/tests/Paramore.Brighter.Core.Tests/Archiving/When_Archiving_Old_Messages_From_The_Outbox.cs @@ -3,10 +3,7 @@ using System.Transactions; using FluentAssertions; using Microsoft.Extensions.Time.Testing; -using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; using Paramore.Brighter.Observability; -using Polly; -using Polly.Registry; using Xunit; namespace Paramore.Brighter.Core.Tests.Archiving; @@ -15,52 +12,23 @@ public class ServiceBusMessageStoreArchiverTests { private readonly InMemoryOutbox _outbox; private readonly InMemoryArchiveProvider _archiveProvider; - private readonly ExternalBusService _bus; private readonly FakeTimeProvider _timeProvider; private readonly RoutingKey _routingKey = new("MyTopic"); + private readonly OutboxArchiver _archiver; public ServiceBusMessageStoreArchiverTests() { _timeProvider = new FakeTimeProvider(); - var producer = new InMemoryProducer(new InternalBus(), _timeProvider){Publication = {Topic = _routingKey, RequestType = typeof(MyCommand)}}; - var messageMapperRegistry = new MessageMapperRegistry( - new SimpleMessageMapperFactory((_) => new MyCommandMessageMapper()), - null); - - var retryPolicy = Policy - .Handle() - .Retry(); - - var circuitBreakerPolicy = Policy - .Handle() - .CircuitBreaker(1, TimeSpan.FromMilliseconds(1)); - - var producerRegistry = new ProducerRegistry(new Dictionary - { - { _routingKey, producer }, - }); - - var policyRegistry = new PolicyRegistry - { - { CommandProcessor.RETRYPOLICY, retryPolicy }, - { CommandProcessor.CIRCUITBREAKER, circuitBreakerPolicy } - }; - var tracer = new BrighterTracer(); _outbox = new InMemoryOutbox(_timeProvider){Tracer = tracer}; _archiveProvider = new InMemoryArchiveProvider(); - - _bus = new ExternalBusService( - producerRegistry, - policyRegistry, - messageMapperRegistry, - new EmptyMessageTransformerFactory(), - new EmptyMessageTransformerFactoryAsync(), - tracer, + + _archiver = new OutboxArchiver( _outbox, - _archiveProvider - ); + _archiveProvider + ); + } [Fact] @@ -86,7 +54,7 @@ public void When_Archiving_All_Messages_From_The_Outbox() _timeProvider.Advance(TimeSpan.FromMinutes(15)); - _bus.Archive(TimeSpan.FromMilliseconds(500), context); + _archiver.Archive(TimeSpan.FromMilliseconds(500), context); //assert _outbox.EntryCount.Should().Be(0); @@ -116,7 +84,7 @@ public void When_Archiving_Some_Messages_From_The_Outbox() _timeProvider.Advance(TimeSpan.FromSeconds(30)); - _bus.Archive(TimeSpan.FromSeconds(30), context); + _archiver.Archive(TimeSpan.FromSeconds(30), context); //assert _outbox.EntryCount.Should().Be(1); @@ -142,7 +110,7 @@ public void When_Archiving_No_Messages_From_The_Outbox() //act _outbox.EntryCount.Should().Be(3); - _bus.Archive(TimeSpan.FromMilliseconds(20000), context); + _archiver.Archive(TimeSpan.FromMilliseconds(20000), context); //assert _outbox.EntryCount.Should().Be(3); @@ -155,7 +123,7 @@ public void When_Archiving_No_Messages_From_The_Outbox() public void When_Archiving_An_Empty_The_Outbox() { var context = new RequestContext(); - _bus.Archive(TimeSpan.FromMilliseconds(20000), context); + _archiver.Archive(TimeSpan.FromMilliseconds(20000), context); //assert _outbox.EntryCount.Should().Be(0); diff --git a/tests/Paramore.Brighter.Core.Tests/Archiving/When_Archiving_Old_Messages_From_The_Outbox_Async.cs b/tests/Paramore.Brighter.Core.Tests/Archiving/When_Archiving_Old_Messages_From_The_Outbox_Async.cs index 339ce3d52..e148c48f7 100644 --- a/tests/Paramore.Brighter.Core.Tests/Archiving/When_Archiving_Old_Messages_From_The_Outbox_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/Archiving/When_Archiving_Old_Messages_From_The_Outbox_Async.cs @@ -1,14 +1,10 @@ using System; using System.Collections.Generic; -using System.Threading; using System.Threading.Tasks; using System.Transactions; using FluentAssertions; using Microsoft.Extensions.Time.Testing; -using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; using Paramore.Brighter.Observability; -using Polly; -using Polly.Registry; using Xunit; namespace Paramore.Brighter.Core.Tests.Archiving; @@ -17,57 +13,19 @@ public class ServiceBusMessageStoreArchiverTestsAsync { private readonly InMemoryOutbox _outbox; private readonly InMemoryArchiveProvider _archiveProvider; - private readonly ExternalBusService _bus; private readonly FakeTimeProvider _timeProvider; private readonly RoutingKey _routingKey = new("MyTopic"); + private readonly OutboxArchiver _archiver; public ServiceBusMessageStoreArchiverTestsAsync() { - var producer = new InMemoryProducer(new InternalBus(), new FakeTimeProvider()) - { - Publication = {Topic = _routingKey, RequestType = typeof(MyCommand)} - }; - - var messageMapperRegistry = new MessageMapperRegistry( - null, - new SimpleMessageMapperFactoryAsync((_) => new MyCommandMessageMapperAsync()) - ); - - var retryPolicy = Policy - .Handle() - .RetryAsync(); - - var circuitBreakerPolicy = Policy - .Handle() - .CircuitBreakerAsync(1, TimeSpan.FromMilliseconds(1)); - - var producerRegistry = new ProducerRegistry(new Dictionary - { - { _routingKey, producer }, - }); - - var policyRegistry = new PolicyRegistry - { - { CommandProcessor.RETRYPOLICYASYNC, retryPolicy }, - { CommandProcessor.CIRCUITBREAKERASYNC, circuitBreakerPolicy } - }; - _timeProvider = new FakeTimeProvider(); var tracer = new BrighterTracer(); _outbox = new InMemoryOutbox(_timeProvider){Tracer = tracer}; _archiveProvider = new InMemoryArchiveProvider(); - _bus = new ExternalBusService( - producerRegistry, - policyRegistry, - messageMapperRegistry, - new EmptyMessageTransformerFactory(), - new EmptyMessageTransformerFactoryAsync(), - tracer, - _outbox, - _archiveProvider - ); - + _archiver = new OutboxArchiver(_outbox, _archiveProvider); + } [Fact] @@ -92,7 +50,7 @@ public async Task When_Archiving_Old_Messages_From_The_Outbox() _timeProvider.Advance(TimeSpan.FromSeconds(30)); - await _bus.ArchiveAsync(TimeSpan.FromSeconds(15), context, new CancellationToken()); + await _archiver.ArchiveAsync(TimeSpan.FromSeconds(15), context); //assert _outbox.EntryCount.Should().Be(0); @@ -122,7 +80,7 @@ public async Task When_Archiving_Some_Messages_From_The_Outbox() _timeProvider.Advance(TimeSpan.FromSeconds(30)); - await _bus.ArchiveAsync(TimeSpan.FromSeconds(15), context, new CancellationToken()); + await _archiver.ArchiveAsync(TimeSpan.FromSeconds(15), context); //assert _outbox.EntryCount.Should().Be(1); @@ -148,7 +106,7 @@ public async Task When_Archiving_No_Messages_From_The_Outbox() //act _outbox.EntryCount.Should().Be(3); - await _bus.ArchiveAsync(TimeSpan.FromMilliseconds(20000), context, new CancellationToken()); + await _archiver.ArchiveAsync(TimeSpan.FromMilliseconds(20000), context); //assert _outbox.EntryCount.Should().Be(3); @@ -164,7 +122,7 @@ public async Task When_Archiving_An_Empty_Outbox() var context = new RequestContext(); //act - await _bus.ArchiveAsync(TimeSpan.FromMilliseconds(20000), context, new CancellationToken()); + await _archiver.ArchiveAsync(TimeSpan.FromMilliseconds(20000), context); //assert _outbox.EntryCount.Should().Be(0); diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Call/When_Calling_A_Server_Via_The_Command_Processor.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Call/When_Calling_A_Server_Via_The_Command_Processor.cs index 63ccb5aa1..3eb18c74e 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Call/When_Calling_A_Server_Via_The_Command_Processor.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Call/When_Calling_A_Server_Via_The_Command_Processor.cs @@ -76,7 +76,7 @@ public CommandProcessorCallTests() }); var tracer = new BrighterTracer(); - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, _messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Call/When_Calling_A_Server_Via_The_Command_Processor_With_No_In_Mapper.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Call/When_Calling_A_Server_Via_The_Command_Processor_With_No_In_Mapper.cs index f8c185103..2f9058f97 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Call/When_Calling_A_Server_Via_The_Command_Processor_With_No_In_Mapper.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Call/When_Calling_A_Server_Via_The_Command_Processor_With_No_In_Mapper.cs @@ -63,7 +63,7 @@ public CommandProcessorNoInMapperTests() var tracer = new BrighterTracer(); - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, @@ -93,7 +93,7 @@ public void When_Calling_A_Server_Via_The_Command_Processor_With_No_Out_Mapper() var exception = Catch.Exception(() => _commandProcessor.Call(_myRequest, new RequestContext(), TimeSpan.FromMilliseconds(500))); //should throw an exception as we require a mapper for the outgoing request - exception.Should().BeOfType(); + exception.Should().BeOfType(); } public void Dispose() diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Call/When_Calling_A_Server_Via_The_Command_Processor_With_No_Out_Mapper.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Call/When_Calling_A_Server_Via_The_Command_Processor_With_No_Out_Mapper.cs index d84e69cad..6879921d6 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Call/When_Calling_A_Server_Via_The_Command_Processor_With_No_Out_Mapper.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Call/When_Calling_A_Server_Via_The_Command_Processor_With_No_Out_Mapper.cs @@ -62,7 +62,7 @@ public CommandProcessorMissingOutMapperTests() }); var tracer = new BrighterTracer(timeProvider); - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Call/When_Calling_A_Server_Via_The_Command_Processor_With_No_Timeout.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Call/When_Calling_A_Server_Via_The_Command_Processor_With_No_Timeout.cs index a73c6b88a..612c9f4f5 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Call/When_Calling_A_Server_Via_The_Command_Processor_With_No_Timeout.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Call/When_Calling_A_Server_Via_The_Command_Processor_With_No_Timeout.cs @@ -74,7 +74,7 @@ public CommandProcessorCallTestsNoTimeout() var timeProvider = fakeTimeProvider; var tracer = new BrighterTracer(timeProvider); - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Bulk_Clearing_The_PostBox_On_The_Command_Processor_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Bulk_Clearing_The_PostBox_On_The_Command_Processor_Async.cs index c868b34db..4da0172e6 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Bulk_Clearing_The_PostBox_On_The_Command_Processor_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Bulk_Clearing_The_PostBox_On_The_Command_Processor_Async.cs @@ -101,7 +101,7 @@ public CommandProcessorPostBoxBulkClearAsyncTests() var tracer = new BrighterTracer(); _outbox = new InMemoryOutbox(timeProvider) {Tracer = tracer}; - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Clearing_The_PostBox_On_The_Command_Processor _Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Clearing_The_PostBox_On_The_Command_Processor _Async.cs index 14ab07101..3f0ee5c83 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Clearing_The_PostBox_On_The_Command_Processor _Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Clearing_The_PostBox_On_The_Command_Processor _Async.cs @@ -87,7 +87,7 @@ public CommandProcessorPostBoxClearAsyncTests() var tracer = new BrighterTracer(timeProvider); _outbox = new InMemoryOutbox(timeProvider) {Tracer = tracer}; - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Clearing_The_PostBox_On_The_Command_Processor.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Clearing_The_PostBox_On_The_Command_Processor.cs index bcc3bafe7..28594579c 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Clearing_The_PostBox_On_The_Command_Processor.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Clearing_The_PostBox_On_The_Command_Processor.cs @@ -86,7 +86,7 @@ public CommandProcessorPostBoxClearTests() var tracer = new BrighterTracer(timeProvider); _outbox = new InMemoryOutbox(timeProvider) {Tracer = tracer}; - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Implicit_Clearing_The_PostBox_On_The_Command_Processor.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Implicit_Clearing_The_PostBox_On_The_Command_Processor.cs index 2efc30ef0..1f77823fb 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Implicit_Clearing_The_PostBox_On_The_Command_Processor.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Implicit_Clearing_The_PostBox_On_The_Command_Processor.cs @@ -95,7 +95,7 @@ public CommandProcessorPostBoxImplicitClearTests() var tracer = new BrighterTracer(); _outbox = new InMemoryOutbox(timeProvider){Tracer = tracer}; - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Implicit_Clearing_The_PostBox_On_The_Command_Processor_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Implicit_Clearing_The_PostBox_On_The_Command_Processor_Async.cs index 80eb790ec..77706409f 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Implicit_Clearing_The_PostBox_On_The_Command_Processor_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Clear/When_Implicit_Clearing_The_PostBox_On_The_Command_Processor_Async.cs @@ -95,7 +95,7 @@ public CommandProcessorPostBoxImplicitClearAsyncTests() _outbox = new InMemoryOutbox(timeProvider); - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Deposit/When_Depositing_A_Message_In_The_Message_Store.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Deposit/When_Depositing_A_Message_In_The_Message_Store.cs index 1bacd42e4..ab976bc9b 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Deposit/When_Depositing_A_Message_In_The_Message_Store.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Deposit/When_Depositing_A_Message_In_The_Message_Store.cs @@ -67,7 +67,7 @@ public CommandProcessorDepositPostTests() var tracer = new BrighterTracer(); _fakeOutbox = new InMemoryOutbox(timeProvider) {Tracer = tracer}; - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Deposit/When_Depositing_A_Message_In_The_Message_StoreAsync.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Deposit/When_Depositing_A_Message_In_The_Message_StoreAsync.cs index 600a0e303..74f3aa3dc 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Deposit/When_Depositing_A_Message_In_The_Message_StoreAsync.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Deposit/When_Depositing_A_Message_In_The_Message_StoreAsync.cs @@ -69,7 +69,7 @@ public CommandProcessorDepositPostTestsAsync() var tracer = new BrighterTracer(); _outbox = new InMemoryOutbox(timeProvider) { Tracer = tracer }; - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Deposit/When_Depositing_A_Message_In_The_Message_StoreAsync_Bulk.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Deposit/When_Depositing_A_Message_In_The_Message_StoreAsync_Bulk.cs index 4f9ddc78a..3623202de 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Deposit/When_Depositing_A_Message_In_The_Message_StoreAsync_Bulk.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Deposit/When_Depositing_A_Message_In_The_Message_StoreAsync_Bulk.cs @@ -103,7 +103,7 @@ public CommandProcessorBulkDepositPostTestsAsync() var tracer = new BrighterTracer(new FakeTimeProvider()); _outbox = new InMemoryOutbox(timeProvider) {Tracer = tracer}; - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Deposit/When_Depositing_A_Message_In_The_Message_Store_Bulk.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Deposit/When_Depositing_A_Message_In_The_Message_Store_Bulk.cs index 51898c9a2..a72d82b9b 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Deposit/When_Depositing_A_Message_In_The_Message_Store_Bulk.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Deposit/When_Depositing_A_Message_In_The_Message_Store_Bulk.cs @@ -99,7 +99,7 @@ public CommandProcessorBulkDepositPostTests() var tracer = new BrighterTracer(); _outbox = new InMemoryOutbox(timeProvider) {Tracer = tracer}; - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Mapper_Registry.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Mapper_Registry.cs index 296126693..31b78bd5d 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Mapper_Registry.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Mapper_Registry.cs @@ -80,7 +80,7 @@ public CommandProcessorNoMessageMapperTests() var tracer = new BrighterTracer(timeProvider); var outbox = new InMemoryOutbox(timeProvider) {Tracer = tracer}; - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Mapper_Registry_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Mapper_Registry_Async.cs index e5ba57ac3..0c90011a9 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Mapper_Registry_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Mapper_Registry_Async.cs @@ -73,7 +73,7 @@ public CommandProcessorNoMessageMapperAsyncTests() var tracer = new BrighterTracer(timeProvider); var outbox = new InMemoryOutbox(timeProvider) {Tracer = tracer}; - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Producer.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Producer.cs index c258b08f7..122182f6c 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Producer.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Producer.cs @@ -75,7 +75,7 @@ public void When_Creating_A_Command_Processor_Without_Producer_Registry() { var policyRegistry = new PolicyRegistry { { CommandProcessor.RETRYPOLICY, _retryPolicy }, { CommandProcessor.CIRCUITBREAKER, _circuitBreakerPolicy } }; - _exception = Catch.Exception(() => new ExternalBusService( + _exception = Catch.Exception(() => new OutboxProducerMediator( null, policyRegistry, _messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Transformer.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Transformer.cs index d78b30c16..6db6d1b8d 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Transformer.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Transformer.cs @@ -98,7 +98,7 @@ public CommandProcessorPostMissingMessageTransformerTests() [Fact] public void When_Creating_A_Command_Processor_Without_Message_Transformer() { - _exception = Catch.Exception(() => new ExternalBusService( + _exception = Catch.Exception(() => new OutboxProducerMediator( _producerRegistry, _policyRegistry, _messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Transformer_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Transformer_Async.cs index dd772c5eb..9be61b1b7 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Transformer_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_And_There_Is_No_Message_Transformer_Async.cs @@ -89,7 +89,7 @@ public CommandProcessorPostMissingMessageTransformerTestsAsync() [Fact] public void When_Creating_A_Command_Processor_Without_Message_Transformer_Async() { - _exception = Catch.Exception(() => new ExternalBusService( + _exception = Catch.Exception(() => new OutboxProducerMediator( _producerRegistry, _policyRegistry, _messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_To_The_Command_Processor.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_To_The_Command_Processor.cs index 12ec870e7..9735e800b 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_To_The_Command_Processor.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_To_The_Command_Processor.cs @@ -83,7 +83,7 @@ public CommandProcessorPostCommandTests() var tracer = new BrighterTracer(timeProvider); _outbox = new InMemoryOutbox(timeProvider) {Tracer = tracer}; - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_To_The_Command_Processor_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_To_The_Command_Processor_Async.cs index 22d35718d..07ffac48d 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_To_The_Command_Processor_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Message_To_The_Command_Processor_Async.cs @@ -80,7 +80,7 @@ public CommandProcessorPostCommandAsyncTests() var tracer = new BrighterTracer(timeProvider); _outbox = new InMemoryOutbox(timeProvider) {Tracer = tracer}; - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_Fails_Limit_Total_Writes_To_OutBox_In_Window.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_Fails_Limit_Total_Writes_To_OutBox_In_Window.cs index 3859e1aa3..a81fd1d74 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_Fails_Limit_Total_Writes_To_OutBox_In_Window.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_Fails_Limit_Total_Writes_To_OutBox_In_Window.cs @@ -61,7 +61,7 @@ public PostFailureLimitCommandTests() var producerRegistry = new ProducerRegistry(new Dictionary { { routingKey, producer }, }); - var externalBus = new ExternalBusService( + var externalBus = new OutboxProducerMediator( producerRegistry: producerRegistry, policyRegistry: new DefaultPolicy(), mapperRegistry: messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_Via_A_Control_Bus_Sender.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_Via_A_Control_Bus_Sender.cs index 55add6aa9..0b68ad0c5 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_Via_A_Control_Bus_Sender.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_Via_A_Control_Bus_Sender.cs @@ -81,7 +81,7 @@ public ControlBusSenderPostMessageTests() var tracer = new BrighterTracer(_timeProvider); _outbox = new InMemoryOutbox(_timeProvider) {Tracer = tracer}; - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_Via_A_Control_Bus_Sender_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_Via_A_Control_Bus_Sender_Async.cs index 31911f393..270d24773 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_Via_A_Control_Bus_Sender_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_Via_A_Control_Bus_Sender_Async.cs @@ -78,7 +78,7 @@ public ControlBusSenderPostMessageAsyncTests() var producerRegistry = new ProducerRegistry(new Dictionary {{_routingKey, producer},}); var policyRegistry = new PolicyRegistry { { CommandProcessor.RETRYPOLICYASYNC, retryPolicy }, { CommandProcessor.CIRCUITBREAKERASYNC, circuitBreakerPolicy } }; - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_With_A_Default_Policy.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_With_A_Default_Policy.cs index ef101ab4f..766cddefa 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_With_A_Default_Policy.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_With_A_Default_Policy.cs @@ -71,7 +71,7 @@ public PostCommandTests() var producerRegistry = new ProducerRegistry(new Dictionary { { _routingKey, producer }, }); - var externalBus = new ExternalBusService( + var externalBus = new OutboxProducerMediator( producerRegistry: producerRegistry, policyRegistry: new DefaultPolicy(), mapperRegistry: messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_With_An_In_Memory_Message_Store.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_With_An_In_Memory_Message_Store.cs index 7ec363969..dd4225253 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_With_An_In_Memory_Message_Store.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_With_An_In_Memory_Message_Store.cs @@ -80,7 +80,7 @@ public CommandProcessorWithInMemoryOutboxTests() var tracer = new BrighterTracer(timeProvider); _outbox = new InMemoryOutbox(timeProvider) {Tracer = tracer}; - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_With_An_In_Memory_Message_Store_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_With_An_In_Memory_Message_Store_Async.cs index 74877fb72..a55dc080b 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_With_An_In_Memory_Message_Store_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_With_An_In_Memory_Message_Store_Async.cs @@ -81,7 +81,7 @@ public CommandProcessorWithInMemoryOutboxAscyncTests() var policyRegistry = new PolicyRegistry { { CommandProcessor.RETRYPOLICYASYNC, retryPolicy }, { CommandProcessor.CIRCUITBREAKERASYNC, circuitBreakerPolicy } }; var producerRegistry = new ProducerRegistry(new Dictionary {{_routingKey, producer},}); - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_A_Request_Context_Is_Provided.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_A_Request_Context_Is_Provided.cs index c1c6790be..ca3ec46ab 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_A_Request_Context_Is_Provided.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_A_Request_Context_Is_Provided.cs @@ -170,7 +170,7 @@ public void When_A_Request_Context_Is_Provided_On_A_Deposit() var tracer = new BrighterTracer(timeProvider); var fakeOutbox = new InMemoryOutbox(timeProvider) {Tracer = tracer}; - var bus = new ExternalBusService( + var bus = new OutboxProducerMediator( producerRegistry, _policyRegistry, messageMapperRegistry, @@ -220,7 +220,7 @@ public async Task When_A_Request_Context_Is_Provided_On_A_Deposit_Async() var tracer = new BrighterTracer(timeProvider); var fakeOutbox = new InMemoryOutbox(timeProvider) {Tracer = tracer}; - var bus = new ExternalBusService( + var bus = new OutboxProducerMediator( producerRegistry, _policyRegistry, messageMapperRegistry, @@ -270,7 +270,7 @@ public void When_A_Request_Context_Is_Provided_On_A_Clear() var tracer = new BrighterTracer(timeProvider); var fakeOutbox = new InMemoryOutbox(timeProvider) {Tracer = tracer}; - var bus = new ExternalBusService( + var bus = new OutboxProducerMediator( producerRegistry, _policyRegistry, messageMapperRegistry, @@ -324,7 +324,7 @@ public async Task When_A_Request_Context_Is_Provided_On_A_Clear_Async() var tracer = new BrighterTracer(timeProvider); var fakeOutbox = new InMemoryOutbox(timeProvider); - var bus = new ExternalBusService( + var bus = new OutboxProducerMediator( producerRegistry, _policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_No_Request_Context_Is_Provided.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_No_Request_Context_Is_Provided.cs index 670f298ac..9b8de7a37 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_No_Request_Context_Is_Provided.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_No_Request_Context_Is_Provided.cs @@ -141,7 +141,7 @@ public void When_No_Request_Context_Is_Provided_On_A_Deposit() var tracer = new BrighterTracer(); var fakeOutbox = new InMemoryOutbox(timeProvider) {Tracer = tracer}; - var bus = new ExternalBusService( + var bus = new OutboxProducerMediator( producerRegistry, _policyRegistry, messageMapperRegistry, @@ -187,7 +187,7 @@ public async Task When_No_Request_Context_Is_Provided_On_A_Deposit_Async() var tracer = new BrighterTracer(); var fakeOutbox = new InMemoryOutbox(timeProvider) {Tracer = tracer}; - var bus = new ExternalBusService( + var bus = new OutboxProducerMediator( producerRegistry, _policyRegistry, messageMapperRegistry, @@ -234,7 +234,7 @@ public void When_No_Request_Context_Is_Provided_On_A_Clear() var tracer = new BrighterTracer(); var fakeOutbox = new InMemoryOutbox(timeProvider) {Tracer = tracer}; - var bus = new ExternalBusService( + var bus = new OutboxProducerMediator( producerRegistry, _policyRegistry, messageMapperRegistry, @@ -285,7 +285,7 @@ public async Task When_A_Request_Context_Is_Provided_On_A_Clear_Async() var tracer = new BrighterTracer(); var fakeOutbox = new InMemoryOutbox(timeProvider) {Tracer = tracer}; - var bus = new ExternalBusService( + var bus = new OutboxProducerMediator( producerRegistry, _policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/Observability/Archive/When_archiving_from_the_outbox.cs b/tests/Paramore.Brighter.Core.Tests/Observability/Archive/When_archiving_from_the_outbox.cs new file mode 100644 index 000000000..e704dc851 --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/Observability/Archive/When_archiving_from_the_outbox.cs @@ -0,0 +1,159 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Transactions; +using FluentAssertions; +using Microsoft.Extensions.Time.Testing; +using OpenTelemetry; +using OpenTelemetry.Resources; +using OpenTelemetry.Trace; +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Paramore.Brighter.Observability; +using Polly; +using Polly.Registry; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.Observability.Archive; + +public class ExternalServiceBusArchiveObservabilityTests +{ + private readonly List _exportedActivities = new(); + private readonly OutboxProducerMediator _bus; + private readonly Publication _publication; + private readonly FakeTimeProvider _timeProvider; + private RoutingKey _routingKey = new("MyEvent"); + private readonly InMemoryOutbox _outbox; + private readonly TracerProvider _traceProvider; + private readonly OutboxArchiver _archiver; + private const double TOLERANCE = 0.000000001; + + public ExternalServiceBusArchiveObservabilityTests() + { + IAmABus internalBus = new InternalBus(); + _timeProvider = new FakeTimeProvider(); + var tracer = new BrighterTracer(_timeProvider); + + var builder = Sdk.CreateTracerProviderBuilder(); + + _traceProvider = builder + .AddSource("Paramore.Brighter.Tests", "Paramore.Brighter") + .ConfigureResource(r => r.AddService("in-memory-tracer")) + .AddInMemoryExporter(_exportedActivities) + .Build(); + + Brighter.CommandProcessor.ClearServiceBus(); + + _publication = new Publication + { + Source = new Uri("http://localhost"), + RequestType = typeof(MyEvent), + Topic = _routingKey, + Type = nameof(MyEvent), + }; + + var producer = new InMemoryProducer(internalBus, _timeProvider) + { + Publication = _publication + }; + + var producerRegistry = + new ProducerRegistry(new Dictionary { { _routingKey, producer } }); + + var retryPolicy = Policy + .Handle() + .Retry(); + + var policyRegistry = new PolicyRegistry { { Brighter.CommandProcessor.RETRYPOLICY, retryPolicy } }; + + var messageMapperRegistry = new MessageMapperRegistry( + new SimpleMessageMapperFactory((_) => new MyEventMessageMapper()), + null); + messageMapperRegistry.Register(); + + _outbox = new InMemoryOutbox(_timeProvider) { Tracer = tracer }; + var archiveProvider = new InMemoryArchiveProvider(); + + _archiver = new OutboxArchiver(_outbox, archiveProvider, tracer: tracer); + + _bus = new OutboxProducerMediator( + producerRegistry, + policyRegistry, + messageMapperRegistry, + new EmptyMessageTransformerFactory(), + new EmptyMessageTransformerFactoryAsync(), + tracer, + _outbox, + timeProvider:_timeProvider); + } + + [Fact] + public void When_archiving_from_the_outbox() + { + var parentActivity = new ActivitySource("Paramore.Brighter.Tests").StartActivity("BrighterTracerSpanTests"); + + var context = new RequestContext(); + context.Span = parentActivity; + + //add and clear message + var myEvent = new MyEvent(); + var myMessage = new MyEventMessageMapper().MapToMessage(myEvent, _publication); + _bus.AddToOutbox(myMessage, context); + _bus.ClearOutbox([myMessage.Id], context); + + //se should have an entry in the outbox + _outbox.EntryCount.Should().Be(1); + + //allow time to pass + _timeProvider.Advance(TimeSpan.FromSeconds(300)); + + //archive + var dispatchedSince = TimeSpan.FromSeconds(100); + _archiver.Archive(dispatchedSince, context); + + //should be no messages in the outbox + _outbox.EntryCount.Should().Be(0); + + parentActivity?.Stop(); + + _traceProvider.ForceFlush(); + + //We should have exported matching activities + _exportedActivities.Count.Should().Be(9); + + _exportedActivities.Any(a => a.Source.Name == "Paramore.Brighter").Should().BeTrue(); + + //there should be a n archive create span for the batch + var createActivity = _exportedActivities.Single(a => a.DisplayName == $"{BrighterSemanticConventions.ArchiveMessages} {CommandProcessorSpanOperation.Archive.ToSpanName()}"); + createActivity.Should().NotBeNull(); + createActivity.ParentId.Should().Be(parentActivity?.Id); + + //check for outstanding messages span + var osCheckActivity = _exportedActivities.SingleOrDefault(a => + a.DisplayName == $"{OutboxDbOperation.DispatchedMessages.ToSpanName()} {InMemoryAttributes.DbName} {InMemoryAttributes.DbTable}"); + osCheckActivity.Should().NotBeNull(); + osCheckActivity?.ParentId.Should().Be(createActivity.Id); + + //check for delete messages span + var deleteActivity = _exportedActivities.SingleOrDefault(a => + a.DisplayName == $"{OutboxDbOperation.Delete.ToSpanName()} {InMemoryAttributes.DbName} {InMemoryAttributes.DbTable}"); + deleteActivity?.Should().NotBeNull(); + deleteActivity?.ParentId.Should().Be(createActivity.Id); + + //check the tags for the create span + createActivity.TagObjects.Should().Contain(t => t.Key == BrighterSemanticConventions.ArchiveAge && Math.Abs(Convert.ToDouble(t.Value) - dispatchedSince.TotalMilliseconds) < TOLERANCE); + + //check the tags for the outstanding messages span + osCheckActivity?.Tags.Any(t => t.Key == BrighterSemanticConventions.DbOperation && t.Value == OutboxDbOperation.DispatchedMessages.ToSpanName()).Should().BeTrue(); + osCheckActivity?.Tags.Any(t => t.Key == BrighterSemanticConventions.DbTable && t.Value == InMemoryAttributes.DbTable).Should().BeTrue(); + osCheckActivity?.Tags.Any(t => t.Key == BrighterSemanticConventions.DbSystem && t.Value == DbSystem.Brighter.ToDbName()).Should().BeTrue(); + osCheckActivity?.Tags.Any(t => t.Key == BrighterSemanticConventions.DbName && t.Value == InMemoryAttributes.DbName).Should().BeTrue(); + + //check the tages for the delete messages span + deleteActivity?.Tags.Any(t => t.Key == BrighterSemanticConventions.DbOperation && t.Value == OutboxDbOperation.Delete.ToSpanName()).Should().BeTrue(); + deleteActivity?.Tags.Any(t => t.Key == BrighterSemanticConventions.DbTable && t.Value == InMemoryAttributes.DbTable).Should().BeTrue(); + deleteActivity?.Tags.Any(t => t.Key == BrighterSemanticConventions.DbSystem && t.Value == DbSystem.Brighter.ToDbName()).Should().BeTrue(); + deleteActivity?.Tags.Any(t => t.Key == BrighterSemanticConventions.DbName && t.Value == InMemoryAttributes.DbName).Should().BeTrue(); + + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/Observability/Archive/When_archiving_from_the_outbox_async.cs b/tests/Paramore.Brighter.Core.Tests/Observability/Archive/When_archiving_from_the_outbox_async.cs new file mode 100644 index 000000000..a4caa315d --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/Observability/Archive/When_archiving_from_the_outbox_async.cs @@ -0,0 +1,161 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading.Tasks; +using System.Transactions; +using FluentAssertions; +using Microsoft.Extensions.Time.Testing; +using OpenTelemetry; +using OpenTelemetry.Resources; +using OpenTelemetry.Trace; +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Paramore.Brighter.Observability; +using Polly; +using Polly.Registry; +using Xunit; +// ReSharper disable ExplicitCallerInfoArgument + +namespace Paramore.Brighter.Core.Tests.Observability.Archive; + +public class AsyncExternalServiceBusArchiveObservabilityTests +{ + private readonly List _exportedActivities = new(); + private readonly OutboxProducerMediator _bus; + private readonly Publication _publication; + private readonly FakeTimeProvider _timeProvider; + private readonly RoutingKey _routingKey = new("MyEvent"); + private readonly InMemoryOutbox _outbox; + private readonly TracerProvider _traceProvider; + private readonly OutboxArchiver _archiver; + private const double TOLERANCE = 0.000000001; + + public AsyncExternalServiceBusArchiveObservabilityTests() + { + IAmABus internalBus = new InternalBus(); + _timeProvider = new FakeTimeProvider(); + var tracer = new BrighterTracer(_timeProvider); + + var builder = Sdk.CreateTracerProviderBuilder(); + + _traceProvider = builder + .AddSource("Paramore.Brighter.Tests", "Paramore.Brighter") + .ConfigureResource(r => r.AddService("in-memory-tracer")) + .AddInMemoryExporter(_exportedActivities) + .Build(); + + Brighter.CommandProcessor.ClearServiceBus(); + + _publication = new Publication + { + Source = new Uri("http://localhost"), + RequestType = typeof(MyEvent), + Topic = _routingKey, + Type = nameof(MyEvent), + }; + + var producer = new InMemoryProducer(internalBus, _timeProvider) + { + Publication = _publication + }; + + var producerRegistry = + new ProducerRegistry(new Dictionary { { _routingKey, producer } }); + + var retryPolicy = Policy + .Handle() + .RetryAsync(); + + var policyRegistry = new PolicyRegistry { { Brighter.CommandProcessor.RETRYPOLICYASYNC, retryPolicy } }; + + var messageMapperRegistry = new MessageMapperRegistry( + new SimpleMessageMapperFactory((_) => new MyEventMessageMapper()), + null); + messageMapperRegistry.Register(); + + _outbox = new InMemoryOutbox(_timeProvider) { Tracer = tracer }; + var archiveProvider = new InMemoryArchiveProvider(); + + _archiver = new OutboxArchiver(_outbox, archiveProvider, tracer: tracer); + + _bus = new OutboxProducerMediator( + producerRegistry, + policyRegistry, + messageMapperRegistry, + new EmptyMessageTransformerFactory(), + new EmptyMessageTransformerFactoryAsync(), + tracer, + _outbox, + timeProvider:_timeProvider); + } + + [Fact] + public async Task When_archiving_from_the_outbox() + { + var parentActivity = new ActivitySource("Paramore.Brighter.Tests").StartActivity("BrighterTracerSpanTests"); + + var context = new RequestContext(); + context.Span = parentActivity; + + //add and clear message + var myEvent = new MyEvent(); + var myMessage = new MyEventMessageMapper().MapToMessage(myEvent, _publication); + await _bus.AddToOutboxAsync(myMessage, context); + await _bus.ClearOutboxAsync([myMessage.Id], context); + + //se should have an entry in the outbox + _outbox.EntryCount.Should().Be(1); + + //allow time to pass + _timeProvider.Advance(TimeSpan.FromSeconds(300)); + + //archive + var dispatchedSince = TimeSpan.FromSeconds(100); + await _archiver.ArchiveAsync(dispatchedSince, context); + + //should be no messages in the outbox + _outbox.EntryCount.Should().Be(0); + + parentActivity?.Stop(); + + _traceProvider.ForceFlush(); + + //We should have exported matching activities + _exportedActivities.Count.Should().Be(9); + + _exportedActivities.Any(a => a.Source.Name == "Paramore.Brighter").Should().BeTrue(); + + //there should be a n archive create span for the batch + var createActivity = _exportedActivities.Single(a => a.DisplayName == $"{BrighterSemanticConventions.ArchiveMessages} {CommandProcessorSpanOperation.Archive.ToSpanName()}"); + createActivity.Should().NotBeNull(); + createActivity.ParentId.Should().Be(parentActivity?.Id); + + //check for outstanding messages span + var osCheckActivity = _exportedActivities.SingleOrDefault(a => + a.DisplayName == $"{OutboxDbOperation.DispatchedMessages.ToSpanName()} {InMemoryAttributes.DbName} {InMemoryAttributes.DbTable}"); + osCheckActivity.Should().NotBeNull(); + osCheckActivity?.ParentId.Should().Be(createActivity.Id); + + //check for delete messages span + var deleteActivity = _exportedActivities.SingleOrDefault(a => + a.DisplayName == $"{OutboxDbOperation.Delete.ToSpanName()} {InMemoryAttributes.DbName} {InMemoryAttributes.DbTable}"); + deleteActivity.Should().NotBeNull(); + deleteActivity?.ParentId.Should().Be(createActivity.Id); + + //check the tags for the create span + createActivity.TagObjects.Should().Contain(t => t.Key == BrighterSemanticConventions.ArchiveAge && Math.Abs(Convert.ToDouble(t.Value) - dispatchedSince.TotalMilliseconds) < TOLERANCE); + + //check the tags for the outstanding messages span + osCheckActivity?.Tags.Any(t => t.Key == BrighterSemanticConventions.DbOperation && t.Value == OutboxDbOperation.DispatchedMessages.ToSpanName()).Should().BeTrue(); + osCheckActivity?.Tags.Any(t => t.Key == BrighterSemanticConventions.DbTable && t.Value == InMemoryAttributes.DbTable).Should().BeTrue(); + osCheckActivity?.Tags.Any(t => t.Key == BrighterSemanticConventions.DbSystem && t.Value == DbSystem.Brighter.ToDbName()).Should().BeTrue(); + osCheckActivity?.Tags.Any(t => t.Key == BrighterSemanticConventions.DbName && t.Value == InMemoryAttributes.DbName).Should().BeTrue(); + + //check the tages for the delete messages span + deleteActivity?.Tags.Any(t => t.Key == BrighterSemanticConventions.DbOperation && t.Value == OutboxDbOperation.Delete.ToSpanName()).Should().BeTrue(); + deleteActivity?.Tags.Any(t => t.Key == BrighterSemanticConventions.DbTable && t.Value == InMemoryAttributes.DbTable).Should().BeTrue(); + deleteActivity?.Tags.Any(t => t.Key == BrighterSemanticConventions.DbSystem && t.Value == DbSystem.Brighter.ToDbName()).Should().BeTrue(); + deleteActivity?.Tags.Any(t => t.Key == BrighterSemanticConventions.DbName && t.Value == InMemoryAttributes.DbName).Should().BeTrue(); + + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_A_Message_A_Span_Is_Exported.cs b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_A_Message_A_Span_Is_Exported.cs index b51f77260..3656ddae8 100644 --- a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_A_Message_A_Span_Is_Exported.cs +++ b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_A_Message_A_Span_Is_Exported.cs @@ -77,7 +77,7 @@ public CommandProcessorClearObservabilityTests() {routingKey, _producer} }); - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, @@ -121,7 +121,7 @@ public void When_Clearing_A_Message_A_Span_Is_Exported() _traceProvider.ForceFlush(); //assert - _exportedActivities.Count.Should().Be(7); + _exportedActivities.Count.Should().Be(8); _exportedActivities.Any(a => a.Source.Name == "Paramore.Brighter").Should().BeTrue(); //there should be a create span for the batch @@ -129,7 +129,6 @@ public void When_Clearing_A_Message_A_Span_Is_Exported() createActivity.Should().NotBeNull(); createActivity.ParentId.Should().Be(parentActivity?.Id); createActivity.Tags.Any(t => t is { Key: BrighterSemanticConventions.Operation, Value: "clear" }).Should().BeTrue(); - //there should be a clear span for each message id var clearActivity = _exportedActivities.Single(a => a.DisplayName == $"{BrighterSemanticConventions.ClearMessages} {CommandProcessorSpanOperation.Clear.ToSpanName()}"); @@ -143,14 +142,14 @@ public void When_Clearing_A_Message_A_Span_Is_Exported() var message = _internalBus.Stream(new RoutingKey("MyEvent")).Single(); var depositEvent = events.Single(e => e.Name == OutboxDbOperation.Get.ToSpanName()); depositEvent.Tags.Any(a => a.Value != null && a.Key == BrighterSemanticConventions.OutboxSharedTransaction && (bool)a.Value == false).Should().BeTrue(); - depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.OutboxType && (string)a.Value == "sync" ).Should().BeTrue(); - depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessageId && (string)a.Value == message.Id ).Should().BeTrue(); - depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessagingDestination && (RoutingKey)a.Value == message.Header.Topic).Should().BeTrue(); + depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.OutboxType && a.Value as string == "sync" ).Should().BeTrue(); + depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessageId && a.Value as string == message.Id ).Should().BeTrue(); + depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessagingDestination && a.Value?.ToString() == message.Header.Topic.Value).Should().BeTrue(); depositEvent.Tags.Any(a => a is { Value: not null, Key: BrighterSemanticConventions.MessageBodySize } && (int)a.Value == message.Body.Bytes.Length).Should().BeTrue(); - depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessageBody && (string)a.Value == message.Body.Value).Should().BeTrue(); - depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessageType && (string)a.Value == message.Header.MessageType.ToString()).Should().BeTrue(); - depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessagingDestinationPartitionId && (string)a.Value == message.Header.PartitionKey).Should().BeTrue(); - depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessageHeaders && (string)a.Value == JsonSerializer.Serialize(message.Header)).Should().BeTrue(); + depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessageBody && a.Value as string == message.Body.Value).Should().BeTrue(); + depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessageType && a.Value as string == message.Header.MessageType.ToString()).Should().BeTrue(); + depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessagingDestinationPartitionId && a.Value as string == message.Header.PartitionKey).Should().BeTrue(); + depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessageHeaders && a.Value as string == JsonSerializer.Serialize(message.Header)).Should().BeTrue(); //there should be a span in the Db for retrieving the message var outBoxActivity = _exportedActivities.Single(a => a.DisplayName == $"{OutboxDbOperation.Get.ToSpanName()} {InMemoryAttributes.DbName} {InMemoryAttributes.DbTable}"); @@ -164,15 +163,15 @@ public void When_Clearing_A_Message_A_Span_Is_Exported() producerActivity.ParentId.Should().Be(clearActivity.Id); producerActivity.Kind.Should().Be(ActivityKind.Producer); - producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessagingOperationType && (string)t.Value == CommandProcessorSpanOperation.Publish.ToSpanName()).Should().BeTrue(); - producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessageId && (string)t.Value == message.Id).Should().BeTrue(); - producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessageType && (string)t.Value == message.Header.MessageType.ToString()).Should().BeTrue(); + producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessagingOperationType && t.Value as string == CommandProcessorSpanOperation.Publish.ToSpanName()).Should().BeTrue(); + producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessageId && t.Value as string == message.Id).Should().BeTrue(); + producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessageType && t.Value as string == message.Header.MessageType.ToString()).Should().BeTrue(); producerActivity.TagObjects.Any(t => t is { Value: not null, Key: BrighterSemanticConventions.MessagingDestination } && t.Value.ToString() == "MyEvent").Should().BeTrue(); - producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessagingDestinationPartitionId && (string)t.Value == message.Header.PartitionKey).Should().BeTrue(); + producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessagingDestinationPartitionId && t.Value as string == message.Header.PartitionKey).Should().BeTrue(); producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessageHeaders && (string)t.Value == JsonSerializer.Serialize(message.Header)).Should().BeTrue(); producerActivity.TagObjects.Any(t => t is { Value: not null, Key: BrighterSemanticConventions.MessageBodySize } && (int)t.Value == message.Body.Bytes.Length).Should().BeTrue(); - producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessageBody && (string)t.Value == message.Body.Value).Should().BeTrue(); - producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.ConversationId && (string)t.Value == message.Header.CorrelationId).Should().BeTrue(); + producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessageBody && t.Value as string == message.Body.Value).Should().BeTrue(); + producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.ConversationId && t.Value as string == message.Header.CorrelationId).Should().BeTrue(); producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.CeMessageId && (string)t.Value == message.Id).Should().BeTrue(); producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.CeSource && (Uri)t.Value == _producer.Publication.Source).Should().BeTrue(); @@ -199,6 +198,11 @@ public void When_Clearing_A_Message_A_Span_Is_Exported() produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.CeSubject && (string)t.Value == _producer.Publication.Subject).Should().BeTrue(); produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.CeType && (string)t.Value == _producer.Publication.Type).Should().BeTrue(); - //TODO: There should be a span event to mark as dispatched + //There should be a span event to mark as dispatched + var markAsDispatchedActivity = _exportedActivities.Single(a => a.DisplayName == $"{OutboxDbOperation.MarkDispatched.ToSpanName()} {InMemoryAttributes.DbName} {InMemoryAttributes.DbTable}"); + markAsDispatchedActivity.Tags.Any(t => t.Key == BrighterSemanticConventions.DbOperation && t.Value == OutboxDbOperation.MarkDispatched.ToSpanName()).Should().BeTrue(); + markAsDispatchedActivity.Tags.Any(t => t.Key == BrighterSemanticConventions.DbTable && t.Value == InMemoryAttributes.DbTable).Should().BeTrue(); + markAsDispatchedActivity.Tags.Any(t => t.Key == BrighterSemanticConventions.DbSystem && t.Value == DbSystem.Brighter.ToDbName()).Should().BeTrue(); + markAsDispatchedActivity.Tags.Any(t => t.Key == BrighterSemanticConventions.DbName && t.Value == InMemoryAttributes.DbName).Should().BeTrue(); } } diff --git a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_A_Message_A_Span_Is_Exported_Async.cs b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_A_Message_A_Span_Is_Exported_Async.cs index af5f758fd..fced18f4a 100644 --- a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_A_Message_A_Span_Is_Exported_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_A_Message_A_Span_Is_Exported_Async.cs @@ -77,7 +77,7 @@ public AsyncCommandProcessorClearObservabilityTests() {_topic, _producer} }); - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, @@ -121,7 +121,7 @@ public async Task When_Clearing_A_Message_A_Span_Is_Exported() _traceProvider.ForceFlush(); //assert - _exportedActivities.Count.Should().Be(7); + _exportedActivities.Count.Should().Be(8); _exportedActivities.Any(a => a.Source.Name == "Paramore.Brighter").Should().BeTrue(); //there should be a create span for the batch @@ -143,14 +143,14 @@ public async Task When_Clearing_A_Message_A_Span_Is_Exported() var message = _internalBus.Stream(new RoutingKey(_topic)).Single(); var depositEvent = events.Single(e => e.Name == OutboxDbOperation.Get.ToSpanName()); depositEvent.Tags.Any(a => a.Value != null && a.Key == BrighterSemanticConventions.OutboxSharedTransaction && (bool)a.Value == false).Should().BeTrue(); - depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.OutboxType && (string)a.Value == "async" ).Should().BeTrue(); - depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessageId && (string)a.Value == message.Id ).Should().BeTrue(); - depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessagingDestination && (RoutingKey)a.Value == message.Header.Topic).Should().BeTrue(); + depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.OutboxType && a.Value as string == "async" ).Should().BeTrue(); + depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessageId && a.Value as string == message.Id ).Should().BeTrue(); + depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessagingDestination && a.Value?.ToString() == message.Header.Topic.ToString()).Should().BeTrue(); depositEvent.Tags.Any(a => a is { Value: not null, Key: BrighterSemanticConventions.MessageBodySize } && (int)a.Value == message.Body.Bytes.Length).Should().BeTrue(); - depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessageBody && (string)a.Value == message.Body.Value).Should().BeTrue(); - depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessageType && (string)a.Value == message.Header.MessageType.ToString()).Should().BeTrue(); - depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessagingDestinationPartitionId && (string)a.Value == message.Header.PartitionKey).Should().BeTrue(); - depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessageHeaders && (string)a.Value == JsonSerializer.Serialize(message.Header)).Should().BeTrue(); + depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessageBody && a.Value as string == message.Body.Value).Should().BeTrue(); + depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessageType && a.Value as string == message.Header.MessageType.ToString()).Should().BeTrue(); + depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessagingDestinationPartitionId && a.Value as string == message.Header.PartitionKey).Should().BeTrue(); + depositEvent.Tags.Any(a => a.Key == BrighterSemanticConventions.MessageHeaders && a.Value as string == JsonSerializer.Serialize(message.Header)).Should().BeTrue(); //there should be a span in the Db for retrieving the message var outBoxActivity = _exportedActivities.Single(a => a.DisplayName == $"{OutboxDbOperation.Get.ToSpanName()} {InMemoryAttributes.DbName} {InMemoryAttributes.DbTable}"); @@ -164,39 +164,47 @@ public async Task When_Clearing_A_Message_A_Span_Is_Exported() producerActivity.ParentId.Should().Be(clearActivity.Id); producerActivity.Kind.Should().Be(ActivityKind.Producer); - producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessagingOperationType && (string)t.Value == CommandProcessorSpanOperation.Publish.ToSpanName()).Should().BeTrue(); - producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessageId && (string)t.Value == message.Id).Should().BeTrue(); - producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessageType && (string)t.Value == message.Header.MessageType.ToString()).Should().BeTrue(); - producerActivity.TagObjects.Any(t => t is { Value: not null, Key: BrighterSemanticConventions.MessagingDestination } && t.Value.ToString() == _topic).Should().BeTrue(); - producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessagingDestinationPartitionId && (string)t.Value == message.Header.PartitionKey).Should().BeTrue(); - producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessageHeaders && (string)t.Value == JsonSerializer.Serialize(message.Header)).Should().BeTrue(); + producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessagingOperationType && t.Value as string == CommandProcessorSpanOperation.Publish.ToSpanName()).Should().BeTrue(); + producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessageId && t.Value as string == message.Id).Should().BeTrue(); + producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessageType && t.Value as string == message.Header.MessageType.ToString()).Should().BeTrue(); + producerActivity.TagObjects.Any(t => t is { Value: not null, Key: BrighterSemanticConventions.MessagingDestination } && t.Value.ToString() == _topic.Value.ToString()).Should().BeTrue(); + producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessagingDestinationPartitionId && t.Value as string == message.Header.PartitionKey).Should().BeTrue(); + producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessageHeaders && t.Value as string == JsonSerializer.Serialize(message.Header)).Should().BeTrue(); producerActivity.TagObjects.Any(t => t is { Value: not null, Key: BrighterSemanticConventions.MessageBodySize } && (int)t.Value == message.Body.Bytes.Length).Should().BeTrue(); - producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessageBody && (string)t.Value == message.Body.Value).Should().BeTrue(); - producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.ConversationId && (string)t.Value == message.Header.CorrelationId).Should().BeTrue(); + producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.MessageBody && t.Value as string == message.Body.Value).Should().BeTrue(); + producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.ConversationId && t.Value as string == message.Header.CorrelationId).Should().BeTrue(); - producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.CeMessageId && (string)t.Value == message.Id).Should().BeTrue(); - producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.CeSource && (Uri)t.Value == _producer.Publication.Source).Should().BeTrue(); - producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.CeVersion && (string)t.Value == "1.0").Should().BeTrue(); - producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.CeSubject && (string)t.Value == _producer.Publication.Subject).Should().BeTrue(); - producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.CeType && (string)t.Value == _producer.Publication.Type).Should().BeTrue(); + producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.CeMessageId && t.Value as string == message.Id).Should().BeTrue(); + producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.CeSource && t.Value as Uri == _producer.Publication.Source).Should().BeTrue(); + producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.CeVersion && t.Value as string == "1.0").Should().BeTrue(); + producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.CeSubject && t.Value as string == _producer.Publication.Subject).Should().BeTrue(); + producerActivity.TagObjects.Any(t => t.Key == BrighterSemanticConventions.CeType && t.Value as string == _producer.Publication.Type).Should().BeTrue(); //there should be an event in the producer for producing the message var produceEvent = producerActivity.Events.Single(e => e.Name ==$"{_topic} {CommandProcessorSpanOperation.Publish.ToSpanName()}"); - produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.MessagingOperationType && (string)t.Value == CommandProcessorSpanOperation.Publish.ToSpanName()).Should().BeTrue(); - produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.MessagingSystem && (string)t.Value == MessagingSystem.InternalBus.ToMessagingSystemName()).Should().BeTrue(); - produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.MessagingDestination && (RoutingKey)t.Value == _topic).Should().BeTrue(); - produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.MessagingDestinationPartitionId && (string)t.Value == message.Header.PartitionKey).Should().BeTrue(); - produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.MessageId && (string)t.Value == message.Id).Should().BeTrue(); - produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.MessageType && (string)t.Value == message.Header.MessageType.ToString()).Should().BeTrue(); - produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.MessageHeaders && (string)t.Value == JsonSerializer.Serialize(message.Header)).Should().BeTrue(); + produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.MessagingOperationType && t.Value as string == CommandProcessorSpanOperation.Publish.ToSpanName()).Should().BeTrue(); + produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.MessagingSystem && t.Value as string == MessagingSystem.InternalBus.ToMessagingSystemName()).Should().BeTrue(); + produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.MessagingDestination && t.Value?.ToString() == _topic.Value.ToString()).Should().BeTrue(); + produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.MessagingDestinationPartitionId && t.Value as string == message.Header.PartitionKey).Should().BeTrue(); + produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.MessageId && t.Value as string == message.Id).Should().BeTrue(); + produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.MessageType && t.Value as string == message.Header.MessageType.ToString()).Should().BeTrue(); + produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.MessageHeaders && t.Value as string == JsonSerializer.Serialize(message.Header)).Should().BeTrue(); produceEvent.Tags.Any(t => t is { Value: not null, Key: BrighterSemanticConventions.MessageBodySize } && (int)t.Value == message.Body.Bytes.Length).Should().BeTrue(); - produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.MessageBody && (string)t.Value == message.Body.Value).Should().BeTrue(); - produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.ConversationId && (string)t.Value == message.Header.CorrelationId).Should().BeTrue(); - - produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.CeMessageId && (string)t.Value == message.Id).Should().BeTrue(); - produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.CeSource && (Uri)t.Value == _producer.Publication.Source).Should().BeTrue(); - produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.CeVersion && (string)t.Value == "1.0").Should().BeTrue(); - produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.CeSubject && (string)t.Value == _producer.Publication.Subject).Should().BeTrue(); - produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.CeType && (string)t.Value == _producer.Publication.Type).Should().BeTrue(); + produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.MessageBody && t.Value as string == message.Body.Value).Should().BeTrue(); + produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.ConversationId && t.Value as string == message.Header.CorrelationId).Should().BeTrue(); + + produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.CeMessageId && t.Value as string == message.Id).Should().BeTrue(); + produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.CeSource && t.Value as Uri == _producer.Publication.Source).Should().BeTrue(); + produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.CeVersion && t.Value as string == "1.0").Should().BeTrue(); + produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.CeSubject && t.Value as string == _producer.Publication.Subject).Should().BeTrue(); + produceEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.CeType && t.Value as string == _producer.Publication.Type).Should().BeTrue(); + + //There should be a span event to mark as dispatched + var markAsDispatchedActivity = _exportedActivities.Single(a => a.DisplayName == $"{OutboxDbOperation.MarkDispatched.ToSpanName()} {InMemoryAttributes.DbName} {InMemoryAttributes.DbTable}"); + markAsDispatchedActivity.Tags.Any(t => t.Key == BrighterSemanticConventions.DbOperation && t.Value == OutboxDbOperation.MarkDispatched.ToSpanName()).Should().BeTrue(); + markAsDispatchedActivity.Tags.Any(t => t.Key == BrighterSemanticConventions.DbTable && t.Value == InMemoryAttributes.DbTable).Should().BeTrue(); + markAsDispatchedActivity.Tags.Any(t => t.Key == BrighterSemanticConventions.DbSystem && t.Value == DbSystem.Brighter.ToDbName()).Should().BeTrue(); + markAsDispatchedActivity.Tags.Any(t => t.Key == BrighterSemanticConventions.DbName && t.Value == InMemoryAttributes.DbName).Should().BeTrue(); + } } diff --git a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_Multipile_Messages_Spans_Are_Exported_Async.cs b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_Multipile_Messages_Spans_Are_Exported_Async.cs index fcc33ca41..82f41ccef 100644 --- a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_Multipile_Messages_Spans_Are_Exported_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_Multipile_Messages_Spans_Are_Exported_Async.cs @@ -80,7 +80,7 @@ public AsyncCommandProcessorMultipleClearObservabilityTests() {routingKey, producer} }); - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, @@ -126,7 +126,7 @@ public async Task When_Clearing_A_Message_A_Span_Is_Exported() _traceProvider.ForceFlush(); //assert - _exportedActivities.Count.Should().Be(18); + _exportedActivities.Count.Should().Be(21); _exportedActivities.Any(a => a.Source.Name == "Paramore.Brighter").Should().BeTrue(); //there should be a create span for the batch diff --git a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_Multiple_Messages_Spans_Are_Exported.cs b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_Multiple_Messages_Spans_Are_Exported.cs index d9d0c94d9..756c6316d 100644 --- a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_Multiple_Messages_Spans_Are_Exported.cs +++ b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_Multiple_Messages_Spans_Are_Exported.cs @@ -78,7 +78,7 @@ public CommandProcessorMultipleClearObservabilityTests() {routingKey, producer} }); - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, @@ -125,7 +125,7 @@ public void When_Clearing_A_Message_A_Span_Is_Exported() _traceProvider.ForceFlush(); //assert - _exportedActivities.Count.Should().Be(18); + _exportedActivities.Count.Should().Be(21); _exportedActivities.Any(a => a.Source.Name == "Paramore.Brighter").Should().BeTrue(); //there should be a create span for the batch diff --git a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_Outstanding_Messages_Spans_Are_Exported.cs b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_Outstanding_Messages_Spans_Are_Exported.cs index 062ecde4b..bae7b808b 100644 --- a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_Outstanding_Messages_Spans_Are_Exported.cs +++ b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_Outstanding_Messages_Spans_Are_Exported.cs @@ -80,7 +80,7 @@ public CommandProcessorClearOutstandingObservabilityTests() {routingKey, producer} }); - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_Outstanding_Messages_Spans_Are_Exported_Bulk.cs b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_Outstanding_Messages_Spans_Are_Exported_Bulk.cs index 51dc6ffc8..3d36f9b05 100644 --- a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_Outstanding_Messages_Spans_Are_Exported_Bulk.cs +++ b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Clear/When_Clearing_Outstanding_Messages_Spans_Are_Exported_Bulk.cs @@ -79,7 +79,7 @@ public AsyncCommandProcessorBulkClearOutstandingObservabilityTests() {routingKey, producer} }); - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Deposit/When_Depositing_A_Request_A_Span_Is_Exported.cs b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Deposit/When_Depositing_A_Request_A_Span_Is_Exported.cs index 03f013d2b..1ad5671da 100644 --- a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Deposit/When_Depositing_A_Request_A_Span_Is_Exported.cs +++ b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Deposit/When_Depositing_A_Request_A_Span_Is_Exported.cs @@ -71,7 +71,7 @@ public CommandProcessorDepositObservabilityTests() } }); - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Deposit/When_Depositing_A_Request_A_Span_Is_Exported_Async.cs b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Deposit/When_Depositing_A_Request_A_Span_Is_Exported_Async.cs index 5205ef266..d501b91e8 100644 --- a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Deposit/When_Depositing_A_Request_A_Span_Is_Exported_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Deposit/When_Depositing_A_Request_A_Span_Is_Exported_Async.cs @@ -73,7 +73,7 @@ public AsyncCommandProcessorDepositObservabilityTests() } }); - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Deposit/When_Depositing_Multiple_Requests_Spans_Are_Exported.cs b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Deposit/When_Depositing_Multiple_Requests_Spans_Are_Exported.cs index a610a16e2..4e09ac05b 100644 --- a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Deposit/When_Depositing_Multiple_Requests_Spans_Are_Exported.cs +++ b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Deposit/When_Depositing_Multiple_Requests_Spans_Are_Exported.cs @@ -70,7 +70,7 @@ public CommandProcessorMultipleDepositObservabilityTests() } }); - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Deposit/When_Depositing_Multiple_Requests_Spans_Are_Exported_Async.cs b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Deposit/When_Depositing_Multiple_Requests_Spans_Are_Exported_Async.cs index 9e9755679..84fec391b 100644 --- a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Deposit/When_Depositing_Multiple_Requests_Spans_Are_Exported_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Deposit/When_Depositing_Multiple_Requests_Spans_Are_Exported_Async.cs @@ -71,7 +71,7 @@ public AsyncCommandProcessorMultipleDepositObservabilityTests() } }); - IAmAnExternalBusService bus = new ExternalBusService( + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( producerRegistry, policyRegistry, messageMapperRegistry, diff --git a/tests/Paramore.Brighter.InMemory.Tests/Sweeper/When_sweeping_the_outbox.cs b/tests/Paramore.Brighter.InMemory.Tests/Sweeper/When_sweeping_the_outbox.cs index dfb363659..9d684d168 100644 --- a/tests/Paramore.Brighter.InMemory.Tests/Sweeper/When_sweeping_the_outbox.cs +++ b/tests/Paramore.Brighter.InMemory.Tests/Sweeper/When_sweeping_the_outbox.cs @@ -52,7 +52,7 @@ public async Task When_outstanding_in_outbox_sweep_clears_them() mapperRegistry.Register(); - var bus = new ExternalBusService( + var bus = new OutboxProducerMediator( producerRegistry, new DefaultPolicy(), mapperRegistry, @@ -128,7 +128,7 @@ public async Task When_outstanding_in_outbox_sweep_clears_them_async() mapperRegistry.Register(); - var bus = new ExternalBusService( + var bus = new OutboxProducerMediator( producerRegistry, new DefaultPolicy(), mapperRegistry, @@ -203,7 +203,7 @@ public async Task When_too_new_to_sweep_leaves_them() mapperRegistry.Register(); - var bus = new ExternalBusService( + var bus = new OutboxProducerMediator( producerRegistry, new DefaultPolicy(), mapperRegistry, @@ -287,7 +287,7 @@ public async Task When_too_new_to_sweep_leaves_them_async() mapperRegistry.Register(); - var bus = new ExternalBusService( + var bus = new OutboxProducerMediator( producerRegistry, new DefaultPolicy(), mapperRegistry,