From 88ff3edeb3fb437ed7d089ad66f916eac6fd37cc Mon Sep 17 00:00:00 2001 From: nulltoken Date: Sat, 25 Jan 2025 13:49:59 +0100 Subject: [PATCH] Make JobRun.RunAt immutable --- src/NCronJob/Registry/IInstantJobRegistry.cs | 4 ++-- src/NCronJob/Registry/JobRun.cs | 23 +++++++++++++++----- src/NCronJob/Scheduler/JobProcessor.cs | 5 +---- src/NCronJob/Scheduler/JobQueue.cs | 12 +++------- src/NCronJob/Scheduler/JobQueueManager.cs | 5 +---- src/NCronJob/Scheduler/JobWorker.cs | 5 ++--- 6 files changed, 26 insertions(+), 28 deletions(-) diff --git a/src/NCronJob/Registry/IInstantJobRegistry.cs b/src/NCronJob/Registry/IInstantJobRegistry.cs index 7f2c3f60..d2cf0b6c 100644 --- a/src/NCronJob/Registry/IInstantJobRegistry.cs +++ b/src/NCronJob/Registry/IInstantJobRegistry.cs @@ -275,11 +275,11 @@ private Guid RunInternal( timeProvider, observer.Report, jobDefinition, + startDate, parameter, token); run.Priority = JobPriority.High; - run.RunAt = startDate; run.IsOneTimeJob = true; if (forceExecution) @@ -289,7 +289,7 @@ private Guid RunInternal( else { var jobQueue = jobQueueManager.GetOrAddQueue(run.JobDefinition.JobFullName); - jobQueue.EnqueueForDirectExecution(run, startDate); + jobQueue.EnqueueForDirectExecution(run); jobQueueManager.SignalJobQueue(run.JobDefinition.JobFullName); } diff --git a/src/NCronJob/Registry/JobRun.cs b/src/NCronJob/Registry/JobRun.cs index 668a0945..5eba3a41 100644 --- a/src/NCronJob/Registry/JobRun.cs +++ b/src/NCronJob/Registry/JobRun.cs @@ -14,15 +14,17 @@ internal class JobRun private JobRun( TimeProvider timeProvider, JobDefinition jobDefinition, + DateTimeOffset runAt, object? parameter, Action progressReporter) - : this(timeProvider, null, jobDefinition, parameter, progressReporter) + : this(timeProvider, null, jobDefinition, runAt, parameter, progressReporter) { } private JobRun( TimeProvider timeProvider, JobRun? parentJob, JobDefinition jobDefinition, + DateTimeOffset runAt, object? parameter, Action progressReporter) { @@ -34,6 +36,7 @@ private JobRun( CorrelationId = parentJob is not null ? parentJob.CorrelationId : Guid.NewGuid(); this.timeProvider = timeProvider; JobDefinition = jobDefinition; + RunAt = runAt; Parameter = parameter ?? jobDefinition.Parameter; this.progressReporter = progressReporter; @@ -52,7 +55,7 @@ private JobRun( public Guid CorrelationId { get; } public bool IsOrchestrationRoot { get; } public CancellationToken CancellationToken { get; set; } - public DateTimeOffset? RunAt { get; set; } + public DateTimeOffset RunAt { get; } /// /// At the moment of processing, if the difference between the current time and the scheduled time exceeds the @@ -60,7 +63,7 @@ private JobRun( /// but it has been dequeued then essentially the job is dropped. /// public TimeSpan Expiry { get; set; } = TimeSpan.FromMinutes(10); - public bool IsExpired(TimeProvider timeProvider) => RunAt.HasValue && timeProvider.GetUtcNow() - RunAt.Value > Expiry; + public bool IsExpired => timeProvider.GetUtcNow() - RunAt > Expiry; public bool IsOneTimeJob { get; set; } public object? Parameter { get; } public object? ParentOutput { get; set; } @@ -71,15 +74,23 @@ public static JobRun Create( TimeProvider timeProvider, Action progressReporter, JobDefinition jobDefinition) - => new(timeProvider, jobDefinition, jobDefinition.Parameter, progressReporter); + => new(timeProvider, jobDefinition, timeProvider.GetUtcNow(), jobDefinition.Parameter, progressReporter); public static JobRun Create( TimeProvider timeProvider, Action progressReporter, JobDefinition jobDefinition, + DateTimeOffset runAt) + => new(timeProvider, jobDefinition, runAt, jobDefinition.Parameter, progressReporter); + + public static JobRun Create( + TimeProvider timeProvider, + Action progressReporter, + JobDefinition jobDefinition, + DateTimeOffset runAt, object? parameter, CancellationToken token) - => new(timeProvider, jobDefinition, parameter, progressReporter) + => new(timeProvider, jobDefinition, runAt, parameter, progressReporter) { CancellationToken = token, }; @@ -89,7 +100,7 @@ public JobRun CreateDependent( object? parameter, CancellationToken token) { - JobRun run = new(timeProvider, this, jobDefinition, parameter, progressReporter) + JobRun run = new(timeProvider, this, jobDefinition, timeProvider.GetUtcNow(), parameter, progressReporter) { CancellationToken = token, }; diff --git a/src/NCronJob/Scheduler/JobProcessor.cs b/src/NCronJob/Scheduler/JobProcessor.cs index 87c1e130..0e3c6e09 100644 --- a/src/NCronJob/Scheduler/JobProcessor.cs +++ b/src/NCronJob/Scheduler/JobProcessor.cs @@ -5,16 +5,13 @@ namespace NCronJob; internal sealed partial class JobProcessor { private readonly JobExecutor jobExecutor; - private readonly TimeProvider timeProvider; private readonly ILogger logger; public JobProcessor( JobExecutor jobExecutor, - TimeProvider timeProvider, ILogger logger) { this.jobExecutor = jobExecutor; - this.timeProvider = timeProvider; this.logger = logger; } @@ -22,7 +19,7 @@ public async Task ProcessJobAsync(JobRun jobRun, CancellationToken cancellationT { try { - if (jobRun.IsExpired(timeProvider)) + if (jobRun.IsExpired) { LogDequeuingExpiredJob(jobRun.JobDefinition.JobName); jobRun.NotifyStateChange(JobStateType.Expired); diff --git a/src/NCronJob/Scheduler/JobQueue.cs b/src/NCronJob/Scheduler/JobQueue.cs index c56c61c5..d279ca90 100644 --- a/src/NCronJob/Scheduler/JobQueue.cs +++ b/src/NCronJob/Scheduler/JobQueue.cs @@ -5,13 +5,10 @@ namespace NCronJob; /// internal sealed class JobQueue : ObservablePriorityQueue { - private readonly TimeProvider timeProvider; - public string Name { get; set; } - public JobQueue(TimeProvider timeProvider, string name) : base(new JobQueueTupleComparer()) + public JobQueue(string name) : base(new JobQueueTupleComparer()) { - this.timeProvider = timeProvider; Name = name; } @@ -19,12 +16,9 @@ public JobQueue(TimeProvider timeProvider, string name) : base(new JobQueueTuple /// Adds a job entry to this instance. /// /// The job that will be added. - /// An optional object representing when the job should run. If null then it'll - /// fall back to the job.RunAt. If the job.RunAt is not defined then it runs immediately. - public void EnqueueForDirectExecution(JobRun job, DateTimeOffset? when = null) + public void EnqueueForDirectExecution(JobRun job) { - when ??= job.RunAt ?? timeProvider.GetUtcNow(); - Enqueue(job, (when.Value, (int)job.Priority)); + Enqueue(job, (job.RunAt, (int)job.Priority)); } public void RemoveByName(string jobName) => RemoveByPredicate(t => t.JobDefinition.CustomName == jobName); diff --git a/src/NCronJob/Scheduler/JobQueueManager.cs b/src/NCronJob/Scheduler/JobQueueManager.cs index 98215818..ae093e16 100644 --- a/src/NCronJob/Scheduler/JobQueueManager.cs +++ b/src/NCronJob/Scheduler/JobQueueManager.cs @@ -6,7 +6,6 @@ namespace NCronJob; internal sealed class JobQueueManager : IDisposable { - private readonly TimeProvider timeProvider; private readonly ConcurrentDictionary jobQueues = new(); private readonly ConcurrentDictionary semaphores = new(); private readonly ConcurrentDictionary jobCancellationTokens = new(); @@ -21,15 +20,13 @@ internal sealed class JobQueueManager : IDisposable public bool IsDisposed { get; private set; } - public JobQueueManager(TimeProvider timeProvider) => this.timeProvider = timeProvider; - public JobQueue GetOrAddQueue(string queueName) { var isCreating = false; var jobQueue = jobQueues.GetOrAdd(queueName, jt => { isCreating = true; - var queue = new JobQueue(timeProvider, jt); + var queue = new JobQueue(jt); queue.CollectionChanged += CallCollectionChanged; return queue; }); diff --git a/src/NCronJob/Scheduler/JobWorker.cs b/src/NCronJob/Scheduler/JobWorker.cs index 512db368..912e2d5d 100644 --- a/src/NCronJob/Scheduler/JobWorker.cs +++ b/src/NCronJob/Scheduler/JobWorker.cs @@ -68,7 +68,7 @@ await DispatchJobForProcessing(nextJob, priorityTuple.NextRunTime, queueName, se public async Task InvokeJobWithSchedule(JobRun jobRun, CancellationToken cancellationToken) { jobRun.NotifyStateChange(JobStateType.Scheduled); - await WaitForNextExecution(jobRun.RunAt ?? timeProvider.GetUtcNow(), cancellationToken).ConfigureAwait(false); + await WaitForNextExecution(jobRun.RunAt, cancellationToken).ConfigureAwait(false); await StartJobProcessingAsync(jobRun, cancellationToken).ConfigureAwait(false); } @@ -192,8 +192,7 @@ public void ScheduleJob(JobDefinition job) if (nextRunTime.HasValue) { LogNextJobRun(job.Type, nextRunTime.Value); // todo: log by subscribing to OnStateChanged => JobStateType.Scheduled - var run = JobRun.Create(timeProvider, observer.Report, job); - run.RunAt = nextRunTime; + var run = JobRun.Create(timeProvider, observer.Report, job, nextRunTime.Value); var jobQueue = jobQueueManager.GetOrAddQueue(job.JobFullName); jobQueue.Enqueue(run, (nextRunTime.Value, (int)run.Priority)); run.NotifyStateChange(JobStateType.Scheduled);