Skip to content

Commit

Permalink
Limit concurrency on audit queries
Browse files Browse the repository at this point in the history
  • Loading branch information
gunndabad committed Dec 31, 2024
1 parent 643cbc6 commit ea95ccd
Showing 1 changed file with 38 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,30 @@ private async Task<IReadOnlyDictionary<Guid, AuditDetailCollection>> GetAuditRec
IEnumerable<Guid> ids,
CancellationToken cancellationToken)
{
return (IsFakeXrm ? new Dictionary<Guid, AuditDetailCollection>() : (await Task.WhenAll(ids
if (IsFakeXrm)
{
return new Dictionary<Guid, AuditDetailCollection>();
}

// Throttle the amount of concurrent requests
using var requestThrottle = new SemaphoreSlim(20, 20);

// Keep track of the last seen 'retry-after' value
var retryDelayUpdateLock = new object();
var retryDelay = Task.Delay(0, cancellationToken);

void UpdateRetryDelay(TimeSpan ts)
{
Task oldDelay;
lock (retryDelayUpdateLock)
{
oldDelay = retryDelay;
retryDelay = Task.Delay(ts, cancellationToken);
}
oldDelay.Dispose();
}

var audits = (await Task.WhenAll(ids
.Chunk(MaxAuditRequestsPerBatch)
.Select(async chunk =>
{
Expand All @@ -842,16 +865,22 @@ private async Task<IReadOnlyDictionary<Guid, AuditDetailCollection>> GetAuditRec
ExecuteMultipleResponse response;
while (true)
{
await retryDelay;
await requestThrottle.WaitAsync(cancellationToken);
try
{
response = (ExecuteMultipleResponse)await organizationService.ExecuteAsync(request, cancellationToken);
}
catch (FaultException fex) when (fex.IsCrmRateLimitException(out var retryAfter))
{
logger.LogWarning("Hit CRM service limits getting {entityLogicalName} audit records; Fault exception. Retrying after {retryAfter} seconds.", entityLogicalName, retryAfter.TotalSeconds);
await Task.Delay(retryAfter, cancellationToken);
UpdateRetryDelay(retryAfter);
continue;
}
finally
{
requestThrottle.Release();
}

if (response.IsFaulted)
{
Expand All @@ -860,13 +889,13 @@ private async Task<IReadOnlyDictionary<Guid, AuditDetailCollection>> GetAuditRec
if (firstFault.IsCrmRateLimitFault(out var retryAfter))
{
logger.LogWarning("Hit CRM service limits getting {entityLogicalName} audit records; CRM rate limit fault. Retrying after {retryAfter} seconds.", entityLogicalName, retryAfter.TotalSeconds);
await Task.Delay(retryAfter, cancellationToken);
UpdateRetryDelay(retryAfter);
continue;
}
else if (firstFault.Message.Contains("The HTTP status code of the response was not expected (429)"))
{
logger.LogWarning("Hit CRM service limits getting {entityLogicalName} audit records; 429 too many requests", entityLogicalName);
await Task.Delay(TimeSpan.FromMinutes(2), cancellationToken);
UpdateRetryDelay(TimeSpan.FromMinutes(2));
continue;
}

Expand All @@ -881,7 +910,11 @@ private async Task<IReadOnlyDictionary<Guid, AuditDetailCollection>> GetAuditRec
(r, e) => (Id: e, ((RetrieveRecordChangeHistoryResponse)r.Response).AuditDetailCollection));
})))
.SelectMany(b => b)
.ToDictionary(t => t.Id, t => t.AuditDetailCollection));
.ToDictionary(t => t.Id, t => t.AuditDetailCollection);

retryDelay.Dispose();

return audits;
}

private async Task<IReadOnlyDictionary<Guid, AuditDetailCollection>> GetAuditRecordsFromAuditRepositoryAsync(
Expand Down

0 comments on commit ea95ccd

Please sign in to comment.