Skip to content

Commit

Permalink
Make JobRun.RunAt immutable
Browse files Browse the repository at this point in the history
  • Loading branch information
nulltoken committed Jan 25, 2025
1 parent 5b93731 commit 8972628
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/NCronJob/Execution/StartupJobManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public async Task ProcessStartupJobs(CancellationToken stopToken)
{
var jobRun = JobRun.Create(
timeProvider,
observer.Report, definition);
observer.Report, definition, timeProvider.GetUtcNow());

jobRuns.Add(jobRun);
return CreateExecutionTask(jobRun, stopToken);
Expand Down
4 changes: 2 additions & 2 deletions src/NCronJob/Registry/IInstantJobRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,11 @@ private Guid RunInternal(
timeProvider,
observer.Report,
jobDefinition,
startDate,
parameter,
token);

run.Priority = JobPriority.High;
run.RunAt = startDate;
run.IsOneTimeJob = true;

if (forceExecution)
Expand All @@ -289,7 +289,7 @@ private Guid RunInternal(
else
{
var jobQueue = jobQueueManager.GetOrAddQueue(run.JobDefinition.JobFullName);
jobQueue.EnqueueForDirectExecution(run, startDate);
jobQueue.EnqueueForDirectExecution(run);
jobQueueManager.SignalJobQueue(run.JobDefinition.JobFullName);
}

Expand Down
19 changes: 12 additions & 7 deletions src/NCronJob/Registry/JobRun.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ internal class JobRun
private JobRun(
TimeProvider timeProvider,
JobDefinition jobDefinition,
DateTimeOffset runAt,
object? parameter,
Action<JobRun> progressReporter)
: this(timeProvider, null, jobDefinition, parameter, progressReporter)
: this(timeProvider, null, jobDefinition, runAt, parameter, progressReporter)
{ }

private JobRun(
TimeProvider timeProvider,
JobRun? parentJob,
JobDefinition jobDefinition,
DateTimeOffset runAt,
object? parameter,
Action<JobRun> progressReporter)
{
Expand All @@ -34,6 +36,7 @@ private JobRun(
CorrelationId = parentJob is not null ? parentJob.CorrelationId : Guid.NewGuid();
this.timeProvider = timeProvider;
JobDefinition = jobDefinition;
RunAt = runAt;
Parameter = parameter ?? jobDefinition.Parameter;

this.progressReporter = progressReporter;
Expand All @@ -52,15 +55,15 @@ private JobRun(
public Guid CorrelationId { get; }
public bool IsOrchestrationRoot { get; }
public CancellationToken CancellationToken { get; set; }
public DateTimeOffset? RunAt { get; set; }
public DateTimeOffset RunAt { get; }

/// <summary>
/// At the moment of processing, if the difference between the current time and the scheduled time exceeds the
/// expiration period (grace period), the job is considered expired and should not be processed. Because the job is not processed,
/// but it has been dequeued then essentially the job is dropped.
/// </summary>
public TimeSpan Expiry { get; set; } = TimeSpan.FromMinutes(10);
public bool IsExpired(TimeProvider timeProvider) => RunAt.HasValue && timeProvider.GetUtcNow() - RunAt.Value > Expiry;
public bool IsExpired => timeProvider.GetUtcNow() - RunAt > Expiry;
public bool IsOneTimeJob { get; set; }
public object? Parameter { get; }
public object? ParentOutput { get; set; }
Expand All @@ -70,16 +73,18 @@ private JobRun(
public static JobRun Create(
TimeProvider timeProvider,
Action<JobRun> progressReporter,
JobDefinition jobDefinition)
=> new(timeProvider, jobDefinition, jobDefinition.Parameter, progressReporter);
JobDefinition jobDefinition,
DateTimeOffset runAt)
=> new(timeProvider, jobDefinition, runAt, jobDefinition.Parameter, progressReporter);

public static JobRun Create(
TimeProvider timeProvider,
Action<JobRun> progressReporter,
JobDefinition jobDefinition,
DateTimeOffset runAt,
object? parameter,
CancellationToken token)
=> new(timeProvider, jobDefinition, parameter, progressReporter)
=> new(timeProvider, jobDefinition, runAt, parameter, progressReporter)
{
CancellationToken = token,
};
Expand All @@ -89,7 +94,7 @@ public JobRun CreateDependent(
object? parameter,
CancellationToken token)
{
JobRun run = new(timeProvider, this, jobDefinition, parameter, progressReporter)
JobRun run = new(timeProvider, this, jobDefinition, timeProvider.GetUtcNow(), parameter, progressReporter)
{
CancellationToken = token,
};
Expand Down
5 changes: 1 addition & 4 deletions src/NCronJob/Scheduler/JobProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,21 @@ namespace NCronJob;
internal sealed partial class JobProcessor
{
private readonly JobExecutor jobExecutor;
private readonly TimeProvider timeProvider;
private readonly ILogger<JobProcessor> logger;

public JobProcessor(
JobExecutor jobExecutor,
TimeProvider timeProvider,
ILogger<JobProcessor> logger)
{
this.jobExecutor = jobExecutor;
this.timeProvider = timeProvider;
this.logger = logger;
}

public async Task ProcessJobAsync(JobRun jobRun, CancellationToken cancellationToken)
{
try
{
if (jobRun.IsExpired(timeProvider))
if (jobRun.IsExpired)
{
LogDequeuingExpiredJob(jobRun.JobDefinition.JobName);
jobRun.NotifyStateChange(JobStateType.Expired);
Expand Down
12 changes: 3 additions & 9 deletions src/NCronJob/Scheduler/JobQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,20 @@ namespace NCronJob;
/// </summary>
internal sealed class JobQueue : ObservablePriorityQueue<JobRun>
{
private readonly TimeProvider timeProvider;

public string Name { get; set; }

public JobQueue(TimeProvider timeProvider, string name) : base(new JobQueueTupleComparer())
public JobQueue(string name) : base(new JobQueueTupleComparer())
{
this.timeProvider = timeProvider;
Name = name;
}

/// <summary>
/// Adds a job entry to this instance.
/// </summary>
/// <param name="job">The job that will be added.</param>
/// <param name="when">An optional <see cref="DateTimeOffset"/> object representing when the job should run. If <c>null</c> then it'll
/// fall back to the job.RunAt. If the job.RunAt is not defined then it runs immediately.</param>
public void EnqueueForDirectExecution(JobRun job, DateTimeOffset? when = null)
public void EnqueueForDirectExecution(JobRun job)
{
when ??= job.RunAt ?? timeProvider.GetUtcNow();
Enqueue(job, (when.Value, (int)job.Priority));
Enqueue(job, (job.RunAt, (int)job.Priority));
}

public void RemoveByName(string jobName) => RemoveByPredicate(t => t.JobDefinition.CustomName == jobName);
Expand Down
5 changes: 1 addition & 4 deletions src/NCronJob/Scheduler/JobQueueManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ namespace NCronJob;

internal sealed class JobQueueManager : IDisposable
{
private readonly TimeProvider timeProvider;
private readonly ConcurrentDictionary<string, JobQueue> jobQueues = new();
private readonly ConcurrentDictionary<string, SemaphoreSlim> semaphores = new();
private readonly ConcurrentDictionary<string, CancellationTokenSource> jobCancellationTokens = new();
Expand All @@ -21,15 +20,13 @@ internal sealed class JobQueueManager : IDisposable

public bool IsDisposed { get; private set; }

public JobQueueManager(TimeProvider timeProvider) => this.timeProvider = timeProvider;

public JobQueue GetOrAddQueue(string queueName)
{
var isCreating = false;
var jobQueue = jobQueues.GetOrAdd(queueName, jt =>
{
isCreating = true;
var queue = new JobQueue(timeProvider, jt);
var queue = new JobQueue(jt);
queue.CollectionChanged += CallCollectionChanged;
return queue;
});
Expand Down
5 changes: 2 additions & 3 deletions src/NCronJob/Scheduler/JobWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ await DispatchJobForProcessing(nextJob, priorityTuple.NextRunTime, queueName, se
public async Task InvokeJobWithSchedule(JobRun jobRun, CancellationToken cancellationToken)
{
jobRun.NotifyStateChange(JobStateType.Scheduled);
await WaitForNextExecution(jobRun.RunAt ?? timeProvider.GetUtcNow(), cancellationToken).ConfigureAwait(false);
await WaitForNextExecution(jobRun.RunAt, cancellationToken).ConfigureAwait(false);
await StartJobProcessingAsync(jobRun, cancellationToken).ConfigureAwait(false);
}

Expand Down Expand Up @@ -192,8 +192,7 @@ public void ScheduleJob(JobDefinition job)
if (nextRunTime.HasValue)
{
LogNextJobRun(job.Type, nextRunTime.Value); // todo: log by subscribing to OnStateChanged => JobStateType.Scheduled
var run = JobRun.Create(timeProvider, observer.Report, job);
run.RunAt = nextRunTime;
var run = JobRun.Create(timeProvider, observer.Report, job, nextRunTime.Value);
var jobQueue = jobQueueManager.GetOrAddQueue(job.JobFullName);
jobQueue.Enqueue(run, (nextRunTime.Value, (int)run.Priority));
run.NotifyStateChange(JobStateType.Scheduled);
Expand Down
4 changes: 3 additions & 1 deletion tests/NCronJob.Tests/JobRunTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ public void TestValuesAreInSyncWithCurrentEnumValues()
[ClassData(typeof(FinalJobStateTypeTestData))]
internal void CompletedJobRunsCannotChangeTheirStateFurther(JobStateType value)
{
var fakeTimeProvider = new FakeTimeProvider();

bool hasBeenCalled = false;

JobDefinition jd = new JobDefinition(typeof(DummyJob), null, null, null);
var jobRun = JobRun.Create(new FakeTimeProvider(), (jr) => { }, jd);
var jobRun = JobRun.Create(fakeTimeProvider, (jr) => { }, jd, fakeTimeProvider.GetUtcNow());

Assert.Equal(JobStateType.NotStarted, jobRun.CurrentState.Type);

Expand Down
2 changes: 1 addition & 1 deletion tests/NCronJob.Tests/NCronJobIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ await Assert.ThrowsAsync<InvalidOperationException>(async () =>
using var serviceScope = provider.CreateScope();
using var executor = serviceScope.ServiceProvider.GetRequiredService<JobExecutor>();
var jobDefinition = new JobDefinition(typeof(JobWithDependency), null, null, null);
await executor.RunJob(JobRun.Create(FakeTimer, (jr) => { }, jobDefinition), CancellationToken.None);
await executor.RunJob(JobRun.Create(FakeTimer, (jr) => { }, jobDefinition, FakeTimer.GetUtcNow()), CancellationToken.None);
});
}

Expand Down

0 comments on commit 8972628

Please sign in to comment.