Skip to content

Commit

Permalink
Update from reviewer comments:
Browse files Browse the repository at this point in the history
* Make outbox truly generic, handling multiple queues
* Add "MoveAsync" to SharedStorage
* Refactor saving pretranslations file
  • Loading branch information
johnml1135 committed Jun 27, 2024
1 parent 1d5e603 commit be36554
Show file tree
Hide file tree
Showing 21 changed files with 352 additions and 240 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ await c.Indexes.CreateOrUpdateAsync(
)
);
o.AddRepository<OutboxMessage>("outbox_messages");
o.AddRepository<SortableIndex>("sortable_indexes");
o.AddRepository<Outbox>("outboxes");
}
);
builder.Services.AddHealthChecks().AddMongoDb(connectionString!, name: "Mongo");
Expand All @@ -298,6 +298,8 @@ public static IMachineBuilder AddServalPlatformService(

builder.Services.AddScoped<IPlatformService, ServalPlatformService>();

builder.Services.AddSingleton<IOutboxMessageHandler, ServalPlatformOutboxHandler>();

builder.Services.AddSingleton<IMessageOutboxService, MessageOutboxService>();

builder
Expand Down
28 changes: 28 additions & 0 deletions src/SIL.Machine.AspNetCore/Models/Outbox.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
namespace SIL.Machine.AspNetCore.Models;

public record Outbox : IEntity
{
public string Id { get; set; } = "";

public int Revision { get; set; }

public string Name { get; set; } = null!;
public int CurrentIndex { get; set; }

public static async Task<Outbox> GetOutboxNextIndexAsync(
IRepository<Outbox> indexRepository,
string outboxName,
CancellationToken cancellationToken
)
{
Outbox outbox = (
await indexRepository.UpdateAsync(
i => i.Name == outboxName,
i => i.Inc(b => b.CurrentIndex, 1),
upsert: true,
cancellationToken: cancellationToken
)
)!;
return outbox;
}
}
16 changes: 3 additions & 13 deletions src/SIL.Machine.AspNetCore/Models/OutboxMessage.cs
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
namespace SIL.Machine.AspNetCore.Models;

public enum OutboxMessageMethod
{
BuildStarted,
BuildCompleted,
BuildCanceled,
BuildFaulted,
BuildRestarting,
InsertPretranslations,
IncrementTranslationEngineCorpusSize
}

public record OutboxMessage : IEntity
{
public string Id { get; set; } = "";
public int Revision { get; set; } = 1;
public required string SortableIndex { get; set; }
public required OutboxMessageMethod Method { get; set; }
public required int Index { get; set; }
public required string OutboxName { get; set; }
public required string Method { get; set; }
public required string GroupId { get; set; }
public required string? RequestContent { get; set; }
public DateTimeOffset Created { get; set; } = DateTimeOffset.UtcNow;
Expand Down
33 changes: 0 additions & 33 deletions src/SIL.Machine.AspNetCore/Models/SortableIndex.cs

This file was deleted.

1 change: 1 addition & 0 deletions src/SIL.Machine.AspNetCore/Services/IFileStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ Task<IReadOnlyCollection<string>> ListFilesAsync(

Task<string> GetDownloadUrlAsync(string path, DateTime expiresAt, CancellationToken cancellationToken = default);

Task MoveAsync(string sourcePath, string destPath, CancellationToken cancellationToken = default);
Task DeleteAsync(string path, bool recurse = false, CancellationToken cancellationToken = default);
}
8 changes: 4 additions & 4 deletions src/SIL.Machine.AspNetCore/Services/IMessageOutboxService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

public interface IMessageOutboxService
{
public Task<string> EnqueueMessageAsync(
OutboxMessageMethod method,
public Task<string> EnqueueMessageAsync<T>(
T method,
string groupId,
string requestContent,
bool alwaysSaveContentToDisk = false,
string? requestContent = null,
string? requestContentPath = null,
CancellationToken cancellationToken = default
);
}
9 changes: 9 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/IOutboxMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace SIL.Machine.AspNetCore.Services;

public interface IOutboxMessageHandler
{
public string Name { get; }

public Task SendMessageAsync(OutboxMessage message, CancellationToken cancellationToken = default);
public Task CleanupMessageAsync(OutboxMessage message, CancellationToken cancellationToken = default);
}
6 changes: 1 addition & 5 deletions src/SIL.Machine.AspNetCore/Services/IPlatformService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,5 @@ Task BuildCompletedAsync(
Task BuildFaultedAsync(string buildId, string message, CancellationToken cancellationToken = default);
Task BuildRestartingAsync(string buildId, CancellationToken cancellationToken = default);

Task InsertPretranslationsAsync(
string engineId,
IAsyncEnumerable<Pretranslation> pretranslations,
CancellationToken cancellationToken = default
);
Task InsertPretranslationsAsync(string engineId, string path, CancellationToken cancellationToken = default);
}
1 change: 1 addition & 0 deletions src/SIL.Machine.AspNetCore/Services/ISharedFileService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Task<IReadOnlyCollection<string>> ListFilesAsync(
Task<Stream> OpenWriteAsync(string path, CancellationToken cancellationToken = default);

Task<bool> ExistsAsync(string path, CancellationToken cancellationToken = default);
Task MoveAsync(string sourcePath, string destPath, CancellationToken cancellationToken = default);

Task DeleteAsync(string path, CancellationToken cancellationToken = default);
}
9 changes: 9 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/InMemoryStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ public async Task DeleteAsync(string path, bool recurse, CancellationToken cance
}
}

public Task MoveAsync(string sourcePath, string destPath, CancellationToken cancellationToken = default)
{
if (!_memoryStreams.TryGetValue(Normalize(sourcePath), out Entry? entry))
throw new FileNotFoundException($"Unable to find file {sourcePath}");
_memoryStreams[Normalize(destPath)] = entry;
_memoryStreams.Remove(Normalize(sourcePath), out _);
return Task.CompletedTask;
}

protected override void DisposeManagedResources()
{
foreach (Entry stream in _memoryStreams.Values)
Expand Down
9 changes: 9 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/LocalStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ public Task<Stream> OpenWriteAsync(string path, CancellationToken cancellationTo
return Task.FromResult<Stream>(File.OpenWrite(pathUri.LocalPath));
}

public Task MoveAsync(string sourcePath, string destPath, CancellationToken cancellationToken = default)
{
Uri sourcePathUri = new(_basePath, Normalize(sourcePath));
Uri destPathUri = new(_basePath, Normalize(destPath));
Directory.CreateDirectory(Path.GetDirectoryName(destPathUri.LocalPath)!);
File.Move(sourcePathUri.LocalPath, destPathUri.LocalPath);
return Task.CompletedTask;
}

public async Task DeleteAsync(string path, bool recurse, CancellationToken cancellationToken = default)
{
Uri pathUri = new(_basePath, Normalize(path));
Expand Down
107 changes: 16 additions & 91 deletions src/SIL.Machine.AspNetCore/Services/MessageOutboxDeliveryService.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
using Serval.Translation.V1;

namespace SIL.Machine.AspNetCore.Services;
namespace SIL.Machine.AspNetCore.Services;

public class MessageOutboxDeliveryService(
TranslationPlatformApi.TranslationPlatformApiClient client,
IRepository<OutboxMessage> messages,
ISharedFileService sharedFileService,
IEnumerable<IOutboxMessageHandler> outboxMessageHandlers,
MessageOutboxOptions options,
ILogger<MessageOutboxDeliveryService> logger
) : BackgroundService
{
private readonly TranslationPlatformApi.TranslationPlatformApiClient _client = client;
private readonly IRepository<OutboxMessage> _messages = messages;
private readonly ISharedFileService _sharedFileService = sharedFileService;
private readonly Dictionary<string, IOutboxMessageHandler> _outboxMessageHandlers =
outboxMessageHandlers.ToDictionary(o => o.Name);

private readonly ILogger<MessageOutboxDeliveryService> _logger = logger;
protected TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(10);
protected TimeSpan MessageExpiration { get; set; } = TimeSpan.FromHours(options.MessageExpirationInHours);
Expand All @@ -38,19 +36,20 @@ protected async Task ProcessMessagesAsync(CancellationToken cancellationToken =
IReadOnlyList<OutboxMessage> messages = await _messages.GetAllAsync();

IEnumerable<List<OutboxMessage>> messageGroups = messages.GroupBy(
m => m.GroupId,
m => new { m.GroupId, m.OutboxName },
m => m,
(key, element) => element.OrderBy(m => m.SortableIndex).ToList()
(key, element) => element.OrderBy(m => m.Index).ToList()
);

foreach (List<OutboxMessage> messageGroup in messageGroups)
{
bool abortMessageGroup = false;
var outboxMessageHandler = _outboxMessageHandlers[messageGroup.First().OutboxName];
foreach (OutboxMessage message in messageGroup)
{
try
{
await ProcessGroupMessagesAsync(message, cancellationToken);
await ProcessGroupMessagesAsync(message, outboxMessageHandler, cancellationToken);
}
catch (RpcException e)
{
Expand Down Expand Up @@ -87,89 +86,15 @@ protected async Task ProcessMessagesAsync(CancellationToken cancellationToken =
}
}

async Task ProcessGroupMessagesAsync(OutboxMessage message, CancellationToken cancellationToken = default)
async Task ProcessGroupMessagesAsync(
OutboxMessage message,
IOutboxMessageHandler outboxMessageHandler,
CancellationToken cancellationToken = default
)
{
string requestContent;
bool deleteMessageFromDisk = false;
if (message.RequestContent is null)
{
await using var requestContentStream = await _sharedFileService.OpenReadAsync(
$"outbox/{message.Id}",
cancellationToken
);
requestContent = new StreamReader(requestContentStream).ReadToEnd();
deleteMessageFromDisk = true;
}
else
{
requestContent = message.RequestContent;
}
switch (message.Method)
{
case OutboxMessageMethod.BuildStarted:
await _client.BuildStartedAsync(
JsonSerializer.Deserialize<BuildStartedRequest>(requestContent),
cancellationToken: cancellationToken
);
break;
case OutboxMessageMethod.BuildCompleted:
await _client.BuildCompletedAsync(
JsonSerializer.Deserialize<BuildCompletedRequest>(requestContent),
cancellationToken: cancellationToken
);
break;
case OutboxMessageMethod.BuildCanceled:
await _client.BuildCanceledAsync(
JsonSerializer.Deserialize<BuildCanceledRequest>(requestContent),
cancellationToken: cancellationToken
);
break;
case OutboxMessageMethod.BuildFaulted:
await _client.BuildFaultedAsync(
JsonSerializer.Deserialize<BuildFaultedRequest>(requestContent),
cancellationToken: cancellationToken
);
break;
case OutboxMessageMethod.BuildRestarting:
await _client.BuildRestartingAsync(
JsonSerializer.Deserialize<BuildRestartingRequest>(requestContent),
cancellationToken: cancellationToken
);
break;
case OutboxMessageMethod.InsertPretranslations:

{
using var call = _client.InsertPretranslations(cancellationToken: cancellationToken);
var requests = JsonSerializer.Deserialize<List<InsertPretranslationRequest>>(requestContent);
foreach (
var request in requests?.Where(r => r != null)
?? Enumerable.Empty<InsertPretranslationRequest>()
)
{
await call.RequestStream.WriteAsync(request, cancellationToken: cancellationToken);
}
await call.RequestStream.CompleteAsync();
}
break;
case OutboxMessageMethod.IncrementTranslationEngineCorpusSize:
await _client.IncrementTranslationEngineCorpusSizeAsync(
JsonSerializer.Deserialize<IncrementTranslationEngineCorpusSizeRequest>(requestContent),
cancellationToken: cancellationToken
);
break;
default:
await _messages.DeleteAsync(message.Id);
_logger.LogWarning(
"Unknown method: {message.Method}. Deleting the message from the list.",
message.Method.ToString()
);
break;
}
await outboxMessageHandler.SendMessageAsync(message, cancellationToken);
await _messages.DeleteAsync(message.Id);
if (deleteMessageFromDisk)
{
await _sharedFileService.DeleteAsync($"outbox/{message.Id}", cancellationToken);
}
await outboxMessageHandler.CleanupMessageAsync(message, cancellationToken);
}

async Task<bool> CheckIfFinalMessageAttempt(OutboxMessage message, Exception e)
Expand Down
Loading

0 comments on commit be36554

Please sign in to comment.