Skip to content

Commit

Permalink
Added support to push CloudEvents to both Custom and Namespace topics (
Browse files Browse the repository at this point in the history
…#75)

- Changed topic name from 'Default' to 'Custom'
- Added named EventGridClient to registered Azure client
- Updated EventPropagationClient
  - Can now push a CloudEvents to a CustomTopic
  - Support new kind of topic : Namespace topics where it can push CloudEvents (only)
  • Loading branch information
DArdouin authored Dec 27, 2023
1 parent da653e6 commit 7aab8f2
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ public class DomainEventWrapperTests
private readonly CloudEvent _cloudEvent = new(
"source",
"eventType",
new BinaryData(new CloudEventSampleDomainEvent()),
nameof(CloudEventSampleDomainEvent));
new CloudEventSampleDomainEvent());

[Fact]
public void GivenEventGridEvent_WhenSetMetadata_ThenMetadataIsSet()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,47 @@
using Azure.Messaging;
using Azure.Messaging.EventGrid;
using Azure.Messaging.EventGrid.Namespaces;
using FakeItEasy;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Options;

namespace Workleap.DomainEventPropagation.Publishing.Tests;

public class EventPropagationClientTests
public abstract class EventPropagationClientTests
{
private const string DomainEventName = "publish-test";
internal const string TopicEndpoint = "http://topicEndpoint.io";
internal const string CloudEventName = "cloud-event";
internal const string EventGridEventName = "eventgrid-event";

private readonly IOptions<EventPropagationPublisherOptions> _publisherOptions = A.Fake<IOptions<EventPropagationPublisherOptions>>();
private readonly EventGridPublisherClient _eventGridPublisherClient = A.Fake<EventGridPublisherClient>();
private readonly IAzureClientFactory<EventGridPublisherClient> _eventGridPublisherClientFactory = A.Fake<IAzureClientFactory<EventGridPublisherClient>>();
internal readonly EventGridPublisherClient EventGridPublisherClient = A.Fake<EventGridPublisherClient>();
internal readonly IAzureClientFactory<EventGridPublisherClient> EventGridPublisherClientFactory = A.Fake<IAzureClientFactory<EventGridPublisherClient>>();
internal readonly EventGridClient EventGridClient = A.Fake<EventGridClient>();
internal readonly IAzureClientFactory<EventGridClient> EventGridClientFactory = A.Fake<IAzureClientFactory<EventGridClient>>();
internal readonly EventPropagationClient EventPropagationClient;

private readonly EventPropagationClient _eventPropagationClient;
private readonly IOptions<EventPropagationPublisherOptions> _publisherOptions;

public EventPropagationClientTests()
protected EventPropagationClientTests(IOptions<EventPropagationPublisherOptions> publisherOptions)
{
A.CallTo(() => this._eventGridPublisherClientFactory.CreateClient(EventPropagationPublisherOptions.ClientName)).Returns(this._eventGridPublisherClient);
this._publisherOptions = publisherOptions;
A.CallTo(() => this.EventGridPublisherClientFactory.CreateClient(EventPropagationPublisherOptions.CustomTopicClientName)).Returns(this.EventGridPublisherClient);
A.CallTo(() => this.EventGridClientFactory.CreateClient(EventPropagationPublisherOptions.NamespaceTopicClientName)).Returns(this.EventGridClient);

this._eventPropagationClient = new EventPropagationClient(
this._eventGridPublisherClientFactory,
this._publisherOptions,
this.EventPropagationClient = new EventPropagationClient(
this.EventGridPublisherClientFactory,
this.EventGridClientFactory,
publisherOptions,
Array.Empty<IPublishingDomainEventBehavior>());
}

[Fact]
public async Task GivenNullDomainEvent_WhenPublishDomainEvent_ThenArgumentNullException()
{
// Given
var domainEvent = (IEnumerable<PublishTestDomainEvent>)null!;
var domainEvent = (IEnumerable<TestEventGridEvent>)null!;

// When
var exception = await Assert.ThrowsAsync<ArgumentNullException>(async () => await this._eventPropagationClient.PublishDomainEventsAsync(domainEvent, CancellationToken.None));
var exception = await Assert.ThrowsAsync<ArgumentNullException>(async () => await this.EventPropagationClient.PublishDomainEventsAsync(domainEvent, CancellationToken.None));

// Then
Assert.Equal("domainEvents", exception.ParamName);
Expand All @@ -42,25 +51,46 @@ public async Task GivenNullDomainEvent_WhenPublishDomainEvent_ThenArgumentNullEx
public async Task GivenEmptyDomainEventsCollections_WhenPublishDomainEvents_ThenNothing()
{
// Given
var events = Array.Empty<PublishTestDomainEvent>();
var events = Array.Empty<TestEventGridEvent>();

// When
await this._eventPropagationClient.PublishDomainEventsAsync(events, CancellationToken.None);
await this.EventPropagationClient.PublishDomainEventsAsync(events, CancellationToken.None);

// Then
A.CallTo(() => this._eventGridPublisherClient.SendEventsAsync(A<IEnumerable<EventGridEvent>>._, A<CancellationToken>._))
A.CallTo(() => this.EventGridPublisherClient.SendEventsAsync(A<IEnumerable<EventGridEvent>>._, A<CancellationToken>._))
.MustNotHaveHappened();
}

[Fact]
public async Task GivenTracingPipeline_WhenPublishDomainEvent_ThenPipelineHandle()
public async Task GivenAdditionalBehavior_WhenPublishEventGridEvent_ThenPipelineHandle()
{
// Given
var domainEvent = new TestEventGridEvent() { Text = "Hello world", Number = 1 };
var publisherBehavior = A.Fake<IPublishingDomainEventBehavior>();

var propagationClient = new EventPropagationClient(
this.EventGridPublisherClientFactory,
this.EventGridClientFactory,
this._publisherOptions,
new[] { publisherBehavior });

// When
await propagationClient.PublishDomainEventAsync(domainEvent, CancellationToken.None);

// Then
A.CallTo(() => publisherBehavior.HandleAsync(A<DomainEventWrapperCollection>._, A<DomainEventsHandlerDelegate>._, A<CancellationToken>._)).MustHaveHappened();
}

[Fact]
public async Task GivenAdditionalBehavior_WhenPublishCloudEvent_ThenPipelineHandle()
{
// Given
var domainEvent = new PublishTestDomainEvent();
var domainEvent = new TestCloudEvent() { Text = "Hello world", Number = 2 };
var publisherBehavior = A.Fake<IPublishingDomainEventBehavior>();

var propagationClient = new EventPropagationClient(
this._eventGridPublisherClientFactory,
this.EventGridPublisherClientFactory,
this.EventGridClientFactory,
this._publisherOptions,
new[] { publisherBehavior });

Expand All @@ -71,41 +101,173 @@ public async Task GivenTracingPipeline_WhenPublishDomainEvent_ThenPipelineHandle
A.CallTo(() => publisherBehavior.HandleAsync(A<DomainEventWrapperCollection>._, A<DomainEventsHandlerDelegate>._, A<CancellationToken>._)).MustHaveHappened();
}

protected static bool IsSingleEventGridEvent(IEnumerable<EventGridEvent> events)
{
return events.Single() is
{
Subject: EventGridEventName,
EventType: EventGridEventName,
DataVersion: "1.0",
};
}

protected static bool IsSingleCloudEvent(IEnumerable<CloudEvent> events)
{
return events.Single() is
{
Type: CloudEventName,
Source: TopicEndpoint,
};
}

[DomainEvent(EventGridEventName, EventSchema.EventGridEvent)]
protected sealed class TestEventGridEvent : IDomainEvent
{
public string Text { get; set; } = string.Empty;

public int Number { get; set; }
}

[DomainEvent(CloudEventName, EventSchema.CloudEvent)]
protected sealed class TestCloudEvent : IDomainEvent
{
public string Text { get; set; } = string.Empty;

public int Number { get; set; }
}
}

public class EventPropagationClientForCustomTopicTests : EventPropagationClientTests
{
private static readonly IOptions<EventPropagationPublisherOptions> PublisherOptions = Options.Create(new EventPropagationPublisherOptions()
{
TopicType = TopicType.Custom,
TopicEndpoint = TopicEndpoint,
TopicAccessKey = "topicAccessKey",
});

public EventPropagationClientForCustomTopicTests() : base(PublisherOptions)
{
}

[Fact]
public async Task GivenDomainEvent_WhenErrorDuringPublishDomainEvent_ThenThrowsException()
public async Task GivenEventGridEvent_WhenPublishEvent_ThenCallsPropagationClient()
{
// Given
var domainEvent = new PublishTestDomainEvent();
A.CallTo(() => this._eventGridPublisherClient.SendEventsAsync(A<IEnumerable<EventGridEvent>>._, A<CancellationToken>._)).Throws<Exception>();
var domainEvent = new TestEventGridEvent() { Text = "Hello world", Number = 1 };

// When
var exception = await Assert.ThrowsAsync<EventPropagationPublishingException>(async () => await this._eventPropagationClient.PublishDomainEventAsync(domainEvent, CancellationToken.None));
await this.EventPropagationClient.PublishDomainEventAsync(domainEvent, CancellationToken.None);

// Then
Assert.Contains(DomainEventName, exception.Message);
A.CallTo(() => this.EventGridPublisherClient.SendEventsAsync(
A<IEnumerable<EventGridEvent>>.That.Matches(events => IsSingleEventGridEvent(events)),
A<CancellationToken>._))
.MustHaveHappened();
}

[Fact]
public async Task GivenDomainEvent_WhenErrorDuringPublishDomainEvent_Then()
public async Task GivenCloudEvent_WhenPublishEvent_ThenCallsPropagationClient()
{
// Given
var domainEvent = new PublishTestDomainEvent();
var domainEvent = new TestCloudEvent() { Text = "Hello world", Number = 1 };

// When
await this._eventPropagationClient.PublishDomainEventAsync(domainEvent, CancellationToken.None);
await this.EventPropagationClient.PublishDomainEventAsync(domainEvent, CancellationToken.None);

// Then
A.CallTo(() => this._eventGridPublisherClient.SendEventsAsync(
A<IEnumerable<EventGridEvent>>.That.Matches(evt => evt.Count() == 1 && evt.First().EventType == DomainEventName),
A.CallTo(() => this.EventGridPublisherClient.SendEventsAsync(
A<IEnumerable<CloudEvent>>.That.Matches(events => IsSingleCloudEvent(events)),
A<CancellationToken>._))
.MustHaveHappened();
}

[DomainEvent(DomainEventName)]
private sealed class PublishTestDomainEvent : IDomainEvent
[Fact]
public async Task GivenEventGridEvent_WhenErrorDuringPublishEvent_ThenThrowsException()
{
public string Text { get; set; } = string.Empty;
// Given
var domainEvent = new TestEventGridEvent() { Text = "Hello world", Number = 1 };
A.CallTo(() => this.EventGridPublisherClient.SendEventsAsync(A<IEnumerable<EventGridEvent>>._, A<CancellationToken>._)).Throws<Exception>();

public int Number { get; set; }
// When
var exception = await Assert.ThrowsAsync<EventPropagationPublishingException>(async () => await this.EventPropagationClient.PublishDomainEventAsync(domainEvent, CancellationToken.None));

// Then
Assert.Contains(EventGridEventName, exception.Message);
}

[Fact]
public async Task GivenCloudEvent_WhenErrorDuringPublishEvent_ThenThrowsException()
{
// Given
var domainEvent = new TestCloudEvent() { Text = "Hello world", Number = 2 };
A.CallTo(() => this.EventGridPublisherClient.SendEventsAsync(A<IEnumerable<CloudEvent>>._, A<CancellationToken>._)).Throws<Exception>();

// When
var exception = await Assert.ThrowsAsync<EventPropagationPublishingException>(async () => await this.EventPropagationClient.PublishDomainEventAsync(domainEvent, CancellationToken.None));

// Then
Assert.Contains(CloudEventName, exception.Message);
}
}

public class EventPropagationClientForNamespaceTopicTests : EventPropagationClientTests
{
private const string TopicName = "topicName";

private static readonly IOptions<EventPropagationPublisherOptions> PublisherOptions = Options.Create(new EventPropagationPublisherOptions()
{
TopicType = TopicType.Namespace,
TopicName = TopicName,
TopicEndpoint = TopicEndpoint,
TopicAccessKey = "topicAccessKey",
});

public EventPropagationClientForNamespaceTopicTests() : base(PublisherOptions)
{
}

[Fact]
public async Task GivenEventGridEvent_WhenPublishDomainEvent_ThenThrowsException()
{
// Given
var domainEvent = new TestEventGridEvent() { Text = "Hello world", Number = 1 };

// When
var exception = await Assert.ThrowsAsync<EventPropagationPublishingException>(async () => await this.EventPropagationClient.PublishDomainEventAsync(domainEvent, CancellationToken.None));

// Then
Assert.Contains(EventGridEventName, exception.Message);
}

[Fact]
public async Task GivenCloudEvent_WhenPublishEvent_ThenCallsPropagationClient()
{
// Given
var domainEvent = new TestCloudEvent() { Text = "Hello world", Number = 1 };

// When
await this.EventPropagationClient.PublishDomainEventAsync(domainEvent, CancellationToken.None);

// Then
A.CallTo(() => this.EventGridClient.PublishCloudEventsAsync(
TopicName,
A<IEnumerable<CloudEvent>>.That.Matches(events => IsSingleCloudEvent(events)),
A<CancellationToken>._))
.MustHaveHappened();
}

[Fact]
public async Task GivenCloudEvent_WhenErrorDuringPublishEvent_ThenThrowsException()
{
// Given
var domainEvent = new TestCloudEvent() { Text = "Hello world", Number = 2 };
A.CallTo(() => this.EventGridClient.PublishCloudEventsAsync(PublisherOptions.Value.TopicName, A<IEnumerable<CloudEvent>>._, A<CancellationToken>._)).Throws<Exception>();

// When
var exception = await Assert.ThrowsAsync<EventPropagationPublishingException>(async () => await this.EventPropagationClient.PublishDomainEventAsync(domainEvent, CancellationToken.None));

// Then
Assert.Contains(CloudEventName, exception.Message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ namespace Workleap.DomainEventPropagation.Publishing.Tests;
public class EventPropagationPublisherOptionsValidatorTests
{
[Theory]
[InlineData(TopicType.Default, " ", "http://topicurl.com", "topicName", true, true)]
[InlineData(TopicType.Default, " ", "http://topicurl.com", "topicName", false, false)]
[InlineData(TopicType.Default, "accessKey", "http://topicurl.com", "topicName", true, true)]
[InlineData(TopicType.Default, "accessKey", "http://topicurl.com", "topicName", false, true)]
[InlineData(TopicType.Default, null, null, "topicName", false, false)]
[InlineData(TopicType.Default, null, "http://topicurl.com", "topicName", false, false)]
[InlineData(TopicType.Default, "accessKey", " ", "topicName", false, false)]
[InlineData(TopicType.Default, "accessKey", null, "topicName", false, false)]
[InlineData(TopicType.Default, "accessKey", "topicEndpoint", "topicName", false, false)]
[InlineData(TopicType.Default, "accessKey", "http://topicurl.com", null, true, true)]
[InlineData(TopicType.Default, "accessKey", "http://topicurl.com", " ", true, true)]
[InlineData(TopicType.Custom, " ", "http://topicurl.com", "topicName", true, true)]
[InlineData(TopicType.Custom, " ", "http://topicurl.com", "topicName", false, false)]
[InlineData(TopicType.Custom, "accessKey", "http://topicurl.com", "topicName", true, true)]
[InlineData(TopicType.Custom, "accessKey", "http://topicurl.com", "topicName", false, true)]
[InlineData(TopicType.Custom, null, null, "topicName", false, false)]
[InlineData(TopicType.Custom, null, "http://topicurl.com", "topicName", false, false)]
[InlineData(TopicType.Custom, "accessKey", " ", "topicName", false, false)]
[InlineData(TopicType.Custom, "accessKey", null, "topicName", false, false)]
[InlineData(TopicType.Custom, "accessKey", "topicEndpoint", "topicName", false, false)]
[InlineData(TopicType.Custom, "accessKey", "http://topicurl.com", null, true, true)]
[InlineData(TopicType.Custom, "accessKey", "http://topicurl.com", " ", true, true)]
[InlineData(TopicType.Namespace, "accessKey", "http://topicurl.com", "topicName", true, true)]
[InlineData(TopicType.Namespace, "accessKey", "http://topicurl.com", "", true, false)]
[InlineData(TopicType.Namespace, "accessKey", "http://topicurl.com", null, true, false)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void GivenConfigWithAccessKey_WhenAddEventPropagationPublisher_ThenEventG
services.AddEventPropagationPublisher();
var serviceProvider = services.BuildServiceProvider();
var clientFactory = serviceProvider.GetRequiredService<IAzureClientFactory<EventGridPublisherClient>>();
var client = clientFactory.CreateClient(EventPropagationPublisherOptions.ClientName);
var client = clientFactory.CreateClient(EventPropagationPublisherOptions.CustomTopicClientName);

// Then
Assert.NotNull(client);
Expand All @@ -104,7 +104,7 @@ public void GivenConfigWithTokenCredentials_WhenAddEventPropagationPublisher_The
});
var serviceProvider = services.BuildServiceProvider();
var clientFactory = serviceProvider.GetRequiredService<IAzureClientFactory<EventGridPublisherClient>>();
var client = clientFactory.CreateClient(EventPropagationPublisherOptions.ClientName);
var client = clientFactory.CreateClient(EventPropagationPublisherOptions.CustomTopicClientName);

// Then
Assert.NotNull(client);
Expand Down
Loading

0 comments on commit 7aab8f2

Please sign in to comment.