Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nulltoken committed Jan 5, 2025
1 parent 575bfe4 commit 018d842
Show file tree
Hide file tree
Showing 13 changed files with 260 additions and 13 deletions.
2 changes: 2 additions & 0 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
<EnableNETAnalyzers>true</EnableNETAnalyzers>
<AnalysisLevel>latest</AnalysisLevel>
<EnforceCodeStyleInBuild>true</EnforceCodeStyleInBuild>

<NoWarn>$(NoWarn);NCRONJOB_OBSERVER</NoWarn>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)' == 'Release'">
Expand Down
9 changes: 7 additions & 2 deletions src/NCronJob/Execution/StartupJobManager.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
using NCronJob.Observer;

namespace NCronJob;

internal class StartupJobManager(JobRegistry jobRegistry, JobProcessor jobProcessor)
internal class StartupJobManager(
JobRegistry jobRegistry,
JobProcessor jobProcessor,
JobExecutionProgressObserver observer)
{
private readonly AsyncManualResetEvent startupJobsCompleted = new();

public async Task ProcessStartupJobs(CancellationToken stopToken)
{
var startupJobs = jobRegistry.GetAllOneTimeJobs();
var startupTasks = startupJobs.Select(definition => CreateExecutionTask(JobRun.Create(definition), stopToken)).ToList();
var startupTasks = startupJobs.Select(definition => CreateExecutionTask(JobRun.Create(observer.Report, definition), stopToken)).ToList();

if (startupTasks.Count > 0)
{
Expand Down
6 changes: 6 additions & 0 deletions src/NCronJob/NCronJobExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
using NCronJob.Observer;

namespace NCronJob;

Expand Down Expand Up @@ -50,6 +51,11 @@ public static IServiceCollection AddNCronJob(
sp.GetRequiredService<JobWorker>(),
sp.GetRequiredService<JobQueueManager>(),
sp.GetRequiredService<ConcurrencySettings>()));
services.TryAddSingleton<JobExecutionProgressObserver>();
services.TryAddSingleton<IJobExecutionProgressReporter_Unstable, JobExecutionProgressObserver>((sp) =>
{
return sp.GetRequiredService<JobExecutionProgressObserver>();
});
services.TryAddSingleton(TimeProvider.System);
services.TryAddSingleton<StartupJobManager>();
services.TryAddSingleton<MissingMethodCalledHandler>();
Expand Down
44 changes: 44 additions & 0 deletions src/NCronJob/Observer/ExecutionProgress_Unstable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System.Diagnostics.CodeAnalysis;

namespace NCronJob.Observer;

#pragma warning disable CA1707 // Identifiers should not contain underscores
#pragma warning disable S101 // Types should be named in PascalCase
[Experimental("NCRONJOB_OBSERVER")]
public record ExecutionProgress_Unstable
#pragma warning restore S101 // Types should be named in PascalCase
#pragma warning restore CA1707 // Identifiers should not contain underscores
{
internal ExecutionProgress_Unstable(JobRun run)
{
RunId = run.JobRunId;
ParentRunId = run.ParentJobRunId;
CorrelationId = run.CorrelationId;
State = MapFrom(run.CurrentState.Type);
}

public Guid? RunId { get; init; }
public Guid? ParentRunId { get; init; }
public Guid CorrelationId { get; }
public ExecutionState_Unstable State { get; init; }
public DateTimeOffset Timestamp { get; } = DateTimeOffset.Now;

private static ExecutionState_Unstable MapFrom(JobStateType currentState)
{
return currentState switch
{
JobStateType.NotStarted => ExecutionState_Unstable.NotStarted,
JobStateType.Scheduled => ExecutionState_Unstable.Scheduled,
JobStateType.Initializing => ExecutionState_Unstable.Initializing,
JobStateType.Running => ExecutionState_Unstable.Running,
JobStateType.Retrying => ExecutionState_Unstable.Retrying,
JobStateType.Completing => ExecutionState_Unstable.Completing,
JobStateType.WaitingForDependency => ExecutionState_Unstable.WaitingForDependency,
JobStateType.Completed => ExecutionState_Unstable.Completed,
JobStateType.Faulted => ExecutionState_Unstable.Faulted,
JobStateType.Cancelled => ExecutionState_Unstable.Cancelled,
JobStateType.Expired => ExecutionState_Unstable.Expired,
_ => ExecutionState_Unstable.Undetermined,
};
}
}
24 changes: 24 additions & 0 deletions src/NCronJob/Observer/ExecutionState_Unstable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System.Diagnostics.CodeAnalysis;

namespace NCronJob.Observer;

#pragma warning disable CA1707 // Identifiers should not contain underscores
[Experimental("NCRONJOB_OBSERVER")]
public enum ExecutionState_Unstable
#pragma warning restore CA1707 // Identifiers should not contain underscores
{
Undetermined = 0,
NotStarted,
Scheduled,
Initializing,
Running,
Retrying,
Completing,
WaitingForDependency,
Completed,
Faulted,
Cancelled,
Expired,
OrchestrationStarted,
OrchestrationCompleted,
}
13 changes: 13 additions & 0 deletions src/NCronJob/Observer/IJobExecutionProgressReporter_Unstable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System.Diagnostics.CodeAnalysis;

namespace NCronJob.Observer;

#pragma warning disable CA1707 // Identifiers should not contain underscores
#pragma warning disable S101 // Types should be named in PascalCase
[Experimental("NCRONJOB_OBSERVER")]
public interface IJobExecutionProgressReporter_Unstable
#pragma warning restore S101 // Types should be named in PascalCase
#pragma warning restore CA1707 // Identifiers should not contain underscores
{
IDisposable Register(Action<ExecutionProgress_Unstable> callback);
}
98 changes: 98 additions & 0 deletions src/NCronJob/Observer/JobExecutionProgressObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
using System.Collections.Immutable;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace NCronJob.Observer;

internal sealed class JobExecutionProgressObserver : IJobExecutionProgressReporter_Unstable
{
private readonly List<Action<ExecutionProgress_Unstable>> callbacks = [];

#if NET9_0_OR_GREATER
private readonly Lock callbacksLock = new();
#else
private readonly object callbacksLock = new();
#endif

public IDisposable Register(Action<ExecutionProgress_Unstable> callback)
{
ArgumentNullException.ThrowIfNull(callback);

lock (callbacksLock)
{
callbacks.Add(callback);
}

return new ActionDisposer(() =>
{
lock (callbacksLock)
{
callbacks.Remove(callback);
}
});
}

internal void Report(JobRun run)
{
List<ExecutionProgress_Unstable> progresses = [];

var progress = new ExecutionProgress_Unstable(run);
progresses.Add(progress);

if (run.IsOrchestrationRoot && progress.State == ExecutionState_Unstable.NotStarted)
{
var orchestrationStarted = progress
with
{
State = ExecutionState_Unstable.OrchestrationStarted,
RunId = null,
ParentRunId = null,
};

progresses.Insert(0, orchestrationStarted);
}
else if (run.IsCompleted && !run.RootJobHasPendingDependentJobs)
{
var orchestrationCompleted = progress
with
{
State = ExecutionState_Unstable.OrchestrationCompleted,
RunId = null,
ParentRunId = null,
};

progresses.Add(orchestrationCompleted);
}

foreach (var callback in callbacks)
{
foreach (var entry in progresses)
{
callback(entry);
}
}
}

internal sealed class ActionDisposer : IDisposable
{
private bool disposed;
private readonly Action disposer;

public ActionDisposer(Action disposer)
{
this.disposer = disposer;
}

public void Dispose()
{
if (disposed)
return;

disposer();

disposed = true;
}
}
}
11 changes: 10 additions & 1 deletion src/NCronJob/Registry/IInstantJobRegistry.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using NCronJob.Observer;
using System.Diagnostics;

namespace NCronJob;
Expand Down Expand Up @@ -161,19 +162,22 @@ internal sealed partial class InstantJobRegistry : IInstantJobRegistry
private readonly JobQueueManager jobQueueManager;
private readonly JobRegistry jobRegistry;
private readonly JobWorker jobWorker;
private readonly JobExecutionProgressObserver observer;
private readonly ILogger<InstantJobRegistry> logger;

public InstantJobRegistry(
TimeProvider timeProvider,
JobQueueManager jobQueueManager,
JobRegistry jobRegistry,
JobWorker jobWorker,
JobExecutionProgressObserver observer,
ILogger<InstantJobRegistry> logger)
{
this.timeProvider = timeProvider;
this.jobQueueManager = jobQueueManager;
this.jobRegistry = jobRegistry;
this.jobWorker = jobWorker;
this.observer = observer;
this.logger = logger;
}

Expand Down Expand Up @@ -268,7 +272,12 @@ private Guid RunInternal(
bool forceExecution,
CancellationToken token)
{
var run = JobRun.Create(jobDefinition, parameter, token);
var run = JobRun.Create(
observer.Report,
jobDefinition,
parameter,
token);

run.Priority = JobPriority.High;
run.RunAt = startDate;
run.IsOneTimeJob = true;
Expand Down
38 changes: 32 additions & 6 deletions src/NCronJob/Registry/JobRun.cs
Original file line number Diff line number Diff line change
@@ -1,36 +1,50 @@

using System.Collections.Concurrent;

namespace NCronJob;

internal class JobRun
{
private readonly JobRun rootJob;
private int jobExecutionCount;
private readonly Action<JobRun> progressReporter;
private readonly ConcurrentBag<JobRun> pendingDependents = [];

private JobRun(
JobDefinition jobDefinition,
object? parameter)
: this(null, jobDefinition, parameter)
object? parameter,
Action<JobRun> progressReporter)
: this(null, jobDefinition, parameter, progressReporter)
{ }

private JobRun(
JobRun? parentJob,
JobDefinition jobDefinition,
object? parameter)
object? parameter,
Action<JobRun> progressReporter)
{
var jobRunId = Guid.NewGuid();

JobRunId = jobRunId;
ParentJobRunId = parentJob is not null ? parentJob.JobRunId : jobRunId;
IsOrchestrationRoot = parentJob is null;
CorrelationId = parentJob is not null ? parentJob.CorrelationId : Guid.NewGuid();
JobDefinition = jobDefinition;
Parameter = parameter ?? jobDefinition.Parameter;

this.progressReporter = progressReporter;
rootJob = parentJob is not null ? parentJob.rootJob : this;

Initialize();
}

internal JobPriority Priority { get; set; } = JobPriority.Normal;

public Guid JobRunId { get; }
public Guid ParentJobRunId { get; }
public JobDefinition JobDefinition { get; }
public Guid CorrelationId { get; }
public bool IsOrchestrationRoot { get; }
public CancellationToken CancellationToken { get; set; }
public DateTimeOffset? RunAt { get; set; }

Expand All @@ -48,14 +62,16 @@ private JobRun(
public void IncrementJobExecutionCount() => Interlocked.Increment(ref jobExecutionCount);

public static JobRun Create(
Action<JobRun> progressReporter,
JobDefinition jobDefinition)
=> new(jobDefinition, jobDefinition.Parameter);
=> new(jobDefinition, jobDefinition.Parameter, progressReporter);

public static JobRun Create(
Action<JobRun> progressReporter,
JobDefinition jobDefinition,
object? parameter,
CancellationToken token)
=> new(jobDefinition, parameter)
=> new(jobDefinition, parameter, progressReporter)
{
CancellationToken = token,
};
Expand All @@ -65,11 +81,13 @@ public JobRun CreateDependent(
object? parameter,
CancellationToken token)
{
JobRun run = new(this, jobDefinition, parameter)
JobRun run = new(this, jobDefinition, parameter, progressReporter)
{
CancellationToken = token,
};

pendingDependents.Add(run);

return run;
}

Expand Down Expand Up @@ -103,9 +121,17 @@ private void Initialize()
}
};

OnStateChanged += (jr, state) =>
{
progressReporter(jr);
};

AddState(new JobState(JobStateType.NotStarted));
}

public bool RootJobHasPendingDependentJobs => rootJob.HasPendingDependentJobs;
public bool HasPendingDependentJobs => !pendingDependents.IsEmpty && pendingDependents.Any(j => !j.IsCompleted || j.HasPendingDependentJobs);

// State change logic
public bool IsCompleted => States.Exists(s => IsFinalState(s.Type));
public JobState CurrentState => States.LastOrDefault();
Expand Down
Loading

0 comments on commit 018d842

Please sign in to comment.