Skip to content

Commit

Permalink
Merge branch 'filter_kbt_by_chapter' of https://github.com/sillsdev/s…
Browse files Browse the repository at this point in the history
…erval into filter_kbt_by_chapter
  • Loading branch information
Enkidu93 committed Nov 27, 2024
2 parents 788beb9 + 551e6cd commit 768cbea
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 57 deletions.
6 changes: 3 additions & 3 deletions deploy/qa-ext-values.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
externalHost: qa.serval-api.org
environment: Production
deploymentVersion: '1.7.QA7'
deploymentVersion: '1.8.QA1'
alertEmail: [email protected]
emailsToAlert: [email protected]
enableTls: true
namespace: serval
auth0Domain: dev-sillsdev.auth0.com
lokiTenent: serval-tenant
lokiUrl: http://loki-distributed-gateway.loki.svc.cluster.local
servalImage: ghcr.io/sillsdev/serval:1.7.7
ClearMLDockerImage: ghcr.io/sillsdev/machine.py:1.7.2
servalImage: ghcr.io/sillsdev/serval:1.8.1
ClearMLDockerImage: ghcr.io/sillsdev/machine.py:1.8.1
ClearMLQueue: production
MongoConnectionPrefix: qa_
SharedFileLocation: s3://silnlp/ext-qa/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ await client.BuildStartedAsync(
try
{
List<InsertPretranslationsRequest> pretranslationsRequests = [];
_parallelCorpusPreprocessingService.Preprocess(
await _parallelCorpusPreprocessingService.Preprocess(
request.Corpora.Select(Map).ToList(),
row => { },
row => Task.CompletedTask,
(row, corpus) =>
{
pretranslationsRequests.Add(
Expand All @@ -97,6 +97,7 @@ await client.BuildStartedAsync(
Translation = row.SourceSegment
}
);
return Task.CompletedTask;
},
false
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ CancellationToken cancellationToken
JsonObject? buildOptionsObject = null;
if (buildOptions is not null)
buildOptionsObject = JsonSerializer.Deserialize<JsonObject>(buildOptions);

await using StreamWriter sourceTrainWriter =
new(await _sharedFileService.OpenWriteAsync($"builds/{buildId}/train.src.txt", cancellationToken));
await using StreamWriter targetTrainWriter =
new(await _sharedFileService.OpenWriteAsync($"builds/{buildId}/train.trg.txt", cancellationToken));

await using Stream pretranslateStream = await _sharedFileService.OpenWriteAsync(
$"builds/{buildId}/pretranslate.src.json",
cancellationToken
Expand All @@ -107,19 +107,19 @@ CancellationToken cancellationToken
int trainCount = 0;
int pretranslateCount = 0;
pretranslateWriter.WriteStartArray();
_parallelCorpusPreprocessingService.Preprocess(
await _parallelCorpusPreprocessingService.Preprocess(
corpora,
row =>
async row =>
{
if (row.SourceSegment.Length > 0 || row.TargetSegment.Length > 0)
{
sourceTrainWriter.Write($"{row.SourceSegment}\n");
targetTrainWriter.Write($"{row.TargetSegment}\n");
await sourceTrainWriter.WriteAsync($"{row.SourceSegment}\n");
await targetTrainWriter.WriteAsync($"{row.TargetSegment}\n");
}
if (row.SourceSegment.Length > 0 && row.TargetSegment.Length > 0)
trainCount++;
},
(row, corpus) =>
async (row, corpus) =>
{
if (row.SourceSegment.Length > 0 && row.TargetSegment.Length == 0)
{
Expand All @@ -134,6 +134,8 @@ CancellationToken cancellationToken
pretranslateWriter.WriteEndObject();
pretranslateCount++;
}
if (pretranslateWriter.BytesPending > 1024 * 1024)
await pretranslateWriter.FlushAsync();
},
(bool?)buildOptionsObject?["use_key_terms"] ?? true
);
Expand Down
88 changes: 53 additions & 35 deletions src/Machine/src/Serval.Machine.Shared/Services/S3WriteStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ ILoggerFactory loggerFactory
private readonly List<UploadPartResponse> _uploadResponses = new List<UploadPartResponse>();
private readonly ILogger<S3WriteStream> _logger = loggerFactory.CreateLogger<S3WriteStream>();

private readonly Stream _stream = new MemoryStream();
private int _bytesWritten = 0;

public const int MaxPartSize = 5 * 1024 * 1024;

public override bool CanRead => false;
Expand All @@ -23,7 +26,7 @@ ILoggerFactory loggerFactory

public override bool CanWrite => true;

public override long Length => 0;
public override long Length => _stream.Length;

public override long Position
{
Expand All @@ -48,47 +51,60 @@ public override async ValueTask WriteAsync(
CancellationToken cancellationToken = default
)
{
try
{
using Stream stream = buffer.AsStream();
// S3 buckets can only be written to in chunks of MaxPartSize
// therefore, break it into chunks, resetting the stream each time

int bytesWritten = 0;
while (buffer.Length + _stream.Position > MaxPartSize)
{
int toWrite = MaxPartSize - (int)_stream.Position;
await _stream.WriteAsync(buffer[..toWrite], cancellationToken);
await UploadPartAsync(cancellationToken);
buffer = buffer[toWrite..];
}
// save the remaining buffer for future calls
await _stream.WriteAsync(buffer, cancellationToken);
}

while (stream.Length > bytesWritten)
{
int partNumber = _uploadResponses.Count + 1;
UploadPartRequest request =
new()
{
BucketName = _bucketName,
Key = _key,
UploadId = _uploadId,
PartNumber = partNumber,
InputStream = stream,
PartSize = MaxPartSize
};
request.StreamTransferProgress += new EventHandler<StreamTransferProgressArgs>(
(_, e) =>
{
_logger.LogDebug(
"Transferred {e.TransferredBytes}/{e.TotalBytes}",
e.TransferredBytes,
e.TotalBytes
);
}
);
UploadPartResponse response = await _client.UploadPartAsync(request, cancellationToken);
if (response.HttpStatusCode != HttpStatusCode.OK)
private async Task UploadPartAsync(CancellationToken cancellationToken = default)
{
if (_stream.Length == 0)
return;
try
{
_stream.Position = 0;
int partNumber = _uploadResponses.Count + 1;
UploadPartRequest request =
new()
{
throw new HttpRequestException(
$"Tried to upload part {partNumber} of upload {_uploadId} to {_bucketName}/{_key} but received response code {response.HttpStatusCode}"
BucketName = _bucketName,
Key = _key,
UploadId = _uploadId,
PartNumber = partNumber,
InputStream = _stream,
PartSize = MaxPartSize
};
request.StreamTransferProgress += new EventHandler<StreamTransferProgressArgs>(
(_, e) =>
{
_logger.LogDebug(
"Transferred {e.TransferredBytes}/{e.TotalBytes}",
e.TransferredBytes,
e.TotalBytes
);
}
);
UploadPartResponse response = await _client.UploadPartAsync(request, cancellationToken);
if (response.HttpStatusCode != HttpStatusCode.OK)
{
throw new HttpRequestException(
$"Tried to upload part {partNumber} of upload {_uploadId} to {_bucketName}/{_key} but received response code {response.HttpStatusCode}"
);
}

_uploadResponses.Add(response);
_uploadResponses.Add(response);

bytesWritten += MaxPartSize;
}
_bytesWritten += MaxPartSize;
_stream.SetLength(0);
}
catch (Exception e)
{
Expand All @@ -104,6 +120,7 @@ public override async Task WriteAsync(byte[] buffer, int offset, int count, Canc

protected override void Dispose(bool disposing)
{
UploadPartAsync().WaitAndUnwrapException();
try
{
if (disposing)
Expand Down Expand Up @@ -164,6 +181,7 @@ protected override void Dispose(bool disposing)

public override async ValueTask DisposeAsync()
{
await UploadPartAsync();
try
{
if (_uploadResponses.Count == 0)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
using Nito.AsyncEx;

namespace SIL.ServiceToolkit.Utils;

public interface IParallelCorpusPreprocessingService
{
void Preprocess(
Task Preprocess(
IReadOnlyList<ParallelCorpus> corpora,
Action<Row> train,
Action<Row, ParallelCorpus> pretranslate,
Func<Row, Task> train,
Func<Row, ParallelCorpus, Task> pretranslate,
bool useKeyTerms = false
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ internal int Seed
}
}

public void Preprocess(
public async Task Preprocess(
IReadOnlyList<ParallelCorpus> corpora,
Action<Row> train,
Action<Row, ParallelCorpus> pretranslate,
Func<Row, Task> train,
Func<Row, ParallelCorpus, Task> pretranslate,
bool useKeyTerms = false
)
{
Expand Down Expand Up @@ -77,7 +77,7 @@ public void Preprocess(

foreach (Row row in CollapseRanges(trainingRows))
{
train(row);
await train(row);
}

if (useKeyTerms)
Expand All @@ -103,7 +103,7 @@ public void Preprocess(
.AlignRows(targetTermCorpora.ChooseFirst());
foreach (ParallelTextRow row in parallelKeyTermsCorpus)
{
train(new Row(row.TextId, row.Refs, row.SourceText, row.TargetText, 1));
await train(new Row(row.TextId, row.Refs, row.SourceText, row.TargetText, 1));
}
}
}
Expand All @@ -116,7 +116,7 @@ public void Preprocess(

foreach (Row row in CollapseRanges(pretranslateCorpus.ToArray()))
{
pretranslate(row, corpus);
await pretranslate(row, corpus);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class ParallelCorpusPreprocessingServiceTests
);

[Test]
public void TestParallelCorpusPreprocessor()
public async Task TestParallelCorpusPreprocessor()
{
ParallelCorpusPreprocessingService processor = new(new CorpusService());
List<ParallelCorpus> corpora =
Expand Down Expand Up @@ -73,17 +73,19 @@ public void TestParallelCorpusPreprocessor()
];
int trainCount = 0;
int pretranslateCount = 0;
processor.Preprocess(
await processor.Preprocess(
corpora,
row =>
{
if (row.SourceSegment.Length > 0 && row.TargetSegment.Length > 0)
trainCount++;
return Task.CompletedTask;
},
(row, _) =>
{
if (row.SourceSegment.Length > 0 && row.TargetSegment.Length == 0)
pretranslateCount++;
return Task.CompletedTask;
},
false
);
Expand Down

0 comments on commit 768cbea

Please sign in to comment.