Skip to content

Commit

Permalink
Merge pull request #124 from nulltoken/ntk/different_deps_schedules
Browse files Browse the repository at this point in the history
Ensure multiples schedules of the same job with different chains of dependent jobs are properly processed
nulltoken authored Nov 3, 2024
2 parents e6e4053 + 8fba306 commit b906e8b
Showing 7 changed files with 185 additions and 58 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -8,6 +8,8 @@ All notable changes to **NCronJob** will be documented in this file. The project

### Fixes

- Ensure multiples schedules of the same job with different chains of dependent jobs are properly processed. Identified in [#108](https://github.com/NCronJob-Dev/NCronJob/issues/107), by [@nulltoken](https://github.com/nulltoken).

- Teach `IRuntimeJobRegistry.RemoveJob()` to clean up potential dependent jobs. Fixes [#107](https://github.com/NCronJob-Dev/NCronJob/issues/107), by [@nulltoken](https://github.com/nulltoken).

## [3.3.3] - 2024-10-31
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ namespace NCronJob;

internal sealed class DependentJobRegistryEntry
{
public required Type PrincipalType { get; init; }
public List<JobDefinition> RunWhenSuccess { get; init; } = [];
public List<JobDefinition> RunWhenFaulted { get; init; } = [];
}
38 changes: 24 additions & 14 deletions src/NCronJob/Configuration/Builder/NCronJobOptionBuilder.cs
Original file line number Diff line number Diff line change
@@ -40,8 +40,8 @@ internal NCronJobOptionBuilder(
public IStartupStage<T> AddJob<T>(Action<JobOptionBuilder>? options = null)
where T : class, IJob
{
var builder = AddJobInternal(typeof(T), options);
return new StartupStage<T>(Services, Settings, jobRegistry, builder);
var (builder, jobDefinitions) = AddJobInternal(typeof(T), options);
return new StartupStage<T>(Services, jobDefinitions, Settings, jobRegistry, builder);
}

/// <summary>
@@ -59,8 +59,8 @@ public IStartupStage<T> AddJob<T>(Action<JobOptionBuilder>? options = null)
/// </example>
public IStartupStage<IJob> AddJob(Type jobType, Action<JobOptionBuilder>? options = null)
{
var builder = AddJobInternal(jobType, options);
return new StartupStage<IJob>(Services, Settings, jobRegistry, builder);
var (builder, jobDefinitions) = AddJobInternal(jobType, options);
return new StartupStage<IJob>(Services, jobDefinitions, Settings, jobRegistry, builder);
}

/// <summary>
@@ -136,10 +136,14 @@ internal static CronExpression GetCronExpression(string expression)
: throw new InvalidOperationException("Invalid cron expression");
}

private JobOptionBuilder AddJobInternal(Type jobType, Action<JobOptionBuilder>? options)
private (JobOptionBuilder, IReadOnlyCollection<JobDefinition>) AddJobInternal(
Type jobType,
Action<JobOptionBuilder>? options)
{
ValidateConcurrencySetting(jobType);

var jobDefinitions = new List<JobDefinition>();

var builder = new JobOptionBuilder();
options?.Invoke(builder);

@@ -159,9 +163,10 @@ private JobOptionBuilder AddJobInternal(Type jobType, Action<JobOptionBuilder>?
UserDefinedCronExpression = option.CronExpression
};
jobRegistry.Add(entry);
jobDefinitions.Add(entry);
}

return builder;
return (builder, jobDefinitions);
}

private static bool DetermineAndValidatePrecision(string cronExpression)
@@ -190,13 +195,16 @@ internal class StartupStage<TJob> : IStartupStage<TJob> where TJob : class, IJob
private readonly ConcurrencySettings settings;
private readonly JobRegistry jobRegistry;
private readonly JobOptionBuilder jobOptionBuilder;
private readonly IReadOnlyCollection<JobDefinition> jobDefinitions;

internal StartupStage(
IServiceCollection services,
IReadOnlyCollection<JobDefinition> jobDefinitions,
ConcurrencySettings settings,
JobRegistry jobRegistry,
JobOptionBuilder jobOptionBuilder)
{
this.jobDefinitions = jobDefinitions;
this.services = services;
this.settings = settings;
this.jobRegistry = jobRegistry;
@@ -210,20 +218,20 @@ public INotificationStage<TJob> RunAtStartup()

jobRegistry.UpdateJobDefinitionsToRunAtStartup<TJob>();

return new NotificationStage<TJob>(services, settings, jobRegistry);
return new NotificationStage<TJob>(services, jobDefinitions, settings, jobRegistry);
}

/// <inheritdoc />
public INotificationStage<TJob> AddNotificationHandler<TJobNotificationHandler>() where TJobNotificationHandler : class, IJobNotificationHandler<TJob>
{
services.TryAddScoped<IJobNotificationHandler<TJob>, TJobNotificationHandler>();
return new NotificationStage<TJob>(services, settings, jobRegistry);
return new NotificationStage<TJob>(services, jobDefinitions, settings, jobRegistry);
}

/// <inheritdoc />
public INotificationStage<TJob> ExecuteWhen(Action<DependencyBuilder<TJob>>? success = null, Action<DependencyBuilder<TJob>>? faulted = null)
{
ExecuteWhenHelper.AddRegistration(jobRegistry, success, faulted);
ExecuteWhenHelper.AddRegistration(jobRegistry, jobDefinitions, success, faulted);

return this;
}
@@ -246,15 +254,18 @@ internal class NotificationStage<TJob> : INotificationStage<TJob> where TJob : c
private readonly IServiceCollection services;
private readonly ConcurrencySettings settings;
private readonly JobRegistry jobRegistry;
private readonly IReadOnlyCollection<JobDefinition> jobDefinitions;

internal NotificationStage(
IServiceCollection services,
IReadOnlyCollection<JobDefinition> jobDefinitions,
ConcurrencySettings settings,
JobRegistry jobRegistry)
{
this.services = services;
this.settings = settings;
this.jobRegistry = jobRegistry;
this.jobDefinitions = jobDefinitions;
}

/// <inheritdoc />
@@ -269,7 +280,7 @@ public INotificationStage<TJob> AddNotificationHandler<TJobNotificationHandler>(
public INotificationStage<TJob> ExecuteWhen(Action<DependencyBuilder<TJob>>? success = null,
Action<DependencyBuilder<TJob>>? faulted = null)
{
ExecuteWhenHelper.AddRegistration(jobRegistry, success, faulted);
ExecuteWhenHelper.AddRegistration(jobRegistry, jobDefinitions, success, faulted);

return this;
}
@@ -355,6 +366,7 @@ internal static class ExecuteWhenHelper
{
public static void AddRegistration<TJob>(
JobRegistry jobRegistry,
IReadOnlyCollection<JobDefinition> parentJobDefinitions,
Action<DependencyBuilder<TJob>>? success,
Action<DependencyBuilder<TJob>>? faulted)
where TJob : IJob
@@ -366,11 +378,10 @@ public static void AddRegistration<TJob>(
var runWhenSuccess = dependencyBuilder.GetDependentJobOption();
var entry = new DependentJobRegistryEntry
{
PrincipalType = typeof(TJob),
RunWhenSuccess = runWhenSuccess,

};
jobRegistry.RegisterJobDependency(entry);
jobRegistry.RegisterJobDependency(parentJobDefinitions, entry);
}

if (faulted is not null)
@@ -380,11 +391,10 @@ public static void AddRegistration<TJob>(
var runWhenFaulted = dependencyBuilder.GetDependentJobOption();
var entry = new DependentJobRegistryEntry
{
PrincipalType = typeof(TJob),
RunWhenFaulted = runWhenFaulted,

};
jobRegistry.RegisterJobDependency(entry);
jobRegistry.RegisterJobDependency(parentJobDefinitions, entry);
}
}
}
4 changes: 2 additions & 2 deletions src/NCronJob/Execution/JobExecutor.cs
Original file line number Diff line number Diff line change
@@ -154,8 +154,8 @@ public void InformDependentJobs(JobExecutionContext context, bool success)

var jobRun = context.JobRun;
var dependencies = success
? jobRegistry.GetDependentSuccessJobTypes(jobRun.JobDefinition.Type)
: jobRegistry.GetDependentFaultedJobTypes(jobRun.JobDefinition.Type);
? jobRegistry.GetDependentSuccessJobTypes(jobRun.JobDefinition)
: jobRegistry.GetDependentFaultedJobTypes(jobRun.JobDefinition);

if (dependencies.Count > 0)
jobRun.NotifyStateChange(JobStateType.WaitingForDependency);
10 changes: 7 additions & 3 deletions src/NCronJob/Registry/IInstantJobRegistry.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.Logging;
using System.Diagnostics;

namespace NCronJob;

@@ -235,17 +236,20 @@ private void RunJob<TJob>(DateTimeOffset startDate, object? parameter = null, bo
{
using (logger.BeginScope("Triggering RunScheduledJob:"))
{
var newJobDefinition = new JobDefinition(typeof(TJob), parameter, null, null);

if (!jobRegistry.IsJobRegistered<TJob>())
{
LogJobNotRegistered(typeof(TJob).Name);
var newJobDefinition = new JobDefinition(typeof(TJob), parameter, null, null);
jobRegistry.Add(newJobDefinition);
}

var jobDefinition = jobRegistry.FindJobDefinition(typeof(TJob));

Debug.Assert(jobDefinition != null);

token.Register(() => LogCancellationRequested(parameter));

RunInternal(newJobDefinition, parameter, startDate, forceExecution, token);
RunInternal(jobDefinition, parameter, startDate, forceExecution, token);
}
}

64 changes: 47 additions & 17 deletions src/NCronJob/Registry/JobRegistry.cs
Original file line number Diff line number Diff line change
@@ -4,9 +4,11 @@ namespace NCronJob;

internal sealed class JobRegistry
{
private readonly HashSet<JobDefinition> allJobs = new(JobDefinitionEqualityComparer.Instance);
private readonly HashSet<JobDefinition> allJobs
= new(JobDefinitionEqualityComparer.Instance);
public List<DynamicJobRegistration> DynamicJobRegistrations { get; } = [];
private readonly Dictionary<Type, List<DependentJobRegistryEntry>> dependentJobsPerPrincipalJobType = [];
private readonly Dictionary<JobDefinition, List<DependentJobRegistryEntry>> dependentJobsPerJobDefinition
= new(DependentJobDefinitionEqualityComparer.Instance);

public IReadOnlyCollection<JobDefinition> GetAllJobs() => [.. allJobs];

@@ -78,22 +80,25 @@ public IJob GetDynamicJobInstance(IServiceProvider serviceProvider, JobDefinitio
.Single(d => d.JobDefinition.JobFullName == jobDefinition.JobFullName)
.DynamicJobFactoryResolver(serviceProvider);

public void RegisterJobDependency(DependentJobRegistryEntry entry)
public void RegisterJobDependency(IReadOnlyCollection<JobDefinition> parentJobdefinitions, DependentJobRegistryEntry entry)
{
if (!dependentJobsPerPrincipalJobType.TryGetValue(entry.PrincipalType, out var entries))
foreach (var jobDefinition in parentJobdefinitions)
{
entries = [];
dependentJobsPerPrincipalJobType.Add(entry.PrincipalType, entries);
}
if (!dependentJobsPerJobDefinition.TryGetValue(jobDefinition, out var entries))
{
entries = [];
dependentJobsPerJobDefinition.Add(jobDefinition, entries);
}

entries.Add(entry);
entries.Add(entry);
}
}

public IReadOnlyCollection<JobDefinition> GetDependentSuccessJobTypes(Type principalType)
=> FilterByAndProject(principalType, v => v.SelectMany(p => p.RunWhenSuccess));
public IReadOnlyCollection<JobDefinition> GetDependentSuccessJobTypes(JobDefinition parentJobDefinition)
=> FilterByAndProject(parentJobDefinition, v => v.SelectMany(p => p.RunWhenSuccess));

public IReadOnlyCollection<JobDefinition> GetDependentFaultedJobTypes(Type principalType)
=> FilterByAndProject(principalType, v => v.SelectMany(p => p.RunWhenFaulted));
public IReadOnlyCollection<JobDefinition> GetDependentFaultedJobTypes(JobDefinition parentJobDefinition)
=> FilterByAndProject(parentJobDefinition, v => v.SelectMany(p => p.RunWhenFaulted));

private void AddDynamicJobRegistration(JobDefinition jobDefinition, Delegate jobDelegate)
=> DynamicJobRegistrations.Add(new DynamicJobRegistration(jobDefinition, sp => new DynamicJobFactory(sp, jobDelegate)));
@@ -112,10 +117,9 @@ public void UpdateJobDefinitionsToRunAtStartup<TJob>()
}

private JobDefinition[] FilterByAndProject(
Type principalJobType,
Func<IEnumerable<DependentJobRegistryEntry>, IEnumerable<JobDefinition>> transform)

=> !dependentJobsPerPrincipalJobType.TryGetValue(principalJobType, out var types)
JobDefinition parentJobDefinition,
Func<IEnumerable<DependentJobRegistryEntry>, IEnumerable<JobDefinition>> transform)
=> !dependentJobsPerJobDefinition.TryGetValue(parentJobDefinition, out var types)
? []
: transform(types).ToArray();

@@ -128,7 +132,7 @@ private void Remove(JobDefinition? jobDefinition)

allJobs.Remove(jobDefinition);

dependentJobsPerPrincipalJobType.Remove(jobDefinition.Type);
dependentJobsPerJobDefinition.Remove(jobDefinition);
}

private void AssertNoDuplicateJobNames(string? additionalJobName = null)
@@ -167,4 +171,30 @@ public int GetHashCode(JobDefinition obj) => HashCode.Combine(
obj.CustomName,
obj.IsStartupJob);
}

private sealed class DependentJobDefinitionEqualityComparer : IEqualityComparer<JobDefinition>
{
// TODO: Maybe is the code conflating two different concepts.
// Dependent jobs may have a name, a type and a parameter, but that's the most of it.
// And the code currently uses the same type to hold the configuration of "lead" jobs
// and dependent jobs.
//
// Which brings this dependent job only comparer.
//
// Maybe should a DependentJobDefinition type spawn?

public static readonly DependentJobDefinitionEqualityComparer Instance = new();

public bool Equals(JobDefinition? x, JobDefinition? y) =>
(x is null && y is null) || (x is not null && y is not null
&& x.Type == y.Type && x.Type != typeof(DynamicJobFactory)
&& x.Parameter == y.Parameter
&& x.CustomName == y.CustomName);

public int GetHashCode(JobDefinition obj) => HashCode.Combine(
obj.Type,
obj.Parameter,
obj.CustomName
);
}
}
Loading

0 comments on commit b906e8b

Please sign in to comment.