Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added fixes while testing against CRM build environment data #1751

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions TeachingRecordSystem/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,14 @@ RUN apk --no-cache add msttcorefonts-installer fontconfig && \
update-ms-fonts && \
fc-cache -f

# Install SQL Server tools needed to be able to query the reporting DB to help debugging
RUN apk add curl

RUN curl -O https://download.microsoft.com/download/b/9/f/b9f3cce4-3925-46d4-9f46-da08869c6486/msodbcsql18_18.0.1.1-1_amd64.apk && \
curl -O https://download.microsoft.com/download/b/9/f/b9f3cce4-3925-46d4-9f46-da08869c6486/mssql-tools18_18.0.1.1-1_amd64.apk

RUN apk add --allow-untrusted msodbcsql18_18.0.1.1-1_amd64.apk && \
apk add --allow-untrusted mssql-tools18_18.0.1.1-1_amd64.apk && \
rm -f msodbcsql18_18.0.1.1-1_amd64.apk mssql-tools18_18.0.1.1-1_amd64.apk

ENV PATH="${PATH}:/Apps/TrsCli"
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.ServiceModel;
using Hangfire;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.Xrm.Sdk;
using Microsoft.Xrm.Sdk.Query;
Expand All @@ -14,15 +15,18 @@ public class SyncAllInductionsFromCrmJob
private readonly ICrmServiceClientProvider _crmServiceClientProvider;
private readonly TrsDataSyncHelper _trsDataSyncHelper;
private readonly IOptions<TrsDataSyncServiceOptions> _syncOptionsAccessor;
private readonly ILogger<SyncAllInductionsFromCrmJob> _logger;

public SyncAllInductionsFromCrmJob(
ICrmServiceClientProvider crmServiceClientProvider,
TrsDataSyncHelper trsDataSyncHelper,
IOptions<TrsDataSyncServiceOptions> syncOptionsAccessor)
IOptions<TrsDataSyncServiceOptions> syncOptionsAccessor,
ILoggerFactory loggerFactory)
{
_crmServiceClientProvider = crmServiceClientProvider;
_trsDataSyncHelper = trsDataSyncHelper;
_syncOptionsAccessor = syncOptionsAccessor;
_logger = loggerFactory.CreateLogger<SyncAllInductionsFromCrmJob>();
}

public async Task ExecuteAsync(bool createMigratedEvent, bool dryRun, CancellationToken cancellationToken)
Expand Down Expand Up @@ -57,6 +61,7 @@ public async Task ExecuteAsync(bool createMigratedEvent, bool dryRun, Cancellati
}
catch (FaultException<OrganizationServiceFault> fex) when (fex.IsCrmRateLimitException(out var retryAfter))
{
_logger.LogWarning("Hit CRM service limits; error code: {ErrorCode}", fex.Detail.ErrorCode);
await Task.Delay(retryAfter, cancellationToken);
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.ServiceModel;
using Microsoft.Crm.Sdk.Messages;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.PowerPlatform.Dataverse.Client;
using Microsoft.Xrm.Sdk;
using Microsoft.Xrm.Sdk.Messages;
Expand All @@ -16,11 +17,7 @@

namespace TeachingRecordSystem.Core.Services.TrsDataSync;

public class TrsDataSyncHelper(
NpgsqlDataSource trsDbDataSource,
[FromKeyedServices(TrsDataSyncService.CrmClientName)] IOrganizationServiceAsync2 organizationService,
ReferenceDataCache referenceDataCache,
IClock clock)
public class TrsDataSyncHelper
{
private delegate Task SyncEntitiesHandler(IReadOnlyCollection<Entity> entities, bool ignoreInvalid, bool dryRun, CancellationToken cancellationToken);

Expand All @@ -37,10 +34,41 @@ public class TrsDataSyncHelper(
};

private readonly ISubject<object[]> _syncedEntitiesSubject = new Subject<object[]>();
private readonly NpgsqlDataSource _trsDbDataSource;
private readonly IOrganizationServiceAsync2 _organizationService;
private readonly ReferenceDataCache _referenceDataCache;
private readonly IClock _clock;
private readonly ILogger<TrsDataSyncHelper> _logger;
private bool? isFakeXrm;

public TrsDataSyncHelper(
NpgsqlDataSource trsDbDataSource,
[FromKeyedServices(TrsDataSyncService.CrmClientName)] IOrganizationServiceAsync2 organizationService,
ReferenceDataCache referenceDataCache,
IClock clock,
ILoggerFactory loggerFactory)
{
_trsDbDataSource = trsDbDataSource;
_organizationService = organizationService;
_referenceDataCache = referenceDataCache;
_clock = clock;
_logger = loggerFactory.CreateLogger<TrsDataSyncHelper>();
}

public IObservable<object[]> GetSyncedEntitiesObservable() => _syncedEntitiesSubject;

private bool IsFakeXrm { get; } = organizationService.GetType().FullName == "Castle.Proxies.ObjectProxy_2";
private bool IsFakeXrm
{
get
{
if (isFakeXrm is null)
{
isFakeXrm = _organizationService.GetType().FullName == "Castle.Proxies.ObjectProxy_2";
}

return isFakeXrm!.Value;
}
}

public static (string EntityLogicalName, string[] AttributeNames) GetEntityInfoForModelType(string modelType)
{
Expand Down Expand Up @@ -142,7 +170,7 @@ public async Task DeleteRecordsAsync(string modelType, IReadOnlyCollection<Guid>
throw new NotSupportedException($"Cannot delete a {modelType}.");
}

await using var connection = await trsDbDataSource.OpenConnectionAsync(cancellationToken);
await using var connection = await _trsDbDataSource.OpenConnectionAsync(cancellationToken);

using (var cmd = connection.CreateCommand())
{
Expand All @@ -161,7 +189,7 @@ public async Task DeleteRecordsAsync(string modelType, IReadOnlyCollection<Guid>
return null;
}

await using var connection = await trsDbDataSource.OpenConnectionAsync();
await using var connection = await _trsDbDataSource.OpenConnectionAsync();

using (var cmd = connection.CreateCommand())
{
Expand Down Expand Up @@ -221,7 +249,7 @@ public async Task<IReadOnlyCollection<Guid>> SyncPersonsAsync(IReadOnlyCollectio

var modelTypeSyncInfo = GetModelTypeSyncInfo<Person>(ModelTypes.Person);

await using var connection = await trsDbDataSource.OpenConnectionAsync(cancellationToken);
await using var connection = await _trsDbDataSource.OpenConnectionAsync(cancellationToken);
using var txn = await connection.BeginTransactionAsync(cancellationToken);

using (var createTempTableCommand = connection.CreateCommand())
Expand All @@ -247,7 +275,7 @@ public async Task<IReadOnlyCollection<Guid>> SyncPersonsAsync(IReadOnlyCollectio
using (var mergeCommand = connection.CreateCommand())
{
mergeCommand.CommandText = modelTypeSyncInfo.UpsertStatement;
mergeCommand.Parameters.Add(new NpgsqlParameter(NowParameterName, clock.UtcNow));
mergeCommand.Parameters.Add(new NpgsqlParameter(NowParameterName, _clock.UtcNow));
mergeCommand.Transaction = txn;
await mergeCommand.ExecuteNonQueryAsync();
}
Expand Down Expand Up @@ -330,7 +358,8 @@ public async Task<int> SyncInductionsAsync(
dfeta_induction.Fields.dfeta_InductionStatus,
dfeta_induction.Fields.CreatedOn,
dfeta_induction.Fields.CreatedBy,
dfeta_induction.Fields.ModifiedOn
dfeta_induction.Fields.ModifiedOn,
dfeta_induction.Fields.StateCode
};

var inductions = await GetEntitiesAsync<dfeta_induction>(
Expand Down Expand Up @@ -384,7 +413,7 @@ private async Task<int> SyncInductionsAsync(
{
var modelTypeSyncInfo = GetModelTypeSyncInfo<InductionInfo>(ModelTypes.Induction);

await using var connection = await trsDbDataSource.OpenConnectionAsync(cancellationToken);
await using var connection = await _trsDbDataSource.OpenConnectionAsync(cancellationToken);

var toSync = inductions.ToList();

Expand Down Expand Up @@ -415,7 +444,7 @@ private async Task<int> SyncInductionsAsync(
using (var mergeCommand = connection.CreateCommand())
{
mergeCommand.CommandText = modelTypeSyncInfo.UpsertStatement;
mergeCommand.Parameters.Add(new NpgsqlParameter(NowParameterName, clock.UtcNow));
mergeCommand.Parameters.Add(new NpgsqlParameter(NowParameterName, _clock.UtcNow));
mergeCommand.Transaction = txn;
using var reader = await mergeCommand.ExecuteReaderAsync();
while (await reader.ReadAsync(cancellationToken))
Expand Down Expand Up @@ -457,7 +486,7 @@ private async Task<int> SyncInductionsAsync(
.Where(e => e is IEventWithPersonId && !unsyncedContactIds.Any(c => c == ((IEventWithPersonId)e).PersonId))
.ToArray();

await txn.SaveEventsAsync(eventsForSyncedContacts, "events_import", clock, cancellationToken);
await txn.SaveEventsAsync(eventsForSyncedContacts, "events_import", _clock, cancellationToken);

if (!dryRun)
{
Expand Down Expand Up @@ -528,7 +557,7 @@ private async Task<int> SyncAlertsAsync(

var modelTypeSyncInfo = GetModelTypeSyncInfo<Alert>(ModelTypes.Alert);

await using var connection = await trsDbDataSource.OpenConnectionAsync(cancellationToken);
await using var connection = await _trsDbDataSource.OpenConnectionAsync(cancellationToken);

var toSync = alerts.ToList();

Expand Down Expand Up @@ -559,12 +588,12 @@ private async Task<int> SyncAlertsAsync(
using (var mergeCommand = connection.CreateCommand())
{
mergeCommand.CommandText = modelTypeSyncInfo.UpsertStatement;
mergeCommand.Parameters.Add(new NpgsqlParameter(NowParameterName, clock.UtcNow));
mergeCommand.Parameters.Add(new NpgsqlParameter(NowParameterName, _clock.UtcNow));
mergeCommand.Transaction = txn;
await mergeCommand.ExecuteNonQueryAsync();
}

await txn.SaveEventsAsync(events, "events_import", clock, cancellationToken);
await txn.SaveEventsAsync(events, "events_import", _clock, cancellationToken);

if (!dryRun)
{
Expand Down Expand Up @@ -620,10 +649,10 @@ public async Task<int> SyncEventsAsync(IReadOnlyCollection<dfeta_TRSEvent> event

var mapped = events.Select(e => EventInfo.Deserialize(e.dfeta_Payload).Event).ToArray();

await using var connection = await trsDbDataSource.OpenConnectionAsync(cancellationToken);
await using var connection = await _trsDbDataSource.OpenConnectionAsync(cancellationToken);

using var txn = await connection.BeginTransactionAsync(cancellationToken);
await txn.SaveEventsAsync(mapped, tempTableSuffix: "events_import", clock, cancellationToken);
await txn.SaveEventsAsync(mapped, tempTableSuffix: "events_import", _clock, cancellationToken);

if (!dryRun)
{
Expand All @@ -644,6 +673,7 @@ private EntityVersionInfo<TEntity>[] GetEntityVersions<TEntity>(TEntity latest,
.OfType<AttributeAuditDetail>()
.Select(a => (AuditDetail: a, AuditRecord: a.AuditRecord.ToEntity<Audit>()))
.OrderBy(a => a.AuditRecord.CreatedOn)
.ThenBy(a => a.AuditRecord.Action == Audit_Action.Create ? 0 : 1)
.ToArray();

if (ordered.Length == 0)
Expand Down Expand Up @@ -798,10 +828,11 @@ private async Task<IReadOnlyDictionary<Guid, AuditDetailCollection>> GetAuditRec
{
try
{
response = (ExecuteMultipleResponse)await organizationService.ExecuteAsync(request, cancellationToken);
response = (ExecuteMultipleResponse)await _organizationService.ExecuteAsync(request, cancellationToken);
}
catch (FaultException fex) when (fex.IsCrmRateLimitException(out var retryAfter))
{
_logger.LogWarning("Hit CRM service limits; Fault exception");
await Task.Delay(retryAfter, cancellationToken);
continue;
}
Expand All @@ -812,11 +843,13 @@ private async Task<IReadOnlyDictionary<Guid, AuditDetailCollection>> GetAuditRec

if (firstFault.IsCrmRateLimitFault(out var retryAfter))
{
_logger.LogWarning("Hit CRM service limits; CRM rate limit fault");
await Task.Delay(retryAfter, cancellationToken);
continue;
}
else if (firstFault.Message.Contains("The HTTP status code of the response was not expected (429)"))
{
_logger.LogWarning("Hit CRM service limits; 429 too many requests");
await Task.Delay(TimeSpan.FromMinutes(2), cancellationToken);
continue;
}
Expand Down Expand Up @@ -852,7 +885,25 @@ private async Task<TEntity[]> GetEntitiesAsync<TEntity>(
query.Criteria.AddCondition("statecode", ConditionOperator.Equal, 0);
}

var response = await organizationService.RetrieveMultipleAsync(query, cancellationToken);
EntityCollection response;
while (true)
{
cancellationToken.ThrowIfCancellationRequested();

try
{
response = await _organizationService.RetrieveMultipleAsync(query, cancellationToken);
}
catch (FaultException<OrganizationServiceFault> fex) when (fex.IsCrmRateLimitException(out var retryAfter))
{
_logger.LogWarning("Hit CRM service limits; error code: {ErrorCode}", fex.Detail.ErrorCode);
await Task.Delay(retryAfter, cancellationToken);
continue;
}

break;
}

return response.Entities.Select(e => e.ToEntity<TEntity>()).ToArray();
}

Expand Down Expand Up @@ -1012,7 +1063,21 @@ RETURNING t.person_id
{
Contact.PrimaryIdAttribute,
Contact.Fields.dfeta_InductionStatus,
Contact.Fields.dfeta_qtlsdate
Contact.Fields.dfeta_qtlsdate,
Contact.Fields.CreatedOn,
Contact.Fields.CreatedBy,
Contact.Fields.StateCode,
Contact.Fields.ModifiedOn,
Contact.Fields.dfeta_TRN,
Contact.Fields.FirstName,
Contact.Fields.MiddleName,
Contact.Fields.LastName,
Contact.Fields.dfeta_StatedFirstName,
Contact.Fields.dfeta_StatedMiddleName,
Contact.Fields.dfeta_StatedLastName,
Contact.Fields.BirthDate,
Contact.Fields.dfeta_NINumber,
Contact.Fields.EMailAddress1,
};

Action<NpgsqlBinaryImporter, InductionInfo> writeRecord = (writer, induction) =>
Expand Down Expand Up @@ -1225,7 +1290,8 @@ private static List<Person> MapPersons(IEnumerable<Contact> contacts) => contact
dfeta_induction.Fields.dfeta_InductionExemptionReason,
dfeta_induction.Fields.dfeta_StartDate,
dfeta_induction.Fields.dfeta_InductionStatus,
dfeta_induction.Fields.ModifiedOn
dfeta_induction.Fields.ModifiedOn,
dfeta_induction.Fields.StateCode
};

if (auditDetails.TryGetValue(induction!.Id, out var inductionAudits))
Expand Down Expand Up @@ -1404,7 +1470,7 @@ EventBase MapMigratedEvent(EntityVersionInfo<dfeta_induction> snapshot, Inductio
{
EventId = Guid.NewGuid(),
Key = $"{snapshot.Entity.Id}-Migrated",
CreatedUtc = clock.UtcNow,
CreatedUtc = _clock.UtcNow,
RaisedBy = EventModels.RaisedByUserInfo.FromUserId(Core.DataStore.Postgres.Models.SystemUser.SystemUserId),
PersonId = snapshot.Entity.dfeta_PersonId.Id,
InductionStartDate = mappedInduction.InductionStartDate,
Expand Down Expand Up @@ -1434,8 +1500,8 @@ EventModels.DqtInduction GetEventDqtInduction(dfeta_induction induction)
bool ignoreInvalid,
bool createMigratedEvent)
{
var sanctionCodes = await referenceDataCache.GetSanctionCodesAsync(activeOnly: false);
var alertTypes = await referenceDataCache.GetAlertTypesAsync();
var sanctionCodes = await _referenceDataCache.GetSanctionCodesAsync(activeOnly: false);
var alertTypes = await _referenceDataCache.GetAlertTypesAsync();

var alerts = new List<Alert>();
var events = new List<EventBase>();
Expand Down Expand Up @@ -1469,7 +1535,7 @@ EventModels.DqtInduction GetEventDqtInduction(dfeta_induction induction)
// If the record is deactivated then it's migrated as deleted
if (s.StateCode == dfeta_sanctionState.Inactive)
{
mapped.DeletedOn = clock.UtcNow;
mapped.DeletedOn = _clock.UtcNow;
}
else if (createMigratedEvent)
{
Expand Down Expand Up @@ -1586,7 +1652,7 @@ EventBase MapMigratedEvent(EntityVersionInfo<dfeta_sanction> snapshot)
{
EventId = Guid.NewGuid(),
Key = $"{snapshot.Entity.Id}-Migrated",
CreatedUtc = clock.UtcNow,
CreatedUtc = _clock.UtcNow,
RaisedBy = EventModels.RaisedByUserInfo.FromUserId(Core.DataStore.Postgres.Models.SystemUser.SystemUserId),
PersonId = snapshot.Entity.dfeta_PersonId.Id,
Alert = GetEventAlert(snapshot.Entity, applyMigrationMappings: true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
"MinimumLevel": {
"Default": "Error",
"Override": {
"TeachingRecordSystem.Worker": "Warning"
"TeachingRecordSystem.Worker": "Warning",
"TeachingRecordSystem.Core.Services.TrsDataSync": "Warning",
"TeachingRecordSystem.Core.Jobs": "Warning"
}
}
},
Expand Down
Loading
Loading