Skip to content

Commit

Permalink
Merge pull request #157 from NCronJob-Dev/ntk/monitor
Browse files Browse the repository at this point in the history
Implement experimental basic job execution progress reporting hook
  • Loading branch information
nulltoken authored Jan 13, 2025
2 parents 6fc61e0 + 9e6d0d7 commit b814db8
Show file tree
Hide file tree
Showing 28 changed files with 744 additions and 23 deletions.
1 change: 1 addition & 0 deletions .github/workflows/dotnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ jobs:
build:

runs-on: ${{ matrix.os }}
timeout-minutes: 10

strategy:
matrix:
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ All notable changes to **NCronJob** will be documented in this file. The project

## [Unreleased]

### Added

- Expose an **experimental** basic job execution progress reporting hook. Added in [#157](https://github.com/NCronJob-Dev/NCronJob/issues/157), by [@nulltoken](https://github.com/nulltoken).

## [v4.1.0] - 2025-01-02

### Added
Expand Down
2 changes: 2 additions & 0 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
<EnableNETAnalyzers>true</EnableNETAnalyzers>
<AnalysisLevel>latest</AnalysisLevel>
<EnforceCodeStyleInBuild>true</EnforceCodeStyleInBuild>

<NoWarn>$(NoWarn);NCRONJOB_OBSERVER</NoWarn>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)' == 'Release'">
Expand Down
2 changes: 1 addition & 1 deletion docs/advanced/dynamic-job-control.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Dynamic Job Control
**NCronJob** allows you to dynmically add or remove CRON jobs from the scheduler. This is useful when you want to add jobs at runtime or remove jobs based on some condition without restarting the scheduler.
**NCronJob** allows you to dynamically add or remove CRON jobs from the scheduler. This is useful when you want to add jobs at runtime or remove jobs based on some condition without restarting the scheduler.

## Defining job names
The core idea is to define an unique job name for each job that might be mutated during runtime. The job name is an optional parameter:
Expand Down
4 changes: 2 additions & 2 deletions docs/advanced/global-concurrency.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Global Concurrency
**NCronJob** utilisies a priority queue with a maximum amount of entries. That is, the queue will only hold and execute a maximum amount of jobs at any given time. This is to prevent the system from being overloaded with jobs.
**NCronJob** relies on a priority queue with a maximum amount of entries. That is, the queue will only hold and execute a maximum amount of jobs at any given time. This is to prevent the system from being overloaded with jobs.

## The Maxmimum
## The Maximum
The global maximum of concurrent jobs is calculated as:

```csharp
Expand Down
231 changes: 231 additions & 0 deletions docs/advanced/observing-job-progress.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
# Observing job progress

Every time one schedules a job (or triggers it as an instant job), a virtual orchestration is spawned.

An orchestration can be as simple as a unique job, or as complex as a root job and the whole hierarchy of its direct and indirect dependent jobs (see [*"Model Dependencies"*](../features/model-dependencies.md)).

An orchestration is uniquely identifed by an identifier. All jobs belonging to an orchestration share this same `CorrelationId` (See [*"Tracing requests of dependencies via `CorrelationId`"*](../features/model-dependencies.md#tracing-requests-of-dependencies-via-correlationid)).

From a timeline perspective, an orchestration starts before the root job that initiated it and completes when **all** of its leaf jobs have reached a final state.

## Subscribing to the executions of jobs

### Forewords

!!! warning

This is an **experimental** feature subject to breaking changes independently of the standard semver lifecycle release of **NCronJob**.

While reporting feedback or bugs about it, please do not forget to
mention in the issue which version of NCronJob you're using.

Would you decide to give it an early try, in order to suppress the warnings that comes with the [.NET Experimental attribute](https://learn.microsoft.com/en-us/dotnet/fundamentals/apicompat/preview-apis#experimentalattribute), update your `.csproj` with a `<NoWarn>` project setting:

```xml
<PropertyGroup>
...
<NoWarn>$(NoWarn);NCRONJOB_OBSERVER</NoWarn>
</PropertyGroup>
```

### Registering a notifier callback

**NCronJob** exposes the capability to notify whenever jobs change states. One can
suscribe to this by leveraging the `IJobExecutionProgressReporter` service.

This is done through the following exposed method

```csharp
IDisposable Register(Action<ExecutionProgress> callback);
```

!!! info

The registration returns the subscriber as a `IDisposable` object.
In order to stop the subscriber from receiving notifications anymore, invoke the `Dispose()` method of it.

Subscribers to the reporting service will receive an immutable instance of the `ExecutionProgress`. This type will expose every meaningful change to any job or orchestration handled by NCronJob.

### Sample usage

Considering the following orchestration

```text
A ─┬─ (successful) ──> B
└─ (successful) ──> C ─── (successful) ──> D
```

Below a very simple approach to schedule it every minute and register a subscriber.

```csharp
using NCronJob;

public class A : IJob
{
public A(ILogger<A> logger) => Logger = logger;

public ILogger<A> Logger { get; }

public async Task RunAsync(IJobExecutionContext context, CancellationToken token)
{
Logger.LogInformation("[A]: Starting processing...");

await Task.Delay(TimeSpan.FromSeconds(1), token);

Logger.LogInformation("[A]: Processing is done.");
}
}

public class B : IJob
{
public B(ILogger<B> logger) => Logger = logger;

public ILogger<B> Logger { get; }

public async Task RunAsync(IJobExecutionContext context, CancellationToken token)
{
Logger.LogInformation("[B]: Starting processing...");

await Task.Delay(TimeSpan.FromSeconds(6), token);

Logger.LogInformation("[B]: Processing is done.");
}
}

public class C : IJob
{
public C(ILogger<C> logger) => Logger = logger;

public ILogger<C> Logger { get; }

public async Task RunAsync(IJobExecutionContext context, CancellationToken token)
{
Logger.LogInformation("[C]: Starting processing...");

await Task.Delay(TimeSpan.FromSeconds(1), token);

Logger.LogInformation("[C]: Processing is done.");
}
}

public class D : IJob
{
public D(ILogger<D> logger) => Logger = logger;

public ILogger<D> Logger { get; }

public async Task RunAsync(IJobExecutionContext context, CancellationToken token)
{
Logger.LogInformation("[D]: Starting processing...");

await Task.Delay(TimeSpan.FromSeconds(1), token);

Logger.LogInformation("[D]: Processing is done.");
}
}

public class Program
{
private static async Task Main(string[] args)
{
var builder = Host.CreateApplicationBuilder(args);

builder.Services.AddNCronJob(n =>
{
n.AddJob<D>();

n.AddJob<C>()
.ExecuteWhen(success: s => s.RunJob<D>());

n.AddJob<B>();

n.AddJob<A>(o => o.WithCronExpression("* * * * *"))
.ExecuteWhen(success: s => s.RunJob<B>())
.ExecuteWhen(success: s => s.RunJob<C>());
});

var app = builder.Build();

await app.UseNCronJobAsync();

var logger = app.Services.GetRequiredService<ILogger<Program>>();

// Retrieve the observer service from the DI container...
var reporter = app.Services.GetRequiredService<IJobExecutionProgressReporter>();

// ...enlist a new subscriber to it...
IDisposable subscriber = reporter.Register(Subscriber);

await app.RunAsync();

// ...and when you're done with it, unhook the subscription.
subscriber.Dispose();

void Subscriber(ExecutionProgress progress)
{
if (progress.RunId is null)
{
logger.LogWarning(
"Orchestration {CorrelationId} - {Status}",
progress.CorrelationId,
progress.State);

return;
}

logger.LogWarning("Job {JobRunId} - {Status}",
progress.RunId,
progress.State);
}
}

}
```

Given the orchestration defined above, with jobs of varying durations, the generated output log may look like this:

```text
10:46:47 warn: Program[0] Orchestration d36e2b62-6997-44c5-a9f9-de442b8a1807 - OrchestrationStarted
10:46:50 warn: Program[0] Job d751f2eb-9f8d-46e3-b863-6dadc6498468 - NotStarted
10:46:50 warn: Program[0] Job d751f2eb-9f8d-46e3-b863-6dadc6498468 - Scheduled
10:47:00 warn: Program[0] Job d751f2eb-9f8d-46e3-b863-6dadc6498468 - Initializing
10:47:00 warn: Program[0] Job d751f2eb-9f8d-46e3-b863-6dadc6498468 - Running
10:47:00 info: A[0] [A]: Starting processing...
10:47:01 info: A[0] [A]: Processing is done.
10:47:01 warn: Program[0] Job d751f2eb-9f8d-46e3-b863-6dadc6498468 - Completing
10:47:01 warn: Program[0] Job d751f2eb-9f8d-46e3-b863-6dadc6498468 - WaitingForDependency
10:47:01 warn: Program[0] Job 27509f28-d84b-4d50-8f9d-e5500bbc17fa - NotStarted
10:47:01 warn: Program[0] Job f28acd5e-cd3e-445e-979c-59a160035ef2 - NotStarted
10:47:01 warn: Program[0] Job d751f2eb-9f8d-46e3-b863-6dadc6498468 - Completed
10:47:01 warn: Program[0] Job 27509f28-d84b-4d50-8f9d-e5500bbc17fa - Initializing
10:47:01 warn: Program[0] Job 27509f28-d84b-4d50-8f9d-e5500bbc17fa - Running
10:47:01 info: B[0] [B]: Starting processing...
10:47:01 warn: Program[0] Job f28acd5e-cd3e-445e-979c-59a160035ef2 - Initializing
10:47:01 warn: Program[0] Job f28acd5e-cd3e-445e-979c-59a160035ef2 - Running
10:47:01 info: C[0] [C]: Starting processing...
10:47:02 info: C[0] [C]: Processing is done.
10:47:02 warn: Program[0] Job f28acd5e-cd3e-445e-979c-59a160035ef2 - Completing
10:47:02 warn: Program[0] Job f28acd5e-cd3e-445e-979c-59a160035ef2 - WaitingForDependency
10:47:02 warn: Program[0] Job f4a8ef3b-4848-4363-a3b7-f847562598b3 - NotStarted
10:47:02 warn: Program[0] Job f28acd5e-cd3e-445e-979c-59a160035ef2 - Completed
10:47:03 warn: Program[0] Job f4a8ef3b-4848-4363-a3b7-f847562598b3 - Initializing
10:47:03 warn: Program[0] Job f4a8ef3b-4848-4363-a3b7-f847562598b3 - Running
10:47:03 info: D[0] [D]: Starting processing...
10:47:04 info: D[0] [D]: Processing is done.
10:47:04 warn: Program[0] Job f4a8ef3b-4848-4363-a3b7-f847562598b3 - Completing
10:47:04 warn: Program[0] Job f4a8ef3b-4848-4363-a3b7-f847562598b3 - Completed
10:47:07 info: B[0] [B]: Processing is done.
10:47:07 warn: Program[0] Job 27509f28-d84b-4d50-8f9d-e5500bbc17fa - Completing
10:47:07 warn: Program[0] Job 27509f28-d84b-4d50-8f9d-e5500bbc17fa - Completed
10:47:07 warn: Program[0] Orchestration d36e2b62-6997-44c5-a9f9-de442b8a1807 - OrchestrationCompleted
```

## Known limitations

As global NCronJob observability is still under development, it's not feature complete yet.

Below are know missing parts of it. Would you find other releated areas of interest that may be worth investigating, please submit a request in the issue tracker.

- [Report removed jobs as `Cancelled`](https://github.com/NCronJob-Dev/NCronJob/issues/161)
- [Report skipped child jobs as `Skipped`](https://github.com/NCronJob-Dev/NCronJob/issues/160)
2 changes: 1 addition & 1 deletion docs/features/instant-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ app.MapPost("/send-email", (RequestDto dto, IInstantJobRegistry jobRegistry) =>
The `RunInstantJob` method takes the job type and the parameters as arguments. Optionally you can pass in a `CancellationToken` as well. The job will be executed immediately.

## Starting a job with a delay
If you find the need of delaying the execution of an instant job, you can use the `RunScheduledJob` method with a `TimeSpan` as a delay. The same as `RunInstantJob` applies here, the job has to be registered in the `AddNCronJob` method.
If you find the need to delay the execution of an instant job, you can use the `RunScheduledJob` method with a `TimeSpan` as a delay. The same as `RunInstantJob` applies here, the job has to be registered in the `AddNCronJob` method.

```csharp
app.MapPost("/send-email", (RequestDto dto, IInstantJobRegistry jobRegistry) =>
Expand Down
4 changes: 2 additions & 2 deletions docs/features/model-dependencies.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Services.AddNCronJob(options =>
```

## Accessing the state of the parent job
The `JobExecutionContext` object passed to the dependent job contains the output of the parent job. This allows access to the state of the parent job. This can be helpful if information should flow from parent to the child job.
The `JobExecutionContext` object passed to the dependent job contains the output of the parent job. This allows access to the state of the parent job. This can be helpful if information should flow from the parent to the child job.

```csharp
public class JobA : IJob
Expand Down Expand Up @@ -185,4 +185,4 @@ Services.AddNCronJob(options =>
// Register JobC into the container to avoid warnings
options.AddJob<JobC>();
});
```
```
4 changes: 2 additions & 2 deletions docs/features/parameters.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Passing parameters to a job

Often times a job needs some kind of configuration or parameter to run. Imagine you have a job that generates a report and can run daily, weekly or monthly. You could create three different jobs for each frequency, but that would be a lot of duplicated code. Instead, you can pass in parameters to the job.
Often a job needs some kind of configuration or parameter to run. Imagine you have a job that generates a report and can run daily, weekly or monthly. You could create three different jobs for each frequency, but that would be a lot of duplicated code. Instead, you can pass in parameters to the job.

```csharp
Services.AddNCronJob(options =>
Expand Down Expand Up @@ -44,7 +44,7 @@ public class ReportJob : IJob
```

## Parameters are not immutable
Passed in parameters are not immutable by default or cloned through out the job execution. This means that if you change the parameter in the job, it will also change in the next execution. If you need to keep the parameter unchanged, you should clone it in the job.
Passed in parameters are not immutable by default or cloned throughout the job execution. This means that if you change the parameter in the job, it will also change in the next execution. If you need to keep the parameter unchanged, you should clone it in the job.

```csharp
public class MyParameter
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ nav:
- Controlling the log level: advanced/log-level.md
- Global Concurrency: advanced/global-concurrency.md
- Dynamic Job Control: advanced/dynamic-job-control.md
- Observing progress of jobs: advanced/observing-job-progress.md

extra:
generator: false
Expand Down
7 changes: 5 additions & 2 deletions src/NCronJob/Execution/StartupJobManager.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
namespace NCronJob;

internal class StartupJobManager(JobRegistry jobRegistry, JobProcessor jobProcessor)
internal class StartupJobManager(
JobRegistry jobRegistry,
JobProcessor jobProcessor,
JobExecutionProgressObserver observer)
{
private readonly AsyncManualResetEvent startupJobsCompleted = new();

public async Task ProcessStartupJobs(CancellationToken stopToken)
{
var startupJobs = jobRegistry.GetAllOneTimeJobs();
var startupTasks = startupJobs.Select(definition => CreateExecutionTask(JobRun.Create(definition), stopToken)).ToList();
var startupTasks = startupJobs.Select(definition => CreateExecutionTask(JobRun.Create(observer.Report, definition), stopToken)).ToList();

if (startupTasks.Count > 0)
{
Expand Down
5 changes: 5 additions & 0 deletions src/NCronJob/NCronJobExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public static IServiceCollection AddNCronJob(
sp.GetRequiredService<JobWorker>(),
sp.GetRequiredService<JobQueueManager>(),
sp.GetRequiredService<ConcurrencySettings>()));
services.TryAddSingleton<JobExecutionProgressObserver>();
services.TryAddSingleton<IJobExecutionProgressReporter, JobExecutionProgressObserver>((sp) =>
{
return sp.GetRequiredService<JobExecutionProgressObserver>();
});
services.TryAddSingleton(TimeProvider.System);
services.TryAddSingleton<StartupJobManager>();
services.TryAddSingleton<MissingMethodCalledHandler>();
Expand Down
Loading

0 comments on commit b814db8

Please sign in to comment.