Skip to content

Commit

Permalink
Add IJobWithOptions to pass options. Fix job name in various places.
Browse files Browse the repository at this point in the history
  • Loading branch information
ejsmith committed Oct 31, 2024
1 parent dd08d28 commit 81b70a8
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 30 deletions.
4 changes: 2 additions & 2 deletions src/Foundatio.MessagePack/Foundatio.MessagePack.csproj
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<PackageReference Include="MessagePack" Version="2.5.140" />
<PackageReference Include="MessagePack" Version="2.5.187" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Foundatio\Foundatio.csproj" />
</ItemGroup>
</Project>
</Project>
43 changes: 30 additions & 13 deletions src/Foundatio/Jobs/IJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ public interface IJob
Task<JobResult> RunAsync(CancellationToken cancellationToken = default);
}

public interface IJobWithOptions : IJob
{
JobOptions Options { get; set; }
}

public static class JobExtensions
{
public static async Task<JobResult> TryRunAsync(this IJob job, CancellationToken cancellationToken = default)
Expand All @@ -35,41 +40,53 @@ public static async Task<JobResult> TryRunAsync(this IJob job, CancellationToken
/// Runs the job continuously until the cancellation token is set or the iteration limit is reached.
/// </summary>
/// <returns>Returns the iteration count for normal jobs. For queue based jobs this will be the amount of items processed successfully.</returns>
public static async Task<int> RunContinuousAsync(this IJob job, TimeSpan? interval = null, int iterationLimit = -1, CancellationToken cancellationToken = default, Func<Task<bool>> continuationCallback = null)
public static Task<int> RunContinuousAsync(this IJob job, TimeSpan? interval = null, int iterationLimit = -1,
CancellationToken cancellationToken = default, Func<Task<bool>> continuationCallback = null)
{
var options = JobOptions.GetDefaults(job);
options.Interval = interval;
options.IterationLimit = iterationLimit;
return RunContinuousAsync(job, options, cancellationToken, continuationCallback);
}

/// <summary>
/// Runs the job continuously until the cancellation token is set or the iteration limit is reached.
/// </summary>
/// <returns>Returns the iteration count for normal jobs. For queue based jobs this will be the amount of items processed successfully.</returns>
public static async Task<int> RunContinuousAsync(this IJob job, JobOptions options, CancellationToken cancellationToken = default, Func<Task<bool>> continuationCallback = null)
{
int iterations = 0;
string jobName = job.GetType().Name;
var logger = job.GetLogger();
bool isInformationLogLevelEnabled = logger.IsEnabled(LogLevel.Information);

int queueItemsProcessed = 0;
bool isQueueJob = job.GetType().GetInterfaces().Any(x => x.IsGenericType && x.GetGenericTypeDefinition() == typeof(IQueueJob<>));

using var _ = logger.BeginScope(new Dictionary<string, object> { { "job", jobName } });
using var _ = logger.BeginScope(new Dictionary<string, object> { { "job", options.Name } });
if (isInformationLogLevelEnabled)
logger.LogInformation("Starting continuous job type {JobName} on machine {MachineName}...", jobName, Environment.MachineName);
logger.LogInformation("Starting continuous job type {JobName} on machine {MachineName}...", options.Name, Environment.MachineName);

while (!cancellationToken.IsCancellationRequested)
{
using var activity = FoundatioDiagnostics.ActivitySource.StartActivity("Job: " + jobName);
using var activity = FoundatioDiagnostics.ActivitySource.StartActivity("Job: " + options.Name);

var result = await job.TryRunAsync(cancellationToken).AnyContext();
logger.LogJobResult(result, jobName);
logger.LogJobResult(result, options.Name);

iterations++;
if (isQueueJob && result.IsSuccess)
queueItemsProcessed++;

if (cancellationToken.IsCancellationRequested || (iterationLimit > -1 && iterationLimit <= iterations))
if (cancellationToken.IsCancellationRequested || (options.IterationLimit > -1 && options.IterationLimit <= iterations))
break;

if (result.Error != null)
{
await job.GetTimeProvider().SafeDelay(TimeSpan.FromMilliseconds(Math.Max((int)(interval?.TotalMilliseconds ?? 0), 100)), cancellationToken).AnyContext();
await job.GetTimeProvider().SafeDelay(TimeSpan.FromMilliseconds(Math.Max((int)(options.Interval?.TotalMilliseconds ?? 0), 100)), cancellationToken).AnyContext();
}
else if (interval.HasValue && interval.Value > TimeSpan.Zero)
else if (options.Interval.HasValue && options.Interval.Value > TimeSpan.Zero)
{
await job.GetTimeProvider().SafeDelay(interval.Value, cancellationToken).AnyContext();
await job.GetTimeProvider().SafeDelay(options.Interval.Value, cancellationToken).AnyContext();
}

// needed to yield back a task for jobs that aren't async
Expand Down Expand Up @@ -97,17 +114,17 @@ public static async Task<int> RunContinuousAsync(this IJob job, TimeSpan? interv

if (isInformationLogLevelEnabled)
{
if (iterationLimit > 0)
if (options.IterationLimit > 0)
{
logger.LogInformation(
"Stopping continuous job type {JobName} on machine {MachineName}: Job ran {Iterations} times (Limit={IterationLimit})",
jobName, Environment.MachineName, iterationLimit, iterations);
options.Name, Environment.MachineName, options.IterationLimit, iterations);
}
else
{
logger.LogInformation(
"Stopping continuous job type {JobName} on machine {MachineName}: Job ran {Iterations} times",
jobName, Environment.MachineName, iterations);
options.Name, Environment.MachineName, iterations);
}
}

Expand Down
28 changes: 15 additions & 13 deletions src/Foundatio/Jobs/JobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ public class JobRunner
{
private readonly TimeProvider _timeProvider;
private readonly ILogger _logger;
private string _jobName;
private readonly JobOptions _options;
private readonly IServiceProvider _serviceProvider;

Expand Down Expand Up @@ -86,7 +85,7 @@ public async Task<int> RunInConsoleAsync()
catch (Exception e)
{
if (_logger.IsEnabled(LogLevel.Error))
_logger.LogError(e, "Job {JobName} error: {Message}", _jobName, e.GetMessage());
_logger.LogError(e, "Job {JobName} error: {Message}", _options.Name, e.GetMessage());

if (Debugger.IsAttached)
Console.ReadKey();
Expand Down Expand Up @@ -136,6 +135,8 @@ public async Task<bool> RunAsync(CancellationToken cancellationToken = default)
try
{
job = _options.JobFactory(_serviceProvider);
if (job is IJobWithOptions jobWithOptions)
jobWithOptions.Options = _options;
}
catch (Exception ex)
{
Expand All @@ -149,20 +150,19 @@ public async Task<bool> RunAsync(CancellationToken cancellationToken = default)
return false;
}

_jobName = TypeHelper.GetTypeDisplayName(job.GetType());
using (_logger.BeginScope(s => s.Property("job", _jobName)))
using (_logger.BeginScope(s => s.Property("job", _options.Name)))
{
if (_logger.IsEnabled(LogLevel.Information))
_logger.LogInformation("Starting job type {JobName} on machine {MachineName}...", _jobName, Environment.MachineName);
_logger.LogInformation("Starting job type {JobName} on machine {MachineName}...", _options.Name, Environment.MachineName);

var jobLifetime = job as IAsyncLifetime;
if (jobLifetime != null)
{
if (_logger.IsEnabled(LogLevel.Information))
_logger.LogInformation("Initializing job lifetime {JobName} on machine {MachineName}...", _jobName, Environment.MachineName);
_logger.LogInformation("Initializing job lifetime {JobName} on machine {MachineName}...", _options.Name, Environment.MachineName);
await jobLifetime.InitializeAsync().AnyContext();
if (_logger.IsEnabled(LogLevel.Information))
_logger.LogInformation("Done initializing job lifetime {JobName} on machine {MachineName}.", _jobName, Environment.MachineName);
_logger.LogInformation("Done initializing job lifetime {JobName} on machine {MachineName}.", _options.Name, Environment.MachineName);
}

try
Expand All @@ -183,8 +183,10 @@ public async Task<bool> RunAsync(CancellationToken cancellationToken = default)
{
await using var scope = _serviceProvider.CreateAsyncScope();
var jobInstance = _options.JobFactory(scope.ServiceProvider);
await jobInstance.RunContinuousAsync(_options.Interval, _options.IterationLimit,
cancellationToken).AnyContext();
if (jobInstance is IJobWithOptions jobWithOptions)
jobWithOptions.Options = _options;
await jobInstance.RunContinuousAsync(_options, cancellationToken).AnyContext();
}
catch (TaskCanceledException)
{
Expand All @@ -210,10 +212,10 @@ await jobInstance.RunContinuousAsync(_options.Interval, _options.IterationLimit,
}
else
{
using var activity = FoundatioDiagnostics.ActivitySource.StartActivity("Job: " + _jobName);
using var activity = FoundatioDiagnostics.ActivitySource.StartActivity("Job: " + _options.Name);

var result = await job.TryRunAsync(cancellationToken).AnyContext();
_logger.LogJobResult(result, _jobName);
_logger.LogJobResult(result, _options.Name);

return result.IsSuccess;
}
Expand All @@ -224,10 +226,10 @@ await jobInstance.RunContinuousAsync(_options.Interval, _options.IterationLimit,
if (jobDisposable != null)
{
if (_logger.IsEnabled(LogLevel.Information))
_logger.LogInformation("Disposing job lifetime {JobName} on machine {MachineName}...", _jobName, Environment.MachineName);
_logger.LogInformation("Disposing job lifetime {JobName} on machine {MachineName}...", _options.Name, Environment.MachineName);
await jobDisposable.DisposeAsync().AnyContext();
if (_logger.IsEnabled(LogLevel.Information))
_logger.LogInformation("Done disposing job lifetime {JobName} on machine {MachineName}.", _jobName, Environment.MachineName);
_logger.LogInformation("Done disposing job lifetime {JobName} on machine {MachineName}.", _options.Name, Environment.MachineName);
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/Foundatio/Jobs/JobWithLockBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace Foundatio.Jobs;

public abstract class JobWithLockBase : IJob, IHaveLogger, IHaveTimeProvider
public abstract class JobWithLockBase : IJobWithOptions, IHaveLogger, IHaveTimeProvider
{
protected readonly ILogger _logger;
private readonly TimeProvider _timeProvider;
Expand All @@ -30,11 +30,12 @@ public JobWithLockBase(TimeProvider timeProvider, ILoggerFactory loggerFactory =
public string JobId { get; } = Guid.NewGuid().ToString("N").Substring(0, 10);
ILogger IHaveLogger.Logger => _logger;
TimeProvider IHaveTimeProvider.TimeProvider => _timeProvider;
public JobOptions Options { get; set; }

public virtual async Task<JobResult> RunAsync(CancellationToken cancellationToken = default)
{
ILock lockValue;
using (var lockActivity = FoundatioDiagnostics.ActivitySource.StartActivity("Job Lock: " + _jobName))
using (var lockActivity = FoundatioDiagnostics.ActivitySource.StartActivity("Job Lock: " + Options?.Name ?? _jobName))
{
lockActivity?.AddTag("job.id", JobId);

Expand Down

0 comments on commit 81b70a8

Please sign in to comment.