Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Archiver Observability #3353

Merged
merged 20 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8461845
feat: add an archive test without observablity, to ensure test passes…
iancooper Oct 9, 2024
358d03b
chore: switch branch; commit to save wip
iancooper Oct 11, 2024
07eb800
chore: Merge branch 'master' into arch_obser
iancooper Oct 11, 2024
1f3dcca
chore: Merge branch 'master' into arch_obser
iancooper Oct 11, 2024
9b4acbc
chore: switch to another device
iancooper Oct 11, 2024
1fa1ee6
feat: we need a create span for the archive batch, if issued through …
iancooper Oct 15, 2024
c3e900b
fix: missing mark dispatched in clear otel checks
iancooper Oct 27, 2024
b327a98
feat: add tagobjects for archive batch
iancooper Oct 27, 2024
a72b352
feat: opentelemetry for the archive operation
iancooper Oct 27, 2024
9ddedf7
feat: add async archive operation opentelemetry
iancooper Oct 27, 2024
5a60cc8
feat: cleanup old archiver telemetry
iancooper Oct 27, 2024
4de9e22
feat: add archive async telemetry
iancooper Oct 27, 2024
5e5813b
Merge branch 'master' into arch_obser
iancooper Oct 27, 2024
68d77b9
feat: move archive functionality to OutboxArchiver.cs
iancooper Oct 28, 2024
b20e357
feat: rename of ExternalBusService.cs to OutboxProducerMediator.cs
iancooper Oct 28, 2024
c824258
fix: name the mediator correctly
iancooper Oct 28, 2024
513052d
fix: issue with PackageReference not PackageVersion
iancooper Oct 28, 2024
4a481b3
chore: Merge branch 'master' into arch_obser
iancooper Oct 28, 2024
8890035
fix: failing reflection based construction of OutBoxProducerMediator …
iancooper Oct 28, 2024
8477f51
Merge branch 'master' into arch_obser
iancooper Nov 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions docs/adr/0021-move-archive-from-esb.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# 20. Move Archive Methods from External Service Bus to Outbox Archiver to Reduce Complexity

Date: 2024-10-28

## Status

Adopted

## Context

The `ExternalServiceBus` class is a mediator between producer and outbox. It suffers from complexity, see ADR 0020.

The `ExternalServiceBus` class has a number of methods that are related to archiving messages. These methods are:
- Archive
- ArchiveAsync

The OutboxArchiver and the TimedOuboxArchiver are the only callers of these ExternalBus Archive methods. The TimedOutboxArchiver provides both a timer that fires the archiver at a given periodic interval, and uses the global distributed lock to ensure only one archiver runs.

## Decision

Whilst the `Archive` methods were not called out by CodeScene analysis, they do add to the overall set of responsibilities of `ExternalServiceBus`. As they have different reasons to change to `ExternalServiceBus` they should be moved to a separate class.

We will move the implementation from `ExternalServiceBus` into `OutboxArchiver`. We will call `OutboxArchiver` from `TimedOutboxArchiver`.

## Consequences

One, we have moved these functions, it makes sense to rename the `ExternalServiceBus` class to 'OutboxProducerMediator' as this better describes its role within our codebase.





Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ THE SOFTWARE. */
using Paramore.Brighter.FeatureSwitch;
using Paramore.Brighter.Logging;
using System.Text.Json;
using System.Transactions;
using Paramore.Brighter.DynamoDb;
using Paramore.Brighter.Observability;
using Polly.Registry;
Expand Down Expand Up @@ -94,7 +93,7 @@ public static IBrighterBuilder BrighterHandlerBuilder(IServiceCollection service
var mapperRegistry = new ServiceCollectionMessageMapperRegistry(services, options.MapperLifetime);
services.TryAddSingleton(mapperRegistry);

services.TryAddSingleton<IAmARequestContextFactory>(options.RequestContextFactory);
services.TryAddSingleton(options.RequestContextFactory);

if (options.FeatureSwitchRegistry != null)
services.TryAddSingleton(options.FeatureSwitchRegistry);
Expand Down Expand Up @@ -136,9 +135,6 @@ public static IBrighterBuilder BrighterHandlerBuilder(IServiceCollection service
/// </summary>
/// <param name="brighterBuilder">The Brighter builder to add this option to</param>
/// <param name="configure">A callback that allows you to configure <see cref="ExternalBusConfiguration"/> options</param>
/// <param name="transactionProvider">The transaction provider for the outbox, can be null for in-memory default
/// of <see cref="CommittableTransactionProvider"/> which you must set the generic type to <see cref="CommittableTransaction"/> for
/// </param>
/// <param name="serviceLifetime">The lifetime of the transaction provider</param>
/// <returns>The Brighter builder to allow chaining of requests</returns>
public static IBrighterBuilder UseExternalBus(
Expand Down Expand Up @@ -222,8 +218,8 @@ public static IBrighterBuilder UseExternalBus(

brighterBuilder.Services.TryAddSingleton<IAmExternalBusConfiguration>(busConfiguration);

brighterBuilder.Services.TryAdd(new ServiceDescriptor(typeof(IAmAnExternalBusService),
(serviceProvider) => BuildExternalBus(
brighterBuilder.Services.TryAdd(new ServiceDescriptor(typeof(IAmAnOutboxProducerMediator),
(serviceProvider) => BuildOutBoxProducerMediator(
serviceProvider, transactionType, busConfiguration, brighterBuilder.PolicyRegistry, outbox
),
ServiceLifetime.Singleton));
Expand All @@ -237,7 +233,7 @@ private static INeedInstrumentation AddEventBus(
INeedMessaging messagingBuilder,
IUseRpc useRequestResponse)
{
var eventBus = provider.GetService<IAmAnExternalBusService>();
var eventBus = provider.GetService<IAmAnOutboxProducerMediator>();
var eventBusConfiguration = provider.GetService<IAmExternalBusConfiguration>();
var serviceActivatorOptions = provider.GetService<IServiceActivatorOptions>();

Expand Down Expand Up @@ -325,17 +321,17 @@ private static object BuildCommandProcessor(IServiceProvider provider)
return commandProcessor;
}

private static IAmAnExternalBusService BuildExternalBus(IServiceProvider serviceProvider,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ No longer an issue: Excess Number of Function Arguments
BuildExternalBus is no longer above the threshold for number of arguments

private static IAmAnOutboxProducerMediator BuildOutBoxProducerMediator(IServiceProvider serviceProvider,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ New issue: Excess Number of Function Arguments
BuildOutBoxProducerMediator has 5 arguments, threshold = 4

Type transactionType,
ExternalBusConfiguration busConfiguration,
IPolicyRegistry<string> policyRegistry,
IAmAnOutbox outbox)
{
//Because the bus has specialized types as members, we need to create the bus type dynamically
//again to prevent someone configuring Brighter from having to pass generic types
var busType = typeof(ExternalBusService<,>).MakeGenericType(typeof(Message), transactionType);
var busType = typeof(OutboxProducerMediator<,>).MakeGenericType(typeof(Message), transactionType);

return (IAmAnExternalBusService)Activator.CreateInstance(
return (IAmAnOutboxProducerMediator)Activator.CreateInstance(
busType,
busConfiguration.ProducerRegistry,
policyRegistry,
Expand All @@ -344,13 +340,11 @@ private static IAmAnExternalBusService BuildExternalBus(IServiceProvider service
TransformFactoryAsync(serviceProvider),
Tracer(serviceProvider),
outbox,
busConfiguration.ArchiveProvider,
RequestContextFactory(serviceProvider),
busConfiguration.OutboxTimeout,
busConfiguration.MaxOutStandingMessages,
busConfiguration.MaxOutStandingCheckInterval,
busConfiguration.OutBoxBag,
busConfiguration.ArchiveBatchSize,
TimeProvider.System,
busConfiguration.InstrumentationOptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static IBrighterBuilder UseOutboxArchiver<TTransaction>(this IBrighterBui
brighterBuilder.Services.TryAddSingleton<TimedOutboxArchiverOptions>(options);
brighterBuilder.Services.AddSingleton<IAmAnArchiveProvider>(archiveProvider);

brighterBuilder.Services.AddHostedService<TimedOutboxArchiver>();
brighterBuilder.Services.AddHostedService<TimedOutboxArchiver<Message, TTransaction>>();

return brighterBuilder;
}
Expand Down
49 changes: 30 additions & 19 deletions src/Paramore.Brighter.Extensions.Hosting/TimedOutboxArchiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,43 @@ THE SOFTWARE. */
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Paramore.Brighter.Logging;
// ReSharper disable StaticMemberInGenericType

namespace Paramore.Brighter.Extensions.Hosting
{

/// <summary>
/// The archiver will find messages in the outbox that are older than a certain age and archive them
/// </summary>
/// <param name="serviceScopeFactory">Needed to create a scope within which to create a <see cref="CommandProcessor"/></param>
/// <param name="distributedLock">Used to ensure that only one instance of the <see cref="TimedOutboxSweeper"/> is running</param>
/// <param name="options">The <see cref="TimedOutboxArchiverOptions"/> that control how the archiver runs, such as interval</param>
public class TimedOutboxArchiver(
IServiceScopeFactory serviceScopeFactory,
IDistributedLock distributedLock,
TimedOutboxArchiverOptions options
)
: IHostedService, IDisposable
public class TimedOutboxArchiver<TMessage, TTransaction> : IHostedService, IDisposable where TMessage : Message
{
private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<TimedOutboxSweeper>();
private Timer _timer;
private readonly OutboxArchiver<TMessage, TTransaction> _archiver;
private readonly TimeSpan _dispatchedSince;
private readonly IDistributedLock _distributedLock;
private readonly TimedOutboxArchiverOptions _options;

/// <summary>
/// The archiver will find messages in the outbox that are older than a certain age and archive them
/// </summary>
/// <param name="archiver">The archiver to use</param>
/// <param name="dispatchedSince">How old should a message be, in order to archive it?</param>
/// <param name="distributedLock">Used to ensure that only one instance of the <see cref="TimedOutboxSweeper"/> is running</param>
/// <param name="options">The <see cref="TimedOutboxArchiverOptions"/> that control how the archiver runs, such as interval</param>
public TimedOutboxArchiver(
OutboxArchiver<TMessage, TTransaction> archiver,
TimeSpan dispatchedSince,
IDistributedLock distributedLock,
TimedOutboxArchiverOptions options)
{
_archiver = archiver;
_dispatchedSince = dispatchedSince;
_distributedLock = distributedLock;
_options = options;
}

private const string LockingResourceName = "Archiver";

Expand All @@ -60,8 +74,8 @@ public Task StartAsync(CancellationToken cancellationToken)
{
s_logger.LogInformation("Outbox Archiver Service is starting");

_timer = new Timer(async (e) => await Archive(e, cancellationToken), null, TimeSpan.Zero,
TimeSpan.FromSeconds(options.TimerInterval));
_timer = new Timer(async e => await Archive(e, cancellationToken), null, TimeSpan.Zero,
TimeSpan.FromSeconds(_options.TimerInterval));

return Task.CompletedTask;
}
Expand Down Expand Up @@ -90,24 +104,21 @@ public void Dispose()

private async Task Archive(object state, CancellationToken cancellationToken)
{
var lockId = await distributedLock.ObtainLockAsync(LockingResourceName, cancellationToken);
var lockId = await _distributedLock.ObtainLockAsync(LockingResourceName, cancellationToken);
if (lockId != null)
{
var scope = serviceScopeFactory.CreateScope();
s_logger.LogInformation("Outbox Archiver looking for messages to Archive");
try
{
IAmAnExternalBusService externalBusService = scope.ServiceProvider.GetService<IAmAnExternalBusService>();

await externalBusService.ArchiveAsync(options.MinimumAge, new RequestContext(), cancellationToken);
await _archiver.ArchiveAsync(_dispatchedSince, new RequestContext(), cancellationToken);
}
catch (Exception e)
{
s_logger.LogError(e, "Error while sweeping the outbox");
}
finally
{
await distributedLock.ReleaseLockAsync(LockingResourceName, lockId, cancellationToken);
await _distributedLock.ReleaseLockAsync(LockingResourceName, lockId, cancellationToken);
}

s_logger.LogInformation("Outbox Sweeper sleeping");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public Dispatcher Build(string hostName)

var outbox = new SinkOutboxSync();

var externalBus = new ExternalBusService<Message, CommittableTransaction>(
var mediator = new OutboxProducerMediator<Message, CommittableTransaction>(
producerRegistry: producerRegistry,
policyRegistry: new DefaultPolicy(),
mapperRegistry: outgoingMessageMapperRegistry,
Expand All @@ -175,7 +175,7 @@ public Dispatcher Build(string hostName)
commandProcessor = CommandProcessorBuilder.StartNew()
.Handlers(new HandlerConfiguration(subscriberRegistry, new ControlBusHandlerFactorySync(_dispatcher, () => commandProcessor)))
.Policies(policyRegistry)
.ExternalBus(ExternalBusType.FireAndForget, externalBus)
.ExternalBus(ExternalBusType.FireAndForget, mediator)
.ConfigureInstrumentation(null, InstrumentationOptions.None)
.RequestContextFactory(new InMemoryRequestContextFactory())
.Build();
Expand Down
Loading
Loading