Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update to 8.0 for e2e tests #592

Merged
merged 2 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
- name: Setup .NET
uses: actions/setup-dotnet@v3
with:
dotnet-version: 6.0.x
dotnet-version: 8.0.x

- name: Start containers
run: dotnet build && docker compose -f "docker-compose.yml" up -d && sleep 20 #allow time for mongo to start up properly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,14 @@ public class MessageOutboxOptions

public string OutboxDir { get; set; } = "outbox";
public TimeSpan MessageExpirationTimeout { get; set; } = TimeSpan.FromHours(48);

public static JsonSerializerOptions JsonSerializerOptions
{
get
{
JsonSerializerOptions options = new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
options.AddProtobufSupport();
return options;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<PackageReference Include="Hangfire.Mongo" Version="1.10.8" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.NewtonsoftJson" Version="8.0.8" />
<PackageReference Include="Microsoft.Extensions.Http.Polly" Version="8.0.8" />
<PackageReference Include="Protobuf.System.Text.Json" Version="1.4.0" />
<PackageReference Include="SIL.Machine" Version="3.5.2" Condition="!Exists('..\..\..\..\..\machine\src\SIL.Machine\SIL.Machine.csproj')" />
<PackageReference Include="SIL.Machine.Morphology.HermitCrab" Version="3.5.2" Condition="!Exists('..\..\..\..\..\machine\src\SIL.Machine.Morphology.HermitCrab\SIL.Machine.Morphology.HermitCrab.csproj')" />
<PackageReference Include="SIL.Machine.Translation.Thot" Version="3.5.2" Condition="!Exists('..\..\..\..\..\machine\src\SIL.Machine.Translation.Thot\SIL.Machine.Translation.Thot.csproj')" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@

public interface IMessageOutboxService
{
public Task<string> EnqueueMessageAsync(
public Task<string> EnqueueMessageAsync<TValue>(
string outboxId,
string method,
string groupId,
string? content = null,
Stream? contentStream = null,
TValue content,
CancellationToken cancellationToken = default
);

public Task<string> EnqueueMessageStreamAsync(
string outboxId,
string method,
string groupId,
Stream contentStream,
CancellationToken cancellationToken = default
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ public interface IOutboxMessageHandler
public string OutboxId { get; }

public Task HandleMessageAsync(
string groupId,
string method,
string? content,
Stream? contentStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@ internal async Task ProcessMessagesAsync(
{
try
{
await ProcessGroupMessagesAsync(messages, message, outboxMessageHandler, cancellationToken);
await ProcessGroupMessagesAsync(
messages,
message,
messageGroup.Key.GroupId,
outboxMessageHandler,
cancellationToken
);
}
catch (RpcException e)
{
Expand Down Expand Up @@ -98,6 +104,7 @@ internal async Task ProcessMessagesAsync(
private async Task ProcessGroupMessagesAsync(
IRepository<OutboxMessage> messages,
OutboxMessage message,
string groupId,
IOutboxMessageHandler outboxMessageHandler,
CancellationToken cancellationToken = default
)
Expand All @@ -109,6 +116,7 @@ private async Task ProcessGroupMessagesAsync(
try
{
await outboxMessageHandler.HandleMessageAsync(
groupId,
message.Method,
message.Content,
contentStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,20 @@ IOptionsMonitor<MessageOutboxOptions> options
private readonly IOptionsMonitor<MessageOutboxOptions> _options = options;
internal int MaxDocumentSize { get; set; } = 1_000_000;

public async Task<string> EnqueueMessageAsync(
public async Task<string> EnqueueMessageAsync<TValue>(
string outboxId,
string method,
string groupId,
string? content = null,
Stream? contentStream = null,
TValue content,
CancellationToken cancellationToken = default
)
{
if (content == null && contentStream == null)
throw new ArgumentException("Either content or contentStream must be specified.");
if (content is not null && content.Length > MaxDocumentSize)
string serializedContent = JsonSerializer.Serialize(content, MessageOutboxOptions.JsonSerializerOptions);
if (serializedContent.Length > MaxDocumentSize)
{
throw new ArgumentException(
$"The content is too large for request {method} with group ID {groupId}. "
+ $"It is {content.Length} bytes, but the maximum is {MaxDocumentSize} bytes."
+ $"It is {serializedContent.Length} bytes, but the maximum is {MaxDocumentSize} bytes."
);
}
Outbox outbox = (
Expand All @@ -49,24 +47,52 @@ await _outboxes.UpdateAsync(
OutboxRef = outboxId,
Method = method,
GroupId = groupId,
Content = content,
HasContentStream = contentStream is not null
Content = serializedContent,
HasContentStream = false
};
string filePath = Path.Combine(_options.CurrentValue.OutboxDir, outboxMessage.Id);
await _messages.InsertAsync(outboxMessage, cancellationToken: cancellationToken);
return outboxMessage.Id;
}

public async Task<string> EnqueueMessageStreamAsync(
string outboxId,
string method,
string groupId,
Stream contentStream,
CancellationToken cancellationToken = default
)
{
Outbox outbox = (
await _outboxes.UpdateAsync(
outboxId,
u => u.Inc(o => o.CurrentIndex, 1),
upsert: true,
cancellationToken: cancellationToken
)
)!;
OutboxMessage outboxMessage =
new()
{
Id = _idGenerator.GenerateId(),
Index = outbox.CurrentIndex,
OutboxRef = outboxId,
Method = method,
GroupId = groupId,
Content = null,
HasContentStream = true
};
string filePath = Path.Combine(_options.CurrentValue.OutboxDir, outboxMessage.Id);
try
{
if (contentStream is not null)
{
await using Stream fileStream = _fileSystem.OpenWrite(filePath);
await contentStream.CopyToAsync(fileStream, cancellationToken);
}
await using Stream fileStream = _fileSystem.OpenWrite(filePath);
await contentStream.CopyToAsync(fileStream, cancellationToken);
await _messages.InsertAsync(outboxMessage, cancellationToken: cancellationToken);
return outboxMessage.Id;
}
catch
{
if (contentStream is not null)
_fileSystem.DeleteFile(filePath);
_fileSystem.DeleteFile(filePath);
throw;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ public class ServalPlatformOutboxMessageHandler(TranslationPlatformApi.Translati
: IOutboxMessageHandler
{
private readonly TranslationPlatformApi.TranslationPlatformApiClient _client = client;
private static readonly JsonSerializerOptions JsonSerializerOptions =
new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
private readonly JsonSerializerOptions _jsonSerializerOptions = MessageOutboxOptions.JsonSerializerOptions;

public string OutboxId => ServalPlatformOutboxConstants.OutboxId;

public async Task HandleMessageAsync(
string groupId,
string method,
string? content,
Stream? contentStream,
Expand All @@ -22,39 +22,39 @@ public async Task HandleMessageAsync(
{
case ServalPlatformOutboxConstants.BuildStarted:
await _client.BuildStartedAsync(
JsonSerializer.Deserialize<BuildStartedRequest>(content!),
JsonSerializer.Deserialize<BuildStartedRequest>(content!, _jsonSerializerOptions),
cancellationToken: cancellationToken
);
break;
case ServalPlatformOutboxConstants.BuildCompleted:
await _client.BuildCompletedAsync(
JsonSerializer.Deserialize<BuildCompletedRequest>(content!),
JsonSerializer.Deserialize<BuildCompletedRequest>(content!, _jsonSerializerOptions),
cancellationToken: cancellationToken
);
break;
case ServalPlatformOutboxConstants.BuildCanceled:
await _client.BuildCanceledAsync(
JsonSerializer.Deserialize<BuildCanceledRequest>(content!),
JsonSerializer.Deserialize<BuildCanceledRequest>(content!, _jsonSerializerOptions),
cancellationToken: cancellationToken
);
break;
case ServalPlatformOutboxConstants.BuildFaulted:
await _client.BuildFaultedAsync(
JsonSerializer.Deserialize<BuildFaultedRequest>(content!),
JsonSerializer.Deserialize<BuildFaultedRequest>(content!, _jsonSerializerOptions),
cancellationToken: cancellationToken
);
break;
case ServalPlatformOutboxConstants.BuildRestarting:
await _client.BuildRestartingAsync(
JsonSerializer.Deserialize<BuildRestartingRequest>(content!),
JsonSerializer.Deserialize<BuildRestartingRequest>(content!, _jsonSerializerOptions),
cancellationToken: cancellationToken
);
break;
case ServalPlatformOutboxConstants.InsertPretranslations:
IAsyncEnumerable<Pretranslation> pretranslations = JsonSerializer
.DeserializeAsyncEnumerable<Pretranslation>(
contentStream!,
JsonSerializerOptions,
_jsonSerializerOptions,
cancellationToken
)
.OfType<Pretranslation>();
Expand All @@ -66,7 +66,7 @@ await _client.BuildRestartingAsync(
await call.RequestStream.WriteAsync(
new InsertPretranslationsRequest
{
EngineId = content!,
EngineId = groupId,
CorpusId = pretranslation.CorpusId,
TextId = pretranslation.TextId,
Refs = { pretranslation.Refs },
Expand All @@ -81,13 +81,16 @@ await call.RequestStream.WriteAsync(
break;
case ServalPlatformOutboxConstants.IncrementTranslationEngineCorpusSize:
await _client.IncrementTranslationEngineCorpusSizeAsync(
JsonSerializer.Deserialize<IncrementTranslationEngineCorpusSizeRequest>(content!),
JsonSerializer.Deserialize<IncrementTranslationEngineCorpusSizeRequest>(
content!,
_jsonSerializerOptions
),
cancellationToken: cancellationToken
);
break;
case ServalPlatformOutboxConstants.UpdateBuildExecutionData:
await _client.UpdateBuildExecutionDataAsync(
JsonSerializer.Deserialize<UpdateBuildExecutionDataRequest>(content!),
JsonSerializer.Deserialize<UpdateBuildExecutionDataRequest>(content!, _jsonSerializerOptions),
cancellationToken: cancellationToken
);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ await _outboxService.EnqueueMessageAsync(
ServalPlatformOutboxConstants.OutboxId,
ServalPlatformOutboxConstants.BuildStarted,
buildId,
JsonSerializer.Serialize(new BuildStartedRequest { BuildId = buildId }),
new BuildStartedRequest { BuildId = buildId },
cancellationToken: cancellationToken
);
}
Expand All @@ -32,14 +32,12 @@ await _outboxService.EnqueueMessageAsync(
ServalPlatformOutboxConstants.OutboxId,
ServalPlatformOutboxConstants.BuildCompleted,
buildId,
JsonSerializer.Serialize(
new BuildCompletedRequest
{
BuildId = buildId,
CorpusSize = trainSize,
Confidence = confidence
}
),
new BuildCompletedRequest
{
BuildId = buildId,
CorpusSize = trainSize,
Confidence = confidence
},
cancellationToken: cancellationToken
);
}
Expand All @@ -50,7 +48,7 @@ await _outboxService.EnqueueMessageAsync(
ServalPlatformOutboxConstants.OutboxId,
ServalPlatformOutboxConstants.BuildCanceled,
buildId,
JsonSerializer.Serialize(new BuildCanceledRequest { BuildId = buildId }),
new BuildCanceledRequest { BuildId = buildId },
cancellationToken: cancellationToken
);
}
Expand All @@ -61,7 +59,7 @@ await _outboxService.EnqueueMessageAsync(
ServalPlatformOutboxConstants.OutboxId,
ServalPlatformOutboxConstants.BuildFaulted,
buildId,
JsonSerializer.Serialize(new BuildFaultedRequest { BuildId = buildId, Message = message }),
new BuildFaultedRequest { BuildId = buildId, Message = message },
cancellationToken: cancellationToken
);
}
Expand All @@ -72,7 +70,7 @@ await _outboxService.EnqueueMessageAsync(
ServalPlatformOutboxConstants.OutboxId,
ServalPlatformOutboxConstants.BuildRestarting,
buildId,
JsonSerializer.Serialize(new BuildRestartingRequest { BuildId = buildId }),
new BuildRestartingRequest { BuildId = buildId },
cancellationToken: cancellationToken
);
}
Expand Down Expand Up @@ -111,13 +109,12 @@ public async Task InsertPretranslationsAsync(
CancellationToken cancellationToken = default
)
{
await _outboxService.EnqueueMessageAsync(
await _outboxService.EnqueueMessageStreamAsync(
ServalPlatformOutboxConstants.OutboxId,
ServalPlatformOutboxConstants.InsertPretranslations,
engineId,
engineId,
pretranslationsStream,
cancellationToken: cancellationToken
cancellationToken
);
}

Expand All @@ -131,9 +128,7 @@ await _outboxService.EnqueueMessageAsync(
ServalPlatformOutboxConstants.OutboxId,
ServalPlatformOutboxConstants.IncrementTranslationEngineCorpusSize,
engineId,
JsonSerializer.Serialize(
new IncrementTranslationEngineCorpusSizeRequest { EngineId = engineId, Count = count }
),
new IncrementTranslationEngineCorpusSizeRequest { EngineId = engineId, Count = count },
cancellationToken: cancellationToken
);
}
Expand All @@ -151,7 +146,7 @@ await _outboxService.EnqueueMessageAsync(
ServalPlatformOutboxConstants.OutboxId,
ServalPlatformOutboxConstants.UpdateBuildExecutionData,
engineId,
JsonSerializer.Serialize(request),
request,
cancellationToken: cancellationToken
);
}
Expand Down
Loading
Loading