diff --git a/src/NCronJob/Execution/StartupJobManager.cs b/src/NCronJob/Execution/StartupJobManager.cs index bb500a0..708e1c5 100644 --- a/src/NCronJob/Execution/StartupJobManager.cs +++ b/src/NCronJob/Execution/StartupJobManager.cs @@ -27,7 +27,7 @@ public async Task ProcessStartupJobs(CancellationToken stopToken) { var jobRun = JobRun.Create( timeProvider, - observer.Report, definition); + observer.Report, definition, timeProvider.GetUtcNow()); jobRuns.Add(jobRun); return CreateExecutionTask(jobRun, stopToken); diff --git a/src/NCronJob/Registry/IInstantJobRegistry.cs b/src/NCronJob/Registry/IInstantJobRegistry.cs index 7f2c3f6..d2cf0b6 100644 --- a/src/NCronJob/Registry/IInstantJobRegistry.cs +++ b/src/NCronJob/Registry/IInstantJobRegistry.cs @@ -275,11 +275,11 @@ private Guid RunInternal( timeProvider, observer.Report, jobDefinition, + startDate, parameter, token); run.Priority = JobPriority.High; - run.RunAt = startDate; run.IsOneTimeJob = true; if (forceExecution) @@ -289,7 +289,7 @@ private Guid RunInternal( else { var jobQueue = jobQueueManager.GetOrAddQueue(run.JobDefinition.JobFullName); - jobQueue.EnqueueForDirectExecution(run, startDate); + jobQueue.EnqueueForDirectExecution(run); jobQueueManager.SignalJobQueue(run.JobDefinition.JobFullName); } diff --git a/src/NCronJob/Registry/JobRun.cs b/src/NCronJob/Registry/JobRun.cs index 668a094..cd2c11f 100644 --- a/src/NCronJob/Registry/JobRun.cs +++ b/src/NCronJob/Registry/JobRun.cs @@ -14,15 +14,17 @@ internal class JobRun private JobRun( TimeProvider timeProvider, JobDefinition jobDefinition, + DateTimeOffset runAt, object? parameter, Action progressReporter) - : this(timeProvider, null, jobDefinition, parameter, progressReporter) + : this(timeProvider, null, jobDefinition, runAt, parameter, progressReporter) { } private JobRun( TimeProvider timeProvider, JobRun? parentJob, JobDefinition jobDefinition, + DateTimeOffset runAt, object? parameter, Action progressReporter) { @@ -34,6 +36,7 @@ private JobRun( CorrelationId = parentJob is not null ? parentJob.CorrelationId : Guid.NewGuid(); this.timeProvider = timeProvider; JobDefinition = jobDefinition; + RunAt = runAt; Parameter = parameter ?? jobDefinition.Parameter; this.progressReporter = progressReporter; @@ -52,7 +55,7 @@ private JobRun( public Guid CorrelationId { get; } public bool IsOrchestrationRoot { get; } public CancellationToken CancellationToken { get; set; } - public DateTimeOffset? RunAt { get; set; } + public DateTimeOffset RunAt { get; } /// /// At the moment of processing, if the difference between the current time and the scheduled time exceeds the @@ -60,7 +63,7 @@ private JobRun( /// but it has been dequeued then essentially the job is dropped. /// public TimeSpan Expiry { get; set; } = TimeSpan.FromMinutes(10); - public bool IsExpired(TimeProvider timeProvider) => RunAt.HasValue && timeProvider.GetUtcNow() - RunAt.Value > Expiry; + public bool IsExpired => timeProvider.GetUtcNow() - RunAt > Expiry; public bool IsOneTimeJob { get; set; } public object? Parameter { get; } public object? ParentOutput { get; set; } @@ -70,16 +73,18 @@ private JobRun( public static JobRun Create( TimeProvider timeProvider, Action progressReporter, - JobDefinition jobDefinition) - => new(timeProvider, jobDefinition, jobDefinition.Parameter, progressReporter); + JobDefinition jobDefinition, + DateTimeOffset runAt) + => new(timeProvider, jobDefinition, runAt, jobDefinition.Parameter, progressReporter); public static JobRun Create( TimeProvider timeProvider, Action progressReporter, JobDefinition jobDefinition, + DateTimeOffset runAt, object? parameter, CancellationToken token) - => new(timeProvider, jobDefinition, parameter, progressReporter) + => new(timeProvider, jobDefinition, runAt, parameter, progressReporter) { CancellationToken = token, }; @@ -89,7 +94,7 @@ public JobRun CreateDependent( object? parameter, CancellationToken token) { - JobRun run = new(timeProvider, this, jobDefinition, parameter, progressReporter) + JobRun run = new(timeProvider, this, jobDefinition, timeProvider.GetUtcNow(), parameter, progressReporter) { CancellationToken = token, }; diff --git a/src/NCronJob/Scheduler/JobProcessor.cs b/src/NCronJob/Scheduler/JobProcessor.cs index 87c1e13..0e3c6e0 100644 --- a/src/NCronJob/Scheduler/JobProcessor.cs +++ b/src/NCronJob/Scheduler/JobProcessor.cs @@ -5,16 +5,13 @@ namespace NCronJob; internal sealed partial class JobProcessor { private readonly JobExecutor jobExecutor; - private readonly TimeProvider timeProvider; private readonly ILogger logger; public JobProcessor( JobExecutor jobExecutor, - TimeProvider timeProvider, ILogger logger) { this.jobExecutor = jobExecutor; - this.timeProvider = timeProvider; this.logger = logger; } @@ -22,7 +19,7 @@ public async Task ProcessJobAsync(JobRun jobRun, CancellationToken cancellationT { try { - if (jobRun.IsExpired(timeProvider)) + if (jobRun.IsExpired) { LogDequeuingExpiredJob(jobRun.JobDefinition.JobName); jobRun.NotifyStateChange(JobStateType.Expired); diff --git a/src/NCronJob/Scheduler/JobQueue.cs b/src/NCronJob/Scheduler/JobQueue.cs index c56c61c..d279ca9 100644 --- a/src/NCronJob/Scheduler/JobQueue.cs +++ b/src/NCronJob/Scheduler/JobQueue.cs @@ -5,13 +5,10 @@ namespace NCronJob; /// internal sealed class JobQueue : ObservablePriorityQueue { - private readonly TimeProvider timeProvider; - public string Name { get; set; } - public JobQueue(TimeProvider timeProvider, string name) : base(new JobQueueTupleComparer()) + public JobQueue(string name) : base(new JobQueueTupleComparer()) { - this.timeProvider = timeProvider; Name = name; } @@ -19,12 +16,9 @@ public JobQueue(TimeProvider timeProvider, string name) : base(new JobQueueTuple /// Adds a job entry to this instance. /// /// The job that will be added. - /// An optional object representing when the job should run. If null then it'll - /// fall back to the job.RunAt. If the job.RunAt is not defined then it runs immediately. - public void EnqueueForDirectExecution(JobRun job, DateTimeOffset? when = null) + public void EnqueueForDirectExecution(JobRun job) { - when ??= job.RunAt ?? timeProvider.GetUtcNow(); - Enqueue(job, (when.Value, (int)job.Priority)); + Enqueue(job, (job.RunAt, (int)job.Priority)); } public void RemoveByName(string jobName) => RemoveByPredicate(t => t.JobDefinition.CustomName == jobName); diff --git a/src/NCronJob/Scheduler/JobQueueManager.cs b/src/NCronJob/Scheduler/JobQueueManager.cs index 9821581..ae093e1 100644 --- a/src/NCronJob/Scheduler/JobQueueManager.cs +++ b/src/NCronJob/Scheduler/JobQueueManager.cs @@ -6,7 +6,6 @@ namespace NCronJob; internal sealed class JobQueueManager : IDisposable { - private readonly TimeProvider timeProvider; private readonly ConcurrentDictionary jobQueues = new(); private readonly ConcurrentDictionary semaphores = new(); private readonly ConcurrentDictionary jobCancellationTokens = new(); @@ -21,15 +20,13 @@ internal sealed class JobQueueManager : IDisposable public bool IsDisposed { get; private set; } - public JobQueueManager(TimeProvider timeProvider) => this.timeProvider = timeProvider; - public JobQueue GetOrAddQueue(string queueName) { var isCreating = false; var jobQueue = jobQueues.GetOrAdd(queueName, jt => { isCreating = true; - var queue = new JobQueue(timeProvider, jt); + var queue = new JobQueue(jt); queue.CollectionChanged += CallCollectionChanged; return queue; }); diff --git a/src/NCronJob/Scheduler/JobWorker.cs b/src/NCronJob/Scheduler/JobWorker.cs index 512db36..912e2d5 100644 --- a/src/NCronJob/Scheduler/JobWorker.cs +++ b/src/NCronJob/Scheduler/JobWorker.cs @@ -68,7 +68,7 @@ await DispatchJobForProcessing(nextJob, priorityTuple.NextRunTime, queueName, se public async Task InvokeJobWithSchedule(JobRun jobRun, CancellationToken cancellationToken) { jobRun.NotifyStateChange(JobStateType.Scheduled); - await WaitForNextExecution(jobRun.RunAt ?? timeProvider.GetUtcNow(), cancellationToken).ConfigureAwait(false); + await WaitForNextExecution(jobRun.RunAt, cancellationToken).ConfigureAwait(false); await StartJobProcessingAsync(jobRun, cancellationToken).ConfigureAwait(false); } @@ -192,8 +192,7 @@ public void ScheduleJob(JobDefinition job) if (nextRunTime.HasValue) { LogNextJobRun(job.Type, nextRunTime.Value); // todo: log by subscribing to OnStateChanged => JobStateType.Scheduled - var run = JobRun.Create(timeProvider, observer.Report, job); - run.RunAt = nextRunTime; + var run = JobRun.Create(timeProvider, observer.Report, job, nextRunTime.Value); var jobQueue = jobQueueManager.GetOrAddQueue(job.JobFullName); jobQueue.Enqueue(run, (nextRunTime.Value, (int)run.Priority)); run.NotifyStateChange(JobStateType.Scheduled); diff --git a/tests/NCronJob.Tests/JobRunTests.cs b/tests/NCronJob.Tests/JobRunTests.cs index 391d208..3124bfd 100644 --- a/tests/NCronJob.Tests/JobRunTests.cs +++ b/tests/NCronJob.Tests/JobRunTests.cs @@ -34,10 +34,12 @@ public void TestValuesAreInSyncWithCurrentEnumValues() [ClassData(typeof(FinalJobStateTypeTestData))] internal void CompletedJobRunsCannotChangeTheirStateFurther(JobStateType value) { + var fakeTimeProvider = new FakeTimeProvider(); + bool hasBeenCalled = false; JobDefinition jd = new JobDefinition(typeof(DummyJob), null, null, null); - var jobRun = JobRun.Create(new FakeTimeProvider(), (jr) => { }, jd); + var jobRun = JobRun.Create(fakeTimeProvider, (jr) => { }, jd, fakeTimeProvider.GetUtcNow()); Assert.Equal(JobStateType.NotStarted, jobRun.CurrentState.Type); diff --git a/tests/NCronJob.Tests/NCronJobIntegrationTests.cs b/tests/NCronJob.Tests/NCronJobIntegrationTests.cs index 79bd805..6b6d1ad 100644 --- a/tests/NCronJob.Tests/NCronJobIntegrationTests.cs +++ b/tests/NCronJob.Tests/NCronJobIntegrationTests.cs @@ -216,7 +216,7 @@ await Assert.ThrowsAsync(async () => using var serviceScope = provider.CreateScope(); using var executor = serviceScope.ServiceProvider.GetRequiredService(); var jobDefinition = new JobDefinition(typeof(JobWithDependency), null, null, null); - await executor.RunJob(JobRun.Create(FakeTimer, (jr) => { }, jobDefinition), CancellationToken.None); + await executor.RunJob(JobRun.Create(FakeTimer, (jr) => { }, jobDefinition, FakeTimer.GetUtcNow()), CancellationToken.None); }); }