Skip to content

Commit

Permalink
WIP - Wrap event and added delegates to handle event lifetime
Browse files Browse the repository at this point in the history
  • Loading branch information
DArdouin committed Jan 4, 2024
1 parent 4489e68 commit 5650548
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using Azure.Messaging;
using Microsoft.Extensions.Logging;

namespace Workleap.DomainEventPropagation;

internal class CloudEventHandler : ICloudEventHandler
{
private readonly IDomainEventTypeRegistry _domainEventTypeRegistry;
private readonly ILogger<CloudEventHandler> _logger;
private readonly DomainEventHandlerDelegate _pipeline;

public CloudEventHandler(
IDomainEventTypeRegistry domainEventTypeRegistry,
IEnumerable<IDomainEventBehavior> domainEventBehaviors,
ILogger<CloudEventHandler> logger)
{
this._domainEventTypeRegistry = domainEventTypeRegistry;
this._logger = logger;
this._pipeline = domainEventBehaviors.Reverse().Aggregate((DomainEventHandlerDelegate)HandleDomainEventAsync, BuildPipeline);
}

public async Task HandleEventGridWebhookEventAsync(CloudEvent cloudEvent, AcknowledgeEventDelegate acknowledge, ReleaseEventDelegate release, RejectEventDelegate reject, CancellationToken cancellationToken)
{
var domainEventWrapper = new DomainEventWrapper(cloudEvent);

var domainEventType = this._domainEventTypeRegistry.GetDomainEventType(domainEventWrapper.DomainEventName);
if (domainEventType == null)
{
this._logger.EventDomainTypeNotRegistered(domainEventWrapper.DomainEventName, cloudEvent.Subject ?? "Unknown");
return;
}

await this._pipeline(domainEventWrapper, acknowledge, release, reject, cancellationToken).ConfigureAwait(false);
}

private static DomainEventHandlerDelegate BuildPipeline(DomainEventHandlerDelegate next, IDomainEventBehavior pipeline)
{
return (@event, acknowledge, release, reject, cancellationToken) => pipeline.HandleAsync(@event, acknowledge, release, reject, next, cancellationToken);
}

private static Task HandleDomainEventAsync(
DomainEventWrapper domainEventWrapper,
AcknowledgeEventDelegate acknowledge,
ReleaseEventDelegate release,
RejectEventDelegate reject,
CancellationToken cancellationToken)
{
// Todo : Get event handler that matches wrapper type and invoke it
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ namespace Workleap.DomainEventPropagation;

internal class EventPuller : BackgroundService
{
private readonly ICloudEventHandler _cloudEventHandler;
private readonly EventGridReceptionChannel[] _eventGridReceptionChannels;

public EventPuller(
IEnumerable<EventGridClientDescriptor> clientDescriptors,
IAzureClientFactory<EventGridClient> eventGridClientFactory,
ICloudEventHandler cloudEventHandler,
IOptionsMonitor<EventPropagationSubscriptionOptions> optionsMonitor)
{
this._cloudEventHandler = cloudEventHandler;
this._eventGridReceptionChannels = clientDescriptors.Select(descriptor =>
new EventGridReceptionChannel(
optionsMonitor.Get(descriptor.Name).TopicName,
Expand All @@ -23,23 +26,46 @@ public EventPuller(

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
return Task.WhenAll(this._eventGridReceptionChannels.Select(channel => Task.Run(() => StartReceivingEventsAsync(channel, stoppingToken), stoppingToken)));
return Task.WhenAll(this._eventGridReceptionChannels.Select(channel => Task.Run(() => this.StartReceivingEventsAsync(channel, stoppingToken), stoppingToken)));
}

private static async Task StartReceivingEventsAsync(EventGridReceptionChannel channel, CancellationToken stoppingToken)
private async Task StartReceivingEventsAsync(EventGridReceptionChannel channel, CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var result = await channel.Client.ReceiveCloudEventsAsync(channel.Topic, channel.Subscription, cancellationToken: stoppingToken).ConfigureAwait(false);
foreach (var detail in result.Value.Value)
{
var cloudEvent = detail.Event;
var lockToken = detail.BrokerProperties.LockToken;

// TODO : Handle event and pass token + client to handler so that we can acknowledge the event
await this._cloudEventHandler.HandleEventGridWebhookEventAsync(
cloudEvent,
Acknowledge(channel, detail, stoppingToken),
Release(channel, detail, stoppingToken),
Reject(channel, detail, stoppingToken),
stoppingToken)
.ConfigureAwait(false);
}
}
}

private static AcknowledgeEventDelegate Acknowledge(EventGridReceptionChannel channel, ReceiveDetails detail, CancellationToken stoppingToken)
{
var lockToken = detail.BrokerProperties.LockToken;
return () => channel.Client.AcknowledgeCloudEventsAsync(channel.Topic, channel.Subscription, new AcknowledgeOptions(new[] { lockToken }), stoppingToken);
}

private static ReleaseEventDelegate Release(EventGridReceptionChannel channel, ReceiveDetails detail, CancellationToken stoppingToken)
{
var lockToken = detail.BrokerProperties.LockToken;
return () => channel.Client.ReleaseCloudEventsAsync(channel.Topic, channel.Subscription, new ReleaseOptions(new[] { lockToken }), cancellationToken: stoppingToken);
}

private static RejectEventDelegate Reject(EventGridReceptionChannel channel, ReceiveDetails detail, CancellationToken stoppingToken)
{
var lockToken = detail.BrokerProperties.LockToken;
return () => channel.Client.RejectCloudEventsAsync(channel.Topic, channel.Subscription, new RejectOptions(new[] { lockToken }), stoppingToken);
}

private record EventGridReceptionChannel(string Topic, string Subscription, EventGridClient Client);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Azure.Messaging;

namespace Workleap.DomainEventPropagation;

internal delegate Task AcknowledgeEventDelegate();

internal delegate Task ReleaseEventDelegate();

internal delegate Task RejectEventDelegate();

internal interface ICloudEventHandler
{
Task HandleEventGridWebhookEventAsync(
CloudEvent cloudEvent,
AcknowledgeEventDelegate acknowledge,
ReleaseEventDelegate release,
RejectEventDelegate reject,
CancellationToken cancellationToken);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace Workleap.DomainEventPropagation;

internal delegate Task DomainEventHandlerDelegate(
DomainEventWrapper domainEventWrapper,
AcknowledgeEventDelegate acknowledge,
ReleaseEventDelegate release,
RejectEventDelegate reject,
CancellationToken cancellationToken);

internal interface IDomainEventBehavior
{
Task HandleAsync(
DomainEventWrapper domainEventWrapper,
AcknowledgeEventDelegate acknowledge,
ReleaseEventDelegate release,
RejectEventDelegate reject,
DomainEventHandlerDelegate next,
CancellationToken cancellationToken);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,35 @@
</PropertyGroup>

<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App"/>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Core" Version="1.36.0"/>
<PackageReference Include="Azure.Messaging.EventGrid" Version="4.22.0-beta.1"/>
<PackageReference Include="Microsoft.Extensions.Azure" Version="1.6.3"/>
<PackageReference Include="Azure.Core" Version="1.36.0" />
<PackageReference Include="Azure.Messaging.EventGrid" Version="4.22.0-beta.1" />
<PackageReference Include="Microsoft.Extensions.Azure" Version="1.6.3" />
<PackageReference Include="Microsoft.CodeAnalysis.PublicApiAnalyzers" Version="3.3.4">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<None Include="..\..\README.md" Link="README.md" Pack="true" PackagePath="\"/>
<None Include="..\..\README.md" Link="README.md" Pack="true" PackagePath="\" />
</ItemGroup>

<ItemGroup>
<!-- Exposes internal symbols to test projects and mocking libraries -->
<InternalsVisibleTo Include="Workleap.DomainEventPropagation.Subscription.PullDelivery.Tests,PublicKey=002400000480000094000000060200000024000052534131000400000100010025301ce547647ab5ac9264ade0f9cdc0252796a257095add4791b0232c1def21bb9e0c87d218713f918565b23394362dbcb058e210c853a24ec33e6925ebedf654a0d65efb3828c855ff21eaaa67aeb9b24b81b8baff582a03df6ab04424c7e53cacbfe84d2765ce840389f900c55824d037d2c5b6b330ac0188a06ef6869dba"/>
<InternalsVisibleTo Include="DynamicProxyGenAssembly2, PublicKey=0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7"/>
<InternalsVisibleTo Include="Workleap.DomainEventPropagation.Subscription.PullDelivery.Tests,PublicKey=002400000480000094000000060200000024000052534131000400000100010025301ce547647ab5ac9264ade0f9cdc0252796a257095add4791b0232c1def21bb9e0c87d218713f918565b23394362dbcb058e210c853a24ec33e6925ebedf654a0d65efb3828c855ff21eaaa67aeb9b24b81b8baff582a03df6ab04424c7e53cacbfe84d2765ce840389f900c55824d037d2c5b6b330ac0188a06ef6869dba" />
<InternalsVisibleTo Include="DynamicProxyGenAssembly2, PublicKey=0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Workleap.DomainEventPropagation.Abstractions\Workleap.DomainEventPropagation.Abstractions.csproj"/>
<ProjectReference Include="..\Workleap.DomainEventPropagation.Abstractions\Workleap.DomainEventPropagation.Abstractions.csproj" />
</ItemGroup>

<ItemGroup>
<Compile Include="..\Shared\DomainEventWrapper.cs" Link="Shared\DomainEventWrapper.cs" />
</ItemGroup>

</Project>

0 comments on commit 5650548

Please sign in to comment.