Skip to content

Commit

Permalink
fix(Dispatcher.Event): Fixed the possibility of obtaining the wrong H…
Browse files Browse the repository at this point in the history
…andler object in high concurrency scenarios, and prompting A second operation started on this context before a previous operation completed when using DbContext (#221)

* fix(Dispatcher.Events): Fix A second operation started on this context before a previous operation completed in high concurrency scenarios

* test: Complementary concurrent unit tests
  • Loading branch information
zhenlei520 authored Aug 26, 2022
1 parent 2488df5 commit dd29c77
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,6 @@ public int Order

internal TaskInvokeDelegate? InvokeDelegate { get; private set; }

private object Instance { get; set; } = default!;

private object? EventHandler { get; set; }

internal bool IsEventHandler => FailureLevels == FailureLevels.Throw || FailureLevels == FailureLevels.ThrowAndCancel;

internal void BuildExpression()
Expand All @@ -127,8 +123,7 @@ internal async Task ExcuteAction<TEvent>(IServiceProvider serviceProvider, TEven
{
if (InvokeDelegate != null)
{
Instance = serviceProvider.GetRequiredService(InstanceType);
await InvokeDelegate.Invoke(Instance, @event);
await InvokeDelegate.Invoke(serviceProvider.GetRequiredService(InstanceType), @event);
}
else
{
Expand All @@ -140,23 +135,15 @@ private async Task ExcuteSagaAction<TEvent>(IServiceProvider serviceProvider, TE
{
if (!IsCancel)
{
if (EventHandler == null)
{
var handlers = serviceProvider.GetServices<IEventHandler<TEvent>>();
var handler = handlers.FirstOrDefault(x => x.GetType() == InstanceType)!;
EventHandler = handler;
}
await ((IEventHandler<TEvent>)EventHandler).HandleAsync(@event);
var handlers = serviceProvider.GetServices<IEventHandler<TEvent>>();
var handler = handlers.FirstOrDefault(x => x.GetType() == InstanceType)!;
await handler.HandleAsync(@event);
}
else
{
if (EventHandler == null)
{
var handlers = serviceProvider.GetServices<ISagaEventHandler<TEvent>>();
var handler = handlers.FirstOrDefault(x => x.GetType() == InstanceType)!;
EventHandler = handler;
}
await ((ISagaEventHandler<TEvent>)EventHandler).CancelAsync(@event);
var handlers = serviceProvider.GetServices<ISagaEventHandler<TEvent>>();
var handler = handlers.FirstOrDefault(x => x.GetType() == InstanceType)!;
await handler.CancelAsync(@event);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private async Task ExecuteEventHandlerAsync<TEvent>(IServiceProvider serviceProv

strategyOptions.SetStrategy(dispatchHandler);

await executionStrategy.ExecuteAsync(strategyOptions, @event, async (@event) =>
await executionStrategy.ExecuteAsync(strategyOptions, @event, async @event =>
{
Logger?.LogDebug("----- Publishing event {@Event}: message id: {messageId} -----", @event, @event.GetEventId());
await dispatchHandler.ExcuteAction(serviceProvider, @event);
Expand Down
31 changes: 31 additions & 0 deletions test/Masa.Framework.IntegrationTests.EventBus/TestDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,35 @@ await integrationEventBus.PublishAsync(new AddGoodsIntegrationEvent()
Price = 9.9m,
});
}

[TestMethod]
public async Task TestEventBusOnConcurrencyAsync()
{
var serviceProvider = ServiceProvider;
var @event = new RegisterUserEvent()
{
Name = Guid.NewGuid().ToString(),
Age = 18
};
var tasks = new ConcurrentBag<Task>();

var testCount = 1000L;
Parallel.For(1L, testCount + 1, i =>
{
tasks.Add(AddUserAsync(serviceProvider, @event));
});

await Task.WhenAll(tasks);

var customizeDbContext = serviceProvider.CreateScope().ServiceProvider.GetRequiredService<CustomDbContext>();
var count = customizeDbContext.Set<User>().Count();
Assert.IsTrue(count == testCount);
}

private async Task AddUserAsync(IServiceProvider serviceProvider, RegisterUserEvent @event)
{
await using var scope = serviceProvider.CreateAsyncScope();
var eventBus = scope.ServiceProvider.GetRequiredService<IEventBus>();
await eventBus.PublishAsync(@event);
}
}
1 change: 1 addition & 0 deletions test/Masa.Framework.IntegrationTests.EventBus/_Imports.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@
global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.DependencyInjection.Extensions;
global using Microsoft.VisualStudio.TestTools.UnitTesting;
global using System.Collections.Concurrent;

0 comments on commit dd29c77

Please sign in to comment.