Skip to content

Commit

Permalink
Ensure different dependent schedules are honored
Browse files Browse the repository at this point in the history
  • Loading branch information
nulltoken committed Nov 3, 2024
1 parent abf6f28 commit 95f8209
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ namespace NCronJob;

internal sealed class DependentJobRegistryEntry
{
public required Type PrincipalType { get; init; }
public List<JobDefinition> RunWhenSuccess { get; init; } = [];
public List<JobDefinition> RunWhenFaulted { get; init; } = [];
}
38 changes: 24 additions & 14 deletions src/NCronJob/Configuration/Builder/NCronJobOptionBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ internal NCronJobOptionBuilder(
public IStartupStage<T> AddJob<T>(Action<JobOptionBuilder>? options = null)
where T : class, IJob
{
var builder = AddJobInternal(typeof(T), options);
return new StartupStage<T>(Services, Settings, jobRegistry, builder);
var (builder, jobDefinitions) = AddJobInternal(typeof(T), options);
return new StartupStage<T>(Services, jobDefinitions, Settings, jobRegistry, builder);
}

/// <summary>
Expand All @@ -59,8 +59,8 @@ public IStartupStage<T> AddJob<T>(Action<JobOptionBuilder>? options = null)
/// </example>
public IStartupStage<IJob> AddJob(Type jobType, Action<JobOptionBuilder>? options = null)
{
var builder = AddJobInternal(jobType, options);
return new StartupStage<IJob>(Services, Settings, jobRegistry, builder);
var (builder, jobDefinitions) = AddJobInternal(jobType, options);
return new StartupStage<IJob>(Services, jobDefinitions, Settings, jobRegistry, builder);
}

/// <summary>
Expand Down Expand Up @@ -136,10 +136,14 @@ internal static CronExpression GetCronExpression(string expression)
: throw new InvalidOperationException("Invalid cron expression");
}

private JobOptionBuilder AddJobInternal(Type jobType, Action<JobOptionBuilder>? options)
private (JobOptionBuilder, ICollection<JobDefinition>) AddJobInternal(
Type jobType,
Action<JobOptionBuilder>? options)
{
ValidateConcurrencySetting(jobType);

var jobDefinitions = new List<JobDefinition>();

var builder = new JobOptionBuilder();
options?.Invoke(builder);

Expand All @@ -159,9 +163,10 @@ private JobOptionBuilder AddJobInternal(Type jobType, Action<JobOptionBuilder>?
UserDefinedCronExpression = option.CronExpression
};
jobRegistry.Add(entry);
jobDefinitions.Add(entry);
}

return builder;
return (builder, jobDefinitions);
}

private static bool DetermineAndValidatePrecision(string cronExpression)
Expand Down Expand Up @@ -190,13 +195,16 @@ internal class StartupStage<TJob> : IStartupStage<TJob> where TJob : class, IJob
private readonly ConcurrencySettings settings;
private readonly JobRegistry jobRegistry;
private readonly JobOptionBuilder jobOptionBuilder;
private readonly ICollection<JobDefinition> jobDefinitions;

internal StartupStage(
IServiceCollection services,
ICollection<JobDefinition> jobDefinitions,
ConcurrencySettings settings,
JobRegistry jobRegistry,
JobOptionBuilder jobOptionBuilder)
{
this.jobDefinitions = jobDefinitions;
this.services = services;
this.settings = settings;
this.jobRegistry = jobRegistry;
Expand All @@ -210,20 +218,20 @@ public INotificationStage<TJob> RunAtStartup()

jobRegistry.UpdateJobDefinitionsToRunAtStartup<TJob>();

return new NotificationStage<TJob>(services, settings, jobRegistry);
return new NotificationStage<TJob>(services, jobDefinitions, settings, jobRegistry);
}

/// <inheritdoc />
public INotificationStage<TJob> AddNotificationHandler<TJobNotificationHandler>() where TJobNotificationHandler : class, IJobNotificationHandler<TJob>
{
services.TryAddScoped<IJobNotificationHandler<TJob>, TJobNotificationHandler>();
return new NotificationStage<TJob>(services, settings, jobRegistry);
return new NotificationStage<TJob>(services, jobDefinitions, settings, jobRegistry);
}

/// <inheritdoc />
public INotificationStage<TJob> ExecuteWhen(Action<DependencyBuilder<TJob>>? success = null, Action<DependencyBuilder<TJob>>? faulted = null)
{
ExecuteWhenHelper.AddRegistration(jobRegistry, success, faulted);
ExecuteWhenHelper.AddRegistration(jobRegistry, jobDefinitions, success, faulted);

return this;
}
Expand All @@ -246,15 +254,18 @@ internal class NotificationStage<TJob> : INotificationStage<TJob> where TJob : c
private readonly IServiceCollection services;
private readonly ConcurrencySettings settings;
private readonly JobRegistry jobRegistry;
private readonly ICollection<JobDefinition> jobDefinitions;

internal NotificationStage(
IServiceCollection services,
ICollection<JobDefinition> jobDefinitions,
ConcurrencySettings settings,
JobRegistry jobRegistry)
{
this.services = services;
this.settings = settings;
this.jobRegistry = jobRegistry;
this.jobDefinitions = jobDefinitions;
}

/// <inheritdoc />
Expand All @@ -269,7 +280,7 @@ public INotificationStage<TJob> AddNotificationHandler<TJobNotificationHandler>(
public INotificationStage<TJob> ExecuteWhen(Action<DependencyBuilder<TJob>>? success = null,
Action<DependencyBuilder<TJob>>? faulted = null)
{
ExecuteWhenHelper.AddRegistration(jobRegistry, success, faulted);
ExecuteWhenHelper.AddRegistration(jobRegistry, jobDefinitions, success, faulted);

return this;
}
Expand Down Expand Up @@ -358,6 +369,7 @@ internal static class ExecuteWhenHelper
{
public static void AddRegistration<TJob>(
JobRegistry jobRegistry,
ICollection<JobDefinition> parentJobDefinitions,
Action<DependencyBuilder<TJob>>? success,
Action<DependencyBuilder<TJob>>? faulted)
where TJob : IJob
Expand All @@ -369,11 +381,10 @@ public static void AddRegistration<TJob>(
var runWhenSuccess = dependencyBuilder.GetDependentJobOption();
var entry = new DependentJobRegistryEntry
{
PrincipalType = typeof(TJob),
RunWhenSuccess = runWhenSuccess,

};
jobRegistry.RegisterJobDependency(entry);
jobRegistry.RegisterJobDependency(parentJobDefinitions, entry);
}

if (faulted is not null)
Expand All @@ -383,11 +394,10 @@ public static void AddRegistration<TJob>(
var runWhenFaulted = dependencyBuilder.GetDependentJobOption();
var entry = new DependentJobRegistryEntry
{
PrincipalType = typeof(TJob),
RunWhenFaulted = runWhenFaulted,

};
jobRegistry.RegisterJobDependency(entry);
jobRegistry.RegisterJobDependency(parentJobDefinitions, entry);
}
}
}
4 changes: 2 additions & 2 deletions src/NCronJob/Execution/JobExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ public void InformDependentJobs(JobExecutionContext context, bool success)

var jobRun = context.JobRun;
var dependencies = success
? jobRegistry.GetDependentSuccessJobTypes(jobRun.JobDefinition.Type)
: jobRegistry.GetDependentFaultedJobTypes(jobRun.JobDefinition.Type);
? jobRegistry.GetDependentSuccessJobTypes(jobRun.JobDefinition)
: jobRegistry.GetDependentFaultedJobTypes(jobRun.JobDefinition);

if (dependencies.Count > 0)
jobRun.NotifyStateChange(JobStateType.WaitingForDependency);
Expand Down
10 changes: 7 additions & 3 deletions src/NCronJob/Registry/IInstantJobRegistry.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.Logging;
using System.Diagnostics;

namespace NCronJob;

Expand Down Expand Up @@ -235,17 +236,20 @@ private void RunJob<TJob>(DateTimeOffset startDate, object? parameter = null, bo
{
using (logger.BeginScope("Triggering RunScheduledJob:"))
{
var newJobDefinition = new JobDefinition(typeof(TJob), parameter, null, null);

if (!jobRegistry.IsJobRegistered<TJob>())
{
LogJobNotRegistered(typeof(TJob).Name);
var newJobDefinition = new JobDefinition(typeof(TJob), parameter, null, null);
jobRegistry.Add(newJobDefinition);
}

var jobDefinition = jobRegistry.FindJobDefinition(typeof(TJob));

Debug.Assert(jobDefinition != null);

token.Register(() => LogCancellationRequested(parameter));

RunInternal(newJobDefinition, parameter, startDate, forceExecution, token);
RunInternal(jobDefinition, parameter, startDate, forceExecution, token);
}
}

Expand Down
71 changes: 54 additions & 17 deletions src/NCronJob/Registry/JobRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ namespace NCronJob;

internal sealed class JobRegistry
{
private readonly HashSet<JobDefinition> allJobs = new(JobDefinitionEqualityComparer.Instance);
private readonly HashSet<JobDefinition> allJobs
= new(JobDefinitionEqualityComparer.Instance);
public List<DynamicJobRegistration> DynamicJobRegistrations { get; } = [];
private readonly Dictionary<Type, List<DependentJobRegistryEntry>> dependentJobsPerPrincipalJobType = [];
private readonly Dictionary<JobDefinition, List<DependentJobRegistryEntry>> dependentJobsPerJobDefinition
= new(DependentJobDefinitionEqualityComparer.Instance);

public IReadOnlyCollection<JobDefinition> GetAllJobs() => [.. allJobs];

Expand Down Expand Up @@ -78,22 +80,25 @@ public IJob GetDynamicJobInstance(IServiceProvider serviceProvider, JobDefinitio
.Single(d => d.JobDefinition.JobFullName == jobDefinition.JobFullName)
.DynamicJobFactoryResolver(serviceProvider);

public void RegisterJobDependency(DependentJobRegistryEntry entry)
public void RegisterJobDependency(ICollection<JobDefinition> parentJobdefinitions, DependentJobRegistryEntry entry)
{
if (!dependentJobsPerPrincipalJobType.TryGetValue(entry.PrincipalType, out var entries))
foreach (var jobDefinition in parentJobdefinitions)
{
entries = [];
dependentJobsPerPrincipalJobType.Add(entry.PrincipalType, entries);
}
if (!dependentJobsPerJobDefinition.TryGetValue(jobDefinition, out var entries))
{
entries = [];
dependentJobsPerJobDefinition.Add(jobDefinition, entries);
}

entries.Add(entry);
entries.Add(entry);
}
}

public IReadOnlyCollection<JobDefinition> GetDependentSuccessJobTypes(Type principalType)
=> FilterByAndProject(principalType, v => v.SelectMany(p => p.RunWhenSuccess));
public IReadOnlyCollection<JobDefinition> GetDependentSuccessJobTypes(JobDefinition parentJobDefinition)
=> FilterByAndProject(parentJobDefinition, v => v.SelectMany(p => p.RunWhenSuccess));

public IReadOnlyCollection<JobDefinition> GetDependentFaultedJobTypes(Type principalType)
=> FilterByAndProject(principalType, v => v.SelectMany(p => p.RunWhenFaulted));
public IReadOnlyCollection<JobDefinition> GetDependentFaultedJobTypes(JobDefinition parentJobDefinition)
=> FilterByAndProject(parentJobDefinition, v => v.SelectMany(p => p.RunWhenFaulted));

private void AddDynamicJobRegistration(JobDefinition jobDefinition, Delegate jobDelegate)
=> DynamicJobRegistrations.Add(new DynamicJobRegistration(jobDefinition, sp => new DynamicJobFactory(sp, jobDelegate)));
Expand All @@ -112,10 +117,9 @@ public void UpdateJobDefinitionsToRunAtStartup<TJob>()
}

private JobDefinition[] FilterByAndProject(
Type principalJobType,
Func<IEnumerable<DependentJobRegistryEntry>, IEnumerable<JobDefinition>> transform)

=> !dependentJobsPerPrincipalJobType.TryGetValue(principalJobType, out var types)
JobDefinition parentJobDefinition,
Func<IEnumerable<DependentJobRegistryEntry>, IEnumerable<JobDefinition>> transform)
=> !dependentJobsPerJobDefinition.TryGetValue(parentJobDefinition, out var types)
? []
: transform(types).ToArray();

Expand All @@ -128,7 +132,7 @@ private void Remove(JobDefinition? jobDefinition)

allJobs.Remove(jobDefinition);

dependentJobsPerPrincipalJobType.Remove(jobDefinition.Type);
dependentJobsPerJobDefinition.Remove(jobDefinition);
}

private void AssertNoDuplicateJobNames(string? additionalJobName = null)
Expand Down Expand Up @@ -167,4 +171,37 @@ public int GetHashCode(JobDefinition obj) => HashCode.Combine(
obj.CustomName,
obj.IsStartupJob);
}

private sealed class DependentJobDefinitionEqualityComparer : IEqualityComparer<JobDefinition>
{
// TODO: Maybe is the code conflating two different concepts.
// Dependent jobs may have a name, a type and a parameter, but that's the most of it.
// And the code currently uses the same type to hold the configuration of "lead" jobs
// and dependent jobs.
//
// Which brings this dependent job only comparer.
//
// Maybe should a DependentJobDefinition type spawn?

public static readonly DependentJobDefinitionEqualityComparer Instance = new();

public bool Equals(JobDefinition? x, JobDefinition? y) =>
(x is null && y is null) || (x is not null && y is not null
&& x.Type == y.Type && x.Type != typeof(DynamicJobFactory)
&& x.Parameter == y.Parameter
//&& x.CronExpression == y.CronExpression
//&& x.TimeZone == y.TimeZone
&& x.CustomName == y.CustomName
//&& x.IsStartupJob == y.IsStartupJob
);

public int GetHashCode(JobDefinition obj) => HashCode.Combine(
obj.Type,
obj.Parameter,
//obj.CronExpression,
//obj.TimeZone,
obj.CustomName
//obj.IsStartupJob
);
}
}
Loading

0 comments on commit 95f8209

Please sign in to comment.