Skip to content

Commit

Permalink
Fix various issues in transactional outbox
Browse files Browse the repository at this point in the history
- correctly handle scoped services in background services
- abstract file handling of content in outbox services
- merge Id and Context in Outbox model
- Consistently use strings for outbox message method identifiers
- split up tests into true unit tests
- fix properties in outbox models
- fix lifetime of new services
  • Loading branch information
ddaspit committed Jun 28, 2024
1 parent 6a8371d commit 6c9e1ea
Show file tree
Hide file tree
Showing 36 changed files with 765 additions and 606 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ public static IMachineBuilder AddMemoryDataAccess(this IMachineBuilder builder)
o.AddRepository<TranslationEngine>();
o.AddRepository<RWLock>();
o.AddRepository<TrainSegmentPair>();
o.AddRepository<OutboxMessage>();
o.AddRepository<Outbox>();
});

return builder;
Expand Down Expand Up @@ -279,7 +281,10 @@ await c.Indexes.CreateOrUpdateAsync(
)
);
o.AddRepository<OutboxMessage>("outbox_messages");
o.AddRepository<Outbox>("outboxes");
o.AddRepository<Outbox>(
"outboxes",
mapSetup: m => m.MapIdProperty(o => o.Id).SetSerializer(new StringSerializer())
);
}
);
builder.Services.AddHealthChecks().AddMongoDb(connectionString!, name: "Mongo");
Expand All @@ -298,9 +303,9 @@ public static IMachineBuilder AddServalPlatformService(

builder.Services.AddScoped<IPlatformService, ServalPlatformService>();

builder.Services.AddSingleton<IOutboxMessageHandler, ServalPlatformOutboxHandler>();
builder.Services.AddSingleton<IOutboxMessageHandler, ServalPlatformOutboxMessageHandler>();

builder.Services.AddSingleton<IMessageOutboxService, MessageOutboxService>();
builder.Services.AddScoped<IMessageOutboxService, MessageOutboxService>();

builder
.Services.AddGrpcClient<TranslationPlatformApi.TranslationPlatformApiClient>(o =>
Expand Down Expand Up @@ -357,8 +362,6 @@ public static IMachineBuilder AddServalTranslationEngineService(
});
builder.AddServalPlatformService(connectionString);

builder.Services.AddHostedService<MessageOutboxDeliveryService>();

engineTypes ??=
builder.Configuration?.GetSection("TranslationEngines").Get<TranslationEngineType[]?>()
?? [TranslationEngineType.SmtTransfer, TranslationEngineType.Nmt];
Expand Down Expand Up @@ -422,4 +425,10 @@ public static IMachineBuilder AddModelCleanupService(this IMachineBuilder builde
builder.Services.AddHostedService<ModelCleanupService>();
return builder;
}

public static IMachineBuilder AddMessageOutboxDeliveryService(this IMachineBuilder builder)
{
builder.Services.AddHostedService<MessageOutboxDeliveryService>();
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public static IMachineBuilder AddMachine(this IServiceCollection services, IConf
services.AddHealthChecks().AddCheck<S3HealthCheck>("S3 Bucket");

services.AddSingleton<ILanguageTagService, LanguageTagService>();
services.AddTransient<IFileSystem, FileSystem>();

services.AddScoped<IDistributedReaderWriterLockFactory, DistributedReaderWriterLockFactory>();
services.AddSingleton<ICorpusService, CorpusService>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ public class MessageOutboxOptions
{
public const string Key = "MessageOutbox";

public int MessageExpirationInHours { get; set; } = 48;
public string DataDir { get; set; } = "outbox";
public TimeSpan MessageExpirationTimeout { get; set; } = TimeSpan.FromHours(48);
}
20 changes: 1 addition & 19 deletions src/SIL.Machine.AspNetCore/Models/Outbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,5 @@ public record Outbox : IEntity

public int Revision { get; set; }

public required string Name { get; init; } = null!;
public required int CurrentIndex { get; set; }

public static async Task<Outbox> GetOutboxNextIndexAsync(
IRepository<Outbox> indexRepository,
string outboxName,
CancellationToken cancellationToken
)
{
Outbox outbox = (
await indexRepository.UpdateAsync(
i => i.Name == outboxName,
i => i.Inc(b => b.CurrentIndex, 1),
upsert: true,
cancellationToken: cancellationToken
)
)!;
return outbox;
}
public int CurrentIndex { get; init; }
}
15 changes: 8 additions & 7 deletions src/SIL.Machine.AspNetCore/Models/OutboxMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ public record OutboxMessage : IEntity
{
public string Id { get; set; } = "";
public int Revision { get; set; } = 1;
public required int Index { get; set; }
public required string OutboxName { get; set; }
public required string Method { get; set; }
public required string GroupId { get; set; }
public required string? RequestContent { get; set; }
public DateTimeOffset Created { get; set; } = DateTimeOffset.UtcNow;
public int Attempts { get; set; } = 0;
public required int Index { get; init; }
public required string OutboxRef { get; init; }
public required string Method { get; init; }
public required string GroupId { get; init; }
public string? Content { get; init; }
public required bool HasContentStream { get; init; }
public DateTimeOffset Created { get; init; } = DateTimeOffset.UtcNow;
public int Attempts { get; init; }
}
16 changes: 11 additions & 5 deletions src/SIL.Machine.AspNetCore/Services/ClearMLMonitorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ public class ClearMLMonitorService(
IServiceProvider services,
IClearMLService clearMLService,
ISharedFileService sharedFileService,
IDataAccessContext dataAccessContext,
IOptionsMonitor<ClearMLOptions> clearMLOptions,
IOptionsMonitor<BuildJobOptions> buildJobOptions,
ILogger<ClearMLMonitorService> logger
Expand All @@ -24,7 +23,6 @@ ILogger<ClearMLMonitorService> logger

private readonly IClearMLService _clearMLService = clearMLService;
private readonly ISharedFileService _sharedFileService = sharedFileService;
private readonly IDataAccessContext _dataAccessContext = dataAccessContext;
private readonly ILogger<IClearMLQueueService> _logger = logger;
private readonly Dictionary<string, ProgressStatus> _curBuildStatus = new();

Expand Down Expand Up @@ -85,6 +83,7 @@ await _clearMLService.GetTasksForQueueAsync(_queuePerEngineType[engineType], can
_queueSizePerEngineType[engineType] = queuePositionsPerEngineType.Count;
}

var dataAccessContext = scope.ServiceProvider.GetRequiredService<IDataAccessContext>();
var platformService = scope.ServiceProvider.GetRequiredService<IPlatformService>();
var lockFactory = scope.ServiceProvider.GetRequiredService<IDistributedReaderWriterLockFactory>();
foreach (TranslationEngine engine in trainingEngines)
Expand Down Expand Up @@ -119,6 +118,7 @@ or ClearMLTaskStatus.Completed
)
{
bool canceled = !await TrainJobStartedAsync(
dataAccessContext,
lockFactory,
buildJobService,
platformService,
Expand Down Expand Up @@ -171,6 +171,7 @@ await UpdateTrainJobStatus(
if (canceling)
{
await TrainJobCanceledAsync(
dataAccessContext,
lockFactory,
buildJobService,
platformService,
Expand All @@ -185,6 +186,7 @@ await TrainJobCanceledAsync(
case ClearMLTaskStatus.Stopped:
{
await TrainJobCanceledAsync(
dataAccessContext,
lockFactory,
buildJobService,
platformService,
Expand All @@ -198,6 +200,7 @@ await TrainJobCanceledAsync(
case ClearMLTaskStatus.Failed:
{
await TrainJobFaultedAsync(
dataAccessContext,
lockFactory,
buildJobService,
platformService,
Expand All @@ -219,6 +222,7 @@ await TrainJobFaultedAsync(
}

private async Task<bool> TrainJobStartedAsync(
IDataAccessContext dataAccessContext,
IDistributedReaderWriterLockFactory lockFactory,
IBuildJobService buildJobService,
IPlatformService platformService,
Expand All @@ -231,7 +235,7 @@ private async Task<bool> TrainJobStartedAsync(
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
success = await _dataAccessContext.WithTransactionAsync(
success = await dataAccessContext.WithTransactionAsync(
async (ct) =>
{
if (!await buildJobService.BuildJobStartedAsync(engineId, buildId, ct))
Expand Down Expand Up @@ -281,6 +285,7 @@ CancellationToken cancellationToken
}

private async Task TrainJobFaultedAsync(
IDataAccessContext dataAccessContext,
IDistributedReaderWriterLockFactory lockFactory,
IBuildJobService buildJobService,
IPlatformService platformService,
Expand All @@ -295,7 +300,7 @@ CancellationToken cancellationToken
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
await _dataAccessContext.WithTransactionAsync(
await dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await platformService.BuildFaultedAsync(buildId, message, ct);
Expand All @@ -318,6 +323,7 @@ await buildJobService.BuildJobFinishedAsync(
}

private async Task TrainJobCanceledAsync(
IDataAccessContext dataAccessContext,
IDistributedReaderWriterLockFactory lockFactory,
IBuildJobService buildJobService,
IPlatformService platformService,
Expand All @@ -331,7 +337,7 @@ CancellationToken cancellationToken
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
await _dataAccessContext.WithTransactionAsync(
await dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await platformService.BuildCanceledAsync(buildId, ct);
Expand Down
25 changes: 25 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/FileSystem.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace SIL.Machine.AspNetCore.Services;

public class FileSystem : IFileSystem
{
public void CreateDirectory(string path)
{
Directory.CreateDirectory(path);
}

public void DeleteFile(string path)
{
if (File.Exists(path))
File.Delete(path);
}

public Stream OpenWrite(string path)
{
return File.OpenWrite(path);
}

public Stream OpenRead(string path)
{
return File.OpenRead(path);
}
}
1 change: 0 additions & 1 deletion src/SIL.Machine.AspNetCore/Services/IFileStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,5 @@ Task<IReadOnlyCollection<string>> ListFilesAsync(

Task<string> GetDownloadUrlAsync(string path, DateTime expiresAt, CancellationToken cancellationToken = default);

Task MoveAsync(string sourcePath, string destPath, CancellationToken cancellationToken = default);
Task DeleteAsync(string path, bool recurse = false, CancellationToken cancellationToken = default);
}
9 changes: 9 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/IFileSystem.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace SIL.Machine.AspNetCore.Services;

public interface IFileSystem
{
void DeleteFile(string path);
void CreateDirectory(string path);
Stream OpenWrite(string path);
Stream OpenRead(string path);
}
9 changes: 5 additions & 4 deletions src/SIL.Machine.AspNetCore/Services/IMessageOutboxService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

public interface IMessageOutboxService
{
public Task<string> EnqueueMessageAsync<T>(
T method,
public Task<string> EnqueueMessageAsync(
string outboxId,
string method,
string groupId,
string? requestContent = null,
string? requestContentPath = null,
string? content = null,
Stream? contentStream = null,
CancellationToken cancellationToken = default
);
}
10 changes: 7 additions & 3 deletions src/SIL.Machine.AspNetCore/Services/IOutboxMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

public interface IOutboxMessageHandler
{
public string Name { get; }
public string OutboxId { get; }

public Task SendMessageAsync(OutboxMessage message, CancellationToken cancellationToken = default);
public Task CleanupMessageAsync(OutboxMessage message, CancellationToken cancellationToken = default);
public Task HandleMessageAsync(
string method,
string? content,
Stream? contentStream,
CancellationToken cancellationToken = default
);
}
6 changes: 5 additions & 1 deletion src/SIL.Machine.AspNetCore/Services/IPlatformService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,9 @@ Task BuildCompletedAsync(
Task BuildFaultedAsync(string buildId, string message, CancellationToken cancellationToken = default);
Task BuildRestartingAsync(string buildId, CancellationToken cancellationToken = default);

Task InsertPretranslationsAsync(string engineId, string path, CancellationToken cancellationToken = default);
Task InsertPretranslationsAsync(
string engineId,
Stream pretranslationsStream,
CancellationToken cancellationToken = default
);
}
1 change: 0 additions & 1 deletion src/SIL.Machine.AspNetCore/Services/ISharedFileService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ Task<IReadOnlyCollection<string>> ListFilesAsync(
Task<Stream> OpenWriteAsync(string path, CancellationToken cancellationToken = default);

Task<bool> ExistsAsync(string path, CancellationToken cancellationToken = default);
Task MoveAsync(string sourcePath, string destPath, CancellationToken cancellationToken = default);

Task DeleteAsync(string path, CancellationToken cancellationToken = default);
}
9 changes: 0 additions & 9 deletions src/SIL.Machine.AspNetCore/Services/InMemoryStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,6 @@ public async Task DeleteAsync(string path, bool recurse, CancellationToken cance
}
}

public Task MoveAsync(string sourcePath, string destPath, CancellationToken cancellationToken = default)
{
if (!_memoryStreams.TryGetValue(Normalize(sourcePath), out Entry? entry))
throw new FileNotFoundException($"Unable to find file {sourcePath}");
_memoryStreams[Normalize(destPath)] = entry;
_memoryStreams.Remove(Normalize(sourcePath), out _);
return Task.CompletedTask;
}

protected override void DisposeManagedResources()
{
foreach (Entry stream in _memoryStreams.Values)
Expand Down
9 changes: 0 additions & 9 deletions src/SIL.Machine.AspNetCore/Services/LocalStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,6 @@ public Task<Stream> OpenWriteAsync(string path, CancellationToken cancellationTo
return Task.FromResult<Stream>(File.OpenWrite(pathUri.LocalPath));
}

public Task MoveAsync(string sourcePath, string destPath, CancellationToken cancellationToken = default)
{
Uri sourcePathUri = new(_basePath, Normalize(sourcePath));
Uri destPathUri = new(_basePath, Normalize(destPath));
Directory.CreateDirectory(Path.GetDirectoryName(destPathUri.LocalPath)!);
File.Move(sourcePathUri.LocalPath, destPathUri.LocalPath);
return Task.CompletedTask;
}

public async Task DeleteAsync(string path, bool recurse, CancellationToken cancellationToken = default)
{
Uri pathUri = new(_basePath, Normalize(path));
Expand Down
Loading

0 comments on commit 6c9e1ea

Please sign in to comment.