Skip to content

Commit

Permalink
broken, but mostly there.
Browse files Browse the repository at this point in the history
  • Loading branch information
johnml1135 committed May 20, 2024
1 parent 5aac7fa commit 7b7b390
Show file tree
Hide file tree
Showing 27 changed files with 164 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ public class BuildJobOptions
{
public const string Key = "BuildJob";

public IList<ClearMLBuildJobOptions> ClearML { get; set; } = new List<ClearMLBuildJobOptions>();
public IList<ClearMLBuildQueue> ClearML { get; set; } = new List<ClearMLBuildQueue>();
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
namespace SIL.Machine.AspNetCore.Configuration;

public class ClearMLBuildJobOptions
public class ClearMLBuildQueue
{
public const string Key = "ClearMLBuildJob";

public TranslationEngineType TranslationEngineType { get; set; }
public string ModelType { get; set; } = "";
public string Queue { get; set; } = "default";
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ await c.Indexes.CreateOrUpdateAsync(
);
await c.Indexes.CreateOrUpdateAsync(
new CreateIndexModel<TranslationEngine>(
Builders<TranslationEngine>.IndexKeys.Ascending(e => e.CurrentBuild!.JobRunner)
Builders<TranslationEngine>.IndexKeys.Ascending(e => e.CurrentBuild!.BuildJobRunner)
)
);
}
Expand Down Expand Up @@ -358,14 +358,14 @@ public static IMachineBuilder AddServalTranslationEngineService(

public static IMachineBuilder AddBuildJobService(this IMachineBuilder builder)
{
builder.Services.AddScoped<IBuildJobService, BuildJobService>();

if (builder.Configuration is not null)
{
builder.Services.AddScoped<IBuildJobService, BuildJobService>();

builder.Services.AddScoped<IBuildJobRunner, ClearMLBuildJobRunner>();
builder.Services.AddScoped<IClearMLBuildJobFactory, NmtClearMLBuildJobFactory>();
builder.Services.AddScoped<IClearMLBuildJobFactory, SmtTransferClearMLBuildJobFactory>();
builder.Services.AddSingleton<ClearMLMonitorService>();
builder.Services.AddSingleton<IClearMLQueueService, ClearMLMonitorService>();
builder.Services.AddHostedService(p => p.GetRequiredService<ClearMLMonitorService>());

builder.Services.AddScoped<IBuildJobRunner, HangfireBuildJobRunner>();
Expand Down
4 changes: 2 additions & 2 deletions src/SIL.Machine.AspNetCore/Models/Build.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public enum BuildJobState
Canceling
}

public enum JobRunnerType
public enum BuildJobRunnerType
{
Hangfire,
ClearML
Expand All @@ -26,7 +26,7 @@ public record Build
public required string BuildId { get; init; }
public required BuildJobState JobState { get; init; }
public required string JobId { get; init; }
public required JobRunnerType JobRunner { get; init; }
public required BuildJobRunnerType BuildJobRunner { get; init; }
public required BuildStage Stage { get; init; }
public string? Options { get; set; }
}
2 changes: 1 addition & 1 deletion src/SIL.Machine.AspNetCore/Models/TranslationEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ public record TranslationEngine : IEntity
public string Id { get; set; } = "";
public int Revision { get; set; } = 1;
public required string EngineId { get; init; }
public required TranslationEngineType Type { get; init; } = TranslationEngineType.Nmt;
public required TranslationEngineType Type { get; init; }
public required string SourceLanguage { get; init; }
public required string TargetLanguage { get; init; }
public required bool IsModelPersisted { get; init; }
Expand Down
35 changes: 16 additions & 19 deletions src/SIL.Machine.AspNetCore/Services/BuildJobService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
public class BuildJobService(IEnumerable<IBuildJobRunner> runners, IRepository<TranslationEngine> engines)
: IBuildJobService
{
private readonly Dictionary<JobRunnerType, IBuildJobRunner> _runners = runners.ToDictionary(r => r.Type);
private readonly Dictionary<BuildJobRunnerType, IBuildJobRunner> _runners = runners.ToDictionary(r => r.Type);
private readonly IRepository<TranslationEngine> _engines = engines;

public Task<bool> IsEngineBuilding(string engineId, CancellationToken cancellationToken = default)
Expand All @@ -12,12 +12,12 @@ public Task<bool> IsEngineBuilding(string engineId, CancellationToken cancellati
}

public Task<IReadOnlyList<TranslationEngine>> GetBuildingEnginesAsync(
JobRunnerType runner,
BuildJobRunnerType runner,
CancellationToken cancellationToken = default
)
{
return _engines.GetAllAsync(
e => e.CurrentBuild != null && e.CurrentBuild.JobRunner == runner,
e => e.CurrentBuild != null && e.CurrentBuild.BuildJobRunner == runner,
cancellationToken
);
}
Expand All @@ -41,7 +41,7 @@ public async Task CreateEngineAsync(
CancellationToken cancellationToken = default
)
{
foreach (JobRunnerType runnerType in _runners.Keys)
foreach (BuildJobRunnerType runnerType in _runners.Keys)
{
IBuildJobRunner runner = _runners[runnerType];
await runner.CreateEngineAsync(engineId, name, cancellationToken);
Expand All @@ -50,15 +50,15 @@ public async Task CreateEngineAsync(

public async Task DeleteEngineAsync(string engineId, CancellationToken cancellationToken = default)
{
foreach (JobRunnerType runnerType in _runners.Keys)
foreach (BuildJobRunnerType runnerType in _runners.Keys)
{
IBuildJobRunner runner = _runners[runnerType];
await runner.DeleteEngineAsync(engineId, cancellationToken);
}
}

public async Task<bool> StartBuildJobAsync(
JobRunnerType runnerType,
BuildJobRunnerType runnerType,
string engineId,
string buildId,
BuildStage stage,
Expand All @@ -67,18 +67,15 @@ public async Task<bool> StartBuildJobAsync(
CancellationToken cancellationToken = default
)
{
if (
!await _engines.ExistsAsync(
e =>
e.EngineId == engineId
&& (e.CurrentBuild == null || e.CurrentBuild.JobState != BuildJobState.Canceling),
cancellationToken
)
)
{
TranslationEngine? engine = await _engines.GetAsync(
e =>
e.EngineId == engineId
&& (e.CurrentBuild == null || e.CurrentBuild.JobState != BuildJobState.Canceling),
cancellationToken
);
if (engine is null)
return false;
}
TranslationEngine engine = (await _engines.GetAsync(e => e.EngineId == engineId, cancellationToken))!;

IBuildJobRunner runner = _runners[runnerType];
string jobId = await runner.CreateJobAsync(
engine.Type,
Expand All @@ -100,7 +97,7 @@ await _engines.UpdateAsync(
{
BuildId = buildId,
JobId = jobId,
JobRunner = runner.Type,
BuildJobRunner = runner.Type,
Stage = stage,
JobState = BuildJobState.Pending,
Options = buildOptions
Expand Down Expand Up @@ -130,7 +127,7 @@ await _engines.UpdateAsync(
if (engine is null || engine.CurrentBuild is null)
return (null, BuildJobState.None);

IBuildJobRunner runner = _runners[engine.CurrentBuild.JobRunner];
IBuildJobRunner runner = _runners[engine.CurrentBuild.BuildJobRunner];

if (engine.CurrentBuild.JobState is BuildJobState.Pending)
{
Expand Down
4 changes: 2 additions & 2 deletions src/SIL.Machine.AspNetCore/Services/ClearMLBuildJobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ IOptionsMonitor<BuildJobOptions> options
private readonly Dictionary<TranslationEngineType, IClearMLBuildJobFactory> _buildJobFactories =
buildJobFactories.ToDictionary(f => f.EngineType);

private readonly Dictionary<TranslationEngineType, ClearMLBuildJobOptions> _options =
private readonly Dictionary<TranslationEngineType, ClearMLBuildQueue> _options =
options.CurrentValue.ClearML.ToDictionary(o => o.TranslationEngineType);

public JobRunnerType Type => JobRunnerType.ClearML;
public BuildJobRunnerType Type => BuildJobRunnerType.ClearML;

public async Task CreateEngineAsync(
string engineId,
Expand Down
14 changes: 8 additions & 6 deletions src/SIL.Machine.AspNetCore/Services/ClearMLHealthCheck.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System.Collections.Immutable;

namespace SIL.Machine.AspNetCore.Services;

public class ClearMLHealthCheck(
Expand All @@ -8,10 +10,10 @@ IOptionsMonitor<BuildJobOptions> buildJobOptions
{
private readonly HttpClient _httpClient = httpClientFactory.CreateClient("ClearML-NoRetry");
private readonly IClearMLAuthenticationService _clearMLAuthenticationService = clearMLAuthenticationService;
private readonly IReadOnlyList<string> _queuesMonitored = buildJobOptions
private readonly ISet<string> _queuesMonitored = buildJobOptions
.CurrentValue.ClearML.Select(x => x.Queue)
.Distinct()
.ToList();
.ToHashSet();

private int _numConsecutiveFailures = 0;
private readonly AsyncLock _lock = new AsyncLock();
Expand All @@ -25,8 +27,8 @@ public async Task<HealthCheckResult> CheckHealthAsync(
{
if (!await PingAsync(cancellationToken))
return HealthCheckResult.Unhealthy("ClearML is unresponsive");
IEnumerable<string> queuesWithoutWorkers = await QueuesWithoutWorkers(cancellationToken);
if (queuesWithoutWorkers.Any())
IReadOnlySet<string> queuesWithoutWorkers = await QueuesWithoutWorkers(cancellationToken);
if (queuesWithoutWorkers.Count > 0)
{
return HealthCheckResult.Unhealthy(
$"No ClearML agents are available for configured queues: {string.Join(", ", queuesWithoutWorkers)}"
Expand Down Expand Up @@ -75,7 +77,7 @@ public async Task<bool> PingAsync(CancellationToken cancellationToken = default)
return result is not null;
}

public async Task<IEnumerable<string>> QueuesWithoutWorkers(CancellationToken cancellationToken = default)
public async Task<IReadOnlySet<string>> QueuesWithoutWorkers(CancellationToken cancellationToken = default)
{
ISet<string> queuesWithoutWorkers = _queuesMonitored.ToHashSet();
JsonObject? result = await CallAsync("workers", "get_all", new JsonObject(), cancellationToken);
Expand All @@ -96,6 +98,6 @@ public async Task<IEnumerable<string>> QueuesWithoutWorkers(CancellationToken ca
queuesWithoutWorkers.Remove(currentQueueName);
}
}
return queuesWithoutWorkers;
return queuesWithoutWorkers.ToImmutableHashSet();
}
}
19 changes: 12 additions & 7 deletions src/SIL.Machine.AspNetCore/Services/ClearMLMonitorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ ILogger<ClearMLMonitorService> logger
clearMLOptions.CurrentValue.BuildPollingTimeout,
logger,
clearMLOptions.CurrentValue.BuildPollingEnabled
)
),
IClearMLQueueService
{
private static readonly string EvalMetric = CreateMD5("eval");
private static readonly string BleuVariant = CreateMD5("bleu");
Expand All @@ -25,22 +26,24 @@ ILogger<ClearMLMonitorService> logger

private readonly IClearMLService _clearMLService = clearMLService;
private readonly ISharedFileService _sharedFileService = sharedFileService;
private readonly ILogger<ClearMLMonitorService> _logger = logger;
private readonly ILogger<IClearMLQueueService> _logger = logger;
private readonly Dictionary<string, ProgressStatus> _curBuildStatus = new();

private readonly IReadOnlyDictionary<TranslationEngineType, string> _queuePerEngineType =
buildJobOptions.CurrentValue.ClearML.ToDictionary(x => x.TranslationEngineType, x => x.Queue);

public IDictionary<TranslationEngineType, int> QueueSizePerEngineType { get; private set; } =
buildJobOptions.CurrentValue.ClearML.ToDictionary(x => x.TranslationEngineType, x => 0);
new ConcurrentDictionary<TranslationEngineType, int>(
buildJobOptions.CurrentValue.ClearML.ToDictionary(x => x.TranslationEngineType, x => 0)
);

protected override async Task DoWorkAsync(IServiceScope scope, CancellationToken cancellationToken)
{
try
{
var buildJobService = scope.ServiceProvider.GetRequiredService<IBuildJobService>();
IReadOnlyList<TranslationEngine> trainingEngines = await buildJobService.GetBuildingEnginesAsync(
JobRunnerType.ClearML,
BuildJobRunnerType.ClearML,
cancellationToken
);
if (trainingEngines.Count == 0)
Expand All @@ -63,15 +66,17 @@ await _clearMLService.GetTasksForQueueAsync(_queuePerEngineType[engineType], can
)
.ToDictionary(t => t.Id);
// add new keys to dictionary
tasksPerEngineType.ToList().ForEach(x => tasks.TryAdd(x.Key, x.Value));
foreach (string key in tasksPerEngineType.Keys.Except(tasks.Keys))
tasks.Add(key, tasksPerEngineType[key]);

Dictionary<string, int> queuePositionsPerEngineType = tasksPerEngineType
.Values.Where(t => t.Status is ClearMLTaskStatus.Queued or ClearMLTaskStatus.Created)
.OrderBy(t => t.Created)
.Select((t, i) => (Position: i, Task: t))
.ToDictionary(e => e.Task.Name, e => e.Position);
// add new keys to dictionary
queuePositionsPerEngineType.ToList().ForEach(x => queuePositions.TryAdd(x.Key, x.Value));
foreach (string key in queuePositionsPerEngineType.Keys.Except(queuePositions.Keys))
queuePositions.Add(key, queuePositionsPerEngineType[key]);

QueueSizePerEngineType[engineType] = queuePositionsPerEngineType.Count;
}
Expand Down Expand Up @@ -238,7 +243,7 @@ CancellationToken cancellationToken
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
return await buildJobService.StartBuildJobAsync(
JobRunnerType.Hangfire,
BuildJobRunnerType.Hangfire,
engineId,
buildId,
BuildStage.Postprocess,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public static Job CreateJob<TJob>(string engineId, string buildId, string queue,
private readonly Dictionary<TranslationEngineType, IHangfireBuildJobFactory> _buildJobFactories =
buildJobFactories.ToDictionary(f => f.EngineType);

public JobRunnerType Type => JobRunnerType.Hangfire;
public BuildJobRunnerType Type => BuildJobRunnerType.Hangfire;

public Task CreateEngineAsync(string engineId, string? name = null, CancellationToken cancellationToken = default)
{
Expand Down
2 changes: 1 addition & 1 deletion src/SIL.Machine.AspNetCore/Services/IBuildJobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

public interface IBuildJobRunner
{
JobRunnerType Type { get; }
BuildJobRunnerType Type { get; }

Task CreateEngineAsync(string engineId, string? name = null, CancellationToken cancellationToken = default);
Task DeleteEngineAsync(string engineId, CancellationToken cancellationToken = default);
Expand Down
4 changes: 2 additions & 2 deletions src/SIL.Machine.AspNetCore/Services/IBuildJobService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
public interface IBuildJobService
{
Task<IReadOnlyList<TranslationEngine>> GetBuildingEnginesAsync(
JobRunnerType runner,
BuildJobRunnerType runner,
CancellationToken cancellationToken = default
);

Expand All @@ -14,7 +14,7 @@ Task<IReadOnlyList<TranslationEngine>> GetBuildingEnginesAsync(
Task DeleteEngineAsync(string engineId, CancellationToken cancellationToken = default);

Task<bool> StartBuildJobAsync(
JobRunnerType jobType,
BuildJobRunnerType jobType,
string engineId,
string buildId,
BuildStage stage,
Expand Down
6 changes: 6 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/IClearMLQueueService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace SIL.Machine.AspNetCore.Services;

public interface IClearMLQueueService
{
public IDictionary<TranslationEngineType, int> QueueSizePerEngineType { get; }
}
8 changes: 4 additions & 4 deletions src/SIL.Machine.AspNetCore/Services/NmtEngineService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class NmtEngineService(
IRepository<TranslationEngine> engines,
IBuildJobService buildJobService,
ILanguageTagService languageTagService,
ClearMLMonitorService clearMLMonitorService,
IClearMLQueueService clearMLQueueService,
ISharedFileService sharedFileService
) : ITranslationEngineService
{
Expand All @@ -16,7 +16,7 @@ ISharedFileService sharedFileService
private readonly IDataAccessContext _dataAccessContext = dataAccessContext;
private readonly IRepository<TranslationEngine> _engines = engines;
private readonly IBuildJobService _buildJobService = buildJobService;
private readonly ClearMLMonitorService _clearMLMonitorService = clearMLMonitorService;
private readonly IClearMLQueueService _clearMLQueueService = clearMLQueueService;
private readonly ILanguageTagService _languageTagService = languageTagService;
private readonly ISharedFileService _sharedFileService = sharedFileService;
public const string ModelDirectory = "models/";
Expand Down Expand Up @@ -88,7 +88,7 @@ public async Task StartBuildAsync(
throw new InvalidOperationException("The engine is already building or in the process of canceling.");

await _buildJobService.StartBuildJobAsync(
JobRunnerType.Hangfire,
BuildJobRunnerType.Hangfire,
engineId,
buildId,
BuildStage.Preprocess,
Expand Down Expand Up @@ -171,7 +171,7 @@ public Task TrainSegmentPairAsync(

public Task<int> GetQueueSizeAsync(CancellationToken cancellationToken = default)
{
return Task.FromResult(_clearMLMonitorService.QueueSizePerEngineType[Type]);
return Task.FromResult(_clearMLQueueService.QueueSizePerEngineType[Type]);
}

public bool IsLanguageNativeToModel(string language, out string internalCode)
Expand Down
Loading

0 comments on commit 7b7b390

Please sign in to comment.