Skip to content
This repository has been archived by the owner on Dec 11, 2024. It is now read-only.

Commit

Permalink
Feature/correlations (#100)
Browse files Browse the repository at this point in the history
* new Correlation type with unique id per scope
* correlation id gets automatically published in integration events
* optional correlation for events outside of scopes (tenant creation)
  • Loading branch information
marcwittke authored Jul 29, 2020
1 parent 6733a8e commit 3f600c5
Show file tree
Hide file tree
Showing 13 changed files with 150 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected BackendFxApplication(ICompositionRoot compositionRoot, ITenantIdServic
return _isBooted.Wait(timeoutMilliSeconds, cancellationToken);
}

public IDisposable BeginScope(IIdentity identity = null, TenantId tenantId = null)
public IDisposable BeginScope(IIdentity identity = null, TenantId tenantId = null, Guid? correlationId = null)
{
var scopeIndex = _scopeIndex++;
tenantId = tenantId ?? new TenantId(null);
Expand All @@ -69,6 +69,10 @@ public IDisposable BeginScope(IIdentity identity = null, TenantId tenantId = nul
var scope = CompositionRoot.BeginScope();
CompositionRoot.GetInstance<ICurrentTHolder<TenantId>>().ReplaceCurrent(tenantId);
CompositionRoot.GetInstance<ICurrentTHolder<IIdentity>>().ReplaceCurrent(identity);
if (correlationId.HasValue)
{
CompositionRoot.GetInstance<ICurrentTHolder<Correlation>>().Current.Resume(correlationId.Value);
}

return new MultipleDisposable(scope, scopeDurationLogger);
}
Expand All @@ -87,9 +91,9 @@ public void Run<TJob>(TenantId tenantId) where TJob : class, IJob
Invoke(() => CompositionRoot.GetInstance<TJob>().Run(), new SystemIdentity(), tenantId);
}

public void Invoke(Action action, IIdentity identity, TenantId tenantId)
public void Invoke(Action action, IIdentity identity, TenantId tenantId, Guid? correlationId = null)
{
using (BeginScope(new SystemIdentity(), tenantId))
using (BeginScope(identity, tenantId, correlationId))
{
var unitOfWork = CompositionRoot.GetInstance<IUnitOfWork>();
try
Expand All @@ -110,9 +114,9 @@ public void Invoke(Action action, IIdentity identity, TenantId tenantId)
}
}

public async Task InvokeAsync(Func<Task> awaitableAsyncAction, IIdentity identity, TenantId tenantId)
public async Task InvokeAsync(Func<Task> awaitableAsyncAction, IIdentity identity, TenantId tenantId, Guid? correlationId = null)
{
using (BeginScope(new SystemIdentity(), tenantId))
using (BeginScope(identity, tenantId, correlationId))
{
var unitOfWork = CompositionRoot.GetInstance<IUnitOfWork>();
try
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using Backend.Fx.Logging;

namespace Backend.Fx.Patterns.DependencyInjection
{
public class Correlation
{
private static readonly ILogger Logger = LogManager.Create<Correlation>();

public Guid Id { get; private set; } = Guid.NewGuid();

public void Resume(Guid correlationId)
{
Logger.Info($"Resuming correlation {correlationId}");
Id = correlationId;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace Backend.Fx.Patterns.DependencyInjection
{
public class CurrentCorrelationHolder : CurrentTHolder<Correlation>
{
public override Correlation ProvideInstance()
{
return new Correlation();
}

protected override string Describe(Correlation instance)
{
if (instance == null)
{
return "<NULL>";
}

return $"Correlation: {instance.Id}";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ public interface IBackendFxApplication : IDisposable
/// <returns></returns>
Task Boot(CancellationToken cancellationToken = default(CancellationToken));

IDisposable BeginScope(IIdentity identity = null, TenantId tenantId = null);
IDisposable BeginScope(IIdentity identity = null, TenantId tenantId = null, Guid? correlationId = null);

void Invoke(Action action, IIdentity identity, TenantId tenantId);
void Invoke(Action action, IIdentity identity, TenantId tenantId, Guid? correlationId = null);

Task InvokeAsync(Func<Task> awaitableAsyncAction, IIdentity identity, TenantId tenantId);
Task InvokeAsync(Func<Task> awaitableAsyncAction, IIdentity identity, TenantId tenantId, Guid? correlationId = null);

void Run<TJob>() where TJob : class, IJob;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,12 @@ public interface ICompositionRoot : IDisposable, IDomainEventHandlerProvider
IEnumerable<T> GetInstances<T>() where T : class;

IDisposable BeginScope();

/// <summary>
/// Gets the current correlation, when inside a scope, otherwise this method will return false
/// </summary>
/// <param name="correlation"></param>
/// <returns></returns>
bool TryGetCurrentCorrelation(out Correlation correlation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,18 @@ protected EventBus(IBackendFxApplication application)
}

public abstract void Connect();
public abstract Task Publish(IIntegrationEvent integrationEvent);

public Task Publish(IIntegrationEvent integrationEvent)
{
if (_application.CompositionRoot.TryGetCurrentCorrelation(out Correlation correlation))
{
((IntegrationEvent)integrationEvent).SetCorrelation(correlation);
}

return PublishOnEventBus(integrationEvent);
}

protected abstract Task PublishOnEventBus(IIntegrationEvent integrationEvent);


/// <inheritdoc />
Expand Down Expand Up @@ -128,7 +139,8 @@ protected virtual void Process(string eventName, EventProcessingContext context)
_application.Invoke(
() => subscription.Process(eventName, context),
new SystemIdentity(),
context.TenantId);
context.TenantId,
context.CorrelationId);
}
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ public abstract class EventProcessingContext
{
public abstract TenantId TenantId { get; }
public abstract dynamic DynamicEvent { get; }
public abstract Guid CorrelationId { get; }

public abstract IIntegrationEvent GetTypedEvent(Type eventType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public InMemoryEventBus(IBackendFxApplication application)
public override void Connect()
{ }

public override Task Publish(IIntegrationEvent integrationEvent)
protected override Task PublishOnEventBus(IIntegrationEvent integrationEvent)
{
Task.Run(() => Process(integrationEvent.GetType().FullName, new InMemoryProcessingContext(integrationEvent)));

Expand All @@ -42,6 +42,7 @@ public InMemoryProcessingContext(IIntegrationEvent integrationEvent)
public override TenantId TenantId => new TenantId(_integrationEvent.TenantId);

public override dynamic DynamicEvent => _integrationEvent;
public override Guid CorrelationId => _integrationEvent.CorrelationId;

public override IIntegrationEvent GetTypedEvent(Type eventType)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,36 @@
namespace Backend.Fx.Patterns.EventAggregation.Integration
{
using System;
using System;
using Backend.Fx.Patterns.DependencyInjection;

namespace Backend.Fx.Patterns.EventAggregation.Integration
{
public interface IIntegrationEvent
{
Guid Id { get; }
DateTime CreationDate { get; }
int TenantId { get; }
Guid CorrelationId { get; }
}

/// <summary>
/// Events that should be handled in a separate context. Might be persisted as well using an external event bus.
/// See https://blogs.msdn.microsoft.com/cesardelatorre/2017/02/07/domain-events-vs-integration-events-in-domain-driven-design-and-microservices-architectures/
/// </summary>
public class IntegrationEvent : IIntegrationEvent
public abstract class IntegrationEvent : IIntegrationEvent
{
public Guid Id { get; } = Guid.NewGuid();

public DateTime CreationDate { get; } = DateTime.UtcNow;

public int TenantId { get; }

public Guid CorrelationId { get; private set; }

internal void SetCorrelation(Correlation correlation)
{
CorrelationId = correlation.Id;
}

public IntegrationEvent(int tenantId)
protected IntegrationEvent(int tenantId)
{
TenantId = tenantId;
}
Expand Down
6 changes: 4 additions & 2 deletions src/implementations/Backend.Fx.RabbitMq/RabbitMQEventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private void ChannelOnMessageReceived(object sender, BasicDeliverEventArgs args)
Process(args.RoutingKey, new RabbitMqEventProcessingContext(args.Body));
}

public override Task Publish(IIntegrationEvent integrationEvent)
protected override Task PublishOnEventBus(IIntegrationEvent integrationEvent)
{
Logger.Info($"Publishing {integrationEvent.GetType().Name}");
_channel.EnsureOpen();
Expand Down Expand Up @@ -94,13 +94,15 @@ public RabbitMqEventProcessingContext(object rawReceivedMessage)
}

_jsonString = Encoding.UTF8.GetString(rawEventPayloadBytes);
var eventStub = JsonConvert.DeserializeAnonymousType(_jsonString, new {tenantId = 0});
var eventStub = JsonConvert.DeserializeAnonymousType(_jsonString, new {tenantId = 0, correlationId = Guid.Empty} );
TenantId = new TenantId(eventStub.tenantId);
CorrelationId = eventStub.correlationId;
}

public override TenantId TenantId { get; }

public override dynamic DynamicEvent => JObject.Parse(_jsonString);
public override Guid CorrelationId { get; }

public override IIntegrationEvent GetTypedEvent(Type eventType)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public SimpleInjectorCompositionRoot(ILifestyleSelectionBehavior lifestyleBehavi
ScopedLifestyle = scopedLifestyle;
Container.Options.LifestyleSelectionBehavior = lifestyleBehavior;
Container.Options.DefaultScopedLifestyle = ScopedLifestyle;
Container.Register<ICurrentTHolder<Correlation>, CurrentCorrelationHolder>();
}

public Container Container { get; } = new Container();
Expand Down Expand Up @@ -89,6 +90,19 @@ public IDisposable BeginScope()
return scope;
}

public bool TryGetCurrentCorrelation(out Correlation correlation)
{
Scope scope = ScopedLifestyle.GetCurrentScope(Container);
if (scope == null)
{
correlation = null;
return false;
}

correlation = scope.GetInstance<ICurrentTHolder<Correlation>>().Current;
return true;
}

public Scope GetCurrentScope()
{
return ScopedLifestyle.GetCurrentScope(Container);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Linq;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Security.Principal;
using System.Threading.Tasks;
using Backend.Fx.BuildingBlocks;
Expand Down Expand Up @@ -27,7 +29,7 @@ public TheBackendFxApplication()
[Fact]
public async Task RunsProdDataGeneratorsOnEveryBoot()
{
using (var sut = CreateSystemUnderTest())
using (AnApplication sut = CreateSystemUnderTest())
{
await sut.Boot();
sut.EnsureProdTenant();
Expand All @@ -41,7 +43,7 @@ public async Task RunsProdDataGeneratorsOnEveryBoot()
}
}

using (var sut = CreateSystemUnderTest())
using (AnApplication sut = CreateSystemUnderTest())
{
await sut.Boot();
sut.EnsureProdTenant();
Expand All @@ -59,7 +61,7 @@ public async Task RunsProdDataGeneratorsOnEveryBoot()
[Fact]
public async Task RunsDemoDataGeneratorsOnEveryBoot()
{
using (var sut = CreateSystemUnderTest())
using (AnApplication sut = CreateSystemUnderTest())
{
await sut.Boot();
sut.EnsureDemoTenant();
Expand All @@ -73,7 +75,7 @@ public async Task RunsDemoDataGeneratorsOnEveryBoot()
}
}

using (var sut = CreateSystemUnderTest())
using (AnApplication sut = CreateSystemUnderTest())
{
await sut.Boot();
sut.EnsureDemoTenant();
Expand All @@ -92,41 +94,68 @@ public async Task RunsDemoDataGeneratorsOnEveryBoot()
[Fact]
public async Task MaintainsTenantIdWhenBeginningScopes()
{
using (var sut = CreateSystemUnderTest())
using (AnApplication sut = CreateSystemUnderTest())
{
await sut.Boot();
Enumerable.Range(1, 100).AsParallel().ForAll(i =>
{
// ReSharper disable AccessToDisposedClosure
using (sut.BeginScope(new SystemIdentity(), new TenantId(i)))
{
var insideScopeTenantId = sut.CompositionRoot.GetInstance<ICurrentTHolder<TenantId>>().Current;
TenantId insideScopeTenantId = sut.CompositionRoot.GetInstance<ICurrentTHolder<TenantId>>().Current;
Assert.True(insideScopeTenantId.HasValue);
Assert.Equal(i, insideScopeTenantId.Value);
}
// ReSharper restore AccessToDisposedClosure
});

}
}

[Fact]
public async Task MaintainsCorrelationWhenBeginningScopes()
{
using (AnApplication sut = CreateSystemUnderTest())
{
await sut.Boot();
var usedCorrelationIds = new ConcurrentBag<Guid>();
Enumerable.Range(1, 100).AsParallel().ForAll(i =>
{
// ReSharper disable AccessToDisposedClosure
using (sut.BeginScope(new SystemIdentity(), new TenantId(i)))
{
Correlation insideScopeCorrelation = sut.CompositionRoot.GetInstance<ICurrentTHolder<Correlation>>().Current;
Assert.NotEqual(Guid.Empty,insideScopeCorrelation.Id);
Assert.DoesNotContain(insideScopeCorrelation.Id, usedCorrelationIds);
usedCorrelationIds.Add(insideScopeCorrelation.Id);
}
// ReSharper restore AccessToDisposedClosure
});
}
}

[Fact]
public async Task MaintainsIdentityWhenBeginningScopes()
{
using (var sut = CreateSystemUnderTest())
using (AnApplication sut = CreateSystemUnderTest())
{
await sut.Boot();
Enumerable.Range(1, 100).AsParallel().ForAll(i =>
{
// ReSharper disable AccessToDisposedClosure
using (sut.BeginScope(new GenericIdentity(i.ToString()), new TenantId(100)))
{
var insideScopeIdentity = sut.CompositionRoot.GetInstance<ICurrentTHolder<IIdentity>>().Current;
IIdentity insideScopeIdentity = sut.CompositionRoot.GetInstance<ICurrentTHolder<IIdentity>>().Current;
Assert.Equal(i.ToString(), insideScopeIdentity.Name);
}
// ReSharper restore AccessToDisposedClosure
});
}
}

private AnApplication CreateSystemUnderTest()
{
SimpleInjectorCompositionRoot compositionRoot = new SimpleInjectorCompositionRoot();
var compositionRoot = new SimpleInjectorCompositionRoot();
var sut = new AnApplication(compositionRoot);
IEventBus eventBus = new InMemoryEventBus(sut);
sut.CompositionRoot.RegisterModules(
Expand Down
Loading

0 comments on commit 3f600c5

Please sign in to comment.