Skip to content

Commit

Permalink
Harden scheduled job runner
Browse files Browse the repository at this point in the history
  • Loading branch information
ejsmith committed Nov 6, 2024
1 parent b32e2b6 commit fc7a27b
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 41 deletions.
2 changes: 1 addition & 1 deletion samples/Foundatio.HostingSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

// shutdown the host if no jobs are running
builder.Services.AddJobLifetimeService();
builder.Services.AddSingleton<ICacheClient>(_ => new InMemoryCacheClient());
builder.Services.AddSingleton<ICacheClient>(sp => new InMemoryCacheClient(o => o.LoggerFactory(sp.GetService<ILoggerFactory>())));

// inserts a startup action that does not complete until the critical health checks are healthy
// gets inserted as 1st startup action so that any other startup actions don't run until the critical resources are available
Expand Down
111 changes: 81 additions & 30 deletions src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ internal class ScheduledJobRunner
private readonly ILogger _logger;
private readonly DateTime _baseDate = new(2010, 1, 1);
private DateTime _lastStatusUpdate = DateTime.MinValue;
private string _cacheKey;

public ScheduledJobRunner(ScheduledJobOptions jobOptions, IServiceProvider serviceProvider, ICacheClient cacheClient, ILoggerFactory loggerFactory = null)
{
_jobOptions = jobOptions;
_jobOptions.Name ??= Guid.NewGuid().ToString("N").Substring(0, 10);
_cacheKey = _jobOptions.Name.ToLower().Replace(' ', '_');
_serviceProvider = serviceProvider;
_timeProvider = serviceProvider.GetService<TimeProvider>() ?? TimeProvider.System;
_cacheClient = new ScopedCacheClient(cacheClient, "jobs");
Expand Down Expand Up @@ -76,22 +78,29 @@ public async ValueTask<bool> ShouldRunAsync()
{
if (_timeProvider.GetUtcNow().UtcDateTime.Subtract(_lastStatusUpdate).TotalSeconds > 15)
{
var lastRun = await _cacheClient.GetAsync<DateTime>("lastrun:" + Options.Name).AnyContext();
if (lastRun.HasValue)
try
{
LastRun = lastRun.Value;
NextRun = _cronSchedule.GetNextOccurrence(LastRun.Value);
}
var lastRun = await _cacheClient.GetAsync<DateTime>("lastrun:" + Options.Name).AnyContext();
if (lastRun.HasValue)
{
LastRun = lastRun.Value;
NextRun = _cronSchedule.GetNextOccurrence(LastRun.Value);
}

var lastSuccess = await _cacheClient.GetAsync<DateTime>("lastsuccess:" + Options.Name).AnyContext();
if (lastSuccess.HasValue)
LastSuccess = lastSuccess.Value;
var lastSuccess = await _cacheClient.GetAsync<DateTime>("lastsuccess:" + Options.Name).AnyContext();
if (lastSuccess.HasValue)
LastSuccess = lastSuccess.Value;

var lastError = await _cacheClient.GetAsync<string>("lasterror:" + Options.Name).AnyContext();
if (lastError.HasValue)
LastErrorMessage = lastError.Value;
var lastError = await _cacheClient.GetAsync<string>("lasterror:" + Options.Name).AnyContext();
if (lastError.HasValue)
LastErrorMessage = lastError.Value;

_lastStatusUpdate = _timeProvider.GetUtcNow().UtcDateTime;
_lastStatusUpdate = _timeProvider.GetUtcNow().UtcDateTime;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error getting job ({JobName}) status", Options.Name);
}
}

if (!NextRun.HasValue)
Expand All @@ -114,21 +123,35 @@ public async Task StartAsync(CancellationToken cancellationToken = default)
if (Options.IsDistributed)
{
// using lock provider in a cluster with a distributed cache implementation keeps cron jobs from running duplicates
l = await _lockProvider.AcquireAsync(GetLockKey(NextRun.Value), TimeSpan.FromMinutes(60), TimeSpan.Zero).AnyContext();
if (l == null)
try
{
// if we didn't get the lock, update the last run time
var lastRun = await _cacheClient.GetAsync<DateTime>("lastrun:" + Options.Name).AnyContext();
if (lastRun.HasValue)
LastRun = lastRun.Value;

var lastSuccess = await _cacheClient.GetAsync<DateTime>("lastsuccess:" + Options.Name).AnyContext();
if (lastSuccess.HasValue)
LastSuccess = lastSuccess.Value;
l = await _lockProvider.AcquireAsync(GetLockKey(NextRun.Value), TimeSpan.FromMinutes(60), TimeSpan.Zero).AnyContext();
} catch (Exception ex)
{
_logger.LogError(ex, "Error acquiring lock for job ({JobName})", Options.Name);
}

var lastError = await _cacheClient.GetAsync<string>("lasterror:" + Options.Name).AnyContext();
if (lastError.HasValue)
LastErrorMessage = lastError.Value;
if (l == null)
{
try
{
// if we didn't get the lock, update the last run time
var lastRun = await _cacheClient.GetAsync<DateTime>("lastrun:" + Options.Name).AnyContext();
if (lastRun.HasValue)
LastRun = lastRun.Value;

var lastSuccess = await _cacheClient.GetAsync<DateTime>("lastsuccess:" + Options.Name).AnyContext();
if (lastSuccess.HasValue)
LastSuccess = lastSuccess.Value;

var lastError = await _cacheClient.GetAsync<string>("lasterror:" + Options.Name).AnyContext();
if (lastError.HasValue)
LastErrorMessage = lastError.Value;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error getting job ({JobName}) status", Options.Name);
}

return;
}
Expand All @@ -150,12 +173,26 @@ public async Task StartAsync(CancellationToken cancellationToken = default)
if (result.IsSuccess)
{
LastSuccess = _timeProvider.GetUtcNow().UtcDateTime;
await _cacheClient.SetAsync("lastsuccess:" + Options.Name, LastSuccess.Value).AnyContext();
try
{
await _cacheClient.SetAsync("lastsuccess:" + Options.Name, LastSuccess.Value).AnyContext();
}
catch (Exception ex)
{
_logger.LogError(ex, "Error updating last success time for job ({JobName})", Options.Name);
}
}
else
{
LastErrorMessage = result.Message;
await _cacheClient.SetAsync("lasterror:" + Options.Name, LastErrorMessage).AnyContext();
try
{
await _cacheClient.SetAsync("lasterror:" + Options.Name, LastErrorMessage).AnyContext();
}
catch (Exception ex)
{
_logger.LogError(ex, "Error updating last error message for job ({JobName})", Options.Name);
}
}
}
catch (TaskCanceledException)
Expand All @@ -164,12 +201,26 @@ public async Task StartAsync(CancellationToken cancellationToken = default)
catch (Exception ex)
{
LastErrorMessage = ex.Message;
await _cacheClient.SetAsync("lasterror:" + Options.Name, LastErrorMessage).AnyContext();
try
{
await _cacheClient.SetAsync("lasterror:" + Options.Name, LastErrorMessage).AnyContext();
}
catch
{
// ignored
}
}
}, cancellationToken).Unwrap();

LastRun = _timeProvider.GetUtcNow().UtcDateTime;
await _cacheClient.SetAsync("lastrun:" + Options.Name, LastRun.Value).AnyContext();
try
{
await _cacheClient.SetAsync("lastrun:" + Options.Name, LastRun.Value).AnyContext();
}
catch
{
// ignored
}
NextRun = _cronSchedule.GetNextOccurrence(LastRun.Value);
}
}
Expand All @@ -178,6 +229,6 @@ private string GetLockKey(DateTime date)
{
long minute = (long)date.Subtract(_baseDate).TotalMinutes;

return Options.Name + ":" + minute;
return _cacheKey + ":" + minute;
}
}
6 changes: 5 additions & 1 deletion src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
using Foundatio.Utility;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

namespace Foundatio.Extensions.Hosting.Jobs;

Expand All @@ -14,12 +16,14 @@ public class ScheduledJobService : BackgroundService
private readonly IServiceProvider _serviceProvider;
private readonly JobManager _jobManager;
private readonly TimeProvider _timeProvider;
private readonly ILogger _logger;

public ScheduledJobService(IServiceProvider serviceProvider, JobManager jobManager)
{
_serviceProvider = serviceProvider;
_jobManager = jobManager;
_timeProvider = _timeProvider = serviceProvider.GetService<TimeProvider>() ?? TimeProvider.System;
_logger = serviceProvider.GetService<ILogger<ScheduledJobService>>() ?? NullLogger<ScheduledJobService>.Instance;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
Expand All @@ -30,7 +34,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
var result = await startupContext.WaitForStartupAsync(stoppingToken).AnyContext();
if (!result.Success)
{
throw new ApplicationException("Failed to wait for startup actions to complete");
throw new StartupActionsException("Failed to wait for startup actions to complete");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,8 @@ public async Task<RunStartupActionsResult> WaitForStartupAsync(CancellationToken
return new RunStartupActionsResult { Success = false, ErrorMessage = $"Timed out waiting for startup actions to be completed after {DateTime.UtcNow.Subtract(startTime):mm\\:ss}" };
}
}

public class StartupActionsException : Exception
{
public StartupActionsException(string message) : base(message) { }
}
4 changes: 1 addition & 3 deletions src/Foundatio/Jobs/JobResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ public static void LogJobResult(this ILogger logger, JobResult result, string jo
{
if (result == null)
{
if (logger.IsEnabled(LogLevel.Error))
logger.LogError("Null job run result for {JobName}", jobName);

logger.LogError("Null job run result for {JobName}", jobName);
return;
}

Expand Down
3 changes: 1 addition & 2 deletions src/Foundatio/Jobs/JobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,7 @@ public async Task<bool> RunAsync(CancellationToken cancellationToken = default)
}
catch (Exception ex)
{
if (_logger.IsEnabled(LogLevel.Error))
_logger.LogError(ex, "Error running job instance: {Message}", ex.Message);
_logger.LogError(ex, "Error running job instance: {Message}", ex.Message);
throw;
}
}, cancellationToken));
Expand Down
2 changes: 1 addition & 1 deletion src/Foundatio/Lock/CacheLockProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public CacheLockProvider(ICacheClient cacheClient, IMessageBus messageBus, ILogg
public CacheLockProvider(ICacheClient cacheClient, IMessageBus messageBus, TimeProvider timeProvider, ILoggerFactory loggerFactory = null)
{
_timeProvider = timeProvider ?? cacheClient.GetTimeProvider();
_logger = loggerFactory?.CreateLogger<CacheLockProvider>() ?? NullLogger<CacheLockProvider>.Instance;
_logger = loggerFactory?.CreateLogger<CacheLockProvider>() ?? cacheClient.GetLogger() ?? NullLogger<CacheLockProvider>.Instance;
_cacheClient = new ScopedCacheClient(cacheClient, "lock");
_messageBus = messageBus;

Expand Down
6 changes: 3 additions & 3 deletions src/Foundatio/Lock/ThrottlingLockProvider.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -20,7 +20,7 @@ public class ThrottlingLockProvider : ILockProvider, IHaveLogger, IHaveTimeProvi
public ThrottlingLockProvider(ICacheClient cacheClient, int maxHitsPerPeriod = 100, TimeSpan? throttlingPeriod = null, TimeProvider timeProvider = null, ILoggerFactory loggerFactory = null)
{
_timeProvider = timeProvider ?? cacheClient.GetTimeProvider();
_logger = loggerFactory?.CreateLogger<ThrottlingLockProvider>() ?? NullLogger<ThrottlingLockProvider>.Instance;
_logger = loggerFactory?.CreateLogger<ThrottlingLockProvider>() ?? cacheClient.GetLogger() ?? NullLogger<ThrottlingLockProvider>.Instance;
_cacheClient = new ScopedCacheClient(cacheClient, "lock:throttled");
_maxHitsPerPeriod = maxHitsPerPeriod;

Expand Down Expand Up @@ -95,7 +95,7 @@ public async Task<ILock> AcquireAsync(string resource, TimeSpan? timeUntilExpire
}
catch (Exception ex)
{
_logger.LogError(ex, "Error acquiring throttled lock: name={Resource} message={Message}", resource, ex.Message);
_logger.LogError(ex, "Error acquiring throttled lock ({Resource}): {Message}", resource, ex.Message);
errors++;
if (errors >= 3)
break;
Expand Down
1 change: 1 addition & 0 deletions src/Foundatio/Queues/QueueBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ protected QueueBase(TOptions options) : base(options?.TimeProvider, options?.Log
{
try
{
using var _ = FoundatioDiagnostics.ActivitySource.StartActivity("Queue Stats: " + _options.Name);
var stats = GetMetricsQueueStats();
return (stats.Queued, stats.Working, stats.Deadletter);
}
Expand Down

0 comments on commit fc7a27b

Please sign in to comment.