diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml
index bde9deec..3c176e3a 100644
--- a/.github/workflows/dotnet.yml
+++ b/.github/workflows/dotnet.yml
@@ -17,6 +17,7 @@ jobs:
build:
runs-on: ${{ matrix.os }}
+ timeout-minutes: 10
strategy:
matrix:
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 37ef1b2e..aa0b97ad 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,10 @@ All notable changes to **NCronJob** will be documented in this file. The project
## [Unreleased]
+### Added
+
+- Expose an **experimental** basic job execution progress reporting hook. Added in [#157](https://github.com/NCronJob-Dev/NCronJob/issues/157), by [@nulltoken](https://github.com/nulltoken).
+
## [v4.1.0] - 2025-01-02
### Added
diff --git a/Directory.Build.props b/Directory.Build.props
index e2b707dd..2aa60575 100644
--- a/Directory.Build.props
+++ b/Directory.Build.props
@@ -16,6 +16,8 @@
true
latest
true
+
+ $(NoWarn);NCRONJOB_OBSERVER
diff --git a/docs/advanced/dynamic-job-control.md b/docs/advanced/dynamic-job-control.md
index 78609a13..ba333f33 100644
--- a/docs/advanced/dynamic-job-control.md
+++ b/docs/advanced/dynamic-job-control.md
@@ -1,5 +1,5 @@
# Dynamic Job Control
-**NCronJob** allows you to dynmically add or remove CRON jobs from the scheduler. This is useful when you want to add jobs at runtime or remove jobs based on some condition without restarting the scheduler.
+**NCronJob** allows you to dynamically add or remove CRON jobs from the scheduler. This is useful when you want to add jobs at runtime or remove jobs based on some condition without restarting the scheduler.
## Defining job names
The core idea is to define an unique job name for each job that might be mutated during runtime. The job name is an optional parameter:
diff --git a/docs/advanced/global-concurrency.md b/docs/advanced/global-concurrency.md
index f27a61e6..9705cb2c 100644
--- a/docs/advanced/global-concurrency.md
+++ b/docs/advanced/global-concurrency.md
@@ -1,7 +1,7 @@
# Global Concurrency
-**NCronJob** utilisies a priority queue with a maximum amount of entries. That is, the queue will only hold and execute a maximum amount of jobs at any given time. This is to prevent the system from being overloaded with jobs.
+**NCronJob** relies on a priority queue with a maximum amount of entries. That is, the queue will only hold and execute a maximum amount of jobs at any given time. This is to prevent the system from being overloaded with jobs.
-## The Maxmimum
+## The Maximum
The global maximum of concurrent jobs is calculated as:
```csharp
diff --git a/docs/advanced/observing-job-progress.md b/docs/advanced/observing-job-progress.md
new file mode 100644
index 00000000..5e4f5326
--- /dev/null
+++ b/docs/advanced/observing-job-progress.md
@@ -0,0 +1,231 @@
+# Observing job progress
+
+Every time one schedules a job (or triggers it as an instant job), a virtual orchestration is spawned.
+
+An orchestration can be as simple as a unique job, or as complex as a root job and the whole hierarchy of its direct and indirect dependent jobs (see [*"Model Dependencies"*](../features/model-dependencies.md)).
+
+An orchestration is uniquely identifed by an identifier. All jobs belonging to an orchestration share this same `CorrelationId` (See [*"Tracing requests of dependencies via `CorrelationId`"*](../features/model-dependencies.md#tracing-requests-of-dependencies-via-correlationid)).
+
+From a timeline perspective, an orchestration starts before the root job that initiated it and completes when **all** of its leaf jobs have reached a final state.
+
+## Subscribing to the executions of jobs
+
+### Forewords
+
+!!! warning
+
+ This is an **experimental** feature subject to breaking changes independently of the standard semver lifecycle release of **NCronJob**.
+
+ While reporting feedback or bugs about it, please do not forget to
+ mention in the issue which version of NCronJob you're using.
+
+Would you decide to give it an early try, in order to suppress the warnings that comes with the [.NET Experimental attribute](https://learn.microsoft.com/en-us/dotnet/fundamentals/apicompat/preview-apis#experimentalattribute), update your `.csproj` with a `` project setting:
+
+```xml
+
+ ...
+ $(NoWarn);NCRONJOB_OBSERVER
+
+```
+
+### Registering a notifier callback
+
+**NCronJob** exposes the capability to notify whenever jobs change states. One can
+suscribe to this by leveraging the `IJobExecutionProgressReporter` service.
+
+This is done through the following exposed method
+
+```csharp
+IDisposable Register(Action callback);
+```
+
+!!! info
+
+ The registration returns the subscriber as a `IDisposable` object.
+ In order to stop the subscriber from receiving notifications anymore, invoke the `Dispose()` method of it.
+
+Subscribers to the reporting service will receive an immutable instance of the `ExecutionProgress`. This type will expose every meaningful change to any job or orchestration handled by NCronJob.
+
+### Sample usage
+
+Considering the following orchestration
+
+```text
+
+A ─┬─ (successful) ──> B
+ └─ (successful) ──> C ─── (successful) ──> D
+```
+
+Below a very simple approach to schedule it every minute and register a subscriber.
+
+```csharp
+using NCronJob;
+
+public class A : IJob
+{
+ public A(ILogger logger) => Logger = logger;
+
+ public ILogger Logger { get; }
+
+ public async Task RunAsync(IJobExecutionContext context, CancellationToken token)
+ {
+ Logger.LogInformation("[A]: Starting processing...");
+
+ await Task.Delay(TimeSpan.FromSeconds(1), token);
+
+ Logger.LogInformation("[A]: Processing is done.");
+ }
+}
+
+public class B : IJob
+{
+ public B(ILogger logger) => Logger = logger;
+
+ public ILogger Logger { get; }
+
+ public async Task RunAsync(IJobExecutionContext context, CancellationToken token)
+ {
+ Logger.LogInformation("[B]: Starting processing...");
+
+ await Task.Delay(TimeSpan.FromSeconds(6), token);
+
+ Logger.LogInformation("[B]: Processing is done.");
+ }
+}
+
+public class C : IJob
+{
+ public C(ILogger logger) => Logger = logger;
+
+ public ILogger Logger { get; }
+
+ public async Task RunAsync(IJobExecutionContext context, CancellationToken token)
+ {
+ Logger.LogInformation("[C]: Starting processing...");
+
+ await Task.Delay(TimeSpan.FromSeconds(1), token);
+
+ Logger.LogInformation("[C]: Processing is done.");
+ }
+}
+
+public class D : IJob
+{
+ public D(ILogger logger) => Logger = logger;
+
+ public ILogger Logger { get; }
+
+ public async Task RunAsync(IJobExecutionContext context, CancellationToken token)
+ {
+ Logger.LogInformation("[D]: Starting processing...");
+
+ await Task.Delay(TimeSpan.FromSeconds(1), token);
+
+ Logger.LogInformation("[D]: Processing is done.");
+ }
+}
+
+public class Program
+{
+ private static async Task Main(string[] args)
+ {
+ var builder = Host.CreateApplicationBuilder(args);
+
+ builder.Services.AddNCronJob(n =>
+ {
+ n.AddJob();
+
+ n.AddJob()
+ .ExecuteWhen(success: s => s.RunJob());
+
+ n.AddJob();
+
+ n.AddJob(o => o.WithCronExpression("* * * * *"))
+ .ExecuteWhen(success: s => s.RunJob())
+ .ExecuteWhen(success: s => s.RunJob());
+ });
+
+ var app = builder.Build();
+
+ await app.UseNCronJobAsync();
+
+ var logger = app.Services.GetRequiredService>();
+
+ // Retrieve the observer service from the DI container...
+ var reporter = app.Services.GetRequiredService();
+
+ // ...enlist a new subscriber to it...
+ IDisposable subscriber = reporter.Register(Subscriber);
+
+ await app.RunAsync();
+
+ // ...and when you're done with it, unhook the subscription.
+ subscriber.Dispose();
+
+ void Subscriber(ExecutionProgress progress)
+ {
+ if (progress.RunId is null)
+ {
+ logger.LogWarning(
+ "Orchestration {CorrelationId} - {Status}",
+ progress.CorrelationId,
+ progress.State);
+
+ return;
+ }
+
+ logger.LogWarning("Job {JobRunId} - {Status}",
+ progress.RunId,
+ progress.State);
+ }
+ }
+
+}
+```
+
+Given the orchestration defined above, with jobs of varying durations, the generated output log may look like this:
+
+```text
+10:46:47 warn: Program[0] Orchestration d36e2b62-6997-44c5-a9f9-de442b8a1807 - OrchestrationStarted
+10:46:50 warn: Program[0] Job d751f2eb-9f8d-46e3-b863-6dadc6498468 - NotStarted
+10:46:50 warn: Program[0] Job d751f2eb-9f8d-46e3-b863-6dadc6498468 - Scheduled
+10:47:00 warn: Program[0] Job d751f2eb-9f8d-46e3-b863-6dadc6498468 - Initializing
+10:47:00 warn: Program[0] Job d751f2eb-9f8d-46e3-b863-6dadc6498468 - Running
+10:47:00 info: A[0] [A]: Starting processing...
+10:47:01 info: A[0] [A]: Processing is done.
+10:47:01 warn: Program[0] Job d751f2eb-9f8d-46e3-b863-6dadc6498468 - Completing
+10:47:01 warn: Program[0] Job d751f2eb-9f8d-46e3-b863-6dadc6498468 - WaitingForDependency
+10:47:01 warn: Program[0] Job 27509f28-d84b-4d50-8f9d-e5500bbc17fa - NotStarted
+10:47:01 warn: Program[0] Job f28acd5e-cd3e-445e-979c-59a160035ef2 - NotStarted
+10:47:01 warn: Program[0] Job d751f2eb-9f8d-46e3-b863-6dadc6498468 - Completed
+10:47:01 warn: Program[0] Job 27509f28-d84b-4d50-8f9d-e5500bbc17fa - Initializing
+10:47:01 warn: Program[0] Job 27509f28-d84b-4d50-8f9d-e5500bbc17fa - Running
+10:47:01 info: B[0] [B]: Starting processing...
+10:47:01 warn: Program[0] Job f28acd5e-cd3e-445e-979c-59a160035ef2 - Initializing
+10:47:01 warn: Program[0] Job f28acd5e-cd3e-445e-979c-59a160035ef2 - Running
+10:47:01 info: C[0] [C]: Starting processing...
+10:47:02 info: C[0] [C]: Processing is done.
+10:47:02 warn: Program[0] Job f28acd5e-cd3e-445e-979c-59a160035ef2 - Completing
+10:47:02 warn: Program[0] Job f28acd5e-cd3e-445e-979c-59a160035ef2 - WaitingForDependency
+10:47:02 warn: Program[0] Job f4a8ef3b-4848-4363-a3b7-f847562598b3 - NotStarted
+10:47:02 warn: Program[0] Job f28acd5e-cd3e-445e-979c-59a160035ef2 - Completed
+10:47:03 warn: Program[0] Job f4a8ef3b-4848-4363-a3b7-f847562598b3 - Initializing
+10:47:03 warn: Program[0] Job f4a8ef3b-4848-4363-a3b7-f847562598b3 - Running
+10:47:03 info: D[0] [D]: Starting processing...
+10:47:04 info: D[0] [D]: Processing is done.
+10:47:04 warn: Program[0] Job f4a8ef3b-4848-4363-a3b7-f847562598b3 - Completing
+10:47:04 warn: Program[0] Job f4a8ef3b-4848-4363-a3b7-f847562598b3 - Completed
+10:47:07 info: B[0] [B]: Processing is done.
+10:47:07 warn: Program[0] Job 27509f28-d84b-4d50-8f9d-e5500bbc17fa - Completing
+10:47:07 warn: Program[0] Job 27509f28-d84b-4d50-8f9d-e5500bbc17fa - Completed
+10:47:07 warn: Program[0] Orchestration d36e2b62-6997-44c5-a9f9-de442b8a1807 - OrchestrationCompleted
+```
+
+## Known limitations
+
+As global NCronJob observability is still under development, it's not feature complete yet.
+
+Below are know missing parts of it. Would you find other releated areas of interest that may be worth investigating, please submit a request in the issue tracker.
+
+- [Report removed jobs as `Cancelled`](https://github.com/NCronJob-Dev/NCronJob/issues/161)
+- [Report skipped child jobs as `Skipped`](https://github.com/NCronJob-Dev/NCronJob/issues/160)
diff --git a/docs/features/instant-jobs.md b/docs/features/instant-jobs.md
index 356db0af..289dfe76 100644
--- a/docs/features/instant-jobs.md
+++ b/docs/features/instant-jobs.md
@@ -46,7 +46,7 @@ app.MapPost("/send-email", (RequestDto dto, IInstantJobRegistry jobRegistry) =>
The `RunInstantJob` method takes the job type and the parameters as arguments. Optionally you can pass in a `CancellationToken` as well. The job will be executed immediately.
## Starting a job with a delay
-If you find the need of delaying the execution of an instant job, you can use the `RunScheduledJob` method with a `TimeSpan` as a delay. The same as `RunInstantJob` applies here, the job has to be registered in the `AddNCronJob` method.
+If you find the need to delay the execution of an instant job, you can use the `RunScheduledJob` method with a `TimeSpan` as a delay. The same as `RunInstantJob` applies here, the job has to be registered in the `AddNCronJob` method.
```csharp
app.MapPost("/send-email", (RequestDto dto, IInstantJobRegistry jobRegistry) =>
diff --git a/docs/features/model-dependencies.md b/docs/features/model-dependencies.md
index 2e46ac14..d9cca2cb 100644
--- a/docs/features/model-dependencies.md
+++ b/docs/features/model-dependencies.md
@@ -29,7 +29,7 @@ Services.AddNCronJob(options =>
```
## Accessing the state of the parent job
-The `JobExecutionContext` object passed to the dependent job contains the output of the parent job. This allows access to the state of the parent job. This can be helpful if information should flow from parent to the child job.
+The `JobExecutionContext` object passed to the dependent job contains the output of the parent job. This allows access to the state of the parent job. This can be helpful if information should flow from the parent to the child job.
```csharp
public class JobA : IJob
@@ -185,4 +185,4 @@ Services.AddNCronJob(options =>
// Register JobC into the container to avoid warnings
options.AddJob();
});
-```
\ No newline at end of file
+```
diff --git a/docs/features/parameters.md b/docs/features/parameters.md
index 165ed962..ce4f01d7 100644
--- a/docs/features/parameters.md
+++ b/docs/features/parameters.md
@@ -1,6 +1,6 @@
# Passing parameters to a job
-Often times a job needs some kind of configuration or parameter to run. Imagine you have a job that generates a report and can run daily, weekly or monthly. You could create three different jobs for each frequency, but that would be a lot of duplicated code. Instead, you can pass in parameters to the job.
+Often a job needs some kind of configuration or parameter to run. Imagine you have a job that generates a report and can run daily, weekly or monthly. You could create three different jobs for each frequency, but that would be a lot of duplicated code. Instead, you can pass in parameters to the job.
```csharp
Services.AddNCronJob(options =>
@@ -44,7 +44,7 @@ public class ReportJob : IJob
```
## Parameters are not immutable
-Passed in parameters are not immutable by default or cloned through out the job execution. This means that if you change the parameter in the job, it will also change in the next execution. If you need to keep the parameter unchanged, you should clone it in the job.
+Passed in parameters are not immutable by default or cloned throughout the job execution. This means that if you change the parameter in the job, it will also change in the next execution. If you need to keep the parameter unchanged, you should clone it in the job.
```csharp
public class MyParameter
diff --git a/mkdocs.yml b/mkdocs.yml
index 64c52b8c..8ac42c87 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -49,6 +49,7 @@ nav:
- Controlling the log level: advanced/log-level.md
- Global Concurrency: advanced/global-concurrency.md
- Dynamic Job Control: advanced/dynamic-job-control.md
+ - Observing progress of jobs: advanced/observing-job-progress.md
extra:
generator: false
diff --git a/src/NCronJob/Execution/StartupJobManager.cs b/src/NCronJob/Execution/StartupJobManager.cs
index fac3e926..4b7ad9b1 100644
--- a/src/NCronJob/Execution/StartupJobManager.cs
+++ b/src/NCronJob/Execution/StartupJobManager.cs
@@ -1,13 +1,16 @@
namespace NCronJob;
-internal class StartupJobManager(JobRegistry jobRegistry, JobProcessor jobProcessor)
+internal class StartupJobManager(
+ JobRegistry jobRegistry,
+ JobProcessor jobProcessor,
+ JobExecutionProgressObserver observer)
{
private readonly AsyncManualResetEvent startupJobsCompleted = new();
public async Task ProcessStartupJobs(CancellationToken stopToken)
{
var startupJobs = jobRegistry.GetAllOneTimeJobs();
- var startupTasks = startupJobs.Select(definition => CreateExecutionTask(JobRun.Create(definition), stopToken)).ToList();
+ var startupTasks = startupJobs.Select(definition => CreateExecutionTask(JobRun.Create(observer.Report, definition), stopToken)).ToList();
if (startupTasks.Count > 0)
{
diff --git a/src/NCronJob/NCronJobExtensions.cs b/src/NCronJob/NCronJobExtensions.cs
index 0a30e16f..87fcf26f 100644
--- a/src/NCronJob/NCronJobExtensions.cs
+++ b/src/NCronJob/NCronJobExtensions.cs
@@ -50,6 +50,11 @@ public static IServiceCollection AddNCronJob(
sp.GetRequiredService(),
sp.GetRequiredService(),
sp.GetRequiredService()));
+ services.TryAddSingleton();
+ services.TryAddSingleton((sp) =>
+ {
+ return sp.GetRequiredService();
+ });
services.TryAddSingleton(TimeProvider.System);
services.TryAddSingleton();
services.TryAddSingleton();
diff --git a/src/NCronJob/Observer/ExecutionProgress.cs b/src/NCronJob/Observer/ExecutionProgress.cs
new file mode 100644
index 00000000..eccf0c5d
--- /dev/null
+++ b/src/NCronJob/Observer/ExecutionProgress.cs
@@ -0,0 +1,65 @@
+using System.Diagnostics.CodeAnalysis;
+
+namespace NCronJob;
+
+///
+/// The snapshot of a state of the execution of a job instance.
+///
+[Experimental("NCRONJOB_OBSERVER")]
+public record ExecutionProgress
+{
+ internal ExecutionProgress(JobRun run)
+ {
+ RunId = run.JobRunId;
+ ParentRunId = run.ParentJobRunId;
+ CorrelationId = run.CorrelationId;
+ State = MapFrom(run.CurrentState.Type);
+ }
+
+ ///
+ /// The identifier of a job run within an orchestration.
+ ///
+ /// Will be null when the relates to the start or completion of an orchestration.
+ public Guid? RunId { get; init; }
+
+ ///
+ /// The identifier of the parent job run.
+ ///
+ /// Will be null when the reported instance is the root job of an orchestration,
+ /// or when the relates to the start or completion of an orchestration.
+ public Guid? ParentRunId { get; init; }
+
+ ///
+ /// The correlation identifier of an orchestration run. Will decorate every reported progress of the root job and all of its dependencies.
+ ///
+ public Guid CorrelationId { get; }
+
+ ///
+ /// The reported state. Will either relate to an orchestration, describing its start or completion or to a job belonging to an orchestration.
+ ///
+ public ExecutionState State { get; init; }
+
+ ///
+ /// The instant this was created./>
+ ///
+ public DateTimeOffset Timestamp { get; } = DateTimeOffset.Now;
+
+ private static ExecutionState MapFrom(JobStateType currentState)
+ {
+ return currentState switch
+ {
+ JobStateType.NotStarted => ExecutionState.NotStarted,
+ JobStateType.Scheduled => ExecutionState.Scheduled,
+ JobStateType.Initializing => ExecutionState.Initializing,
+ JobStateType.Running => ExecutionState.Running,
+ JobStateType.Retrying => ExecutionState.Retrying,
+ JobStateType.Completing => ExecutionState.Completing,
+ JobStateType.WaitingForDependency => ExecutionState.WaitingForDependency,
+ JobStateType.Completed => ExecutionState.Completed,
+ JobStateType.Faulted => ExecutionState.Faulted,
+ JobStateType.Cancelled => ExecutionState.Cancelled,
+ JobStateType.Expired => ExecutionState.Expired,
+ _ => ExecutionState.Undetermined,
+ };
+ }
+}
diff --git a/src/NCronJob/Observer/ExecutionState.cs b/src/NCronJob/Observer/ExecutionState.cs
new file mode 100644
index 00000000..2cea8766
--- /dev/null
+++ b/src/NCronJob/Observer/ExecutionState.cs
@@ -0,0 +1,80 @@
+using System.Diagnostics.CodeAnalysis;
+
+namespace NCronJob;
+
+///
+/// The state of a reported orchestration or job.
+///
+[Experimental("NCRONJOB_OBSERVER")]
+public enum ExecutionState
+{
+ ///
+ /// Fallback state.
+ ///
+ Undetermined = 0,
+
+ ///
+ /// The job has been registered.
+ ///
+ NotStarted,
+
+ ///
+ /// The job has been scheduled.
+ ///
+ Scheduled,
+
+ ///
+ /// The job is about to run.
+ ///
+ Initializing,
+
+ ///
+ /// The job is running.
+ ///
+ Running,
+
+ ///
+ /// The previous instance of the job encountered an issue. The job is retrying.
+ ///
+ Retrying,
+
+ ///
+ /// The job is finalizing its run.
+ ///
+ Completing,
+
+ ///
+ /// The job is identifying its dependent jobs to be triggered.
+ ///
+ WaitingForDependency,
+
+ ///
+ /// The job has completed.
+ ///
+ Completed,
+
+ ///
+ /// The job has crashed.
+ ///
+ Faulted,
+
+ ///
+ /// The job has been cancelled.
+ ///
+ Cancelled,
+
+ ///
+ /// The job expired.
+ ///
+ Expired,
+
+ ///
+ /// The orchestration has started.
+ ///
+ OrchestrationStarted,
+
+ ///
+ /// The orchestration has completed.
+ ///
+ OrchestrationCompleted,
+}
diff --git a/src/NCronJob/Observer/IJobExecutionProgressReporter.cs b/src/NCronJob/Observer/IJobExecutionProgressReporter.cs
new file mode 100644
index 00000000..c91f952b
--- /dev/null
+++ b/src/NCronJob/Observer/IJobExecutionProgressReporter.cs
@@ -0,0 +1,17 @@
+using System.Diagnostics.CodeAnalysis;
+
+namespace NCronJob;
+
+///
+/// Provides a way to be notified of job execution lifecycle.
+///
+[Experimental("NCRONJOB_OBSERVER")]
+public interface IJobExecutionProgressReporter
+{
+ ///
+ /// Enlist a new callback hook that will be triggered on each job state change.
+ ///
+ /// The action that will be invoked.
+ /// An object representing the subscription. Once disposed, the callback hook won't be invoked anymore.
+ IDisposable Register(Action callback);
+}
diff --git a/src/NCronJob/Observer/JobExecutionProgressObserver.cs b/src/NCronJob/Observer/JobExecutionProgressObserver.cs
new file mode 100644
index 00000000..e48507fb
--- /dev/null
+++ b/src/NCronJob/Observer/JobExecutionProgressObserver.cs
@@ -0,0 +1,98 @@
+using System.Collections.Immutable;
+using System.Diagnostics.CodeAnalysis;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace NCronJob;
+
+internal sealed class JobExecutionProgressObserver : IJobExecutionProgressReporter
+{
+ private readonly List> callbacks = [];
+
+#if NET9_0_OR_GREATER
+ private readonly Lock callbacksLock = new();
+#else
+ private readonly object callbacksLock = new();
+#endif
+
+ public IDisposable Register(Action callback)
+ {
+ ArgumentNullException.ThrowIfNull(callback);
+
+ lock (callbacksLock)
+ {
+ callbacks.Add(callback);
+ }
+
+ return new ActionDisposer(() =>
+ {
+ lock (callbacksLock)
+ {
+ callbacks.Remove(callback);
+ }
+ });
+ }
+
+ internal void Report(JobRun run)
+ {
+ List progresses = [];
+
+ var progress = new ExecutionProgress(run);
+ progresses.Add(progress);
+
+ if (run.IsOrchestrationRoot && progress.State == ExecutionState.NotStarted)
+ {
+ var orchestrationStarted = progress
+ with
+ {
+ State = ExecutionState.OrchestrationStarted,
+ RunId = null,
+ ParentRunId = null,
+ };
+
+ progresses.Insert(0, orchestrationStarted);
+ }
+ else if (run.IsCompleted && !run.RootJobHasPendingDependentJobs)
+ {
+ var orchestrationCompleted = progress
+ with
+ {
+ State = ExecutionState.OrchestrationCompleted,
+ RunId = null,
+ ParentRunId = null,
+ };
+
+ progresses.Add(orchestrationCompleted);
+ }
+
+ foreach (var callback in callbacks)
+ {
+ foreach (var entry in progresses)
+ {
+ callback(entry);
+ }
+ }
+ }
+
+ internal sealed class ActionDisposer : IDisposable
+ {
+ private bool disposed;
+ private readonly Action disposer;
+
+ public ActionDisposer(Action disposer)
+ {
+ this.disposer = disposer;
+ }
+
+ public void Dispose()
+ {
+ if (disposed)
+ return;
+
+ disposer();
+
+ disposed = true;
+ }
+ }
+}
diff --git a/src/NCronJob/Registry/IInstantJobRegistry.cs b/src/NCronJob/Registry/IInstantJobRegistry.cs
index 87c4bb85..f2bcdaf7 100644
--- a/src/NCronJob/Registry/IInstantJobRegistry.cs
+++ b/src/NCronJob/Registry/IInstantJobRegistry.cs
@@ -161,6 +161,7 @@ internal sealed partial class InstantJobRegistry : IInstantJobRegistry
private readonly JobQueueManager jobQueueManager;
private readonly JobRegistry jobRegistry;
private readonly JobWorker jobWorker;
+ private readonly JobExecutionProgressObserver observer;
private readonly ILogger logger;
public InstantJobRegistry(
@@ -168,12 +169,14 @@ public InstantJobRegistry(
JobQueueManager jobQueueManager,
JobRegistry jobRegistry,
JobWorker jobWorker,
+ JobExecutionProgressObserver observer,
ILogger logger)
{
this.timeProvider = timeProvider;
this.jobQueueManager = jobQueueManager;
this.jobRegistry = jobRegistry;
this.jobWorker = jobWorker;
+ this.observer = observer;
this.logger = logger;
}
@@ -268,7 +271,12 @@ private Guid RunInternal(
bool forceExecution,
CancellationToken token)
{
- var run = JobRun.Create(jobDefinition, parameter, token);
+ var run = JobRun.Create(
+ observer.Report,
+ jobDefinition,
+ parameter,
+ token);
+
run.Priority = JobPriority.High;
run.RunAt = startDate;
run.IsOneTimeJob = true;
diff --git a/src/NCronJob/Registry/JobRun.cs b/src/NCronJob/Registry/JobRun.cs
index 0db81fde..4adf070b 100644
--- a/src/NCronJob/Registry/JobRun.cs
+++ b/src/NCronJob/Registry/JobRun.cs
@@ -1,36 +1,50 @@
+using System.Collections.Concurrent;
+
namespace NCronJob;
internal class JobRun
{
+ private readonly JobRun rootJob;
private int jobExecutionCount;
+ private readonly Action progressReporter;
+ private readonly ConcurrentBag pendingDependents = [];
private JobRun(
JobDefinition jobDefinition,
- object? parameter)
- : this(null, jobDefinition, parameter)
+ object? parameter,
+ Action progressReporter)
+ : this(null, jobDefinition, parameter, progressReporter)
{ }
private JobRun(
JobRun? parentJob,
JobDefinition jobDefinition,
- object? parameter)
+ object? parameter,
+ Action progressReporter)
{
var jobRunId = Guid.NewGuid();
JobRunId = jobRunId;
+ ParentJobRunId = parentJob is not null ? parentJob.JobRunId : null;
+ IsOrchestrationRoot = parentJob is null;
CorrelationId = parentJob is not null ? parentJob.CorrelationId : Guid.NewGuid();
JobDefinition = jobDefinition;
Parameter = parameter ?? jobDefinition.Parameter;
+ this.progressReporter = progressReporter;
+ rootJob = parentJob is not null ? parentJob.rootJob : this;
+
Initialize();
}
internal JobPriority Priority { get; set; } = JobPriority.Normal;
public Guid JobRunId { get; }
+ public Guid? ParentJobRunId { get; }
public JobDefinition JobDefinition { get; }
public Guid CorrelationId { get; }
+ public bool IsOrchestrationRoot { get; }
public CancellationToken CancellationToken { get; set; }
public DateTimeOffset? RunAt { get; set; }
@@ -48,14 +62,16 @@ private JobRun(
public void IncrementJobExecutionCount() => Interlocked.Increment(ref jobExecutionCount);
public static JobRun Create(
+ Action progressReporter,
JobDefinition jobDefinition)
- => new(jobDefinition, jobDefinition.Parameter);
+ => new(jobDefinition, jobDefinition.Parameter, progressReporter);
public static JobRun Create(
+ Action progressReporter,
JobDefinition jobDefinition,
object? parameter,
CancellationToken token)
- => new(jobDefinition, parameter)
+ => new(jobDefinition, parameter, progressReporter)
{
CancellationToken = token,
};
@@ -65,11 +81,13 @@ public JobRun CreateDependent(
object? parameter,
CancellationToken token)
{
- JobRun run = new(this, jobDefinition, parameter)
+ JobRun run = new(this, jobDefinition, parameter, progressReporter)
{
CancellationToken = token,
};
+ pendingDependents.Add(run);
+
return run;
}
@@ -101,11 +119,15 @@ private void Initialize()
default:
throw new ArgumentOutOfRangeException(nameof(state), state.Type, "Unexpected JobStateType value");
}
+
+ progressReporter(jr);
};
AddState(new JobState(JobStateType.NotStarted));
}
+ public bool RootJobHasPendingDependentJobs => rootJob.HasPendingDependentJobs();
+
// State change logic
public bool IsCompleted => States.Exists(s => IsFinalState(s.Type));
public JobState CurrentState => States.LastOrDefault();
@@ -120,7 +142,7 @@ public void AddState(JobState state)
public void NotifyStateChange(JobStateType type, string message = "")
{
- if (CurrentState.Type == type)
+ if (CurrentState.Type == type || IsCompleted)
return;
var state = new JobState(type, message);
@@ -128,5 +150,15 @@ public void NotifyStateChange(JobStateType type, string message = "")
}
private static bool IsFinalState(JobStateType stateType) =>
- stateType is JobStateType.Completed or JobStateType.Cancelled or JobStateType.Faulted or JobStateType.Crashed;
+ stateType is
+ JobStateType.Completed or
+ JobStateType.Cancelled or
+ JobStateType.Faulted or
+ JobStateType.Crashed or
+ JobStateType.Expired;
+
+ private bool HasPendingDependentJobs()
+ {
+ return !pendingDependents.IsEmpty && pendingDependents.Any(j => !j.IsCompleted || j.HasPendingDependentJobs());
+ }
}
diff --git a/src/NCronJob/Scheduler/JobQueueManager.cs b/src/NCronJob/Scheduler/JobQueueManager.cs
index dde47be0..2df2d433 100644
--- a/src/NCronJob/Scheduler/JobQueueManager.cs
+++ b/src/NCronJob/Scheduler/JobQueueManager.cs
@@ -47,6 +47,8 @@ public void RemoveQueue(string queueName)
{
if (jobQueues.TryRemove(queueName, out var jobQueue))
{
+ // TODO: Should we first cancel the potentially existing JobRuns here?
+
jobQueue.Clear();
jobQueue.CollectionChanged -= CallCollectionChanged;
semaphores.Clear();
diff --git a/src/NCronJob/Scheduler/JobWorker.cs b/src/NCronJob/Scheduler/JobWorker.cs
index 4b7bf99f..8f90a077 100644
--- a/src/NCronJob/Scheduler/JobWorker.cs
+++ b/src/NCronJob/Scheduler/JobWorker.cs
@@ -9,6 +9,7 @@ internal sealed partial class JobWorker
private readonly JobProcessor jobProcessor;
private readonly JobRegistry registry;
private readonly TimeProvider timeProvider;
+ private readonly JobExecutionProgressObserver observer;
private readonly ILogger logger;
private readonly int globalConcurrencyLimit;
private readonly ConcurrentDictionary runningJobCounts = [];
@@ -22,12 +23,14 @@ public JobWorker(
JobRegistry registry,
TimeProvider timeProvider,
ConcurrencySettings concurrencySettings,
+ JobExecutionProgressObserver observer,
ILogger logger)
{
this.jobQueueManager = jobQueueManager;
this.jobProcessor = jobProcessor;
this.registry = registry;
this.timeProvider = timeProvider;
+ this.observer = observer;
this.logger = logger;
globalConcurrencyLimit = concurrencySettings.MaxDegreeOfParallelism;
@@ -187,7 +190,7 @@ public void ScheduleJob(JobDefinition job)
if (nextRunTime.HasValue)
{
LogNextJobRun(job.Type, nextRunTime.Value.LocalDateTime); // todo: log by subscribing to OnStateChanged => JobStateType.Scheduled
- var run = JobRun.Create(job);
+ var run = JobRun.Create(observer.Report, job);
run.RunAt = nextRunTime;
var jobQueue = jobQueueManager.GetOrAddQueue(job.JobFullName);
jobQueue.Enqueue(run, (nextRunTime.Value, (int)run.Priority));
diff --git a/tests/NCronJob.Tests/.editorconfig b/tests/NCronJob.Tests/.editorconfig
new file mode 100644
index 00000000..1621d529
--- /dev/null
+++ b/tests/NCronJob.Tests/.editorconfig
@@ -0,0 +1,3 @@
+[*.cs]
+
+dotnet_diagnostic.IDISP017.severity = none # IDISP017: Prefer using
diff --git a/tests/NCronJob.Tests/NCronJob.Tests.csproj b/tests/NCronJob.Tests/NCronJob.Tests.csproj
index 323a5a97..f37b1c35 100644
--- a/tests/NCronJob.Tests/NCronJob.Tests.csproj
+++ b/tests/NCronJob.Tests/NCronJob.Tests.csproj
@@ -8,7 +8,7 @@
true
Exe
- NU1903
+ $(NoWarn);NU1903
diff --git a/tests/NCronJob.Tests/NCronJobIntegrationTests.cs b/tests/NCronJob.Tests/NCronJobIntegrationTests.cs
index 2a86a9e8..a57091b8 100644
--- a/tests/NCronJob.Tests/NCronJobIntegrationTests.cs
+++ b/tests/NCronJob.Tests/NCronJobIntegrationTests.cs
@@ -3,6 +3,7 @@
using System.Threading.Channels;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
using Shouldly;
namespace NCronJob.Tests;
@@ -215,7 +216,7 @@ await Assert.ThrowsAsync(async () =>
using var serviceScope = provider.CreateScope();
using var executor = serviceScope.ServiceProvider.GetRequiredService();
var jobDefinition = new JobDefinition(typeof(JobWithDependency), null, null, null);
- await executor.RunJob(JobRun.Create(jobDefinition), CancellationToken.None);
+ await executor.RunJob(JobRun.Create((jr) => { }, jobDefinition), CancellationToken.None);
});
}
@@ -249,6 +250,36 @@ public async Task ExecuteAScheduledJobWithDateTimeOffset()
jobFinished.ShouldBeTrue();
}
+ [Fact]
+ public async Task ExecuteAScheduledJobWithDateTimeOffsetInThePast()
+ {
+ ServiceCollection.AddNCronJob(n => n.AddJob());
+ var provider = CreateServiceProvider();
+ var runDate = FakeTimer.GetUtcNow().AddDays(-1);
+
+ (IDisposable subscriber, IList events) = RegisterAnExecutionProgressSubscriber(provider);
+
+ await provider.GetRequiredService().StartAsync(CancellationToken);
+
+ Guid orchestrationId = provider.GetRequiredService().RunScheduledJob(runDate, token: CancellationToken);
+
+ await Task.Delay(10, CancellationToken);
+ FakeTimer.Advance(TimeSpan.FromMinutes(1));
+ var jobFinished = await WaitForJobsOrTimeout(1);
+ jobFinished.ShouldBeFalse();
+
+ await WaitForOrchestrationCompletion(events, orchestrationId);
+
+ subscriber.Dispose();
+
+ Assert.All(events, e => Assert.Equal(orchestrationId, e.CorrelationId));
+ Assert.Equal(ExecutionState.OrchestrationStarted, events[0].State);
+ Assert.Equal(ExecutionState.NotStarted, events[1].State);
+ Assert.Equal(ExecutionState.Expired, events[2].State);
+ Assert.Equal(ExecutionState.OrchestrationCompleted, events[3].State);
+ Assert.Equal(4, events.Count);
+ }
+
[Fact]
public async Task WhileAwaitingJobTriggeringInstantJobShouldAnywayTriggerCronJob()
{
diff --git a/tests/NCronJob.Tests/NCronJobNotificationHandlerTests.cs b/tests/NCronJob.Tests/NCronJobNotificationHandlerTests.cs
index 4d171136..d2cc7d77 100644
--- a/tests/NCronJob.Tests/NCronJobNotificationHandlerTests.cs
+++ b/tests/NCronJob.Tests/NCronJobNotificationHandlerTests.cs
@@ -30,13 +30,33 @@ public async Task ShouldPassDownExceptionToNotificationHandler()
.AddJob(p => p.WithCronExpression("* * * * *"))
.AddNotificationHandler()
);
+
var provider = CreateServiceProvider();
+ (IDisposable subscriber, IList events) = RegisterAnExecutionProgressSubscriber(provider);
+
await provider.GetRequiredService().StartAsync(CancellationToken);
FakeTimer.Advance(TimeSpan.FromMinutes(1));
var message = await CommunicationChannel.Reader.ReadAsync(CancellationToken);
message.ShouldBeOfType();
+
+ Guid orchestrationId = events.First().CorrelationId;
+
+ await WaitForOrchestrationCompletion(events, orchestrationId);
+
+ subscriber.Dispose();
+
+ var filteredEvents = events.Where((e) => e.CorrelationId == orchestrationId).ToList();
+
+ Assert.Equal(ExecutionState.OrchestrationStarted, filteredEvents[0].State);
+ Assert.Equal(ExecutionState.NotStarted, filteredEvents[1].State);
+ Assert.Equal(ExecutionState.Scheduled, filteredEvents[2].State);
+ Assert.Equal(ExecutionState.Initializing, filteredEvents[3].State);
+ Assert.Equal(ExecutionState.Running, filteredEvents[4].State);
+ Assert.Equal(ExecutionState.Faulted, filteredEvents[5].State);
+ Assert.Equal(ExecutionState.OrchestrationCompleted, filteredEvents[6].State);
+ Assert.Equal(7, filteredEvents.Count);
}
[Fact]
diff --git a/tests/NCronJob.Tests/NCronJobRetryTests.cs b/tests/NCronJob.Tests/NCronJobRetryTests.cs
index 4ca3bfb4..ff29a266 100644
--- a/tests/NCronJob.Tests/NCronJobRetryTests.cs
+++ b/tests/NCronJob.Tests/NCronJobRetryTests.cs
@@ -94,6 +94,8 @@ public async Task CancelledJobIsStillAValidExecution()
var jobQueueManager = provider.GetRequiredService();
var jobQueue = jobQueueManager.GetOrAddQueue(typeof(CancelRetryingJob2).FullName!);
+ (IDisposable subscriber, IList events) = RegisterAnExecutionProgressSubscriber(provider);
+
JobRun? nextJob = null;
var tcs = new TaskCompletionSource();
@@ -129,6 +131,24 @@ public async Task CancelledJobIsStillAValidExecution()
await Task.Delay(10, CancellationToken);
nextJob!.CurrentState.Type.ShouldBe(JobStateType.Cancelled);
nextJob!.JobExecutionCount.ShouldBe(1);
+
+ Guid orchestrationId = events.First().CorrelationId;
+
+ await WaitForOrchestrationCompletion(events, orchestrationId);
+
+ subscriber.Dispose();
+
+ var filteredEvents = events.Where((e) => e.CorrelationId == orchestrationId).ToList();
+
+ Assert.Equal(ExecutionState.OrchestrationStarted, filteredEvents[0].State);
+ Assert.Equal(ExecutionState.NotStarted, filteredEvents[1].State);
+ Assert.Equal(ExecutionState.Scheduled, filteredEvents[2].State);
+ Assert.Equal(ExecutionState.Initializing, filteredEvents[3].State);
+ Assert.Equal(ExecutionState.Running, filteredEvents[4].State);
+ Assert.Equal(ExecutionState.Retrying, filteredEvents[5].State);
+ Assert.Equal(ExecutionState.Cancelled, filteredEvents[6].State);
+ Assert.Equal(ExecutionState.OrchestrationCompleted, filteredEvents[7].State);
+ Assert.Equal(8, filteredEvents.Count);
}
[Fact]
diff --git a/tests/NCronJob.Tests/RunAtStartupJobTests.cs b/tests/NCronJob.Tests/RunAtStartupJobTests.cs
index d56c9cb3..530c41fb 100644
--- a/tests/NCronJob.Tests/RunAtStartupJobTests.cs
+++ b/tests/NCronJob.Tests/RunAtStartupJobTests.cs
@@ -56,14 +56,33 @@ public async Task ShouldStartStartupJobsBeforeApplicationIsSpunUp()
services.AddSingleton(_ => storage);
services.AddHostedService();
});
+
using var app = builder.Build();
+ (IDisposable subscriber, IList events) = RegisterAnExecutionProgressSubscriber(app.Services);
+
await app.UseNCronJobAsync();
await RunApp(app);
storage.Content.Count.ShouldBe(2);
storage.Content[0].ShouldBe("SimpleJob");
storage.Content[1].ShouldBe("StartingService");
+
+ Guid orchestrationId = events.First().CorrelationId;
+
+ await WaitForOrchestrationCompletion(events, orchestrationId);
+
+ subscriber.Dispose();
+
+ Assert.All(events, e => Assert.Equal(orchestrationId, e.CorrelationId));
+ Assert.Equal(ExecutionState.OrchestrationStarted, events[0].State);
+ Assert.Equal(ExecutionState.NotStarted, events[1].State);
+ Assert.Equal(ExecutionState.Initializing, events[2].State);
+ Assert.Equal(ExecutionState.Running, events[3].State);
+ Assert.Equal(ExecutionState.Completing, events[4].State);
+ Assert.Equal(ExecutionState.Completed, events[5].State);
+ Assert.Equal(ExecutionState.OrchestrationCompleted, events[6].State);
+ Assert.Equal(7, events.Count);
}
[Fact]
@@ -80,13 +99,31 @@ public async Task StartupJobThatThrowsShouldNotPreventHostFromStarting()
});
services.AddSingleton(_ => storage);
});
+
using var app = builder.Build();
+ (IDisposable subscriber, IList events) = RegisterAnExecutionProgressSubscriber(app.Services);
+
await app.UseNCronJobAsync();
await RunApp(app);
storage.Content.Count.ShouldBe(1);
storage.Content[0].ShouldBe("ExceptionHandler");
+
+ Guid orchestrationId = events.First().CorrelationId;
+
+ await WaitForOrchestrationCompletion(events, orchestrationId);
+
+ subscriber.Dispose();
+
+ Assert.All(events, e => Assert.Equal(orchestrationId, e.CorrelationId));
+ Assert.Equal(ExecutionState.OrchestrationStarted, events[0].State);
+ Assert.Equal(ExecutionState.NotStarted, events[1].State);
+ Assert.Equal(ExecutionState.Initializing, events[2].State);
+ Assert.Equal(ExecutionState.Running, events[3].State);
+ Assert.Equal(ExecutionState.Faulted, events[4].State);
+ Assert.Equal(ExecutionState.OrchestrationCompleted, events[5].State);
+ Assert.Equal(6, events.Count);
}
[SuppressMessage("Major Code Smell", "S108:Nested blocks of code should not be left empty", Justification = "On purpose")]
diff --git a/tests/NCronJob.Tests/RunDependentJobTests.cs b/tests/NCronJob.Tests/RunDependentJobTests.cs
index a50695d2..b7a93fcd 100644
--- a/tests/NCronJob.Tests/RunDependentJobTests.cs
+++ b/tests/NCronJob.Tests/RunDependentJobTests.cs
@@ -167,9 +167,12 @@ public async Task CanTriggerAChainOfDependentJobs()
});
var provider = CreateServiceProvider();
+
+ (IDisposable subscriber, IList events) = RegisterAnExecutionProgressSubscriber(provider);
+
await provider.GetRequiredService().StartAsync(CancellationToken);
- provider.GetRequiredService().ForceRunInstantJob(true, token: CancellationToken);
+ Guid orchestrationId = provider.GetRequiredService().ForceRunInstantJob(true, token: CancellationToken);
List results = [];
results.Add(await CommunicationChannel.Reader.ReadAsync(CancellationToken) as string);
@@ -179,6 +182,16 @@ public async Task CanTriggerAChainOfDependentJobs()
results.ShouldContain("PrincipalJob: Success");
results.ShouldContain("DependentJob: Parent: Success");
results.ShouldContain("Dependent job did run");
+
+ await WaitForOrchestrationCompletion(events, orchestrationId);
+
+ subscriber.Dispose();
+
+ Assert.All(events, e => Assert.Equal(orchestrationId, e.CorrelationId));
+ Assert.Equal(ExecutionState.OrchestrationStarted, events[0].State);
+ Assert.Equal(ExecutionState.Completed, events[18].State);
+ Assert.Equal(ExecutionState.OrchestrationCompleted, events[19].State);
+ Assert.Equal(20, events.Count);
}
[Fact]
diff --git a/tests/NCronJob.Tests/TestHelper.cs b/tests/NCronJob.Tests/TestHelper.cs
index 27f15a72..b107cf29 100644
--- a/tests/NCronJob.Tests/TestHelper.cs
+++ b/tests/NCronJob.Tests/TestHelper.cs
@@ -107,6 +107,51 @@ protected IEnumerable GetCompletionJobs(int expectedJobCount, Cancellation
}
}
+ protected static async Task WaitForOrchestrationCompletion(
+ IList events,
+ Guid orchestrationId)
+ {
+ // Note: Although this function could seem a bit over-engineered, it's sadly necessary.
+ // Indeed, events may actually be updated upstream while it's being enumerated
+ // in here (which leads to a "Collection was modified; enumeration operation may not
+ // execute." error message would we using any enumerating based (eg. Linq) traversal.
+
+ int index = 0;
+
+ while (true)
+ {
+ int count = events.Count;
+
+ while (index < count)
+ {
+ ExecutionProgress @event = events[index];
+
+ if (@event.CorrelationId == orchestrationId && @event.State == ExecutionState.OrchestrationCompleted)
+ {
+ return;
+ }
+
+ index++;
+ }
+
+ await Task.Delay(TimeSpan.FromMicroseconds(100));
+ }
+ }
+
+ protected static (IDisposable subscriber, IList events) RegisterAnExecutionProgressSubscriber(IServiceProvider serviceProvider)
+ {
+ List events = [];
+
+ void Subscriber(ExecutionProgress progress)
+ {
+ events.Add(progress);
+ }
+
+ var progressReporter = serviceProvider.GetRequiredService();
+
+ return (progressReporter.Register(Subscriber), events);
+ }
+
private async IAsyncEnumerable