From b8e50ca0280b3f94354224e2fbd5e7a265357253 Mon Sep 17 00:00:00 2001 From: John Lambert Date: Tue, 11 Jul 2023 12:41:32 -0400 Subject: [PATCH 1/3] * sillsdev/serval#40 Get SMT job cancellation working * Subscribe to mongoDB for build being cancelled * sillsdev/serval#36 - RWLock - remove upsert, change lock creation logic * sillsdev/serval#42 - add BSON attributes * hot-load serval projects when available for debugging data access without building new code * minor fixes and project updates --- .../Models/ClearMLProject.cs | 2 + .../Models/ClearMLTask.cs | 2 + .../SIL.Machine.AspNetCore/Models/Corpus.cs | 2 + .../src/SIL.Machine.AspNetCore/Models/Lock.cs | 2 + .../SIL.Machine.AspNetCore/Models/RWLock.cs | 2 + .../Models/TrainSegmentPair.cs | 2 + .../SIL.Machine.AspNetCore.csproj | 6 +- .../Services/DistributedReaderWriterLock.cs | 109 ++++++++---------- .../DistributedReaderWriterLockFactory.cs | 4 +- .../Services/SmtTransferEngineBuildJob.cs | 68 ++++++++--- .../src/SIL.Machine.AspNetCore/Usings.cs | 2 + .../Services/SmtTransferEngineServiceTests.cs | 7 +- 12 files changed, 129 insertions(+), 79 deletions(-) diff --git a/src/Machine/src/SIL.Machine.AspNetCore/Models/ClearMLProject.cs b/src/Machine/src/SIL.Machine.AspNetCore/Models/ClearMLProject.cs index 6d378428..5452612d 100644 --- a/src/Machine/src/SIL.Machine.AspNetCore/Models/ClearMLProject.cs +++ b/src/Machine/src/SIL.Machine.AspNetCore/Models/ClearMLProject.cs @@ -2,5 +2,7 @@ public class ClearMLProject { + [BsonId] + [BsonRepresentation(BsonType.ObjectId)] public string Id { get; set; } = default!; } diff --git a/src/Machine/src/SIL.Machine.AspNetCore/Models/ClearMLTask.cs b/src/Machine/src/SIL.Machine.AspNetCore/Models/ClearMLTask.cs index 256c0ea3..885bc730 100644 --- a/src/Machine/src/SIL.Machine.AspNetCore/Models/ClearMLTask.cs +++ b/src/Machine/src/SIL.Machine.AspNetCore/Models/ClearMLTask.cs @@ -16,6 +16,8 @@ public enum ClearMLTaskStatus public class ClearMLTask { + [BsonId] + [BsonRepresentation(BsonType.ObjectId)] public string Id { get; set; } = default!; public string Name { get; set; } = default!; public ClearMLProject Project { get; set; } = default!; diff --git a/src/Machine/src/SIL.Machine.AspNetCore/Models/Corpus.cs b/src/Machine/src/SIL.Machine.AspNetCore/Models/Corpus.cs index 84dda461..f07f2a7f 100644 --- a/src/Machine/src/SIL.Machine.AspNetCore/Models/Corpus.cs +++ b/src/Machine/src/SIL.Machine.AspNetCore/Models/Corpus.cs @@ -2,6 +2,8 @@ public class Corpus { + [BsonId] + [BsonRepresentation(BsonType.ObjectId)] public string Id { get; set; } = default!; public string SourceLanguage { get; set; } = default!; public string TargetLanguage { get; set; } = default!; diff --git a/src/Machine/src/SIL.Machine.AspNetCore/Models/Lock.cs b/src/Machine/src/SIL.Machine.AspNetCore/Models/Lock.cs index 87a38d66..1ae4ba5e 100644 --- a/src/Machine/src/SIL.Machine.AspNetCore/Models/Lock.cs +++ b/src/Machine/src/SIL.Machine.AspNetCore/Models/Lock.cs @@ -2,6 +2,8 @@ public class Lock { + [BsonId] + [BsonRepresentation(BsonType.ObjectId)] public string Id { get; set; } = default!; public DateTime? ExpiresAt { get; set; } public string HostId { get; set; } = default!; diff --git a/src/Machine/src/SIL.Machine.AspNetCore/Models/RWLock.cs b/src/Machine/src/SIL.Machine.AspNetCore/Models/RWLock.cs index 895cbcfb..548731e0 100644 --- a/src/Machine/src/SIL.Machine.AspNetCore/Models/RWLock.cs +++ b/src/Machine/src/SIL.Machine.AspNetCore/Models/RWLock.cs @@ -2,6 +2,8 @@ public class RWLock : IEntity { + [BsonId] + [BsonRepresentation(BsonType.ObjectId)] public string Id { get; set; } = default!; public int Revision { get; set; } public Lock? WriterLock { get; set; } diff --git a/src/Machine/src/SIL.Machine.AspNetCore/Models/TrainSegmentPair.cs b/src/Machine/src/SIL.Machine.AspNetCore/Models/TrainSegmentPair.cs index 8e1d40ea..c2fb583f 100644 --- a/src/Machine/src/SIL.Machine.AspNetCore/Models/TrainSegmentPair.cs +++ b/src/Machine/src/SIL.Machine.AspNetCore/Models/TrainSegmentPair.cs @@ -2,6 +2,8 @@ public class TrainSegmentPair : IEntity { + [BsonId] + [BsonRepresentation(BsonType.ObjectId)] public string Id { get; set; } = default!; public int Revision { get; set; } = 1; public string TranslationEngineRef { get; set; } = default!; diff --git a/src/Machine/src/SIL.Machine.AspNetCore/SIL.Machine.AspNetCore.csproj b/src/Machine/src/SIL.Machine.AspNetCore/SIL.Machine.AspNetCore.csproj index 5a50fb89..cc2d36f2 100644 --- a/src/Machine/src/SIL.Machine.AspNetCore/SIL.Machine.AspNetCore.csproj +++ b/src/Machine/src/SIL.Machine.AspNetCore/SIL.Machine.AspNetCore.csproj @@ -32,8 +32,8 @@ - - + + @@ -43,6 +43,8 @@ + + diff --git a/src/Machine/src/SIL.Machine.AspNetCore/Services/DistributedReaderWriterLock.cs b/src/Machine/src/SIL.Machine.AspNetCore/Services/DistributedReaderWriterLock.cs index aefd6245..a5b216ae 100644 --- a/src/Machine/src/SIL.Machine.AspNetCore/Services/DistributedReaderWriterLock.cs +++ b/src/Machine/src/SIL.Machine.AspNetCore/Services/DistributedReaderWriterLock.cs @@ -7,12 +7,15 @@ public class DistributedReaderWriterLock : IDistributedReaderWriterLock private readonly IIdGenerator _idGenerator; private readonly string _id; + private bool _lockChecked; + public DistributedReaderWriterLock(string hostId, IRepository locks, IIdGenerator idGenerator, string id) { _hostId = hostId; _locks = locks; _idGenerator = idGenerator; _id = id; + _lockChecked = false; } public async Task ReaderLockAsync( @@ -20,6 +23,7 @@ public async Task ReaderLockAsync( CancellationToken cancellationToken = default ) { + await createLockIfNotExist(); string lockId = _idGenerator.GenerateId(); if (!await TryAcquireReaderLock(lockId, lifetime, cancellationToken)) { @@ -49,6 +53,7 @@ public async Task WriterLockAsync( CancellationToken cancellationToken = default ) { + await createLockIfNotExist(); string lockId = _idGenerator.GenerateId(); if (!await TryAcquireWriterLock(lockId, lifetime, cancellationToken)) { @@ -96,45 +101,43 @@ await _locks.UpdateAsync( return new WriterLockReleaser(this, lockId); } + private async Task createLockIfNotExist() + { + if (_lockChecked == false) + { + if (!await _locks.ExistsAsync(e => e.Id == _id, CancellationToken.None)) + await _locks.InsertAsync(new RWLock { Id = _id }, CancellationToken.None); + _lockChecked = true; + } + } + private async Task TryAcquireWriterLock( string lockId, TimeSpan? lifetime, CancellationToken cancellationToken ) { - try + var now = DateTime.UtcNow; + Expression> filter = rwl => + rwl.Id == _id + && (rwl.WriterLock == null || rwl.WriterLock.ExpiresAt != null && rwl.WriterLock.ExpiresAt <= now) + && !rwl.ReaderLocks.Any(l => l.ExpiresAt == null || l.ExpiresAt > now) + && (!rwl.WriterQueue.Any() || rwl.WriterQueue[0].Id == lockId); + void Update(IUpdateBuilder u) { - var now = DateTime.UtcNow; - Expression> filter = rwl => - rwl.Id == _id - && (rwl.WriterLock == null || rwl.WriterLock.ExpiresAt != null && rwl.WriterLock.ExpiresAt <= now) - && !rwl.ReaderLocks.Any(l => l.ExpiresAt == null || l.ExpiresAt > now) - && (!rwl.WriterQueue.Any() || rwl.WriterQueue[0].Id == lockId); - void Update(IUpdateBuilder u) - { - u.Set( - rwl => rwl.WriterLock, - new Lock - { - Id = lockId, - ExpiresAt = lifetime is null ? null : now + lifetime, - HostId = _hostId - } - ); - u.RemoveAll(rwl => rwl.WriterQueue, l => l.Id == lockId); - } - RWLock? rwLock = await _locks.UpdateAsync( - filter, - Update, - upsert: true, - cancellationToken: cancellationToken + u.Set( + rwl => rwl.WriterLock, + new Lock + { + Id = lockId, + ExpiresAt = lifetime is null ? null : now + lifetime, + HostId = _hostId + } ); - return rwLock is not null; - } - catch (DuplicateKeyException) - { - return false; + u.RemoveAll(rwl => rwl.WriterQueue, l => l.Id == lockId); } + RWLock? rwLock = await _locks.UpdateAsync(filter, Update, cancellationToken: cancellationToken); + return rwLock is not null; } private async Task TryAcquireReaderLock( @@ -143,38 +146,26 @@ private async Task TryAcquireReaderLock( CancellationToken cancellationToken ) { - try + var now = DateTime.UtcNow; + Expression> filter = rwl => + rwl.Id == _id + && (rwl.WriterLock == null || rwl.WriterLock.ExpiresAt != null && rwl.WriterLock.ExpiresAt <= now) + && !rwl.WriterQueue.Any(); + void Update(IUpdateBuilder u) { - var now = DateTime.UtcNow; - Expression> filter = rwl => - rwl.Id == _id - && (rwl.WriterLock == null || rwl.WriterLock.ExpiresAt != null && rwl.WriterLock.ExpiresAt <= now) - && !rwl.WriterQueue.Any(); - void Update(IUpdateBuilder u) - { - u.Add( - rwl => rwl.ReaderLocks, - new Lock - { - Id = lockId, - ExpiresAt = lifetime is null ? null : now + lifetime, - HostId = _hostId - } - ); - } - - RWLock? rwLock = await _locks.UpdateAsync( - filter, - Update, - upsert: true, - cancellationToken: cancellationToken + u.Add( + rwl => rwl.ReaderLocks, + new Lock + { + Id = lockId, + ExpiresAt = lifetime is null ? null : now + lifetime, + HostId = _hostId + } ); - return rwLock is not null; - } - catch (DuplicateKeyException) - { - return false; } + + RWLock? rwLock = await _locks.UpdateAsync(filter, Update, cancellationToken: cancellationToken); + return rwLock is not null; } private class WriterLockReleaser : AsyncDisposableBase diff --git a/src/Machine/src/SIL.Machine.AspNetCore/Services/DistributedReaderWriterLockFactory.cs b/src/Machine/src/SIL.Machine.AspNetCore/Services/DistributedReaderWriterLockFactory.cs index c56f3f81..0f7c0b0e 100644 --- a/src/Machine/src/SIL.Machine.AspNetCore/Services/DistributedReaderWriterLockFactory.cs +++ b/src/Machine/src/SIL.Machine.AspNetCore/Services/DistributedReaderWriterLockFactory.cs @@ -3,8 +3,8 @@ public class DistributedReaderWriterLockFactory : IDistributedReaderWriterLockFactory { private readonly ServiceOptions _serviceOptions; - private readonly IRepository _locks; private readonly IIdGenerator _idGenerator; + private readonly IRepository _locks; public DistributedReaderWriterLockFactory( IOptions serviceOptions, @@ -31,7 +31,7 @@ public IDistributedReaderWriterLock Create(string id) public async Task DeleteAsync(string id, CancellationToken cancellationToken = default) { - RWLock? rwLock = await _locks.DeleteAsync(id, cancellationToken); + RWLock? rwLock = await _locks.DeleteAsync(rwl => rwl.Id == id, cancellationToken); return rwLock is not null; } diff --git a/src/Machine/src/SIL.Machine.AspNetCore/Services/SmtTransferEngineBuildJob.cs b/src/Machine/src/SIL.Machine.AspNetCore/Services/SmtTransferEngineBuildJob.cs index 2e172e1c..0ab1ad50 100644 --- a/src/Machine/src/SIL.Machine.AspNetCore/Services/SmtTransferEngineBuildJob.cs +++ b/src/Machine/src/SIL.Machine.AspNetCore/Services/SmtTransferEngineBuildJob.cs @@ -39,7 +39,7 @@ public async Task RunAsync( string engineId, string buildId, IReadOnlyList corpora, - CancellationToken cancellationToken + CancellationToken externalCancellationToken ) { IDistributedReaderWriterLock rwLock = _lockFactory.Create(engineId); @@ -50,23 +50,36 @@ CancellationToken cancellationToken ITrainer? truecaseTrainer = null; try { + CancellationTokenSource cts = new(); + SubscribeForCancellationAsync(cts, engineId, buildId); + CancellationTokenSource combinedCancellationSource = CancellationTokenSource.CreateLinkedTokenSource( + externalCancellationToken, + cts.Token + ); + var combinedCancellationToken = combinedCancellationSource.Token; + var stopwatch = new Stopwatch(); TranslationEngine? engine; - await using (await rwLock.WriterLockAsync(cancellationToken: cancellationToken)) + await using (await rwLock.WriterLockAsync(cancellationToken: combinedCancellationToken)) { engine = await _engines.UpdateAsync( e => e.EngineId == engineId && e.BuildId == buildId && !e.IsCanceled, u => u.Set(e => e.BuildState, BuildState.Active), - cancellationToken: CancellationToken.None + cancellationToken: combinedCancellationToken ); if (engine is null) throw new OperationCanceledException(); - await _platformService.BuildStartedAsync(buildId, cancellationToken); + await _platformService.BuildStartedAsync(buildId, combinedCancellationToken); _logger.LogInformation("Build started ({0})", buildId); stopwatch.Start(); - await _trainSegmentPairs.DeleteAllAsync(p => p.TranslationEngineRef == engineId, cancellationToken); + await _trainSegmentPairs.DeleteAllAsync( + p => p.TranslationEngineRef == engineId, + combinedCancellationToken + ); + + combinedCancellationToken.ThrowIfCancellationRequested(); var targetCorpora = new List(); var parallelCorpora = new List(); @@ -86,19 +99,22 @@ CancellationToken cancellationToken truecaseTrainer = _truecaserFactory.CreateTrainer(engineId, tokenizer, targetCorpus); } + combinedCancellationToken.ThrowIfCancellationRequested(); + var progress = new BuildProgress(_platformService, buildId); - await smtModelTrainer.TrainAsync(progress, cancellationToken); - await truecaseTrainer.TrainAsync(cancellationToken: cancellationToken); + await smtModelTrainer.TrainAsync(progress, combinedCancellationToken); + await truecaseTrainer.TrainAsync(cancellationToken: combinedCancellationToken); int trainSegmentPairCount; - await using (await rwLock.WriterLockAsync(cancellationToken: cancellationToken)) + await using (await rwLock.WriterLockAsync(cancellationToken: combinedCancellationToken)) { - cancellationToken.ThrowIfCancellationRequested(); - await smtModelTrainer.SaveAsync(CancellationToken.None); - await truecaseTrainer.SaveAsync(CancellationToken.None); + combinedCancellationToken.ThrowIfCancellationRequested(); + await smtModelTrainer.SaveAsync(combinedCancellationToken); + await truecaseTrainer.SaveAsync(combinedCancellationToken); + combinedCancellationToken.ThrowIfCancellationRequested(); ITruecaser truecaser = await _truecaserFactory.CreateAsync(engineId); IReadOnlyList segmentPairs = await _trainSegmentPairs.GetAllAsync( - p => p.TranslationEngineRef == engine.Id, - CancellationToken.None + p => p.TranslationEngineRef == engine!.Id, + combinedCancellationToken ); using ( IInteractiveTranslationModel smtModel = _smtModelFactory.Create( @@ -114,8 +130,9 @@ CancellationToken cancellationToken await smtModel.TrainSegmentAsync( segmentPair.Source, segmentPair.Target, - cancellationToken: CancellationToken.None + cancellationToken: combinedCancellationToken ); + combinedCancellationToken.ThrowIfCancellationRequested(); } } @@ -209,4 +226,27 @@ await _engines.UpdateAsync( truecaseTrainer?.Dispose(); } } + + private async void SubscribeForCancellationAsync(CancellationTokenSource cts, string engineId, string buildId) + { + var cancellationToken = cts.Token; + ISubscription sub = await _engines.SubscribeAsync( + e => e.EngineId == engineId && e.BuildId == buildId + ); + if (sub.Change.Entity is null) + return; + while (true) + { + await sub.WaitForChangeAsync(TimeSpan.FromSeconds(10), cancellationToken); + TranslationEngine? engine = sub.Change.Entity; + if (engine is null || engine.IsCanceled) + { + cts.Cancel(); + return; + } + if (cancellationToken.IsCancellationRequested) + return; + Thread.Sleep(500); + } + } } diff --git a/src/Machine/src/SIL.Machine.AspNetCore/Usings.cs b/src/Machine/src/SIL.Machine.AspNetCore/Usings.cs index c15aea48..8c21e10e 100644 --- a/src/Machine/src/SIL.Machine.AspNetCore/Usings.cs +++ b/src/Machine/src/SIL.Machine.AspNetCore/Usings.cs @@ -29,6 +29,8 @@ global using Microsoft.Extensions.Hosting; global using Microsoft.Extensions.Logging; global using Microsoft.Extensions.Options; +global using MongoDB.Bson; +global using MongoDB.Bson.Serialization.Attributes; global using MongoDB.Driver; global using MongoDB.Driver.Linq; global using Nito.AsyncEx; diff --git a/src/Machine/test/SIL.Machine.AspNetCore.Tests/Services/SmtTransferEngineServiceTests.cs b/src/Machine/test/SIL.Machine.AspNetCore.Tests/Services/SmtTransferEngineServiceTests.cs index 9bd8a674..72c75b45 100644 --- a/src/Machine/test/SIL.Machine.AspNetCore.Tests/Services/SmtTransferEngineServiceTests.cs +++ b/src/Machine/test/SIL.Machine.AspNetCore.Tests/Services/SmtTransferEngineServiceTests.cs @@ -21,8 +21,8 @@ await env.SmtBatchTrainer await env.TruecaserTrainer .Received() .TrainAsync(Arg.Any>(), Arg.Any()); - await env.SmtBatchTrainer.Received().SaveAsync(); - await env.TruecaserTrainer.Received().SaveAsync(); + await env.SmtBatchTrainer.Received().SaveAsync(Arg.Any()); + await env.TruecaserTrainer.Received().SaveAsync(Arg.Any()); engine = env.Engines.Get("engine1"); Assert.That(engine.BuildState, Is.EqualTo(BuildState.None)); Assert.That(engine.BuildRevision, Is.EqualTo(1)); @@ -67,7 +67,10 @@ await env.SmtBatchTrainer.TrainAsync( Arg.Do(ct => { while (true) + { ct.ThrowIfCancellationRequested(); + Thread.Sleep(100); + } }) ); await env.Service.StartBuildAsync("engine1", "build1", Array.Empty()); From dc2dadefe9ce18aeea258136cbbfb44890ecafbc Mon Sep 17 00:00:00 2001 From: John Lambert Date: Tue, 11 Jul 2023 21:34:05 -0400 Subject: [PATCH 2/3] sillsdev/serval#32 Fixes for NMT cancellation * Fix "get task by name" from ClearML * Use same sub for cancellation as from SMT - and refractor * Fix tests --- .../Services/ClearMLNmtEngineBuildJob.cs | 34 +++++++------ .../Services/ClearMLNmtEngineService.cs | 1 + .../Services/ClearMLService.cs | 16 ++---- .../Services/IClearMLService.cs | 4 +- .../Services/SmtTransferEngineBuildJob.cs | 33 ++----------- .../Services/SmtTransferEngineService.cs | 1 + .../Services/SubscribeForCancellation.cs | 49 +++++++++++++++++++ .../Services/ClearMLNmtEngineServiceTests.cs | 4 +- 8 files changed, 81 insertions(+), 61 deletions(-) create mode 100644 src/Machine/src/SIL.Machine.AspNetCore/Services/SubscribeForCancellation.cs diff --git a/src/Machine/src/SIL.Machine.AspNetCore/Services/ClearMLNmtEngineBuildJob.cs b/src/Machine/src/SIL.Machine.AspNetCore/Services/ClearMLNmtEngineBuildJob.cs index 10839bde..bff6ee06 100644 --- a/src/Machine/src/SIL.Machine.AspNetCore/Services/ClearMLNmtEngineBuildJob.cs +++ b/src/Machine/src/SIL.Machine.AspNetCore/Services/ClearMLNmtEngineBuildJob.cs @@ -35,30 +35,36 @@ public async Task RunAsync( string engineId, string buildId, IReadOnlyList corpora, - CancellationToken cancellationToken + CancellationToken externalCancellationToken ) { - string? clearMLProjectId = await _clearMLService.GetProjectIdAsync(engineId, cancellationToken); + string? clearMLProjectId = await _clearMLService.GetProjectIdAsync(engineId, externalCancellationToken); if (clearMLProjectId is null) return; try { + var combinedCancellationToken = new SubscribeForCancellation(_engines).GetCombinedCancellationToken( + engineId, + buildId, + externalCancellationToken + ); + TranslationEngine? engine = await _engines.GetAsync( e => e.EngineId == engineId && e.BuildId == buildId, - cancellationToken: cancellationToken + cancellationToken: combinedCancellationToken ); if (engine is null || engine.IsCanceled) throw new OperationCanceledException(); int corpusSize; if (engine.BuildState is BuildState.Pending) - corpusSize = await WriteDataFilesAsync(buildId, corpora, cancellationToken); + corpusSize = await WriteDataFilesAsync(buildId, corpora, combinedCancellationToken); else corpusSize = GetCorpusSize(corpora); string clearMLTaskId; - ClearMLTask? clearMLTask = await _clearMLService.GetTaskAsync(buildId, clearMLProjectId, cancellationToken); + ClearMLTask? clearMLTask = await _clearMLService.GetTaskByNameAsync(buildId, combinedCancellationToken); if (clearMLTask is null) { clearMLTaskId = await _clearMLService.CreateTaskAsync( @@ -68,7 +74,7 @@ CancellationToken cancellationToken engine.SourceLanguage, engine.TargetLanguage, _sharedFileService.GetBaseUri().ToString(), - cancellationToken + combinedCancellationToken ); await _clearMLService.EnqueueTaskAsync(clearMLTaskId, CancellationToken.None); } @@ -80,9 +86,9 @@ CancellationToken cancellationToken int lastIteration = 0; while (true) { - cancellationToken.ThrowIfCancellationRequested(); + combinedCancellationToken.ThrowIfCancellationRequested(); - clearMLTask = await _clearMLService.GetTaskAsync(clearMLTaskId, cancellationToken); + clearMLTask = await _clearMLService.GetTaskByIdAsync(clearMLTaskId, combinedCancellationToken); if (clearMLTask is null) throw new InvalidOperationException("The ClearML task does not exist."); @@ -98,7 +104,7 @@ or ClearMLTaskStatus.Completed engine = await _engines.UpdateAsync( e => e.EngineId == engineId && e.BuildId == buildId && !e.IsCanceled, u => u.Set(e => e.BuildState, BuildState.Active), - cancellationToken: cancellationToken + cancellationToken: combinedCancellationToken ); if (engine is null) throw new OperationCanceledException(); @@ -129,11 +135,11 @@ await _engines.UpdateAsync( } if (clearMLTask.Status is ClearMLTaskStatus.Completed) break; - await Task.Delay(_options.CurrentValue.BuildPollingTimeout, cancellationToken); + await Task.Delay(_options.CurrentValue.BuildPollingTimeout, combinedCancellationToken); } // The ClearML task has successfully completed, so insert the generated pretranslations into the database. - await InsertPretranslationsAsync(engineId, buildId, cancellationToken); + await InsertPretranslationsAsync(engineId, buildId, combinedCancellationToken); IReadOnlyDictionary metrics = await _clearMLService.GetTaskMetricsAsync( clearMLTaskId, @@ -174,11 +180,7 @@ await _platformService.BuildCompletedAsync( if (engine is null || engine.IsCanceled) { // This is an actual cancellation triggered by an API call. - ClearMLTask? task = await _clearMLService.GetTaskAsync( - buildId, - clearMLProjectId, - CancellationToken.None - ); + ClearMLTask? task = await _clearMLService.GetTaskByNameAsync(buildId, CancellationToken.None); if (task is not null) await _clearMLService.StopTaskAsync(task.Id, CancellationToken.None); diff --git a/src/Machine/src/SIL.Machine.AspNetCore/Services/ClearMLNmtEngineService.cs b/src/Machine/src/SIL.Machine.AspNetCore/Services/ClearMLNmtEngineService.cs index 1be52195..42eb064e 100644 --- a/src/Machine/src/SIL.Machine.AspNetCore/Services/ClearMLNmtEngineService.cs +++ b/src/Machine/src/SIL.Machine.AspNetCore/Services/ClearMLNmtEngineService.cs @@ -45,6 +45,7 @@ protected override Expression> GetJobExpres IReadOnlyList corpora ) { + // Token "None" is used here because hangfire injects the proper cancellation token return r => r.RunAsync(engineId, buildId, corpora, CancellationToken.None); } } diff --git a/src/Machine/src/SIL.Machine.AspNetCore/Services/ClearMLService.cs b/src/Machine/src/SIL.Machine.AspNetCore/Services/ClearMLService.cs index 0d8eb18a..a215ba82 100644 --- a/src/Machine/src/SIL.Machine.AspNetCore/Services/ClearMLService.cs +++ b/src/Machine/src/SIL.Machine.AspNetCore/Services/ClearMLService.cs @@ -145,20 +145,12 @@ public async Task StopTaskAsync(string id, CancellationToken cancellationT return updated == 1; } - public Task GetTaskAsync(string name, string projectId, CancellationToken cancellationToken = default) + public Task GetTaskByNameAsync(string name, CancellationToken cancellationToken = default) { - return GetTaskAsync( - new JsonObject - { - ["id"] = new JsonArray(), - ["name"] = name, - ["project"] = new JsonArray(projectId) - }, - cancellationToken - ); + return GetTaskAsync(new JsonObject { ["name"] = name }, cancellationToken); } - public Task GetTaskAsync(string id, CancellationToken cancellationToken = default) + public Task GetTaskByIdAsync(string id, CancellationToken cancellationToken = default) { return GetTaskAsync(new JsonObject { ["id"] = id }, cancellationToken); } @@ -206,7 +198,7 @@ public async Task> GetTaskMetricsAsync( "status_reason", "active_duration" ); - JsonObject? result = await CallAsync("tasks", "get_by_id_ex", body, cancellationToken); + JsonObject? result = await CallAsync("tasks", "get_all_ex", body, cancellationToken); var tasks = (JsonArray?)result?["data"]?["tasks"]; if (tasks is null || tasks.Count == 0) return null; diff --git a/src/Machine/src/SIL.Machine.AspNetCore/Services/IClearMLService.cs b/src/Machine/src/SIL.Machine.AspNetCore/Services/IClearMLService.cs index b76152cd..ea9bc760 100644 --- a/src/Machine/src/SIL.Machine.AspNetCore/Services/IClearMLService.cs +++ b/src/Machine/src/SIL.Machine.AspNetCore/Services/IClearMLService.cs @@ -22,8 +22,8 @@ Task CreateTaskAsync( Task EnqueueTaskAsync(string id, CancellationToken cancellationToken = default); Task DequeueTaskAsync(string id, CancellationToken cancellationToken = default); Task StopTaskAsync(string id, CancellationToken cancellationToken = default); - Task GetTaskAsync(string name, string projectId, CancellationToken cancellationToken = default); - Task GetTaskAsync(string id, CancellationToken cancellationToken = default); + Task GetTaskByNameAsync(string name, CancellationToken cancellationToken = default); + Task GetTaskByIdAsync(string id, CancellationToken cancellationToken = default); Task> GetTaskMetricsAsync( string id, CancellationToken cancellationToken = default diff --git a/src/Machine/src/SIL.Machine.AspNetCore/Services/SmtTransferEngineBuildJob.cs b/src/Machine/src/SIL.Machine.AspNetCore/Services/SmtTransferEngineBuildJob.cs index 0ab1ad50..a0c8bcb7 100644 --- a/src/Machine/src/SIL.Machine.AspNetCore/Services/SmtTransferEngineBuildJob.cs +++ b/src/Machine/src/SIL.Machine.AspNetCore/Services/SmtTransferEngineBuildJob.cs @@ -50,13 +50,11 @@ CancellationToken externalCancellationToken ITrainer? truecaseTrainer = null; try { - CancellationTokenSource cts = new(); - SubscribeForCancellationAsync(cts, engineId, buildId); - CancellationTokenSource combinedCancellationSource = CancellationTokenSource.CreateLinkedTokenSource( - externalCancellationToken, - cts.Token + var combinedCancellationToken = new SubscribeForCancellation(_engines).GetCombinedCancellationToken( + engineId, + buildId, + externalCancellationToken ); - var combinedCancellationToken = combinedCancellationSource.Token; var stopwatch = new Stopwatch(); TranslationEngine? engine; @@ -226,27 +224,4 @@ await _engines.UpdateAsync( truecaseTrainer?.Dispose(); } } - - private async void SubscribeForCancellationAsync(CancellationTokenSource cts, string engineId, string buildId) - { - var cancellationToken = cts.Token; - ISubscription sub = await _engines.SubscribeAsync( - e => e.EngineId == engineId && e.BuildId == buildId - ); - if (sub.Change.Entity is null) - return; - while (true) - { - await sub.WaitForChangeAsync(TimeSpan.FromSeconds(10), cancellationToken); - TranslationEngine? engine = sub.Change.Entity; - if (engine is null || engine.IsCanceled) - { - cts.Cancel(); - return; - } - if (cancellationToken.IsCancellationRequested) - return; - Thread.Sleep(500); - } - } } diff --git a/src/Machine/src/SIL.Machine.AspNetCore/Services/SmtTransferEngineService.cs b/src/Machine/src/SIL.Machine.AspNetCore/Services/SmtTransferEngineService.cs index 8877986d..78278c37 100644 --- a/src/Machine/src/SIL.Machine.AspNetCore/Services/SmtTransferEngineService.cs +++ b/src/Machine/src/SIL.Machine.AspNetCore/Services/SmtTransferEngineService.cs @@ -167,6 +167,7 @@ protected override Expression> GetJobExpre IReadOnlyList corpora ) { + // Token "None" is used here because hangfire injects the proper cancellation token return r => r.RunAsync(engineId, buildId, corpora, CancellationToken.None); } diff --git a/src/Machine/src/SIL.Machine.AspNetCore/Services/SubscribeForCancellation.cs b/src/Machine/src/SIL.Machine.AspNetCore/Services/SubscribeForCancellation.cs new file mode 100644 index 00000000..c13915fd --- /dev/null +++ b/src/Machine/src/SIL.Machine.AspNetCore/Services/SubscribeForCancellation.cs @@ -0,0 +1,49 @@ +namespace SIL.Machine.AspNetCore.Services; + +public class SubscribeForCancellation +{ + private readonly IRepository _engines; + + public SubscribeForCancellation(IRepository engines) + { + _engines = engines; + } + + public CancellationToken GetCombinedCancellationToken( + string engineId, + string buildId, + CancellationToken externalCancellationToken + ) + { + CancellationTokenSource cts = new(); + SubscribeForCancellationAsync(cts, engineId, buildId); + CancellationTokenSource combinedCancellationSource = CancellationTokenSource.CreateLinkedTokenSource( + externalCancellationToken, + cts.Token + ); + return combinedCancellationSource.Token; + } + + private async void SubscribeForCancellationAsync(CancellationTokenSource cts, string engineId, string buildId) + { + var cancellationToken = cts.Token; + ISubscription sub = await _engines.SubscribeAsync( + e => e.EngineId == engineId && e.BuildId == buildId + ); + if (sub.Change.Entity is null) + return; + while (true) + { + await sub.WaitForChangeAsync(TimeSpan.FromSeconds(10), cancellationToken); + TranslationEngine? engine = sub.Change.Entity; + if (engine is null || engine.IsCanceled) + { + cts.Cancel(); + return; + } + if (cancellationToken.IsCancellationRequested) + return; + Thread.Sleep(500); + } + } +} diff --git a/src/Machine/test/SIL.Machine.AspNetCore.Tests/Services/ClearMLNmtEngineServiceTests.cs b/src/Machine/test/SIL.Machine.AspNetCore.Tests/Services/ClearMLNmtEngineServiceTests.cs index 8ad617a8..f9d01a7d 100644 --- a/src/Machine/test/SIL.Machine.AspNetCore.Tests/Services/ClearMLNmtEngineServiceTests.cs +++ b/src/Machine/test/SIL.Machine.AspNetCore.Tests/Services/ClearMLNmtEngineServiceTests.cs @@ -26,7 +26,7 @@ public async Task CancelBuildAsync() }; bool first = true; env.ClearMLService - .GetTaskAsync(Arg.Any(), "project1", Arg.Any()) + .GetTaskByNameAsync(Arg.Any(), Arg.Any()) .Returns(x => { if (first) @@ -37,7 +37,7 @@ public async Task CancelBuildAsync() return Task.FromResult(task); }); env.ClearMLService - .GetTaskAsync("task1", Arg.Any()) + .GetTaskByIdAsync("task1", Arg.Any()) .Returns(Task.FromResult(task)); await env.Service.StartBuildAsync("engine1", "build1", Array.Empty()); await env.WaitForBuildToStartAsync(); From f8656f7a038996ffede18ae55a7b3e606d349ec2 Mon Sep 17 00:00:00 2001 From: John Lambert Date: Wed, 12 Jul 2023 14:05:38 -0400 Subject: [PATCH 3/3] Update SIL.dataAccess --- .../src/SIL.Machine.AspNetCore/SIL.Machine.AspNetCore.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Machine/src/SIL.Machine.AspNetCore/SIL.Machine.AspNetCore.csproj b/src/Machine/src/SIL.Machine.AspNetCore/SIL.Machine.AspNetCore.csproj index cc2d36f2..dc159463 100644 --- a/src/Machine/src/SIL.Machine.AspNetCore/SIL.Machine.AspNetCore.csproj +++ b/src/Machine/src/SIL.Machine.AspNetCore/SIL.Machine.AspNetCore.csproj @@ -33,7 +33,7 @@ - +