Skip to content

Commit

Permalink
Added logging to sync helper to highlight when we're getting CRM rate…
Browse files Browse the repository at this point in the history
… limiting
  • Loading branch information
hortha committed Dec 17, 2024
1 parent 568e7eb commit f09c226
Show file tree
Hide file tree
Showing 17 changed files with 153 additions and 51 deletions.
10 changes: 10 additions & 0 deletions TeachingRecordSystem/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,14 @@ RUN apk --no-cache add msttcorefonts-installer fontconfig && \
update-ms-fonts && \
fc-cache -f

# Install SQL Server tools needed to be able to query the reporting DB to help debugging
RUN apk add curl

RUN curl -O https://download.microsoft.com/download/b/9/f/b9f3cce4-3925-46d4-9f46-da08869c6486/msodbcsql18_18.0.1.1-1_amd64.apk && \
curl -O https://download.microsoft.com/download/b/9/f/b9f3cce4-3925-46d4-9f46-da08869c6486/mssql-tools18_18.0.1.1-1_amd64.apk

RUN apk add --allow-untrusted msodbcsql18_18.0.1.1-1_amd64.apk && \
apk add --allow-untrusted mssql-tools18_18.0.1.1-1_amd64.apk && \
rm -f msodbcsql18_18.0.1.1-1_amd64.apk mssql-tools18_18.0.1.1-1_amd64.apk

ENV PATH="${PATH}:/Apps/TrsCli"
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.ServiceModel;
using Hangfire;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.Xrm.Sdk;
using Microsoft.Xrm.Sdk.Query;
Expand All @@ -14,15 +15,18 @@ public class SyncAllInductionsFromCrmJob
private readonly ICrmServiceClientProvider _crmServiceClientProvider;
private readonly TrsDataSyncHelper _trsDataSyncHelper;
private readonly IOptions<TrsDataSyncServiceOptions> _syncOptionsAccessor;
private readonly ILogger<SyncAllInductionsFromCrmJob> _logger;

public SyncAllInductionsFromCrmJob(
ICrmServiceClientProvider crmServiceClientProvider,
TrsDataSyncHelper trsDataSyncHelper,
IOptions<TrsDataSyncServiceOptions> syncOptionsAccessor)
IOptions<TrsDataSyncServiceOptions> syncOptionsAccessor,
ILoggerFactory loggerFactory)
{
_crmServiceClientProvider = crmServiceClientProvider;
_trsDataSyncHelper = trsDataSyncHelper;
_syncOptionsAccessor = syncOptionsAccessor;
_logger = loggerFactory.CreateLogger<SyncAllInductionsFromCrmJob>();
}

public async Task ExecuteAsync(bool createMigratedEvent, bool dryRun, CancellationToken cancellationToken)
Expand Down Expand Up @@ -57,6 +61,7 @@ public async Task ExecuteAsync(bool createMigratedEvent, bool dryRun, Cancellati
}
catch (FaultException<OrganizationServiceFault> fex) when (fex.IsCrmRateLimitException(out var retryAfter))
{
_logger.LogWarning("Hit CRM service limits; error code: {ErrorCode}", fex.Detail.ErrorCode);
await Task.Delay(retryAfter, cancellationToken);
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.ServiceModel;
using Microsoft.Crm.Sdk.Messages;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.PowerPlatform.Dataverse.Client;
using Microsoft.Xrm.Sdk;
using Microsoft.Xrm.Sdk.Messages;
Expand All @@ -16,11 +17,7 @@

namespace TeachingRecordSystem.Core.Services.TrsDataSync;

public class TrsDataSyncHelper(
NpgsqlDataSource trsDbDataSource,
[FromKeyedServices(TrsDataSyncService.CrmClientName)] IOrganizationServiceAsync2 organizationService,
ReferenceDataCache referenceDataCache,
IClock clock)
public class TrsDataSyncHelper
{
private delegate Task SyncEntitiesHandler(IReadOnlyCollection<Entity> entities, bool ignoreInvalid, bool dryRun, CancellationToken cancellationToken);

Expand All @@ -37,10 +34,41 @@ public class TrsDataSyncHelper(
};

private readonly ISubject<object[]> _syncedEntitiesSubject = new Subject<object[]>();
private readonly NpgsqlDataSource _trsDbDataSource;
private readonly IOrganizationServiceAsync2 _organizationService;
private readonly ReferenceDataCache _referenceDataCache;
private readonly IClock _clock;
private readonly ILogger<TrsDataSyncHelper> _logger;
private bool? isFakeXrm;

public TrsDataSyncHelper(
NpgsqlDataSource trsDbDataSource,
[FromKeyedServices(TrsDataSyncService.CrmClientName)] IOrganizationServiceAsync2 organizationService,
ReferenceDataCache referenceDataCache,
IClock clock,
ILoggerFactory loggerFactory)
{
_trsDbDataSource = trsDbDataSource;
_organizationService = organizationService;
_referenceDataCache = referenceDataCache;
_clock = clock;
_logger = loggerFactory.CreateLogger<TrsDataSyncHelper>();
}

public IObservable<object[]> GetSyncedEntitiesObservable() => _syncedEntitiesSubject;

private bool IsFakeXrm { get; } = organizationService.GetType().FullName == "Castle.Proxies.ObjectProxy_2";
private bool IsFakeXrm
{
get
{
if (isFakeXrm is null)
{
isFakeXrm = _organizationService.GetType().FullName == "Castle.Proxies.ObjectProxy_2";
}

return isFakeXrm!.Value;
}
}

public static (string EntityLogicalName, string[] AttributeNames) GetEntityInfoForModelType(string modelType)
{
Expand Down Expand Up @@ -142,7 +170,7 @@ public async Task DeleteRecordsAsync(string modelType, IReadOnlyCollection<Guid>
throw new NotSupportedException($"Cannot delete a {modelType}.");
}

await using var connection = await trsDbDataSource.OpenConnectionAsync(cancellationToken);
await using var connection = await _trsDbDataSource.OpenConnectionAsync(cancellationToken);

using (var cmd = connection.CreateCommand())
{
Expand All @@ -161,7 +189,7 @@ public async Task DeleteRecordsAsync(string modelType, IReadOnlyCollection<Guid>
return null;
}

await using var connection = await trsDbDataSource.OpenConnectionAsync();
await using var connection = await _trsDbDataSource.OpenConnectionAsync();

using (var cmd = connection.CreateCommand())
{
Expand Down Expand Up @@ -221,7 +249,7 @@ public async Task<IReadOnlyCollection<Guid>> SyncPersonsAsync(IReadOnlyCollectio

var modelTypeSyncInfo = GetModelTypeSyncInfo<Person>(ModelTypes.Person);

await using var connection = await trsDbDataSource.OpenConnectionAsync(cancellationToken);
await using var connection = await _trsDbDataSource.OpenConnectionAsync(cancellationToken);
using var txn = await connection.BeginTransactionAsync(cancellationToken);

using (var createTempTableCommand = connection.CreateCommand())
Expand All @@ -247,7 +275,7 @@ public async Task<IReadOnlyCollection<Guid>> SyncPersonsAsync(IReadOnlyCollectio
using (var mergeCommand = connection.CreateCommand())
{
mergeCommand.CommandText = modelTypeSyncInfo.UpsertStatement;
mergeCommand.Parameters.Add(new NpgsqlParameter(NowParameterName, clock.UtcNow));
mergeCommand.Parameters.Add(new NpgsqlParameter(NowParameterName, _clock.UtcNow));
mergeCommand.Transaction = txn;
await mergeCommand.ExecuteNonQueryAsync();
}
Expand Down Expand Up @@ -385,7 +413,7 @@ private async Task<int> SyncInductionsAsync(
{
var modelTypeSyncInfo = GetModelTypeSyncInfo<InductionInfo>(ModelTypes.Induction);

await using var connection = await trsDbDataSource.OpenConnectionAsync(cancellationToken);
await using var connection = await _trsDbDataSource.OpenConnectionAsync(cancellationToken);

var toSync = inductions.ToList();

Expand Down Expand Up @@ -416,7 +444,7 @@ private async Task<int> SyncInductionsAsync(
using (var mergeCommand = connection.CreateCommand())
{
mergeCommand.CommandText = modelTypeSyncInfo.UpsertStatement;
mergeCommand.Parameters.Add(new NpgsqlParameter(NowParameterName, clock.UtcNow));
mergeCommand.Parameters.Add(new NpgsqlParameter(NowParameterName, _clock.UtcNow));
mergeCommand.Transaction = txn;
using var reader = await mergeCommand.ExecuteReaderAsync();
while (await reader.ReadAsync(cancellationToken))
Expand Down Expand Up @@ -458,7 +486,7 @@ private async Task<int> SyncInductionsAsync(
.Where(e => e is IEventWithPersonId && !unsyncedContactIds.Any(c => c == ((IEventWithPersonId)e).PersonId))
.ToArray();

await txn.SaveEventsAsync(eventsForSyncedContacts, "events_import", clock, cancellationToken);
await txn.SaveEventsAsync(eventsForSyncedContacts, "events_import", _clock, cancellationToken);

if (!dryRun)
{
Expand Down Expand Up @@ -529,7 +557,7 @@ private async Task<int> SyncAlertsAsync(

var modelTypeSyncInfo = GetModelTypeSyncInfo<Alert>(ModelTypes.Alert);

await using var connection = await trsDbDataSource.OpenConnectionAsync(cancellationToken);
await using var connection = await _trsDbDataSource.OpenConnectionAsync(cancellationToken);

var toSync = alerts.ToList();

Expand Down Expand Up @@ -560,12 +588,12 @@ private async Task<int> SyncAlertsAsync(
using (var mergeCommand = connection.CreateCommand())
{
mergeCommand.CommandText = modelTypeSyncInfo.UpsertStatement;
mergeCommand.Parameters.Add(new NpgsqlParameter(NowParameterName, clock.UtcNow));
mergeCommand.Parameters.Add(new NpgsqlParameter(NowParameterName, _clock.UtcNow));
mergeCommand.Transaction = txn;
await mergeCommand.ExecuteNonQueryAsync();
}

await txn.SaveEventsAsync(events, "events_import", clock, cancellationToken);
await txn.SaveEventsAsync(events, "events_import", _clock, cancellationToken);

if (!dryRun)
{
Expand Down Expand Up @@ -621,10 +649,10 @@ public async Task<int> SyncEventsAsync(IReadOnlyCollection<dfeta_TRSEvent> event

var mapped = events.Select(e => EventInfo.Deserialize(e.dfeta_Payload).Event).ToArray();

await using var connection = await trsDbDataSource.OpenConnectionAsync(cancellationToken);
await using var connection = await _trsDbDataSource.OpenConnectionAsync(cancellationToken);

using var txn = await connection.BeginTransactionAsync(cancellationToken);
await txn.SaveEventsAsync(mapped, tempTableSuffix: "events_import", clock, cancellationToken);
await txn.SaveEventsAsync(mapped, tempTableSuffix: "events_import", _clock, cancellationToken);

if (!dryRun)
{
Expand Down Expand Up @@ -800,7 +828,7 @@ private async Task<IReadOnlyDictionary<Guid, AuditDetailCollection>> GetAuditRec
{
try
{
response = (ExecuteMultipleResponse)await organizationService.ExecuteAsync(request, cancellationToken);
response = (ExecuteMultipleResponse)await _organizationService.ExecuteAsync(request, cancellationToken);
}
catch (FaultException fex) when (fex.IsCrmRateLimitException(out var retryAfter))
{
Expand Down Expand Up @@ -854,7 +882,25 @@ private async Task<TEntity[]> GetEntitiesAsync<TEntity>(
query.Criteria.AddCondition("statecode", ConditionOperator.Equal, 0);
}

var response = await organizationService.RetrieveMultipleAsync(query, cancellationToken);
EntityCollection response;
while (true)
{
cancellationToken.ThrowIfCancellationRequested();

try
{
response = await _organizationService.RetrieveMultipleAsync(query, cancellationToken);
}
catch (FaultException<OrganizationServiceFault> fex) when (fex.IsCrmRateLimitException(out var retryAfter))
{
_logger.LogWarning("Hit CRM service limits; error code: {ErrorCode}", fex.Detail.ErrorCode);
await Task.Delay(retryAfter, cancellationToken);
continue;
}

break;
}

return response.Entities.Select(e => e.ToEntity<TEntity>()).ToArray();
}

Expand Down Expand Up @@ -1421,7 +1467,7 @@ EventBase MapMigratedEvent(EntityVersionInfo<dfeta_induction> snapshot, Inductio
{
EventId = Guid.NewGuid(),
Key = $"{snapshot.Entity.Id}-Migrated",
CreatedUtc = clock.UtcNow,
CreatedUtc = _clock.UtcNow,
RaisedBy = EventModels.RaisedByUserInfo.FromUserId(Core.DataStore.Postgres.Models.SystemUser.SystemUserId),
PersonId = snapshot.Entity.dfeta_PersonId.Id,
InductionStartDate = mappedInduction.InductionStartDate,
Expand Down Expand Up @@ -1451,8 +1497,8 @@ EventModels.DqtInduction GetEventDqtInduction(dfeta_induction induction)
bool ignoreInvalid,
bool createMigratedEvent)
{
var sanctionCodes = await referenceDataCache.GetSanctionCodesAsync(activeOnly: false);
var alertTypes = await referenceDataCache.GetAlertTypesAsync();
var sanctionCodes = await _referenceDataCache.GetSanctionCodesAsync(activeOnly: false);
var alertTypes = await _referenceDataCache.GetAlertTypesAsync();

var alerts = new List<Alert>();
var events = new List<EventBase>();
Expand Down Expand Up @@ -1486,7 +1532,7 @@ EventModels.DqtInduction GetEventDqtInduction(dfeta_induction induction)
// If the record is deactivated then it's migrated as deleted
if (s.StateCode == dfeta_sanctionState.Inactive)
{
mapped.DeletedOn = clock.UtcNow;
mapped.DeletedOn = _clock.UtcNow;
}
else if (createMigratedEvent)
{
Expand Down Expand Up @@ -1603,7 +1649,7 @@ EventBase MapMigratedEvent(EntityVersionInfo<dfeta_sanction> snapshot)
{
EventId = Guid.NewGuid(),
Key = $"{snapshot.Entity.Id}-Migrated",
CreatedUtc = clock.UtcNow,
CreatedUtc = _clock.UtcNow,
RaisedBy = EventModels.RaisedByUserInfo.FromUserId(Core.DataStore.Postgres.Models.SystemUser.SystemUserId),
PersonId = snapshot.Entity.dfeta_PersonId.Id,
Alert = GetEventAlert(snapshot.Entity, applyMigrationMappings: true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.PowerPlatform.Dataverse.Client;
using TeachingRecordSystem.Core.Services.TrnGenerationApi;
using TeachingRecordSystem.TestCommon;
Expand All @@ -18,10 +19,11 @@ public sealed class CrmClientFixture : IDisposable
private readonly CancellationTokenSource _completedCts;
private readonly EnvironmentLockManager _lockManager;
private readonly IMemoryCache _memoryCache;
private readonly ILoggerFactory _loggerFactory;
private readonly ITrnGenerationApiClient _trnGenerationApiClient;
private readonly ReferenceDataCache _referenceDataCache;

public CrmClientFixture(ServiceClient serviceClient, DbFixture dbFixture, IConfiguration configuration, IMemoryCache memoryCache)
public CrmClientFixture(ServiceClient serviceClient, DbFixture dbFixture, IConfiguration configuration, IMemoryCache memoryCache, ILoggerFactory loggerFactory)
{
Clock = new Clock();
Configuration = configuration;
Expand All @@ -31,6 +33,7 @@ public CrmClientFixture(ServiceClient serviceClient, DbFixture dbFixture, IConfi
_lockManager = new EnvironmentLockManager(Configuration);
_lockManager.AcquireLock(_completedCts.Token);
_memoryCache = memoryCache;
_loggerFactory = loggerFactory;
_trnGenerationApiClient = GetTrnGenerationApiClient();
_referenceDataCache = new ReferenceDataCache(
new CrmQueryDispatcher(CreateQueryServiceProvider(_baseServiceClient, referenceDataCache: null), serviceClientName: null),
Expand Down Expand Up @@ -65,7 +68,7 @@ public TestDataScope CreateTestDataScope(bool withSync = false)
_referenceDataCache,
Clock,
() => _trnGenerationApiClient.GenerateTrnAsync(),
withSync ? TestDataSyncConfiguration.Sync(new(DbFixture.GetDataSource(), orgService, _referenceDataCache, Clock)) : TestDataSyncConfiguration.NoSync()),
withSync ? TestDataSyncConfiguration.Sync(new(DbFixture.GetDataSource(), orgService, _referenceDataCache, Clock, _loggerFactory)) : TestDataSyncConfiguration.NoSync()),
_memoryCache,
onAsyncDispose);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.PowerPlatform.Dataverse.Client;
using TeachingRecordSystem.Core.Dqt;
using TeachingRecordSystem.Core.Services.TrsDataSync;
Expand Down Expand Up @@ -30,7 +31,8 @@ public EventMapperFixture(
IOrganizationServiceAsync2 organizationService,
ICrmQueryDispatcher crmQueryDispatcher,
FakeTrnGenerator trnGenerator,
IServiceProvider serviceProvider)
IServiceProvider serviceProvider,
ILoggerFactory loggerFactory)
{
Clock = new TestableClock();
DbFixture = dbFixture;
Expand All @@ -40,7 +42,8 @@ public EventMapperFixture(
dbFixture.GetDataSource(),
organizationService,
ReferenceDataCache,
Clock);
Clock,
loggerFactory);

TestData = new TestData(
dbFixture.GetDbContextFactory(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public async Task SyncInductionsAsync_WithExistingDqtInduction_UpdatesPersonReco
var job = new SyncAllInductionsFromCrmJob(
CrmServiceClientProvider,
Helper,
options);
options,
LoggerFactory);

await job.ExecuteAsync(createMigratedEvent: false, dryRun: false, CancellationToken.None);

Expand Down
Loading

0 comments on commit f09c226

Please sign in to comment.