Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/sillsdev/machine
Browse files Browse the repository at this point in the history
  • Loading branch information
Enkidu93 committed Nov 7, 2023
2 parents 37cf14a + fba7aea commit 2bdf579
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 46 deletions.
2 changes: 2 additions & 0 deletions src/SIL.Machine.AspNetCore/Models/Corpus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ public class Corpus
public string Id { get; set; } = default!;
public string SourceLanguage { get; set; } = default!;
public string TargetLanguage { get; set; } = default!;
public bool TrainOnAll { get; set; }
public bool PretranslateAll { get; set; }
public HashSet<string> TrainOnTextIds { get; set; } = default!;
public HashSet<string> PretranslateTextIds { get; set; } = default!;
public List<CorpusFile> SourceFiles { get; set; } = default!;
public List<CorpusFile> TargetFiles { get; set; } = default!;
Expand Down
7 changes: 5 additions & 2 deletions src/SIL.Machine.AspNetCore/Services/NmtPreprocessBuildJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,11 @@ async IAsyncEnumerable<Pretranslation> ProcessRowsAsync()

foreach (ParallelTextRow row in parallelCorpus)
{
await sourceTrainWriter.WriteAsync($"{row.SourceText}\n");
await targetTrainWriter.WriteAsync($"{row.TargetText}\n");
if (corpus.TrainOnAll || corpus.TrainOnTextIds.Contains(row.TextId))
{
await sourceTrainWriter.WriteAsync($"{row.SourceText}\n");
await targetTrainWriter.WriteAsync($"{row.TargetText}\n");
}
if (
(corpus.PretranslateAll || corpus.PretranslateTextIds.Contains(row.TextId))
&& row.SourceSegment.Count > 0
Expand Down
128 changes: 84 additions & 44 deletions src/SIL.Machine.AspNetCore/Services/S3WriteStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,75 +55,114 @@ public override void Flush() { }

public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
try
if (count > 0)
{
using MemoryStream ms = new(buffer, offset, count);
int partNumber = _uploadResponses.Count + 1;
UploadPartRequest request =
new()
{
BucketName = _bucketName,
Key = _key,
UploadId = _uploadId,
PartNumber = partNumber,
InputStream = ms,
PartSize = MaxPartSize
};
request.StreamTransferProgress += new EventHandler<StreamTransferProgressArgs>(
(_, e) =>
{
_logger.LogDebug($"Transferred {e.TransferredBytes}/{e.TotalBytes}");
}
);
UploadPartResponse response = await _client.UploadPartAsync(request);
if (response.HttpStatusCode != HttpStatusCode.OK)
throw new HttpRequestException(
$"Tried to upload part {partNumber} of upload {_uploadId} to {_bucketName}/{_key} but received response code {response.HttpStatusCode}"
try
{
using MemoryStream ms = new(buffer, offset, count);
int partNumber = _uploadResponses.Count + 1;
UploadPartRequest request =
new()
{
BucketName = _bucketName,
Key = _key,
UploadId = _uploadId,
PartNumber = partNumber,
InputStream = ms,
PartSize = MaxPartSize
};
request.StreamTransferProgress += new EventHandler<StreamTransferProgressArgs>(
(_, e) =>
{
_logger.LogDebug($"Transferred {e.TransferredBytes}/{e.TotalBytes}");
}
);
_uploadResponses.Add(response);
}
catch (Exception e)
{
await AbortAsync(e);
throw;
UploadPartResponse response = await _client.UploadPartAsync(request);
if (response.HttpStatusCode != HttpStatusCode.OK)
throw new HttpRequestException(
$"Tried to upload part {partNumber} of upload {_uploadId} to {_bucketName}/{_key} but received response code {response.HttpStatusCode}"
);
_uploadResponses.Add(response);
}
catch (Exception e)
{
await AbortAsync(e);
throw;
}
}
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
try
if (_uploadResponses.Count == 0)
{
CompleteMultipartUploadRequest request =
AbortAsync().WaitAndUnwrapException();
PutObjectRequest request =
new()
{
BucketName = _bucketName,
Key = _key,
UploadId = _uploadId
ContentBody = ""
};
request.AddPartETags(_uploadResponses);
CompleteMultipartUploadResponse response = _client
.CompleteMultipartUploadAsync(request)
.WaitAndUnwrapException();
Dispose(disposing: false);
GC.SuppressFinalize(this);
PutObjectResponse response = _client.PutObjectAsync(request).WaitAndUnwrapException();
if (response.HttpStatusCode != HttpStatusCode.OK)
throw new HttpRequestException(
$"Tried to complete {_uploadId} to {_bucketName}/{_key} but received response code {response.HttpStatusCode}"
$"Tried to upload empty file to {_bucketName}/{_key} but received response code {response.HttpStatusCode}"
);
}
catch (Exception e)
else
{
AbortAsync(e).WaitAndUnwrapException();
throw;
try
{
CompleteMultipartUploadRequest request =
new()
{
BucketName = _bucketName,
Key = _key,
UploadId = _uploadId
};
request.AddPartETags(_uploadResponses);
CompleteMultipartUploadResponse response = _client
.CompleteMultipartUploadAsync(request)
.WaitAndUnwrapException();
Dispose(disposing: false);
GC.SuppressFinalize(this);
if (response.HttpStatusCode != HttpStatusCode.OK)
throw new HttpRequestException(
$"Tried to complete {_uploadId} to {_bucketName}/{_key} but received response code {response.HttpStatusCode}"
);
}
catch (Exception e)
{
AbortAsync(e).WaitAndUnwrapException();
throw;
}
}
}
base.Dispose(disposing);
}

public async override ValueTask DisposeAsync()
{
if (_uploadResponses.Count == 0)
{
await AbortAsync();
PutObjectRequest request =
new()
{
BucketName = _bucketName,
Key = _key,
ContentBody = ""
};
PutObjectResponse response = await _client.PutObjectAsync(request);
if (response.HttpStatusCode != HttpStatusCode.OK)
throw new HttpRequestException(
$"Tried to upload empty file to {_bucketName}/{_key} but received response code {response.HttpStatusCode}"
);
return;
}
try
{
CompleteMultipartUploadRequest request =
Expand All @@ -148,9 +187,10 @@ public async override ValueTask DisposeAsync()
}
}

private async Task AbortAsync(Exception e)
private async Task AbortAsync(Exception? e = null)
{
_logger.LogError(e, $"Aborted upload {_uploadId} to {_bucketName}/{_key}");
if (e is not null)
_logger.LogError(e, $"Aborted upload {_uploadId} to {_bucketName}/{_key}");
AbortMultipartUploadRequest abortMPURequest =
new()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,9 @@ private static Models.Corpus Map(Serval.Translation.V1.Corpus source)
Id = source.Id,
SourceLanguage = source.SourceLanguage,
TargetLanguage = source.TargetLanguage,
TrainOnAll = source.TrainOnAll,

Check failure on line 239 in src/SIL.Machine.AspNetCore/Services/ServalTranslationEngineServiceV1.cs

View workflow job for this annotation

GitHub Actions / Build

'Corpus' does not contain a definition for 'TrainOnAll' and no accessible extension method 'TrainOnAll' accepting a first argument of type 'Corpus' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 239 in src/SIL.Machine.AspNetCore/Services/ServalTranslationEngineServiceV1.cs

View workflow job for this annotation

GitHub Actions / Build

'Corpus' does not contain a definition for 'TrainOnAll' and no accessible extension method 'TrainOnAll' accepting a first argument of type 'Corpus' could be found (are you missing a using directive or an assembly reference?)
PretranslateAll = source.PretranslateAll,
TrainOnTextIds = source.TrainOnTextIds.ToHashSet(),

Check failure on line 241 in src/SIL.Machine.AspNetCore/Services/ServalTranslationEngineServiceV1.cs

View workflow job for this annotation

GitHub Actions / Build

'Corpus' does not contain a definition for 'TrainOnTextIds' and no accessible extension method 'TrainOnTextIds' accepting a first argument of type 'Corpus' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 241 in src/SIL.Machine.AspNetCore/Services/ServalTranslationEngineServiceV1.cs

View workflow job for this annotation

GitHub Actions / Build

'Corpus' does not contain a definition for 'TrainOnTextIds' and no accessible extension method 'TrainOnTextIds' accepting a first argument of type 'Corpus' could be found (are you missing a using directive or an assembly reference?)
PretranslateTextIds = source.PretranslateTextIds.ToHashSet(),
SourceFiles = source.SourceFiles.Select(Map).ToList(),
TargetFiles = source.TargetFiles.Select(Map).ToList()
Expand Down

0 comments on commit 2bdf579

Please sign in to comment.