Skip to content

Commit

Permalink
Fix InMemoryMessageBus to await all pending messages (#130)
Browse files Browse the repository at this point in the history
Co-authored-by: Florian Markus <[email protected]>
  • Loading branch information
flmarkus and Florian Markus authored Sep 21, 2021
1 parent d2e6ac5 commit 14224bb
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
namespace Backend.Fx.Patterns.EventAggregation.Integration
using System.Collections.Concurrent;

namespace Backend.Fx.Patterns.EventAggregation.Integration
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

public class InMemoryMessageBusChannel
{
private readonly List<Task> _messageHandlingTasks = new List<Task>();
private readonly ConcurrentBag<Task> _messageHandlingTasks = new ConcurrentBag<Task>();

internal event EventHandler<MessageReceivedEventArgs> MessageReceived;

Expand All @@ -18,8 +19,10 @@ internal void Publish(IIntegrationEvent integrationEvent)

public async Task FinishHandlingAllMessagesAsync()
{
await Task.WhenAll(_messageHandlingTasks);
_messageHandlingTasks.Clear();
while (_messageHandlingTasks.TryTake(out var messageHandlingTask))
{
await messageHandlingTask;
}
}

internal class MessageReceivedEventArgs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,39 @@ public async Task InvokesAllApplicationHandlers()
Assert.True(eventHandled);
Assert.True(anotherEventHandled);
}

[Fact]
public async Task DoesAwaitAllPendingMessages()
{
var channel = new InMemoryMessageBusChannel();
var messageBus = new InMemoryMessageBus(channel);
messageBus.Connect();
messageBus.ProvideInvoker(new TheMessageBus.TestInvoker());

var allMessagesAreHandled = false;

messageBus.Subscribe(new DelegateIntegrationMessageHandler<TestIntegrationEvent>(x =>
{
if (x.StringParam == "first message")
{
messageBus.Publish(new TestIntegrationEvent(0, "second message"));
}
else if (x.StringParam == "second message")
{
messageBus.Publish(new TestIntegrationEvent(0, "third message"));
}
else if (x.StringParam == "third message")
{
allMessagesAreHandled = true;
}
}));

// Publish the first message and await the result.
// This should block until all three messages are processed not only the first one was.
await messageBus.Publish(new TestIntegrationEvent(0, "first message"));
await channel.FinishHandlingAllMessagesAsync();

Assert.True(allMessagesAreHandled);
}
}
}

0 comments on commit 14224bb

Please sign in to comment.