Skip to content

Commit

Permalink
Merge pull request #47 from sillsdev/nmt_fixes
Browse files Browse the repository at this point in the history
Nmt fixes
  • Loading branch information
johnml1135 authored Aug 2, 2023
2 parents 6417fc6 + f8656f7 commit a035b5e
Show file tree
Hide file tree
Showing 19 changed files with 181 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@

public class ClearMLProject
{
[BsonId]
[BsonRepresentation(BsonType.ObjectId)]
public string Id { get; set; } = default!;
}
2 changes: 2 additions & 0 deletions src/Machine/src/SIL.Machine.AspNetCore/Models/ClearMLTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public enum ClearMLTaskStatus

public class ClearMLTask
{
[BsonId]
[BsonRepresentation(BsonType.ObjectId)]
public string Id { get; set; } = default!;
public string Name { get; set; } = default!;
public ClearMLProject Project { get; set; } = default!;
Expand Down
2 changes: 2 additions & 0 deletions src/Machine/src/SIL.Machine.AspNetCore/Models/Corpus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

public class Corpus
{
[BsonId]
[BsonRepresentation(BsonType.ObjectId)]
public string Id { get; set; } = default!;
public string SourceLanguage { get; set; } = default!;
public string TargetLanguage { get; set; } = default!;
Expand Down
2 changes: 2 additions & 0 deletions src/Machine/src/SIL.Machine.AspNetCore/Models/Lock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

public class Lock
{
[BsonId]
[BsonRepresentation(BsonType.ObjectId)]
public string Id { get; set; } = default!;
public DateTime? ExpiresAt { get; set; }
public string HostId { get; set; } = default!;
Expand Down
2 changes: 2 additions & 0 deletions src/Machine/src/SIL.Machine.AspNetCore/Models/RWLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

public class RWLock : IEntity
{
[BsonId]
[BsonRepresentation(BsonType.ObjectId)]
public string Id { get; set; } = default!;
public int Revision { get; set; }
public Lock? WriterLock { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

public class TrainSegmentPair : IEntity
{
[BsonId]
[BsonRepresentation(BsonType.ObjectId)]
public string Id { get; set; } = default!;
public int Revision { get; set; } = 1;
public string TranslationEngineRef { get; set; } = default!;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
<PackageReference Include="Hangfire.MemoryStorage" Version="1.7.0" />
<PackageReference Include="Hangfire.Mongo" Version="1.9.3" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.NewtonsoftJson" Version="6.0.16" />
<PackageReference Include="Serval.Grpc" Version="0.8.0" />
<PackageReference Include="SIL.DataAccess" Version="0.3.3" />
<PackageReference Include="Serval.Grpc" Version="0.8.0" Condition="!Exists('..\..\..\serval\src\Serval.Grpc\Serval.Grpc.csproj')" />
<PackageReference Include="SIL.DataAccess" Version="0.4.0" Condition="!Exists('..\..\..\serval\src\SIL.DataAccess\SIL.DataAccess.csproj')" />
<PackageReference Include="SIL.WritingSystems" Version="12.0.1" />
<PackageReference Include="Stowage" Version="1.2.6" />
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
Expand All @@ -43,6 +43,8 @@
<ProjectReference Include="..\SIL.Machine.Morphology.HermitCrab\SIL.Machine.Morphology.HermitCrab.csproj" />
<ProjectReference Include="..\SIL.Machine.Translation.Thot\SIL.Machine.Translation.Thot.csproj" />
<ProjectReference Include="..\SIL.Machine\SIL.Machine.csproj" />
<ProjectReference Include="..\..\..\serval\src\Serval.Grpc\Serval.Grpc.csproj" Condition="Exists('..\..\..\serval\src\Serval.Grpc\Serval.Grpc.csproj')" />
<ProjectReference Include="..\..\..\serval\src\SIL.DataAccess\SIL.DataAccess.csproj" Condition="Exists('..\..\..\serval\src\SIL.DataAccess\SIL.DataAccess.csproj')" />
</ItemGroup>

<Target Name="ZipThotNewModel" BeforeTargets="BeforeBuild">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,36 @@ public async Task RunAsync(
string engineId,
string buildId,
IReadOnlyList<Corpus> corpora,
CancellationToken cancellationToken
CancellationToken externalCancellationToken
)
{
string? clearMLProjectId = await _clearMLService.GetProjectIdAsync(engineId, cancellationToken);
string? clearMLProjectId = await _clearMLService.GetProjectIdAsync(engineId, externalCancellationToken);
if (clearMLProjectId is null)
return;

try
{
var combinedCancellationToken = new SubscribeForCancellation(_engines).GetCombinedCancellationToken(
engineId,
buildId,
externalCancellationToken
);

TranslationEngine? engine = await _engines.GetAsync(
e => e.EngineId == engineId && e.BuildId == buildId,
cancellationToken: cancellationToken
cancellationToken: combinedCancellationToken
);
if (engine is null || engine.IsCanceled)
throw new OperationCanceledException();

int corpusSize;
if (engine.BuildState is BuildState.Pending)
corpusSize = await WriteDataFilesAsync(buildId, corpora, cancellationToken);
corpusSize = await WriteDataFilesAsync(buildId, corpora, combinedCancellationToken);
else
corpusSize = GetCorpusSize(corpora);

string clearMLTaskId;
ClearMLTask? clearMLTask = await _clearMLService.GetTaskAsync(buildId, clearMLProjectId, cancellationToken);
ClearMLTask? clearMLTask = await _clearMLService.GetTaskByNameAsync(buildId, combinedCancellationToken);
if (clearMLTask is null)
{
clearMLTaskId = await _clearMLService.CreateTaskAsync(
Expand All @@ -68,7 +74,7 @@ CancellationToken cancellationToken
engine.SourceLanguage,
engine.TargetLanguage,
_sharedFileService.GetBaseUri().ToString(),
cancellationToken
combinedCancellationToken
);
await _clearMLService.EnqueueTaskAsync(clearMLTaskId, CancellationToken.None);
}
Expand All @@ -80,9 +86,9 @@ CancellationToken cancellationToken
int lastIteration = 0;
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
combinedCancellationToken.ThrowIfCancellationRequested();

clearMLTask = await _clearMLService.GetTaskAsync(clearMLTaskId, cancellationToken);
clearMLTask = await _clearMLService.GetTaskByIdAsync(clearMLTaskId, combinedCancellationToken);
if (clearMLTask is null)
throw new InvalidOperationException("The ClearML task does not exist.");

Expand All @@ -98,7 +104,7 @@ or ClearMLTaskStatus.Completed
engine = await _engines.UpdateAsync(
e => e.EngineId == engineId && e.BuildId == buildId && !e.IsCanceled,
u => u.Set(e => e.BuildState, BuildState.Active),
cancellationToken: cancellationToken
cancellationToken: combinedCancellationToken
);
if (engine is null)
throw new OperationCanceledException();
Expand Down Expand Up @@ -129,11 +135,11 @@ await _engines.UpdateAsync(
}
if (clearMLTask.Status is ClearMLTaskStatus.Completed)
break;
await Task.Delay(_options.CurrentValue.BuildPollingTimeout, cancellationToken);
await Task.Delay(_options.CurrentValue.BuildPollingTimeout, combinedCancellationToken);
}

// The ClearML task has successfully completed, so insert the generated pretranslations into the database.
await InsertPretranslationsAsync(engineId, buildId, cancellationToken);
await InsertPretranslationsAsync(engineId, buildId, combinedCancellationToken);

IReadOnlyDictionary<string, double> metrics = await _clearMLService.GetTaskMetricsAsync(
clearMLTaskId,
Expand Down Expand Up @@ -174,11 +180,7 @@ await _platformService.BuildCompletedAsync(
if (engine is null || engine.IsCanceled)
{
// This is an actual cancellation triggered by an API call.
ClearMLTask? task = await _clearMLService.GetTaskAsync(
buildId,
clearMLProjectId,
CancellationToken.None
);
ClearMLTask? task = await _clearMLService.GetTaskByNameAsync(buildId, CancellationToken.None);
if (task is not null)
await _clearMLService.StopTaskAsync(task.Id, CancellationToken.None);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ protected override Expression<Func<ClearMLNmtEngineBuildJob, Task>> GetJobExpres
IReadOnlyList<Corpus> corpora
)
{
// Token "None" is used here because hangfire injects the proper cancellation token
return r => r.RunAsync(engineId, buildId, corpora, CancellationToken.None);
}
}
16 changes: 4 additions & 12 deletions src/Machine/src/SIL.Machine.AspNetCore/Services/ClearMLService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,20 +145,12 @@ public async Task<bool> StopTaskAsync(string id, CancellationToken cancellationT
return updated == 1;
}

public Task<ClearMLTask?> GetTaskAsync(string name, string projectId, CancellationToken cancellationToken = default)
public Task<ClearMLTask?> GetTaskByNameAsync(string name, CancellationToken cancellationToken = default)
{
return GetTaskAsync(
new JsonObject
{
["id"] = new JsonArray(),
["name"] = name,
["project"] = new JsonArray(projectId)
},
cancellationToken
);
return GetTaskAsync(new JsonObject { ["name"] = name }, cancellationToken);
}

public Task<ClearMLTask?> GetTaskAsync(string id, CancellationToken cancellationToken = default)
public Task<ClearMLTask?> GetTaskByIdAsync(string id, CancellationToken cancellationToken = default)
{
return GetTaskAsync(new JsonObject { ["id"] = id }, cancellationToken);
}
Expand Down Expand Up @@ -206,7 +198,7 @@ public async Task<IReadOnlyDictionary<string, double>> GetTaskMetricsAsync(
"status_reason",
"active_duration"
);
JsonObject? result = await CallAsync("tasks", "get_by_id_ex", body, cancellationToken);
JsonObject? result = await CallAsync("tasks", "get_all_ex", body, cancellationToken);
var tasks = (JsonArray?)result?["data"]?["tasks"];
if (tasks is null || tasks.Count == 0)
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,23 @@ public class DistributedReaderWriterLock : IDistributedReaderWriterLock
private readonly IIdGenerator _idGenerator;
private readonly string _id;

private bool _lockChecked;

public DistributedReaderWriterLock(string hostId, IRepository<RWLock> locks, IIdGenerator idGenerator, string id)
{
_hostId = hostId;
_locks = locks;
_idGenerator = idGenerator;
_id = id;
_lockChecked = false;
}

public async Task<IAsyncDisposable> ReaderLockAsync(
TimeSpan? lifetime = default,
CancellationToken cancellationToken = default
)
{
await createLockIfNotExist();
string lockId = _idGenerator.GenerateId();
if (!await TryAcquireReaderLock(lockId, lifetime, cancellationToken))
{
Expand Down Expand Up @@ -49,6 +53,7 @@ public async Task<IAsyncDisposable> WriterLockAsync(
CancellationToken cancellationToken = default
)
{
await createLockIfNotExist();
string lockId = _idGenerator.GenerateId();
if (!await TryAcquireWriterLock(lockId, lifetime, cancellationToken))
{
Expand Down Expand Up @@ -96,45 +101,43 @@ await _locks.UpdateAsync(
return new WriterLockReleaser(this, lockId);
}

private async Task createLockIfNotExist()
{
if (_lockChecked == false)
{
if (!await _locks.ExistsAsync(e => e.Id == _id, CancellationToken.None))
await _locks.InsertAsync(new RWLock { Id = _id }, CancellationToken.None);
_lockChecked = true;
}
}

private async Task<bool> TryAcquireWriterLock(
string lockId,
TimeSpan? lifetime,
CancellationToken cancellationToken
)
{
try
var now = DateTime.UtcNow;
Expression<Func<RWLock, bool>> filter = rwl =>
rwl.Id == _id
&& (rwl.WriterLock == null || rwl.WriterLock.ExpiresAt != null && rwl.WriterLock.ExpiresAt <= now)
&& !rwl.ReaderLocks.Any(l => l.ExpiresAt == null || l.ExpiresAt > now)
&& (!rwl.WriterQueue.Any() || rwl.WriterQueue[0].Id == lockId);
void Update(IUpdateBuilder<RWLock> u)
{
var now = DateTime.UtcNow;
Expression<Func<RWLock, bool>> filter = rwl =>
rwl.Id == _id
&& (rwl.WriterLock == null || rwl.WriterLock.ExpiresAt != null && rwl.WriterLock.ExpiresAt <= now)
&& !rwl.ReaderLocks.Any(l => l.ExpiresAt == null || l.ExpiresAt > now)
&& (!rwl.WriterQueue.Any() || rwl.WriterQueue[0].Id == lockId);
void Update(IUpdateBuilder<RWLock> u)
{
u.Set(
rwl => rwl.WriterLock,
new Lock
{
Id = lockId,
ExpiresAt = lifetime is null ? null : now + lifetime,
HostId = _hostId
}
);
u.RemoveAll(rwl => rwl.WriterQueue, l => l.Id == lockId);
}
RWLock? rwLock = await _locks.UpdateAsync(
filter,
Update,
upsert: true,
cancellationToken: cancellationToken
u.Set(
rwl => rwl.WriterLock,
new Lock
{
Id = lockId,
ExpiresAt = lifetime is null ? null : now + lifetime,
HostId = _hostId
}
);
return rwLock is not null;
}
catch (DuplicateKeyException)
{
return false;
u.RemoveAll(rwl => rwl.WriterQueue, l => l.Id == lockId);
}
RWLock? rwLock = await _locks.UpdateAsync(filter, Update, cancellationToken: cancellationToken);
return rwLock is not null;
}

private async Task<bool> TryAcquireReaderLock(
Expand All @@ -143,38 +146,26 @@ private async Task<bool> TryAcquireReaderLock(
CancellationToken cancellationToken
)
{
try
var now = DateTime.UtcNow;
Expression<Func<RWLock, bool>> filter = rwl =>
rwl.Id == _id
&& (rwl.WriterLock == null || rwl.WriterLock.ExpiresAt != null && rwl.WriterLock.ExpiresAt <= now)
&& !rwl.WriterQueue.Any();
void Update(IUpdateBuilder<RWLock> u)
{
var now = DateTime.UtcNow;
Expression<Func<RWLock, bool>> filter = rwl =>
rwl.Id == _id
&& (rwl.WriterLock == null || rwl.WriterLock.ExpiresAt != null && rwl.WriterLock.ExpiresAt <= now)
&& !rwl.WriterQueue.Any();
void Update(IUpdateBuilder<RWLock> u)
{
u.Add(
rwl => rwl.ReaderLocks,
new Lock
{
Id = lockId,
ExpiresAt = lifetime is null ? null : now + lifetime,
HostId = _hostId
}
);
}

RWLock? rwLock = await _locks.UpdateAsync(
filter,
Update,
upsert: true,
cancellationToken: cancellationToken
u.Add(
rwl => rwl.ReaderLocks,
new Lock
{
Id = lockId,
ExpiresAt = lifetime is null ? null : now + lifetime,
HostId = _hostId
}
);
return rwLock is not null;
}
catch (DuplicateKeyException)
{
return false;
}

RWLock? rwLock = await _locks.UpdateAsync(filter, Update, cancellationToken: cancellationToken);
return rwLock is not null;
}

private class WriterLockReleaser : AsyncDisposableBase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
public class DistributedReaderWriterLockFactory : IDistributedReaderWriterLockFactory
{
private readonly ServiceOptions _serviceOptions;
private readonly IRepository<RWLock> _locks;
private readonly IIdGenerator _idGenerator;
private readonly IRepository<RWLock> _locks;

public DistributedReaderWriterLockFactory(
IOptions<ServiceOptions> serviceOptions,
Expand All @@ -31,7 +31,7 @@ public IDistributedReaderWriterLock Create(string id)

public async Task<bool> DeleteAsync(string id, CancellationToken cancellationToken = default)
{
RWLock? rwLock = await _locks.DeleteAsync(id, cancellationToken);
RWLock? rwLock = await _locks.DeleteAsync(rwl => rwl.Id == id, cancellationToken);
return rwLock is not null;
}

Expand Down
Loading

0 comments on commit a035b5e

Please sign in to comment.