Skip to content

Commit

Permalink
Add a background service for delivering webhook messages (#1699)
Browse files Browse the repository at this point in the history
  • Loading branch information
gunndabad authored Nov 22, 2024
1 parent d8aebd5 commit 2e78108
Show file tree
Hide file tree
Showing 7 changed files with 444 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,5 @@
"Username": "admin",
"Password": "test"
},
"StorageConnectionString": "UseDevelopmentStorage=true",
"Webhooks": {
"CanonicalDomain": "https://localhost:5001"
}
"StorageConnectionString": "UseDevelopmentStorage=true"
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,16 @@ public static IHostApplicationBuilder AddWebhookOptions(this IHostApplicationBui

return builder;
}

public static IHostApplicationBuilder AddWebhookDeliveryService(this IHostApplicationBuilder builder)
{
AddWebhookOptions(builder);

builder.Services.AddSingleton<IWebhookSender, WebhookSender>();
WebhookSender.AddHttpClient(builder.Services);

builder.Services.AddSingleton<IHostedService, WebhookDeliveryService>();

return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using TeachingRecordSystem.Core.DataStore.Postgres.Models;

namespace TeachingRecordSystem.Core.Services.Webhooks;

public interface IWebhookSender
{
Task SendMessageAsync(WebhookMessage message, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Polly;
using TeachingRecordSystem.Core.DataStore.Postgres;

namespace TeachingRecordSystem.Core.Services.Webhooks;

public class WebhookDeliveryService(
IWebhookSender webhookSender,
IDbContextFactory<TrsDbContext> dbContextFactory,
IClock clock,
ILogger<WebhookDeliveryService> logger) : BackgroundService
{
public const int BatchSize = 20;

private static readonly TimeSpan _pollInterval = TimeSpan.FromMinutes(1);

private static readonly ResiliencePipeline _resiliencePipeline = new ResiliencePipelineBuilder()
.AddRetry(new Polly.Retry.RetryStrategyOptions()
{
BackoffType = DelayBackoffType.Linear,
Delay = TimeSpan.FromSeconds(30),
MaxRetryAttempts = 10
})
.Build();

public static TimeSpan[] RetryInvervals { get; } =
[
TimeSpan.FromSeconds(5),
TimeSpan.FromMinutes(5),
TimeSpan.FromMinutes(30),
TimeSpan.FromHours(2),
TimeSpan.FromHours(5),
TimeSpan.FromHours(10),
TimeSpan.FromHours(14),
TimeSpan.FromHours(20),
TimeSpan.FromHours(24),
];

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var timer = new PeriodicTimer(_pollInterval);

do
{
try
{
await _resiliencePipeline.ExecuteAsync(
async (_, ct) =>
{
SendMessagesResult result;
do
{
result = await SendMessagesAsync(ct);
}
while (result.MoreRecords);
},
stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
}
}
while (await timer.WaitForNextTickAsync(stoppingToken));
}

public async Task<SendMessagesResult> SendMessagesAsync(CancellationToken cancellationToken = default)
{
var startedAt = clock.UtcNow;

await using var dbContext = await dbContextFactory.CreateDbContextAsync();
var txn = await dbContext.Database.BeginTransactionAsync(System.Data.IsolationLevel.ReadCommitted);

// Get the first batch of messages that are due to be sent.
// Constrain the batch to `batchSize`, but return one more record so we know if there are more that need to be processed.
var messages = await dbContext.WebhookMessages
.FromSql($"""
select * from webhook_messages
where next_delivery_attempt <= {clock.UtcNow}
order by next_delivery_attempt
limit {BatchSize + 1}
for update skip locked
""")
.Include(m => m.WebhookEndpoint)
.ToArrayAsync();

var moreRecords = messages.Length > BatchSize;

await Parallel.ForEachAsync(
messages.Take(BatchSize),
cancellationToken,
async (message, ct) =>
{
ct.ThrowIfCancellationRequested();

var now = clock.UtcNow;
message.DeliveryAttempts.Add(now);

try
{
await webhookSender.SendMessageAsync(message);

message.Delivered = now;
message.NextDeliveryAttempt = null;
}
catch (Exception ex)
{
logger.LogWarning(ex, "Failed delivering webhook message.");

message.DeliveryErrors.Add(ex.Message);

if (message.DeliveryAttempts.Count <= RetryInvervals.Length)
{
var nextRetryInterval = RetryInvervals[message.DeliveryAttempts.Count - 1];
message.NextDeliveryAttempt = now.Add(nextRetryInterval);

// If next retry is due before we'll next be polling then ensure we return 'true' for MoreRecords.
// (That ensures we won't have to wait for the timer to fire again before this message is retried.)
var nextRun = startedAt.Add(_pollInterval);
if (message.NextDeliveryAttempt < nextRun)
{
moreRecords = true;
}
}
else
{
message.NextDeliveryAttempt = null;
}
}
});

await dbContext.SaveChangesAsync();
await txn.CommitAsync();

return new(moreRecords);
}

public record SendMessagesResult(bool MoreRecords);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

namespace TeachingRecordSystem.Core.Services.Webhooks;

public class WebhookSender(HttpClient httpClient, IOptions<WebhookOptions> optionsAccessor)
public class WebhookSender(HttpClient httpClient, IOptions<WebhookOptions> optionsAccessor) : IWebhookSender
{
public const string TagName = "trs-webhooks";
private const string DataContentType = "application/json; charset=utf-8";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using TeachingRecordSystem.Core.Services.Notify;
using TeachingRecordSystem.Core.Services.TrnGenerationApi;
using TeachingRecordSystem.Core.Services.TrsDataSync;
using TeachingRecordSystem.Core.Services.Webhooks;
using TeachingRecordSystem.Core.Services.WorkforceData;
using TeachingRecordSystem.Worker.Infrastructure.Logging;

Expand Down Expand Up @@ -42,7 +43,8 @@
.AddEmail()
.AddIdentityApi()
.AddNameSynonyms()
.AddDqtOutboxMessageProcessorService();
.AddDqtOutboxMessageProcessorService()
.AddWebhookDeliveryService();

var crmServiceClient = new ServiceClient(builder.Configuration.GetRequiredValue("ConnectionStrings:Crm"))
{
Expand Down
Loading

0 comments on commit 2e78108

Please sign in to comment.