Skip to content

Commit

Permalink
Updates from reviewer comments
Browse files Browse the repository at this point in the history
  • Loading branch information
johnml1135 committed Jun 26, 2024
1 parent 641902d commit 1d5e603
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ await c.Indexes.CreateOrUpdateAsync(
)
);
o.AddRepository<OutboxMessage>("outbox_messages");
o.AddRepository<Sequence>("outbox_message_index");
o.AddRepository<SortableIndex>("sortable_indexes");
}
);
builder.Services.AddHealthChecks().AddMongoDb(connectionString!, name: "Mongo");
Expand Down
1 change: 1 addition & 0 deletions src/SIL.Machine.AspNetCore/Models/OutboxMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public record OutboxMessage : IEntity
{
public string Id { get; set; } = "";
public int Revision { get; set; } = 1;
public required string SortableIndex { get; set; }
public required OutboxMessageMethod Method { get; set; }
public required string GroupId { get; set; }
public required string? RequestContent { get; set; }
Expand Down
16 changes: 0 additions & 16 deletions src/SIL.Machine.AspNetCore/Models/Sequence.cs

This file was deleted.

33 changes: 33 additions & 0 deletions src/SIL.Machine.AspNetCore/Models/SortableIndex.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace SIL.Machine.AspNetCore.Models;

public record SortableIndex : IEntity
{
public string Id { get; set; } = "";

public int Revision { get; set; }

public string Context { get; set; } = "";
public int CurrentIndex { get; set; }

public static string IndexToObjectIdString(int value)
{
return value.ToString("x24");
}

public static async Task<string> GetSortableIndexAsync(
IRepository<SortableIndex> indexRepository,
string context,
CancellationToken cancellationToken
)
{
SortableIndex outboxIndex = (
await indexRepository.UpdateAsync(
i => i.Context == context,
i => i.Inc(b => b.CurrentIndex, 1),
upsert: true,
cancellationToken: cancellationToken
)
)!;
return IndexToObjectIdString(outboxIndex.CurrentIndex);
}
}
12 changes: 7 additions & 5 deletions src/SIL.Machine.AspNetCore/Services/ClearMLMonitorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,22 +227,24 @@ private async Task<bool> TrainJobStartedAsync(
CancellationToken cancellationToken = default
)
{
bool success;
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken)){
return await _dataAccessContext.WithTransactionAsync(
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
success = await _dataAccessContext.WithTransactionAsync(
async (ct) =>
{
if (!await buildJobService.BuildJobStartedAsync(engineId, buildId, ct))
return false;
await platformService.BuildStartedAsync(buildId, CancellationToken.None);

await UpdateTrainJobStatus(platformService, buildId, new ProgressStatus(0), 0, ct);
_logger.LogInformation("Build started ({BuildId})", buildId);
return true;
},
cancellationToken: cancellationToken
);
}
await UpdateTrainJobStatus(platformService, buildId, new ProgressStatus(0), 0, cancellationToken);
_logger.LogInformation("Build started ({BuildId})", buildId);
return success;
}

private async Task<bool> TrainJobCompletedAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected async Task ProcessMessagesAsync(CancellationToken cancellationToken =
IEnumerable<List<OutboxMessage>> messageGroups = messages.GroupBy(
m => m.GroupId,
m => m,
(key, element) => element.OrderBy(m => m.Id).ToList()
(key, element) => element.OrderBy(m => m.SortableIndex).ToList()
);

foreach (List<OutboxMessage> messageGroup in messageGroups)
Expand Down Expand Up @@ -94,7 +94,7 @@ async Task ProcessGroupMessagesAsync(OutboxMessage message, CancellationToken ca
if (message.RequestContent is null)
{
await using var requestContentStream = await _sharedFileService.OpenReadAsync(
$"outbox/{message.Id}.json",
$"outbox/{message.Id}",
cancellationToken
);
requestContent = new StreamReader(requestContentStream).ReadToEnd();
Expand Down Expand Up @@ -168,7 +168,7 @@ await _client.IncrementTranslationEngineCorpusSizeAsync(
await _messages.DeleteAsync(message.Id);
if (deleteMessageFromDisk)
{
await _sharedFileService.DeleteAsync($"outbox/{message.Id}.json", cancellationToken);
await _sharedFileService.DeleteAsync($"outbox/{message.Id}", cancellationToken);
}
}

Expand Down
30 changes: 14 additions & 16 deletions src/SIL.Machine.AspNetCore/Services/MessageOutboxService.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
namespace SIL.Machine.AspNetCore.Services;
using MongoDB.Bson;

namespace SIL.Machine.AspNetCore.Services;

public class MessageOutboxService(
IRepository<Sequence> messageIndexes,
IRepository<SortableIndex> messageIndexes,
IRepository<OutboxMessage> messages,
ISharedFileService sharedFileService
) : IMessageOutboxService
{
private readonly IRepository<Sequence> _messageIndex = messageIndexes;
private readonly IRepository<SortableIndex> _messageIndex = messageIndexes;
private readonly IRepository<OutboxMessage> _messages = messages;
private readonly ISharedFileService _sharedFileService = sharedFileService;
protected int MaxDocumentSize { get; set; } = 1_000_000;
Expand All @@ -19,19 +21,15 @@ public async Task<string> EnqueueMessageAsync(
CancellationToken cancellationToken = default
)
{
// get next index
Sequence outboxIndex = (
await _messageIndex.UpdateAsync(
i => i.Context == "MessageOutbox",
i => i.Inc(b => b.CurrentIndex, 1),
upsert: true,
cancellationToken: cancellationToken
)
)!;
string id = Sequence.IndexToObjectIdString(outboxIndex.CurrentIndex);
string sortableIndex = await SortableIndex.GetSortableIndexAsync(
_messageIndex,
"MessageOutbox",
cancellationToken
);
OutboxMessage outboxMessage = new OutboxMessage
{
Id = id,
Id = ObjectId.GenerateNewId().ToString(),
SortableIndex = sortableIndex,
Method = method,
GroupId = groupId,
RequestContent = requestContent
Expand All @@ -41,11 +39,11 @@ await _messageIndex.UpdateAsync(
// The file is too large - save it to disk and send a reference.
// MongoDB has a 16MB document size limit - let's keep below that.
await using StreamWriter sourceTrainWriter =
new(await _sharedFileService.OpenWriteAsync($"outbox/{id}.json", cancellationToken));
new(await _sharedFileService.OpenWriteAsync($"outbox/{outboxMessage.Id}", cancellationToken));
sourceTrainWriter.Write(requestContent);
outboxMessage.RequestContent = null;
}
await _messages.InsertAsync(outboxMessage, cancellationToken: cancellationToken);
return id;
return outboxMessage.Id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public async Task SendMessages_Timeout()
await Task.Delay(100);
await env.MessageOutboxDeliveryService.ProcessMessagesOnceAsync();
// Each group should try to send one message
Assert.That((await env.Messages.GetAsync(m => m.Id == "1"))!.Attempts, Is.EqualTo(1));
Assert.That((await env.Messages.GetAsync(m => m.Id == "2"))!.Attempts, Is.EqualTo(0));
Assert.That((await env.Messages.GetAsync(m => m.Id == "3"))!.Attempts, Is.EqualTo(1));
Assert.That((await env.Messages.GetAsync(m => m.Id == "B"))!.Attempts, Is.EqualTo(1));
Assert.That((await env.Messages.GetAsync(m => m.Id == "A"))!.Attempts, Is.EqualTo(0));
Assert.That((await env.Messages.GetAsync(m => m.Id == "C"))!.Attempts, Is.EqualTo(1));

// with now shorter timeout, the messages will be deleted.
// 4 start build attempts, and only one build completed attempt
Expand All @@ -60,14 +60,14 @@ public async Task SendMessagesUnavailable_Failure()
env.ClientUnavailableFailure();
await env.MessageOutboxDeliveryService.ProcessMessagesOnceAsync();
// Only the first group should be attempted - but not recorded as attempted
Assert.That((await env.Messages.GetAsync(m => m.Id == "1"))!.Attempts, Is.EqualTo(0));
Assert.That((await env.Messages.GetAsync(m => m.Id == "2"))!.Attempts, Is.EqualTo(0));
Assert.That((await env.Messages.GetAsync(m => m.Id == "3"))!.Attempts, Is.EqualTo(0));
Assert.That((await env.Messages.GetAsync(m => m.Id == "B"))!.Attempts, Is.EqualTo(0));
Assert.That((await env.Messages.GetAsync(m => m.Id == "A"))!.Attempts, Is.EqualTo(0));
Assert.That((await env.Messages.GetAsync(m => m.Id == "C"))!.Attempts, Is.EqualTo(0));
env.ClientInternalFailure();
await env.MessageOutboxDeliveryService.ProcessMessagesOnceAsync();
Assert.That((await env.Messages.GetAsync(m => m.Id == "1"))!.Attempts, Is.EqualTo(1));
Assert.That((await env.Messages.GetAsync(m => m.Id == "2"))!.Attempts, Is.EqualTo(0));
Assert.That((await env.Messages.GetAsync(m => m.Id == "3"))!.Attempts, Is.EqualTo(1));
Assert.That((await env.Messages.GetAsync(m => m.Id == "B"))!.Attempts, Is.EqualTo(1));
Assert.That((await env.Messages.GetAsync(m => m.Id == "A"))!.Attempts, Is.EqualTo(0));
Assert.That((await env.Messages.GetAsync(m => m.Id == "C"))!.Attempts, Is.EqualTo(1));
env.ClientNoFailure();
await env.MessageOutboxDeliveryService.ProcessMessagesOnceAsync();
Assert.That(await env.Messages.ExistsAsync(m => true), Is.False);
Expand All @@ -86,7 +86,7 @@ public async Task LargeMessageContent()
requestContent: JsonSerializer.Serialize(new BuildStartedRequest { BuildId = "C" }),
cancellationToken: CancellationToken.None
);
Assert.That(await env.SharedFileService.ExistsAsync($"outbox/{fileIdC}.json"), Is.False);
Assert.That(await env.SharedFileService.ExistsAsync($"outbox/{fileIdC}"), Is.False);
await env.MessageOutboxDeliveryService.ProcessMessagesOnceAsync();
// small max document size - message saved to file
env.OutboxService.SetMaxDocumentSize(1);
Expand All @@ -96,9 +96,9 @@ public async Task LargeMessageContent()
requestContent: JsonSerializer.Serialize(new BuildStartedRequest { BuildId = "D" }),
cancellationToken: CancellationToken.None
);
Assert.That(await env.SharedFileService.ExistsAsync($"outbox/{fileIdD}.json"), Is.True);
Assert.That(await env.SharedFileService.ExistsAsync($"outbox/{fileIdD}"), Is.True);
await env.MessageOutboxDeliveryService.ProcessMessagesOnceAsync();
Assert.That(await env.SharedFileService.ExistsAsync($"outbox/{fileIdD}.json"), Is.False);
Assert.That(await env.SharedFileService.ExistsAsync($"outbox/{fileIdD}"), Is.False);
}

public class TestMessageOutboxDeliveryService(
Expand All @@ -114,7 +114,7 @@ ILogger<MessageOutboxDeliveryService> logger
}

public class TestMessageOutboxService(
IRepository<Sequence> messageIndexes,
IRepository<SortableIndex> messageIndexes,
IRepository<OutboxMessage> messages,
ISharedFileService sharedFileService
) : MessageOutboxService(messageIndexes, messages, sharedFileService)
Expand All @@ -124,7 +124,7 @@ ISharedFileService sharedFileService

private class TestEnvironment : ObjectModel.DisposableBase
{
public MemoryRepository<Sequence> MessageIndexes { get; }
public MemoryRepository<SortableIndex> MessageIndexes { get; }
public MemoryRepository<OutboxMessage> Messages { get; }
public TestMessageOutboxService OutboxService { get; }
public ISharedFileService SharedFileService { get; }
Expand All @@ -134,7 +134,7 @@ private class TestEnvironment : ObjectModel.DisposableBase

public TestEnvironment()
{
MessageIndexes = new MemoryRepository<Sequence>();
MessageIndexes = new MemoryRepository<SortableIndex>();
Messages = new MemoryRepository<OutboxMessage>();
SharedFileService = new SharedFileService(Substitute.For<ILoggerFactory>());
OutboxService = new TestMessageOutboxService(MessageIndexes, Messages, SharedFileService);
Expand Down Expand Up @@ -208,7 +208,8 @@ public void AddStandardMessages()
Messages.Add(
new OutboxMessage
{
Id = "2",
Id = "A",
SortableIndex = "2",
Method = OutboxMessageMethod.BuildCompleted,
GroupId = "A",
RequestContent = JsonSerializer.Serialize(
Expand All @@ -224,7 +225,8 @@ public void AddStandardMessages()
Messages.Add(
new OutboxMessage
{
Id = "1",
Id = "B",
SortableIndex = "1",
Method = OutboxMessageMethod.BuildStarted,
GroupId = "A",
RequestContent = JsonSerializer.Serialize(new BuildStartedRequest { BuildId = "A" })
Expand All @@ -233,7 +235,8 @@ public void AddStandardMessages()
Messages.Add(
new OutboxMessage
{
Id = "3",
Id = "C",
SortableIndex = "3",
Method = OutboxMessageMethod.BuildStarted,
GroupId = "B",
RequestContent = JsonSerializer.Serialize(new BuildStartedRequest { BuildId = "B" })
Expand Down

0 comments on commit 1d5e603

Please sign in to comment.