Skip to content

Commit

Permalink
Sync events from DQT to TRS
Browse files Browse the repository at this point in the history
  • Loading branch information
gunndabad committed Aug 21, 2024
1 parent a2317a4 commit 4266102
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Text.Json;
using Npgsql;
using NpgsqlTypes;

Expand Down Expand Up @@ -32,7 +31,13 @@ public static void WriteValueOrNull<T>(this NpgsqlBinaryImporter writer, T? valu
}
}

public static async Task<int> SaveEvents(this NpgsqlTransaction transaction, IReadOnlyCollection<EventBase> events, string tempTableSuffix, IClock clock, CancellationToken cancellationToken, int? timeoutSeconds = null)
public static async Task<int> SaveEvents(
this NpgsqlTransaction transaction,
IReadOnlyCollection<EventBase> events,
string tempTableSuffix,
IClock clock,
CancellationToken cancellationToken,
int? timeoutSeconds = null)
{
if (events.Count == 0)
{
Expand Down Expand Up @@ -88,16 +93,13 @@ ON CONFLICT DO NOTHING

foreach (var e in events)
{
var payload = JsonSerializer.Serialize(e, e.GetType(), EventBase.JsonSerializerOptions);
var key = e is IEventWithKey eventWithKey ? eventWithKey.Key : null;

writer.StartRow();
writer.WriteValueOrNull(e.EventId, NpgsqlDbType.Uuid);
writer.WriteValueOrNull(e.GetEventName(), NpgsqlDbType.Varchar);
writer.WriteValueOrNull(e.CreatedUtc, NpgsqlDbType.TimestampTz);
writer.WriteValueOrNull(clock.UtcNow, NpgsqlDbType.TimestampTz);
writer.WriteValueOrNull(payload, NpgsqlDbType.Jsonb);
writer.WriteValueOrNull(key, NpgsqlDbType.Varchar);
writer.WriteValueOrNull(e.Serialize(), NpgsqlDbType.Jsonb); // payload
writer.WriteValueOrNull((e as IEventWithKey)?.Key, NpgsqlDbType.Varchar);
writer.WriteValueOrNull((e as IEventWithPersonId)?.PersonId, NpgsqlDbType.Uuid);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ public abstract class EventInfo
private protected EventInfo(EventBase @event)
{
Event = @event;
EventName = @event.GetEventName();
}

public EventBase Event { get; }

public string EventName { get; }

public static EventInfo<TEvent> Create<TEvent>(TEvent @event) where TEvent : EventBase => new EventInfo<TEvent>(@event);

public static EventInfo Deserialize(string payload) =>
Expand Down Expand Up @@ -110,7 +113,7 @@ public static EventInfo<TEvent> Deserialize<TEvent>(string payload) where TEvent
public override void Write(Utf8JsonWriter writer, EventInfo value, JsonSerializerOptions options)
{
var eventType = value.Event.GetType();
var eventName = value.Event.GetEventName();
var eventName = value.EventName;

writer.WriteStartObject();
writer.WritePropertyName("EventName");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class TrsDataSyncHelper(
{
{ ModelTypes.Person, GetModelTypeSyncInfoForPerson() },
{ ModelTypes.MandatoryQualification, GetModelTypeSyncInfoForMandatoryQualification() },
{ ModelTypes.Event, GetModelTypeSyncInfoForEvent() },
};

private readonly ISubject<object[]> _syncedEntitiesSubject = new Subject<object[]>();
Expand Down Expand Up @@ -114,6 +115,11 @@ public async Task DeleteRecords(string modelType, IReadOnlyCollection<Guid> ids,

var modelTypeSyncInfo = GetModelTypeSyncInfo(modelType);

if (modelTypeSyncInfo.DeleteStatement is null)
{
throw new NotSupportedException($"Cannot delete a {modelType}.");
}

await using var connection = await trsDbDataSource.OpenConnectionAsync(cancellationToken);

using (var cmd = connection.CreateCommand())
Expand All @@ -128,6 +134,11 @@ public async Task DeleteRecords(string modelType, IReadOnlyCollection<Guid> ids,
{
var modelTypeSyncInfo = GetModelTypeSyncInfo(modelType);

if (modelTypeSyncInfo.GetLastModifiedOnStatement is null)
{
return null;
}

await using var connection = await trsDbDataSource.OpenConnectionAsync();

using (var cmd = connection.CreateCommand())
Expand Down Expand Up @@ -194,12 +205,12 @@ public async Task<int> SyncPersons(IReadOnlyCollection<Contact> entities, bool i
await createTempTableCommand.ExecuteNonQueryAsync(cancellationToken);
}

using var writer = await connection.BeginBinaryImportAsync(modelTypeSyncInfo.CopyStatement, cancellationToken);
using var writer = await connection.BeginBinaryImportAsync(modelTypeSyncInfo.CopyStatement!, cancellationToken);

foreach (var person in people)
{
writer.StartRow();
modelTypeSyncInfo.WriteRecord(writer, person);
modelTypeSyncInfo.WriteRecord!(writer, person);
}

await writer.CompleteAsync(cancellationToken);
Expand Down Expand Up @@ -348,12 +359,12 @@ private async Task<int> SyncMandatoryQualifications(
await createTempTableCommand.ExecuteNonQueryAsync(cancellationToken);
}

using var writer = await connection.BeginBinaryImportAsync(modelTypeSyncInfo.CopyStatement, cancellationToken);
using var writer = await connection.BeginBinaryImportAsync(modelTypeSyncInfo.CopyStatement!, cancellationToken);

foreach (var mq in toSync)
{
writer.StartRow();
modelTypeSyncInfo.WriteRecord(writer, mq);
modelTypeSyncInfo.WriteRecord!(writer, mq);
}

await writer.CompleteAsync(cancellationToken);
Expand Down Expand Up @@ -413,6 +424,24 @@ private async Task<int> SyncMandatoryQualifications(
return toSync.Count;
}

public async Task<int> SyncEvents(IReadOnlyCollection<dfeta_TRSEvent> events, CancellationToken cancellationToken = default)
{
var modelTypeSyncInfo = GetModelTypeSyncInfo<EventInfo>(ModelTypes.Event);

var mapped = events.Select(e => EventInfo.Deserialize(e.dfeta_Payload).Event).ToArray();

await using var dbContext = await dbContextFactory.CreateDbContextAsync();

var connection = (NpgsqlConnection)dbContext.Database.GetDbConnection();
await connection.OpenAsync(cancellationToken);
using var txn = await connection.BeginTransactionAsync(cancellationToken);
await txn.SaveEvents(mapped, tempTableSuffix: "events_import", clock, cancellationToken);
await txn.CommitAsync();

_syncedEntitiesSubject.OnNext(events.ToArray());
return events.Count;
}

private EntityVersionInfo<TEntity>[] GetEntityVersions<TEntity>(TEntity latest, IEnumerable<AuditDetail> auditDetails, string[] attributeNames)
where TEntity : Entity
{
Expand Down Expand Up @@ -830,6 +859,30 @@ WHERE t.dqt_modified_on < EXCLUDED.dqt_modified_on
};
}

private static ModelTypeSyncInfo GetModelTypeSyncInfoForEvent()
{
var attributeNames = new[]
{
dfeta_TRSEvent.Fields.dfeta_TRSEventId,
dfeta_TRSEvent.Fields.dfeta_EventName,
dfeta_TRSEvent.Fields.dfeta_Payload,
};

return new ModelTypeSyncInfo<EventInfo>()
{
CreateTempTableStatement = null,
CopyStatement = null,
InsertStatement = null,
DeleteStatement = null,
GetLastModifiedOnStatement = null,
EntityLogicalName = dfeta_TRSEvent.EntityLogicalName,
AttributeNames = attributeNames,
GetSyncHandler = helper => (entities, ignoreInvalid, ct) =>
helper.SyncEvents(entities.Select(e => e.ToEntity<dfeta_TRSEvent>()).ToArray(), ct),
WriteRecord = null
};
}

private static List<Person> MapPersons(IEnumerable<Contact> contacts) => contacts
.Select(c => new Person()
{
Expand Down Expand Up @@ -1106,19 +1159,19 @@ bool TryDeserializeEventAttribute(AttributeCollection attributes, string key, [N

private record ModelTypeSyncInfo
{
public required string CreateTempTableStatement { get; init; }
public required string CopyStatement { get; init; }
public required string InsertStatement { get; init; }
public required string DeleteStatement { get; init; }
public required string GetLastModifiedOnStatement { get; init; }
public required string? CreateTempTableStatement { get; init; }
public required string? CopyStatement { get; init; }
public required string? InsertStatement { get; init; }
public required string? DeleteStatement { get; init; }
public required string? GetLastModifiedOnStatement { get; init; }
public required string EntityLogicalName { get; init; }
public required string[] AttributeNames { get; init; }
public required Func<TrsDataSyncHelper, SyncEntitiesHandler> GetSyncHandler { get; init; }
}

private record ModelTypeSyncInfo<TModel> : ModelTypeSyncInfo
{
public required Action<NpgsqlBinaryImporter, TModel> WriteRecord { get; init; }
public required Action<NpgsqlBinaryImporter, TModel>? WriteRecord { get; init; }
}

private record EntityVersionInfo<TEntity>(
Expand All @@ -1134,6 +1187,7 @@ public static class ModelTypes
{
public const string Person = "Person";
public const string MandatoryQualification = "MandatoryQualification";
public const string Event = "Event";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ internal async Task ProcessChanges(CancellationToken cancellationToken)
// Order is important here; the dependees should come before dependents
await SyncIfEnabled(TrsDataSyncHelper.ModelTypes.Person);
await SyncIfEnabled(TrsDataSyncHelper.ModelTypes.MandatoryQualification);
await SyncIfEnabled(TrsDataSyncHelper.ModelTypes.Event);

async Task SyncIfEnabled(string modelType)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@
"TrsSyncService": {
"PollIntervalSeconds": 60,
"ModelTypes": [
"Person"
"Person",
"Event"
],
"IgnoreInvalidData": false,
"RunService": false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using TeachingRecordSystem.Core.Dqt.Models;
using SystemUser = TeachingRecordSystem.Core.DataStore.Postgres.Models.SystemUser;

namespace TeachingRecordSystem.Core.Tests.Services.TrsDataSync;

public partial class TrsDataSyncHelperTests
{
[Fact]
public async Task SyncEvent_NewRecord_WritesNewRowToDb()
{
// Arrange
var @event = new DqtAnnotationDeletedEvent()
{
AnnotationId = Guid.NewGuid(),
CreatedUtc = Clock.UtcNow,
EventId = Guid.NewGuid(),
RaisedBy = SystemUser.SystemUserId
};

var eventInfo = EventInfo.Create(@event);

var trsEventEntity = new dfeta_TRSEvent()
{
dfeta_TRSEventId = @event.EventId,
dfeta_Payload = eventInfo.Serialize(),
dfeta_EventName = eventInfo.EventName
};

// Act
await Helper.SyncEvents([trsEventEntity]);

// Assert
await DbFixture.WithDbContext(async dbContext =>
{
var dbEvent = await dbContext.Events.SingleOrDefaultAsync(p => p.EventId == @event.EventId);
Assert.NotNull(dbEvent);
Assert.Equal(@event.GetEventName(), dbEvent.EventName);
Assert.Equal(@event.CreatedUtc, dbEvent.Created);
Assert.Equal(Clock.UtcNow, dbEvent.Inserted);
AssertEx.JsonObjectEquals(@event, EventBase.Deserialize(dbEvent.Payload, dbEvent.EventName));
Assert.Null(dbEvent.Key);
Assert.False(dbEvent.Published);
Assert.Null(dbEvent.PersonId);
});
}
}

0 comments on commit 4266102

Please sign in to comment.