Skip to content

Commit

Permalink
feat:init event
Browse files Browse the repository at this point in the history
  • Loading branch information
duiapro committed Apr 28, 2024
1 parent 3fb1fea commit 449e060
Show file tree
Hide file tree
Showing 62 changed files with 895 additions and 7 deletions.
11 changes: 11 additions & 0 deletions Masa.Framework.sln
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Masa.Utils.DynamicsCrm.Core
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Masa.Utils.DynamicsCrm.EntityFrameworkCore", "src\Utils\DynamicsCrm\Masa.Utils.DynamicsCrm.EntityFrameworkCore\Masa.Utils.DynamicsCrm.EntityFrameworkCore.csproj", "{8A51A2A9-FBF4-40DC-AD89-AD3B9D3A50DC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Masa.Framework.EventApiTest", "test\Masa.Framework.EventApiTest\Masa.Framework.EventApiTest.csproj", "{345F5F33-8788-43F3-9005-B3EEBC007C72}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -2185,6 +2187,14 @@ Global
{8A51A2A9-FBF4-40DC-AD89-AD3B9D3A50DC}.Release|Any CPU.Build.0 = Release|Any CPU
{8A51A2A9-FBF4-40DC-AD89-AD3B9D3A50DC}.Release|x64.ActiveCfg = Release|Any CPU
{8A51A2A9-FBF4-40DC-AD89-AD3B9D3A50DC}.Release|x64.Build.0 = Release|Any CPU
{345F5F33-8788-43F3-9005-B3EEBC007C72}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{345F5F33-8788-43F3-9005-B3EEBC007C72}.Debug|Any CPU.Build.0 = Debug|Any CPU
{345F5F33-8788-43F3-9005-B3EEBC007C72}.Debug|x64.ActiveCfg = Debug|Any CPU
{345F5F33-8788-43F3-9005-B3EEBC007C72}.Debug|x64.Build.0 = Debug|Any CPU
{345F5F33-8788-43F3-9005-B3EEBC007C72}.Release|Any CPU.ActiveCfg = Release|Any CPU
{345F5F33-8788-43F3-9005-B3EEBC007C72}.Release|Any CPU.Build.0 = Release|Any CPU
{345F5F33-8788-43F3-9005-B3EEBC007C72}.Release|x64.ActiveCfg = Release|Any CPU
{345F5F33-8788-43F3-9005-B3EEBC007C72}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -2493,6 +2503,7 @@ Global
{64B54122-44F1-4379-9422-953EF706A3A6} = {5944A182-13B8-4DA6-AEE2-0A01E64A9648}
{83310F46-E1C7-4438-B32A-9F6F7EA13FCF} = {64B54122-44F1-4379-9422-953EF706A3A6}
{8A51A2A9-FBF4-40DC-AD89-AD3B9D3A50DC} = {64B54122-44F1-4379-9422-953EF706A3A6}
{345F5F33-8788-43F3-9005-B3EEBC007C72} = {E747043D-81E2-4A89-8B5B-1258ED45F941}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {40383055-CC50-4600-AD9A-53C14F620D03}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,16 @@ Task SaveEventAsync(

Task MarkEventAsPublishedAsync(Guid eventId, CancellationToken cancellationToken = default);

Task BulkMarkEventAsPublishedAsync(IEnumerable<Guid> eventIds, CancellationToken cancellationToken = default);

Task MarkEventAsInProgressAsync(Guid eventId, int minimumRetryInterval, CancellationToken cancellationToken = default);

Task BulkMarkEventAsInProgressAsync(IEnumerable<Guid> eventIds, int minimumRetryInterval, CancellationToken cancellationToken = default);

Task MarkEventAsFailedAsync(Guid eventId, CancellationToken cancellationToken = default);

Task BulkMarkEventAsFailedAsync(IEnumerable<Guid> eventIds, CancellationToken cancellationToken = default);

/// <summary>
/// Delete successfully published and expired data
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,59 @@ public async Task PublishAsync<T>(
@event);
}
}

public async Task BulkPublishAsync<T>(
string topicName, List<(T @event, IntegrationEventExpand? eventMessageExpand)> @events,
CancellationToken stoppingToken = default)
{

_logger?.LogDebug("-----BulkPublishEvent Integration event publishing is in progress from {AppId} with DaprAppId as '{DaprAppId}'", _appId,
_daprAppId);

if (!@events.Any())
return;

MasaArgumentException.ThrowIfNullOrWhiteSpace(_daprAppId);

var masaCloudEvents = new List<MasaCloudEvent<IntegrationEventMessage>>();
var waitEvents = new List<T>();

@events.ForEach(item =>
{
if (item.eventMessageExpand is { Isolation.Count: > 0 })
{
var eventMessage = new IntegrationEventMessage(item.@event, item.eventMessageExpand);
var masaCloudEvent = new MasaCloudEvent<IntegrationEventMessage>(eventMessage)
{
Source = new Uri(_daprAppId, UriKind.RelativeOrAbsolute)
};

masaCloudEvents.Add(masaCloudEvent);
}
else
{
waitEvents.Add(item.@event);
}
});

if (masaCloudEvents.Any())
{
await DaprClient.PublishEventAsync(_pubSubName, topicName, masaCloudEvents, stoppingToken);
_logger?.LogDebug(
"-----BulkPublishEvent Publishing integration event from {AppId} succeeded with DaprAppId is {DaprAppId} and Event is {Event}",
_appId,
_daprAppId,
masaCloudEvents);
}

if (waitEvents.Any())
{
await DaprClient.BulkPublishEventAsync(_pubSubName, topicName, @events.ToList(), cancellationToken: stoppingToken);
_logger?.LogDebug(
"-----BulkPublishEvent Publishing integration event from {AppId} succeeded with DaprAppId is {DaprAppId} and Event is {Event}",
_appId,
_daprAppId,
@events);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,25 @@ public Task MarkEventAsPublishedAsync(Guid eventId, CancellationToken cancellati
}, cancellationToken);
}

public Task BulkMarkEventAsPublishedAsync(IEnumerable<Guid> eventIds, CancellationToken cancellationToken = default)
{
return BulkUpdateEventStatus(eventIds, IntegrationEventStates.Published, eventLogs =>
{
eventLogs.ForEach(eventLog =>
{
if (eventLog.State != IntegrationEventStates.InProgress)
{
_logger?.LogWarning(
"Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}",
IntegrationEventStates.Published, eventLog.State, eventLog.Id);
throw new UserFriendlyException(
$"Failed to modify the state of the local message table to {IntegrationEventStates.Published}, the current State is {eventLog.State}, Id: {eventLog.Id}");
}
});

}, cancellationToken);
}

public Task MarkEventAsInProgressAsync(Guid eventId, int minimumRetryInterval, CancellationToken cancellationToken = default)
{
return UpdateEventStatus(eventId, IntegrationEventStates.InProgress, eventLog =>
Expand All @@ -132,6 +151,35 @@ public Task MarkEventAsInProgressAsync(Guid eventId, int minimumRetryInterval, C
}, cancellationToken);
}

public Task BulkMarkEventAsInProgressAsync(IEnumerable<Guid> eventIds, int minimumRetryInterval, CancellationToken cancellationToken = default)
{
return BulkUpdateEventStatus(eventIds, IntegrationEventStates.InProgress, eventLogs =>
{
eventLogs.ForEach(eventLog =>
{
if (eventLog.State is IntegrationEventStates.InProgress or IntegrationEventStates.PublishedFailed &&
(eventLog.GetCurrentTime() - eventLog.ModificationTime).TotalSeconds < minimumRetryInterval)
{
_logger?.LogInformation(
"Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}, Multitasking execution error, waiting for the next retry",
IntegrationEventStates.InProgress, eventLog.State, eventLog.Id);
throw new UserFriendlyException(
$"Failed to modify the state of the local message table to {IntegrationEventStates.InProgress}, the current State is {eventLog.State}, Id: {eventLog.Id}, Multitasking execution error, waiting for the next retry");
}
if (eventLog.State != IntegrationEventStates.NotPublished &&
eventLog.State != IntegrationEventStates.InProgress &&
eventLog.State != IntegrationEventStates.PublishedFailed)
{
_logger?.LogWarning(
"Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}",
IntegrationEventStates.InProgress, eventLog.State, eventLog.Id);
throw new UserFriendlyException(
$"Failed to modify the state of the local message table to {IntegrationEventStates.InProgress}, the current State is {eventLog.State}, Id: {eventLog.Id}");
}
});
}, cancellationToken);
}

public Task MarkEventAsFailedAsync(Guid eventId, CancellationToken cancellationToken = default)
{
return UpdateEventStatus(eventId, IntegrationEventStates.PublishedFailed, eventLog =>
Expand All @@ -147,6 +195,24 @@ public Task MarkEventAsFailedAsync(Guid eventId, CancellationToken cancellationT
}, cancellationToken);
}

public Task BulkMarkEventAsFailedAsync(IEnumerable<Guid> eventIds, CancellationToken cancellationToken = default)
{
return BulkUpdateEventStatus(eventIds, IntegrationEventStates.PublishedFailed, eventLogs =>
{
eventLogs.ForEach(eventLog =>
{
if (eventLog.State != IntegrationEventStates.InProgress)
{
_logger?.LogWarning(
"Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}",
IntegrationEventStates.PublishedFailed, eventLog.State, eventLog.Id);
throw new UserFriendlyException(
$"Failed to modify the state of the local message table to {IntegrationEventStates.PublishedFailed}, the current State is {eventLog.State}, Id: {eventLog.Id}");
}
});
}, cancellationToken);
}

public async Task DeleteExpiresAsync(DateTime expiresAt, int batchCount, CancellationToken token = default)
{
var eventLogs = _eventLogContext.EventLogs.Where(e => e.ModificationTime < expiresAt && e.State == IntegrationEventStates.Published)
Expand All @@ -164,6 +230,51 @@ public async Task DeleteExpiresAsync(DateTime expiresAt, int batchCount, Cancell
}
}

private async Task BulkUpdateEventStatus(IEnumerable<Guid> eventIds,
IntegrationEventStates status,
Action<List<IntegrationEventLog>>? action = null,
CancellationToken cancellationToken = default)
{
var eventLogEntrys =
await _eventLogContext.EventLogs.Where(e => eventIds.Contains(e.EventId)).ToListAsync();
if (eventLogEntrys == null || !eventLogEntrys.Any())
throw new ArgumentException(
$"The local message record does not exist, please confirm whether the local message record has been deleted or other reasons cause the local message record to not be inserted successfully In EventId: {eventIds}",
nameof(eventIds));

action?.Invoke(eventLogEntrys);

var updateEventLogEntry = new List<IntegrationEventLog>();
foreach (var eventLogEntry in eventLogEntrys)
{
if (eventLogEntry.State == status)
{
continue;
}

eventLogEntry.State = status;
eventLogEntry.ModificationTime = eventLogEntry.GetCurrentTime();

if (status == IntegrationEventStates.InProgress)
eventLogEntry.TimesSent++;

updateEventLogEntry.Add(eventLogEntry);
}

_eventLogContext.EventLogs.UpdateRange(updateEventLogEntry);

try
{
await _eventLogContext.DbContext.SaveChangesAsync(cancellationToken);
}
catch (DbUpdateConcurrencyException ex)
{
throw new UserFriendlyException($"Concurrency conflict, update exception. {ex.Message}");
}

updateEventLogEntry.ForEach(CheckAndDetached);
}

private async Task UpdateEventStatus(Guid eventId,
IntegrationEventStates status,
Action<IntegrationEventLog>? action = null,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) MASA Stack All rights reserved.
// Copyright (c) MASA Stack All rights reserved.
// Licensed under the MIT License. See LICENSE.txt in the project root for license information.

namespace Masa.Contrib.Dispatcher.IntegrationEvents;
Expand All @@ -10,4 +10,8 @@ Task PublishAsync<T>(
T @event,
IntegrationEventExpand? eventMessageExpand,
CancellationToken stoppingToken = default);

Task BulkPublishAsync<T>(
string topicName, List<(T @event, IntegrationEventExpand? eventMessageExpand)> @events,
CancellationToken stoppingToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ public static void SetLogger(IServiceCollection services)
public void AddJobs(IntegrationEventLogItem items)
=> _retryEventLogs.TryAdd(items.EventId, items);

public void BulkAddJobs(List<IntegrationEventLogItem> items)
=> items.ForEach(item => _retryEventLogs.TryAdd(item.EventId, item));

public void RemoveJobs(Guid eventId)
=> _retryEventLogs.TryRemove(eventId, out _);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,68 @@ await eventLogService.RetrieveEventLogsPendingToPublishAsync(
_options.Value.BatchSize,
stoppingToken);

if(!retrieveEventLogs.Any())
if (!retrieveEventLogs.Any())
return;

var publisher = serviceProvider.GetRequiredService<IPublisher>();
var retrieveEventLogsGroupByTopic = retrieveEventLogs.GroupBy(eventLog => eventLog.Topic)
.Select(eventLog => new
{
TopicName = eventLog.Key,
Events = eventLog.Select(log => new { log.Event, log.EventExpand, log.EventId }).ToList(),
}).ToList();

foreach (var eventLog in retrieveEventLogsGroupByTopic)
{
var eventIds = eventLog.Events.Select(item => item.EventId);
var events = eventLog.Events.Select(item => (item.Event, item.EventExpand)).ToList();

try
{
await eventLogService.BulkMarkEventAsInProgressAsync(eventIds, _options.Value.MinimumRetryInterval, stoppingToken);

_logger?.LogDebug("Publishing integration event {Event} to {TopicName}",
eventLog,
eventLog.TopicName);

await publisher.BulkPublishAsync(eventLog.TopicName, events, stoppingToken);

await eventLogService.BulkMarkEventAsPublishedAsync(eventIds, stoppingToken);
}
catch (UserFriendlyException)
{
//Update state due to multitasking contention, no processing required
}
catch (Exception ex)
{
_logger?.LogError(ex,
"Error Publishing integration event: {IntegrationEventId} from {AppId} - ({IntegrationEvent})",
eventIds, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog);
await eventLogService.BulkMarkEventAsFailedAsync(eventIds, stoppingToken);

var integrationEventLogItem = eventLog.Events.Select(item =>
new IntegrationEventLogItem(item.EventId, eventLog.TopicName, item.Event, item.EventExpand)).ToList();

LocalQueueProcessor.Default.BulkAddJobs(integrationEventLogItem);
}
}
}

[Obsolete]
private async Task ExecuteByObsoleteAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken)
{
var unitOfWork = serviceProvider.GetService<IUnitOfWork>();
if (unitOfWork != null)
unitOfWork.UseTransaction = false;

var eventLogService = serviceProvider.GetRequiredService<IIntegrationEventLogService>();

var retrieveEventLogs =
await eventLogService.RetrieveEventLogsPendingToPublishAsync(
_options.Value.BatchSize,
stoppingToken);

if (!retrieveEventLogs.Any())
return;

var publisher = serviceProvider.GetRequiredService<IPublisher>();
Expand Down Expand Up @@ -65,7 +126,8 @@ await eventLogService.RetrieveEventLogsPendingToPublishAsync(
eventLog.EventId, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog);
await eventLogService.MarkEventAsFailedAsync(eventLog.EventId, stoppingToken);

LocalQueueProcessor.Default.AddJobs(new IntegrationEventLogItem(eventLog.EventId, eventLog.Topic, eventLog.Event, eventLog.EventExpand));
LocalQueueProcessor.Default.AddJobs(new IntegrationEventLogItem(eventLog.EventId, eventLog.Topic, eventLog.Event,
eventLog.EventExpand));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) MASA Stack All rights reserved.
// Copyright (c) MASA Stack All rights reserved.
// Licensed under the MIT License. See LICENSE.txt in the project root for license information.

namespace Masa.Contrib.Dispatcher.IntegrationEvents.Tests.Infrastructure;
Expand Down Expand Up @@ -47,4 +47,19 @@ public Task SaveEventAsync(
{
return Task.CompletedTask;
}

public Task BulkMarkEventAsPublishedAsync(IEnumerable<Guid> eventIds, CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}

public Task BulkMarkEventAsInProgressAsync(IEnumerable<Guid> eventIds, int minimumRetryInterval, CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}

public Task BulkMarkEventAsFailedAsync(IEnumerable<Guid> eventIds, CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ public DispatcherOptions(IServiceCollection services, Assembly[] assemblies)
.Where(type => type.IsClass && typeof(IEvent).IsAssignableFrom(type))
.ToList();

allEventTypes.AddRange(GetGenericTypeEventType(assemblies));
allEventTypes.AddRange(GetGenericEventType(assemblies));

UnitOfWorkRelation = allEventTypes.ToDictionary(type => type, IsSupportUnitOfWork);
}

private List<Type> GetGenericTypeEventType(Assembly[] assemblies)
private List<Type> GetGenericEventType(Assembly[] assemblies)
{
var methods = assemblies
.SelectMany(assembly => assembly.GetTypes().SelectMany(method => method.GetMethods()))
Expand Down
Loading

0 comments on commit 449e060

Please sign in to comment.