From 59918ee8f62b62e30e048d5d8bab0fd22b86352c Mon Sep 17 00:00:00 2001 From: Jono Prest <65739024+JonoPrest@users.noreply.github.com> Date: Mon, 18 Nov 2024 18:26:55 +0200 Subject: [PATCH] [4] Rollback diff implementation (#333) * Implement entity history tables for each entity * Wip implement per table insert history item function with tests * Add entity history schema creator * Add eval function for inserting entity history * Fix spelling of variable * Move EntityHistory to its own file * Add serial to entity history * Make InMemomoryStore static * Move InMemomoryStore to static templates dir * Add new rollback diffing queries * Implement delete rolled back entity history * Implement multichain rollback filters * Implement unordered and ordered multichain versions of rollbacks * Make tests compile * Fix broken magic in memory table function * Fix double quoted dynamic table name in query * Add new test expectations * Refactor simpler in memory table entity history state * Fix diff and delete entity history functions * Fix tests * Fix inverted rollback check * Include rollback multichain test in suite --- .../templates/dynamic/codegen/src/IO.res.hbs | 128 ++++--- .../dynamic/codegen/src/InMemoryStore.res.hbs | 64 ---- .../codegen/src/TestHelpers_MockDb.res.hbs | 12 +- .../dynamic/codegen/src/Types.res.hbs | 12 +- .../dynamic/codegen/src/db/Entities.res.hbs | 24 +- .../static/codegen/src/EventProcessing.res | 9 +- .../static/codegen/src/InMemoryStore.res | 94 +++++ .../static/codegen/src/InMemoryTable.res | 42 +-- .../static/codegen/src/ReorgDetection.res | 35 +- .../static/codegen/src/db/DbFunctions.res | 326 ++++++++++++------ .../src/db/DbFunctionsImplementation.js | 189 +++++++--- .../static/codegen/src/db/EntityHistory.res | 72 ++-- .../src/eventFetching/ChainFetcher.res | 29 +- .../codegen/src/eventFetching/FetchState.res | 94 ++--- .../eventFetching/PartitionedFetchState.res | 9 +- .../codegen/src/globalState/GlobalState.res | 135 +++++--- .../test/RollbackMultichain_test.res | 53 +-- scenarios/test_codegen/test/Mock_test.res | 5 +- .../test/lib_tests/EntityHistory_test.res | 68 +++- .../test/lib_tests/FetchState_test.res | 10 +- 20 files changed, 908 insertions(+), 502 deletions(-) delete mode 100644 codegenerator/cli/templates/dynamic/codegen/src/InMemoryStore.res.hbs create mode 100644 codegenerator/cli/templates/static/codegen/src/InMemoryStore.res diff --git a/codegenerator/cli/templates/dynamic/codegen/src/IO.res.hbs b/codegenerator/cli/templates/dynamic/codegen/src/IO.res.hbs index b389473d5..c55fd1105 100644 --- a/codegenerator/cli/templates/dynamic/codegen/src/IO.res.hbs +++ b/codegenerator/cli/templates/dynamic/codegen/src/IO.res.hbs @@ -17,8 +17,8 @@ let getEntityHistoryItems = entityUpdates => { ) => { let (optPreviousEventIdentifier, entityHistoryItems) = prev - let {eventIdentifier, shouldSaveHistory, entityUpdateAction, entityId} = entityUpdate - let entityHistoryItems = if shouldSaveHistory { + let {eventIdentifier, entityUpdateAction, entityId} = entityUpdate + let entityHistoryItems = { let historyItem: EntityHistory.historyRow<_> = { current: { chain_id: eventIdentifier.chainId, @@ -38,10 +38,7 @@ let getEntityHistoryItems = entityUpdates => { }, } entityHistoryItems->Belt.Array.concat([historyItem]) - } else { - entityHistoryItems } - (Some(eventIdentifier), entityHistoryItems) }) @@ -51,19 +48,20 @@ let getEntityHistoryItems = entityUpdates => { let executeSetEntityWithHistory = ( type entity, sql: Postgres.sql, - ~rows: array>, + ~inMemoryStore: InMemoryStore.t, ~entityMod: module(Entities.Entity with type t = entity), ): promise => { + let rows = + inMemoryStore.entities + ->InMemoryStore.EntityTables.get(entityMod) + ->InMemoryTable.Entity.rows let module(EntityMod) = entityMod let (entitiesToSet, idsToDelete, entityHistoryItemsToSet) = rows->Belt.Array.reduce( ([], [], []), ((entitiesToSet, idsToDelete, entityHistoryItemsToSet), row) => { switch row { | Updated({latest, history}) => - let entityHistoryItems = - history - ->Belt.Array.concat([latest]) - ->getEntityHistoryItems + let entityHistoryItems = history->getEntityHistoryItems switch latest.entityUpdateAction { | Set(entity) => ( @@ -86,6 +84,7 @@ let executeSetEntityWithHistory = ( EntityMod.entityHistory->EntityHistory.batchInsertRows( ~sql, ~rows=Belt.Array.concatMany(entityHistoryItemsToSet), + ~shouldCopyCurrentEntity=!(inMemoryStore->InMemoryStore.isRollingBack), ), if entitiesToSet->Array.length > 0 { sql->DbFunctionsEntities.batchSet(~entityMod)(entitiesToSet) @@ -105,9 +104,14 @@ let executeSetEntityWithHistory = ( let executeDbFunctionsEntity = ( type entity, sql: Postgres.sql, - ~rows: array>, + ~inMemoryStore: InMemoryStore.t, ~entityMod: module(Entities.Entity with type t = entity), ): promise => { + let rows = + inMemoryStore.entities + ->InMemoryStore.EntityTables.get(entityMod) + ->InMemoryTable.Entity.rows + let (entitiesToSet, idsToDelete) = rows->Belt.Array.reduce(([], []), ( (accumulatedSets, accumulatedDeletes), row, @@ -139,9 +143,9 @@ let executeDbFunctionsEntity = ( promises->Promise.all->Promise.thenResolve(_ => ()) } -let executeBatch = async (sql, ~inMemoryStore: InMemoryStore.t, ~isInReorgThreshold) => { +let executeBatch = async (sql, ~inMemoryStore: InMemoryStore.t, ~isInReorgThreshold, ~config) => { let entityDbExecutionComposer = - RegisterHandlers.getConfig()->Config.shouldSaveHistory(~isInReorgThreshold) + config->Config.shouldSaveHistory(~isInReorgThreshold) ? executeSetEntityWithHistory : executeDbFunctionsEntity @@ -167,7 +171,7 @@ let executeBatch = async (sql, ~inMemoryStore: InMemoryStore.t, ~isInReorgThresh let set{{entity.name.capitalized}}s = entityDbExecutionComposer( _, ~entityMod=module(Entities.{{entity.name.capitalized}}), - ~rows=inMemoryStore.{{entity.name.uncapitalized}}->InMemoryTable.Entity.rows, + ~inMemoryStore, ) {{/each}} @@ -177,7 +181,9 @@ let executeBatch = async (sql, ~inMemoryStore: InMemoryStore.t, ~isInReorgThresh let rollbackTables = switch inMemoryStore.rollBackEventIdentifier { | Some(eventIdentifier) => [ - DbFunctions.EntityHistory.deleteAllEntityHistoryAfterEventIdentifier, + DbFunctions.EntityHistory.deleteAllEntityHistoryAfterEventIdentifier( + ~isUnorderedMultichainMode=config.isUnorderedMultichainMode, + ), DbFunctions.RawEvents.deleteAllRawEventsAfterEventIdentifier, DbFunctions.DynamicContractRegistry.deleteAllDynamicContractRegistrationsAfterEventIdentifier, ]->Belt.Array.map(fn => fn(_, ~eventIdentifier)) @@ -204,19 +210,13 @@ let executeBatch = async (sql, ~inMemoryStore: InMemoryStore.t, ~isInReorgThresh module RollBack = { exception DecodeError(S.error) - let rollBack = async (~chainId, ~blockTimestamp, ~blockNumber, ~logIndex) => { - let reorgData = switch await DbFunctions.sql->DbFunctions.EntityHistory.getRollbackDiff( - ~chainId, - ~blockTimestamp, - ~blockNumber, - ) { - | Ok(v) => v - | Error(exn) => - exn - ->DecodeError - ->ErrorHandling.mkLogAndRaise(~msg="Failed to get rollback diff from entity history") - } - + let rollBack = async ( + ~chainId, + ~blockTimestamp, + ~blockNumber, + ~logIndex, + ~isUnorderedMultichainMode, + ) => { let rollBackEventIdentifier: Types.eventIdentifier = { chainId, blockTimestamp, @@ -224,32 +224,56 @@ module RollBack = { logIndex, } - let inMemStore = InMemoryStore.makeWithRollBackEventIdentifier(Some(rollBackEventIdentifier)) - - //Don't save the rollback diffs to history table - let shouldSaveHistory = false + let inMemStore = InMemoryStore.make(~rollBackEventIdentifier) - reorgData->Belt.Array.forEach(e => { - switch e { - //Where previousEntity is Some, - //set the value with the eventIdentifier that set that value initially - {{#each entities as | entity |}} - | {previousEntity: Some({entity: {{entity.name.capitalized}}(entity), eventIdentifier}), entityId} => - inMemStore.{{entity.name.uncapitalized}}->InMemoryTable.Entity.set( - Set(entity)->Types.mkEntityUpdate(~eventIdentifier, ~entityId, ~shouldSaveHistory), - ~shouldSaveHistory, - ) - {{/each}} - //Where previousEntity is None, - //delete it with the eventIdentifier of the rollback event - {{#each entities as | entity |}} - | {previousEntity: None, entityType: {{entity.name.capitalized}}, entityId} => - inMemStore.{{entity.name.uncapitalized}}->InMemoryTable.Entity.set( - Delete->Types.mkEntityUpdate(~eventIdentifier=rollBackEventIdentifier, ~entityId, ~shouldSaveHistory), - ~shouldSaveHistory, + await Utils.Array.awaitEach(Entities.allEntities, async entityMod => { + let module(Entity) = entityMod + let entityMod = + entityMod->( + Utils.magic: module(Entities.InternalEntity) => module(Entities.Entity with + type t = 'entity + ) ) - {{/each}} - } + + let diff = await DbFunctions.sql->DbFunctions.EntityHistory.getRollbackDiff( + isUnorderedMultichainMode + ? UnorderedMultichain({ + reorgChainId: chainId, + safeBlockNumber: blockNumber, + }) + : OrderedMultichain({ + safeBlockTimestamp: blockTimestamp, + reorgChainId: chainId, + safeBlockNumber: blockNumber, + }), + ~entityMod, + ) + + let entityTable = inMemStore.entities->InMemoryStore.EntityTables.get(entityMod) + + diff->Belt.Array.forEach(historyRow => { + let eventIdentifier: Types.eventIdentifier = { + chainId: historyRow.current.chain_id, + blockNumber: historyRow.current.block_number, + logIndex: historyRow.current.log_index, + blockTimestamp: historyRow.current.block_timestamp, + } + switch historyRow.entityData { + | Set(entity) => + entityTable->InMemoryTable.Entity.set( + Set(entity)->Types.mkEntityUpdate( + ~eventIdentifier, + ~entityId=entity->Entities.getEntityId, + ), + ~shouldSaveHistory=false, + ) + | Delete({id}) => + entityTable->InMemoryTable.Entity.set( + Delete->Types.mkEntityUpdate(~eventIdentifier, ~entityId=id), + ~shouldSaveHistory=false, + ) + } + }) }) inMemStore diff --git a/codegenerator/cli/templates/dynamic/codegen/src/InMemoryStore.res.hbs b/codegenerator/cli/templates/dynamic/codegen/src/InMemoryStore.res.hbs deleted file mode 100644 index 83eb04050..000000000 --- a/codegenerator/cli/templates/dynamic/codegen/src/InMemoryStore.res.hbs +++ /dev/null @@ -1,64 +0,0 @@ - -@genType -type rawEventsKey = { - chainId: int, - eventId: string, -} - -let hashRawEventsKey = (key: rawEventsKey) => - EventUtils.getEventIdKeyString(~chainId=key.chainId, ~eventId=key.eventId) - -@genType -type dynamicContractRegistryKey = { - chainId: int, - contractAddress: Address.t, -} - -let hashDynamicContractRegistryKey = ({chainId, contractAddress}) => - EventUtils.getContractAddressKeyString(~chainId, ~contractAddress) - -type t = { - eventSyncState: InMemoryTable.t, - rawEvents: InMemoryTable.t, - dynamicContractRegistry: InMemoryTable.t< - dynamicContractRegistryKey, - TablesStatic.DynamicContractRegistry.t, - >, - {{#each entities as |entity|}} - @as("{{entity.name.original}}") {{!-- The @as is Needed for the hacky getInMemTable implementation. TODO: Remove after migrating to static codegen --}} - {{entity.name.uncapitalized}}: InMemoryTable.Entity.t, - {{/each}} - rollBackEventIdentifier: option, -} - -let makeWithRollBackEventIdentifier = (rollBackEventIdentifier): t => { - eventSyncState: InMemoryTable.make(~hash=v => v->Belt.Int.toString), - rawEvents: InMemoryTable.make(~hash=hashRawEventsKey), - dynamicContractRegistry: InMemoryTable.make(~hash=hashDynamicContractRegistryKey), - {{#each entities as |entity|}} - {{entity.name.uncapitalized}}: InMemoryTable.Entity.make(), - {{/each}} - rollBackEventIdentifier, -} - -let make = () => makeWithRollBackEventIdentifier(None) - -let clone = (self: t) => { - eventSyncState: self.eventSyncState->InMemoryTable.clone, - rawEvents: self.rawEvents->InMemoryTable.clone, - dynamicContractRegistry: self.dynamicContractRegistry->InMemoryTable.clone, - {{#each entities as |entity|}} - {{entity.name.uncapitalized}}: self.{{entity.name.uncapitalized}}->InMemoryTable.Entity.clone, - {{/each}} - rollBackEventIdentifier: self.rollBackEventIdentifier->InMemoryTable.structuredClone, -} - - -let getInMemTable = ( - type entity, - inMemoryStore: t, - ~entityMod: module(Entities.Entity with type t = entity), -): InMemoryTable.Entity.t => { - let module(Entity) = entityMod->Entities.entityModToInternal - inMemoryStore->Utils.magic->Js.Dict.unsafeGet(Entity.key) -} diff --git a/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbs b/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbs index 2094af1f2..b0e9ab2a9 100644 --- a/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbs +++ b/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbs @@ -237,7 +237,7 @@ let rec makeWithInMemoryStore: InMemoryStore.t => t = (inMemoryStore: InMemorySt makeStoreOperatorEntity( ~inMemoryStore, ~makeMockDb=makeWithInMemoryStore, - ~getStore=db => db.{{entity.name.uncapitalized}}, + ~getStore=db => db.entities->InMemoryStore.EntityTables.get(module(Entities.{{entity.name.capitalized}})), ~getKey=({id}) => id, ) }, @@ -297,11 +297,15 @@ A function composer for simulating the writing of an inMemoryStore to the extern Runs all set and delete operations currently cached in an inMemory store against the mockDb */ let executeRowsEntity = ( + type entity, mockDb: t, ~inMemoryStore: InMemoryStore.t, - ~getInMemTable: InMemoryStore.t => InMemoryTable.Entity.t<'entity>, - ~getKey: 'entity => 'key, + ~entityMod: module(Entities.Entity with type t = entity), + ~getKey: entity => 'key, ) => { + let getInMemTable = (inMemoryStore: InMemoryStore.t) => + inMemoryStore.entities->InMemoryStore.EntityTables.get(entityMod) + let inMemTable = getInMemTable(inMemoryStore) inMemTable.table @@ -377,7 +381,7 @@ let writeFromMemoryStore = (mockDb: t, ~inMemoryStore: InMemoryStore.t) => { {{#each entities as | entity |}} mockDb->executeRowsEntity( ~inMemoryStore, - ~getInMemTable=self => {self.{{entity.name.uncapitalized}}}, + ~entityMod=module(Entities.{{entity.name.capitalized}}), ~getKey=entity => entity.id, ) {{/each}} diff --git a/codegenerator/cli/templates/dynamic/codegen/src/Types.res.hbs b/codegenerator/cli/templates/dynamic/codegen/src/Types.res.hbs index eb51a40de..69b3fb5a0 100644 --- a/codegenerator/cli/templates/dynamic/codegen/src/Types.res.hbs +++ b/codegenerator/cli/templates/dynamic/codegen/src/Types.res.hbs @@ -61,14 +61,12 @@ type entityUpdateAction<'entityType> = type entityUpdate<'entityType> = { eventIdentifier: eventIdentifier, - shouldSaveHistory: bool, entityId: id, entityUpdateAction: entityUpdateAction<'entityType>, } -let mkEntityUpdate = (~shouldSaveHistory=true, ~eventIdentifier, ~entityId, entityUpdateAction) => { +let mkEntityUpdate = (~eventIdentifier, ~entityId, entityUpdateAction) => { entityId, - shouldSaveHistory, eventIdentifier, entityUpdateAction, } @@ -77,17 +75,11 @@ type entityValueAtStartOfBatch<'entityType> = | NotSet // The entity isn't in the DB yet | AlreadySet('entityType) -type existingValueInDb<'entityType> = - | Retrieved(entityValueAtStartOfBatch<'entityType>) - // NOTE: We use an postgres function solve the issue of this entities previous value not being known. - | Unknown - type updatedValue<'entityType> = { - // Initial value within a batch - initial: existingValueInDb<'entityType>, latest: entityUpdate<'entityType>, history: array>, } + @genType type inMemoryStoreRowEntity<'entityType> = | Updated(updatedValue<'entityType>) diff --git a/codegenerator/cli/templates/dynamic/codegen/src/db/Entities.res.hbs b/codegenerator/cli/templates/dynamic/codegen/src/db/Entities.res.hbs index ddb88d79e..5b159b3f0 100644 --- a/codegenerator/cli/templates/dynamic/codegen/src/db/Entities.res.hbs +++ b/codegenerator/cli/templates/dynamic/codegen/src/db/Entities.res.hbs @@ -107,17 +107,25 @@ let getEntityParamsDecoder = (entityName: Enums.EntityType.t) => {{/each}} } -let allTables: array = [ -{{#each entities as |entity|}} - {{entity.name.capitalized}}.table, -{{/each}} -] +let allEntities: array = + [ -let allEntityHistory: array> = [ {{#each entities as |entity|}} - {{entity.name.capitalized}}.entityHistory->EntityHistory.castInternal, + module({{entity.name.capitalized}}), {{/each}} -] + ]->(Utils.magic: array => array) + +let allTables: array
= allEntities->Belt.Array.map(entityMod => { + let module(Entity) = entityMod + Entity.table +}) + +let allEntityHistory: array< + EntityHistory.t, +> = allEntities->Belt.Array.map(entityMod => { + let module(Entity) = entityMod + Entity.entityHistory->EntityHistory.castInternal +}) let schema = Schema.make(allTables) diff --git a/codegenerator/cli/templates/static/codegen/src/EventProcessing.res b/codegenerator/cli/templates/static/codegen/src/EventProcessing.res index c8b7574b5..4a08b36ac 100644 --- a/codegenerator/cli/templates/static/codegen/src/EventProcessing.res +++ b/codegenerator/cli/templates/static/codegen/src/EventProcessing.res @@ -505,6 +505,7 @@ let getDynamicContractRegistrations = ( ~eventBatch: array, ~latestProcessedBlocks: EventsProcessed.t, ~checkContractIsRegistered, + ~config, ) => { let logger = Logging.createChild( ~params={ @@ -530,7 +531,11 @@ let getDynamicContractRegistrations = ( ->propogate //We only preregister below the reorg threshold so it can be hardcoded as false - switch await DbFunctions.sql->IO.executeBatch(~inMemoryStore, ~isInReorgThreshold=false) { + switch await DbFunctions.sql->IO.executeBatch( + ~inMemoryStore, + ~isInReorgThreshold=false, + ~config, + ) { | exception exn => exn->ErrorHandling.make(~msg="Failed writing batch to database", ~logger)->Error->propogate | () => () @@ -597,7 +602,7 @@ let processEventBatch = ( let elapsedTimeAfterProcess = timeRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis - switch await DbFunctions.sql->IO.executeBatch(~inMemoryStore, ~isInReorgThreshold) { + switch await DbFunctions.sql->IO.executeBatch(~inMemoryStore, ~isInReorgThreshold, ~config) { | exception exn => exn->ErrorHandling.make(~msg="Failed writing batch to database", ~logger)->Error->propogate | () => () diff --git a/codegenerator/cli/templates/static/codegen/src/InMemoryStore.res b/codegenerator/cli/templates/static/codegen/src/InMemoryStore.res new file mode 100644 index 000000000..77fb3464d --- /dev/null +++ b/codegenerator/cli/templates/static/codegen/src/InMemoryStore.res @@ -0,0 +1,94 @@ +@genType +type rawEventsKey = { + chainId: int, + eventId: string, +} + +let hashRawEventsKey = (key: rawEventsKey) => + EventUtils.getEventIdKeyString(~chainId=key.chainId, ~eventId=key.eventId) + +@genType +type dynamicContractRegistryKey = { + chainId: int, + contractAddress: Address.t, +} + +let hashDynamicContractRegistryKey = ({chainId, contractAddress}) => + EventUtils.getContractAddressKeyString(~chainId, ~contractAddress) + +module EntityTables = { + type t = dict> + exception UndefinedEntity(string) + let make = (entities: array): t => { + let init = Js.Dict.empty() + entities->Belt.Array.forEach(entity => { + let module(Entity) = entity + init->Js.Dict.set(Entity.key, InMemoryTable.Entity.make()) + }) + init + } + + let get = (type entity, self: t, entityMod: module(Entities.Entity with type t = entity)) => { + let module(Entity) = entityMod + switch self->Utils.Dict.dangerouslyGetNonOption(Entity.key) { + | Some(table) => + table->( + Utils.magic: InMemoryTable.Entity.t => InMemoryTable.Entity.t< + entity, + > + ) + + | None => + UndefinedEntity(Entity.key)->ErrorHandling.mkLogAndRaise( + ~msg="Unexpected, entity InMemoryTable is undefined", + ) + } + } + + let clone = (self: t) => { + self + ->Js.Dict.entries + ->Belt.Array.map(((k, v)) => (k, v->InMemoryTable.Entity.clone)) + ->Js.Dict.fromArray + } +} + +type t = { + eventSyncState: InMemoryTable.t, + rawEvents: InMemoryTable.t, + dynamicContractRegistry: InMemoryTable.t< + dynamicContractRegistryKey, + TablesStatic.DynamicContractRegistry.t, + >, + entities: Js.Dict.t>, + rollBackEventIdentifier: option, +} + +let make = ( + ~entities: array=Entities.allEntities, + ~rollBackEventIdentifier=?, +): t => { + eventSyncState: InMemoryTable.make(~hash=v => v->Belt.Int.toString), + rawEvents: InMemoryTable.make(~hash=hashRawEventsKey), + dynamicContractRegistry: InMemoryTable.make(~hash=hashDynamicContractRegistryKey), + entities: EntityTables.make(entities), + rollBackEventIdentifier, +} + +let clone = (self: t) => { + eventSyncState: self.eventSyncState->InMemoryTable.clone, + rawEvents: self.rawEvents->InMemoryTable.clone, + dynamicContractRegistry: self.dynamicContractRegistry->InMemoryTable.clone, + entities: self.entities->EntityTables.clone, + rollBackEventIdentifier: self.rollBackEventIdentifier->InMemoryTable.structuredClone, +} + +let getInMemTable = ( + type entity, + inMemoryStore: t, + ~entityMod: module(Entities.Entity with type t = entity), +): InMemoryTable.Entity.t => { + inMemoryStore.entities->EntityTables.get(entityMod) +} + +let isRollingBack = (inMemoryStore: t) => inMemoryStore.rollBackEventIdentifier->Belt.Option.isSome diff --git a/codegenerator/cli/templates/static/codegen/src/InMemoryTable.res b/codegenerator/cli/templates/static/codegen/src/InMemoryTable.res index 54aa1f22a..b3a9488f7 100644 --- a/codegenerator/cli/templates/static/codegen/src/InMemoryTable.res +++ b/codegenerator/cli/templates/static/codegen/src/InMemoryTable.res @@ -143,40 +143,40 @@ module Entity = { entityUpdate: Types.entityUpdate<'entity>, ~shouldSaveHistory, ) => { + //New entity row with only the latest update + let newEntityRow = Types.Updated({ + latest: entityUpdate, + history: shouldSaveHistory ? [entityUpdate] : [], + }) + let {entityRow, entityIndices} = switch inMemTable.table->get(entityUpdate.entityId) { - | Some({entityRow: InitialReadFromDb(entity_read), entityIndices}) => - let entityRow = Types.Updated({ - initial: Retrieved(entity_read), - latest: entityUpdate, - history: [], - }) - {entityRow, entityIndices} + | None => {entityRow: newEntityRow, entityIndices: Utils.Set.make()} + | Some({entityRow: InitialReadFromDb(_), entityIndices}) => { + entityRow: newEntityRow, + entityIndices, + } | Some({entityRow: Updated(previous_values), entityIndices}) - if !shouldSaveHistory || - //Rollback initial state cases should not save history - !previous_values.latest.shouldSaveHistory || // This prevents two db actions in the same event on the same entity from being recorded to the history table. - previous_values.latest.eventIdentifier == entityUpdate.eventIdentifier => + if previous_values.latest.eventIdentifier == entityUpdate.eventIdentifier && + shouldSaveHistory => let entityRow = Types.Updated({ - ...previous_values, latest: entityUpdate, + history: previous_values.history->Utils.Array.setIndexImmutable( + previous_values.history->Array.length - 1, + entityUpdate, + ), }) {entityRow, entityIndices} | Some({entityRow: Updated(previous_values), entityIndices}) => let entityRow = Types.Updated({ - initial: previous_values.initial, latest: entityUpdate, - history: previous_values.history->Array.concat([previous_values.latest]), + history: shouldSaveHistory + ? [...previous_values.history, entityUpdate] + : previous_values.history, }) {entityRow, entityIndices} - | None => - let entityRow = Types.Updated({ - initial: Unknown, - latest: entityUpdate, - history: [], - }) - {entityRow, entityIndices: Utils.Set.make()} } + switch entityUpdate.entityUpdateAction { | Set(entity) => inMemTable->updateIndices(~entity, ~entityIndices) | Delete => inMemTable->deleteEntityFromIndices(~entityId=entityUpdate.entityId, ~entityIndices) diff --git a/codegenerator/cli/templates/static/codegen/src/ReorgDetection.res b/codegenerator/cli/templates/static/codegen/src/ReorgDetection.res index b2dfd4bcd..30a067f08 100644 --- a/codegenerator/cli/templates/static/codegen/src/ReorgDetection.res +++ b/codegenerator/cli/templates/static/codegen/src/ReorgDetection.res @@ -73,9 +73,9 @@ module LastBlockScannedHashes: { /** Return a BlockNumbersAndHashes.t rolled back to where blockData is less - than or equal to the provided blockTimestamp + than the provided blockNumber */ - let rollBackToBlockTimestampLte: (~blockTimestamp: int, t) => t + let rollBackToBlockNumberLt: (~blockNumber: int, t) => t } = { type t = { // Number of blocks behind head, we want to keep track @@ -330,39 +330,40 @@ module LastBlockScannedHashes: { ->Belt.Result.map(list => list->makeWithDataInternal(~confirmedBlockThreshold)) } - let rec rollBackToBlockTimestampLteInternal = ( - ~blockTimestamp: int, + let min = (arrInt: array) => { + arrInt->Belt.Array.reduce(None, (current, val) => { + switch current { + | None => Some(val) + | Some(current) => Js.Math.min_int(current, val)->Some + } + }) + } + + let rec rollBackToBlockNumberLtInternal = ( + ~blockNumber: int, latestBlockScannedData: list, ) => { switch latestBlockScannedData { | list{} => list{} | list{head, ...tail} => - if head.blockTimestamp <= blockTimestamp { + if head.blockNumber < blockNumber { latestBlockScannedData } else { - tail->rollBackToBlockTimestampLteInternal(~blockTimestamp) + tail->rollBackToBlockNumberLtInternal(~blockNumber) } } } /** Return a BlockNumbersAndHashes.t rolled back to where blockData is less - than or equal to the provided blockTimestamp + than the provided blockNumber */ - let rollBackToBlockTimestampLte = (~blockTimestamp: int, self: t) => { + let rollBackToBlockNumberLt = (~blockNumber: int, self: t) => { let {confirmedBlockThreshold, lastBlockScannedDataList} = self lastBlockScannedDataList - ->rollBackToBlockTimestampLteInternal(~blockTimestamp) + ->rollBackToBlockNumberLtInternal(~blockNumber) ->makeWithDataInternal(~confirmedBlockThreshold) } - let min = (arrInt: array) => { - arrInt->Belt.Array.reduce(None, (current, val) => { - switch current { - | None => Some(val) - | Some(current) => Js.Math.min_int(current, val)->Some - } - }) - } type currentHeightAndLastBlockHashes = { currentHeight: int, diff --git a/codegenerator/cli/templates/static/codegen/src/db/DbFunctions.res b/codegenerator/cli/templates/static/codegen/src/db/DbFunctions.res index b04b20cf6..4afcb405f 100644 --- a/codegenerator/cli/templates/static/codegen/src/db/DbFunctions.res +++ b/codegenerator/cli/templates/static/codegen/src/db/DbFunctions.res @@ -241,34 +241,6 @@ module DynamicContractRegistry = { } } -type entityHistoryItem = { - block_timestamp: int, - chain_id: int, - block_number: int, - log_index: int, - previous_block_timestamp: option, - previous_chain_id: option, - previous_block_number: option, - previous_log_index: option, - params: option, - entity_type: string, - entity_id: string, -} - -let entityHistoryItemSchema = S.object(s => { - block_timestamp: s.field("block_timestamp", S.int), - chain_id: s.field("chain_id", S.int), - block_number: s.field("block_number", S.int), - log_index: s.field("log_index", S.int), - previous_block_timestamp: s.field("previous_block_timestamp", S.null(S.int)), - previous_chain_id: s.field("previous_chain_id", S.null(S.int)), - previous_block_number: s.field("previous_block_number", S.null(S.int)), - previous_log_index: s.field("previous_log_index", S.null(S.int)), - params: s.field("params", S.null(S.json(~validate=false))), - entity_type: s.field("entity_type", S.string), - entity_id: s.field("entity_id", S.string), -}) - module EntityHistory = { //Given chainId, blockTimestamp, blockNumber //Delete all rows where chain_id = chainId and block_timestamp < blockTimestamp and block_number < blockNumber @@ -287,101 +259,239 @@ module EntityHistory = { ~entityHistoriesToSet: array, ) => promise = "batchInsertEntityHistory" - let batchSet = (sql, ~entityHistoriesToSet) => { - //Encode null for for the with prev types so that it's not undefined - batchSetInternal( - sql, - ~entityHistoriesToSet=entityHistoriesToSet->Belt.Array.map(v => - v->S.serializeOrRaiseWith(entityHistoryItemSchema) - ), - ) + type dynamicSqlQuery + module UnorderedMultichain = { + @module("./DbFunctionsImplementation.js") + external getFirstChangeSerial: ( + Postgres.sql, + ~reorgChainId: int, + ~safeBlockNumber: int, + ~entityName: Enums.EntityType.t, + ) => dynamicSqlQuery = "getFirstChangeSerial_UnorderedMultichain" + } + + module OrderedMultichain = { + @module("./DbFunctionsImplementation.js") + external getFirstChangeSerial: ( + Postgres.sql, + ~safeBlockTimestamp: int, + ~reorgChainId: int, + ~safeBlockNumber: int, + ~entityName: Enums.EntityType.t, + ) => dynamicSqlQuery = "getFirstChangeSerial_OrderedMultichain" } @module("./DbFunctionsImplementation.js") - external deleteAllEntityHistoryAfterEventIdentifier: ( + external getFirstChangeEntityHistoryPerChain: ( Postgres.sql, - ~eventIdentifier: Types.eventIdentifier, - ) => promise = "deleteAllEntityHistoryAfterEventIdentifier" - - type rollbackDiffResponseRaw = { - entity_type: Enums.EntityType.t, - entity_id: string, - chain_id: option, - block_timestamp: option, - block_number: option, - log_index: option, - val: option, - } + ~entityName: Enums.EntityType.t, + ~getFirstChangeSerial: Postgres.sql => dynamicSqlQuery, + ) => promise = "getFirstChangeEntityHistoryPerChain" - let rollbackDiffResponseRawSchema = S.object(s => { - entity_type: s.field("entity_type", Enums.EntityType.enum.schema), - entity_id: s.field("entity_id", S.string), - chain_id: s.field("chain_id", S.null(S.int)), - block_timestamp: s.field("block_timestamp", S.null(S.int)), - block_number: s.field("block_number", S.null(S.int)), - log_index: s.field("log_index", S.null(S.int)), - val: s.field("val", S.null(S.json(~validate=false))), - }) - - type previousEntity = { - eventIdentifier: Types.eventIdentifier, - entity: Entities.entity, - } + @module("./DbFunctionsImplementation.js") + external getRollbackDiffInternal: ( + Postgres.sql, + ~entityName: Enums.EntityType.t, + ~getFirstChangeSerial: Postgres.sql => dynamicSqlQuery, + ) => //Returns an array of entity history rows + promise = "getRollbackDiff" - type rollbackDiffResponse = { - entityType: Enums.EntityType.t, - entityId: string, - previousEntity: option, + @module("./DbFunctionsImplementation.js") + external deleteRolledBackEntityHistory: ( + Postgres.sql, + ~entityName: Enums.EntityType.t, + ~getFirstChangeSerial: Postgres.sql => dynamicSqlQuery, + ) => promise = "deleteRolledBackEntityHistory" + + module Args = { + type t = + | OrderedMultichain({safeBlockTimestamp: int, reorgChainId: int, safeBlockNumber: int}) + | UnorderedMultichain({reorgChainId: int, safeBlockNumber: int}) + + /** + Uses two different methods for determining the first change event after rollback block + + This is needed since unordered multichain mode only cares about any changes that + occurred after the first change on the reorg chain. To prevent skipping or double processing events + on the other chains. If for instance there are no entity changes based on the reorg chain, the other + chains do not need to be rolled back, and if the reorg chain has new included events, it does not matter + that if those events are processed out of order from other chains since this is "unordered_multichain_mode" + + Ordered multichain mode needs to ensure that all chains rollback to any event that occurred after the reorg chain + block number. Regardless of whether the reorg chain incurred any changes or not to entities. + */ + let makeGetFirstChangeSerial = (self: t, ~entityName) => + switch self { + | OrderedMultichain({safeBlockTimestamp, reorgChainId, safeBlockNumber}) => + sql => + OrderedMultichain.getFirstChangeSerial( + sql, + ~safeBlockTimestamp, + ~reorgChainId, + ~safeBlockNumber, + ~entityName, + ) + | UnorderedMultichain({reorgChainId, safeBlockNumber}) => + sql => + UnorderedMultichain.getFirstChangeSerial( + sql, + ~reorgChainId, + ~safeBlockNumber, + ~entityName, + ) + } + + let getLogger = (self: t, ~entityName) => { + switch self { + | OrderedMultichain({safeBlockTimestamp, reorgChainId, safeBlockNumber}) => + Logging.createChild( + ~params={ + "type": "OrderedMultichain", + "safeBlockTimestamp": safeBlockTimestamp, + "reorgChainId": reorgChainId, + "safeBlockNumber": safeBlockNumber, + "entityName": entityName, + }, + ) + | UnorderedMultichain({reorgChainId, safeBlockNumber}) => + Logging.createChild( + ~params={ + "type": "UnorderedMultichain", + "reorgChainId": reorgChainId, + "safeBlockNumber": safeBlockNumber, + "entityName": entityName, + }, + ) + } + } } - let rollbackDiffResponse_decode = (json: Js.Json.t) => { - json - ->S.parseWith(rollbackDiffResponseRawSchema) - ->Belt.Result.flatMap(raw => { - switch raw { - | { - val: Some(val), - chain_id: Some(chainId), - block_number: Some(blockNumber), - block_timestamp: Some(blockTimestamp), - log_index: Some(logIndex), - entity_type, - } => - Entities.getEntityParamsDecoder(entity_type)(val)->Belt.Result.map(entity => { - let eventIdentifier: Types.eventIdentifier = { - chainId, - blockTimestamp, - blockNumber, - logIndex, - } - - Some({entity, eventIdentifier}) + let deleteAllEntityHistoryAfterEventIdentifier = (~isUnorderedMultichainMode) => ( + sql, + ~eventIdentifier: Types.eventIdentifier, + ): promise => { + let {chainId, blockNumber, blockTimestamp} = eventIdentifier + let args: Args.t = isUnorderedMultichainMode + ? UnorderedMultichain({reorgChainId: chainId, safeBlockNumber: blockNumber}) + : OrderedMultichain({ + reorgChainId: chainId, + safeBlockNumber: blockNumber, + safeBlockTimestamp: blockTimestamp, }) - | _ => Ok(None) - }->Belt.Result.map(previousEntity => { - entityType: raw.entity_type, - entityId: raw.entity_id, - previousEntity, - }) + + Entities.allEntities + ->Belt.Array.map(async entityMod => { + let module(Entity) = entityMod + try await deleteRolledBackEntityHistory( + sql, + ~entityName=Entity.name, + ~getFirstChangeSerial=args->Args.makeGetFirstChangeSerial(~entityName=Entity.name), + ) catch { + | exn => + exn->ErrorHandling.mkLogAndRaise( + ~msg=`Failed to delete rolled back entity history`, + ~logger=args->Args.getLogger(~entityName=Entity.name), + ) + } }) + ->Promise.all + ->Promise.thenResolve(_ => ()) + } + + let getRollbackDiff = async ( + type entity, + sql, + args: Args.t, + ~entityMod: module(Entities.Entity with type t = entity), + ) => { + let module(Entity) = entityMod + + let diffRes = switch await getRollbackDiffInternal( + sql, + ~getFirstChangeSerial=args->Args.makeGetFirstChangeSerial(~entityName=Entity.name), + ~entityName=Entity.name, + ) { + | exception exn => + exn->ErrorHandling.mkLogAndRaise( + ~msg="Failed to get rollback diff from entity history", + ~logger=args->Args.getLogger(~entityName=Entity.name), + ) + | res => res + } + + switch diffRes->S.parseAnyOrRaiseWith(Entity.entityHistory.schemaRows) { + | exception exn => + exn->ErrorHandling.mkLogAndRaise( + ~msg="Failed to parse rollback diff from entity history", + ~logger=args->Args.getLogger(~entityName=Entity.name), + ) + | diffRows => diffRows + } } - let rollbackDiffResponseArr_decode = (jsonArr: array) => { - jsonArr->Belt.Array.map(rollbackDiffResponse_decode)->Utils.Array.transposeResults + module FirstChangeEventPerChain = { + type t = Js.Dict.t + let getKey = chainId => chainId->Belt.Int.toString + let make = () => Js.Dict.empty() + let get = (self: t, ~chainId) => self->Utils.Dict.dangerouslyGetNonOption(getKey(chainId)) + + let setIfEarlier = (self: t, ~chainId, ~event: FetchState.blockNumberAndLogIndex) => { + let chainKey = chainId->Belt.Int.toString + switch self->Utils.Dict.dangerouslyGetNonOption(chainKey) { + | Some(existingEvent) => + if ( + (event.blockNumber, event.logIndex) < (existingEvent.blockNumber, existingEvent.logIndex) + ) { + self->Js.Dict.set(chainKey, event) + } + | None => self->Js.Dict.set(chainKey, event) + } + } } - @module("./DbFunctionsImplementation.js") - external getRollbackDiffInternal: ( - Postgres.sql, - ~blockTimestamp: int, - ~chainId: int, - ~blockNumber: int, - ) => promise> = "getRollbackDiff" + let getFirstChangeEventPerChain = async (sql, args: Args.t) => { + let firstChangeEventPerChain = FirstChangeEventPerChain.make() + + let _ = + await Entities.allEntities + ->Belt.Array.map(async entityMod => { + let module(Entity) = entityMod + let res = try await getFirstChangeEntityHistoryPerChain( + sql, + ~entityName=Entity.name, + ~getFirstChangeSerial=args->Args.makeGetFirstChangeSerial(~entityName=Entity.name), + ) catch { + | exn => + exn->ErrorHandling.mkLogAndRaise( + ~msg=`Failed to get first change entity history per chain for entity`, + ~logger=args->Args.getLogger(~entityName=Entity.name), + ) + } + + let chainHistoryRows = try res->S.parseAnyOrRaiseWith( + Entity.entityHistory.schemaRows, + ) catch { + | exn => + exn->ErrorHandling.mkLogAndRaise( + ~msg=`Failed to parse entity history rows from db on getFirstChangeEntityHistoryPerChain`, + ~logger=args->Args.getLogger(~entityName=Entity.name), + ) + } + + chainHistoryRows->Belt.Array.forEach(chainHistoryRow => { + firstChangeEventPerChain->FirstChangeEventPerChain.setIfEarlier( + ~chainId=chainHistoryRow.current.chain_id, + ~event={ + blockNumber: chainHistoryRow.current.block_number, + logIndex: chainHistoryRow.current.log_index, + }, + ) + }) + }) + ->Promise.all - let getRollbackDiff = (sql, ~blockTimestamp: int, ~chainId: int, ~blockNumber: int) => - getRollbackDiffInternal(sql, ~blockTimestamp, ~chainId, ~blockNumber)->Promise.thenResolve( - rollbackDiffResponseArr_decode, - ) + firstChangeEventPerChain + } let copyTableToEntityHistory = (sql, ~sourceTableName: Enums.EntityType.t): promise => { sql->Postgres.unsafe(`SELECT copy_table_to_entity_history('${(sourceTableName :> string)}');`) diff --git a/codegenerator/cli/templates/static/codegen/src/db/DbFunctionsImplementation.js b/codegenerator/cli/templates/static/codegen/src/db/DbFunctionsImplementation.js index 5aa3de5f7..74eeba35c 100644 --- a/codegenerator/cli/templates/static/codegen/src/db/DbFunctionsImplementation.js +++ b/codegenerator/cli/templates/static/codegen/src/db/DbFunctionsImplementation.js @@ -16,8 +16,9 @@ const chunkBatchQuery = async (sql, entityDataArray, queryToExecute) => { const commaSeparateDynamicMapQuery = (sql, dynQueryConstructors) => sql`${dynQueryConstructors.map( (constrQuery, i) => - sql`${constrQuery(sql)}${i === dynQueryConstructors.length - 1 ? sql`` : sql`, ` - }`, + sql`${constrQuery(sql)}${ + i === dynQueryConstructors.length - 1 ? sql`` : sql`, ` + }`, )}`; const batchSetItemsInTableCore = (table, sql, rowDataArray) => { @@ -145,7 +146,7 @@ module.exports.batchSetChainMetadata = (sql, entityDataArray) => { "latest_fetched_block_number" = EXCLUDED."latest_fetched_block_number", "timestamp_caught_up_to_head_or_endblock" = EXCLUDED."timestamp_caught_up_to_head_or_endblock", "block_height" = EXCLUDED."block_height";` - .then((res) => { }) + .then((res) => {}) .catch((err) => { console.log("errored", err); }); @@ -165,7 +166,7 @@ module.exports.setChainMetadataBlockHeight = (sql, entityDataArray) => { SET "chain_id" = EXCLUDED."chain_id", "block_height" = EXCLUDED."block_height";` - .then((res) => { }) + .then((res) => {}) .catch((err) => { console.log("errored", err); }); @@ -436,8 +437,8 @@ module.exports.readDynamicContractsOnChainIdMatchingEvents = ( FROM "public"."dynamic_contract_registry" WHERE chain_id = ${chainId} AND (registering_event_contract_name, registering_event_name, registering_event_src_address) IN ${sql( - preRegisterEvents.map((item) => sql(item)), - )}; + preRegisterEvents.map((item) => sql(item)), + )}; `; }; @@ -475,49 +476,145 @@ module.exports.batchSetDynamicContractRegistry = (sql, entityDataArray) => { ); }; -// end db operations for dynamic_contract_registry +/** + Find the "first change" serial originating from the reorg chain above the safe block number + (Using serial to account for unordered multi chain reorgs, where an earier event on another chain could be rolled back) +*/ +module.exports.getFirstChangeSerial_UnorderedMultichain = ( + sql, + reorgChainId, + safeBlockNumber, + entityName, +) => + sql` + SELECT + MIN(serial) AS first_change_serial + FROM + public.${sql(entityName + "_history")} + WHERE + entity_history_chain_id = ${reorgChainId} + AND entity_history_block_number > ${safeBlockNumber} + `; -module.exports.getRollbackDiff = ( +/** + Find the "first change" serial originating from any chain above the provided safe block +*/ +module.exports.getFirstChangeSerial_OrderedMultichain = ( sql, - blockTimestamp, - chainId, - blockNumber, + safeBlockTimestamp, + reorgChainId, + safeBlockNumber, + entityName, +) => + sql` + SELECT + MIN(serial) AS first_change_serial + FROM + public.${sql(entityName + "_history")} + WHERE + entity_history_block_timestamp > ${safeBlockTimestamp} + OR + (entity_history_block_timestamp = ${safeBlockTimestamp} AND entity_history_chain_id > ${reorgChainId}) + OR + (entity_history_block_timestamp = ${safeBlockTimestamp} AND entity_history_chain_id = ${reorgChainId} AND entity_history_block_number > ${safeBlockNumber}) + `; + +module.exports.getFirstChangeEntityHistoryPerChain = ( + sql, + entityName, + getFirstChangeSerial, ) => sql` -SELECT DISTINCT - ON ( - COALESCE(old.entity_id, new.entity_id) - ) COALESCE(old.entity_id, new.entity_id) AS entity_id, - COALESCE(old.params, NULL) AS val, - COALESCE(old.block_timestamp, NULL) AS block_timestamp, - COALESCE(old.chain_id, NULL) AS chain_id, - COALESCE(old.block_number, NULL) AS block_number, - COALESCE(old.log_index, NULL) AS log_index, - COALESCE(old.entity_type, new.entity_type) AS entity_type - -FROM entity_history old -INNER JOIN - entity_history next - -- next should simply be a higher block multichain - ON ( - next.block_timestamp > ${blockTimestamp} - OR (next.block_timestamp = ${blockTimestamp} AND next.chain_id > ${chainId}) - OR ( - next.block_timestamp = ${blockTimestamp} AND next.chain_id = ${chainId} AND next.block_number >= ${blockNumber} - ) + WITH + first_change AS ( + -- Step 1: Find the "first change" serial originating from the reorg chain above the safe block number + -- (Using serial to account for unordered multi chain reorgs, where an earier event on another chain could be rolled back) + ${getFirstChangeSerial(sql)} ) - -- old should be a lower block multichain - AND ( - old.block_timestamp < ${blockTimestamp} - OR (old.block_timestamp = ${blockTimestamp} AND old.chain_id < ${chainId}) - OR (old.block_timestamp = ${blockTimestamp} AND old.chain_id = ${chainId} AND old.block_number <= ${blockNumber}) + -- Step 2: Distinct on entity_history_chain_id, get the entity_history_block_number of the row with the + -- lowest serial >= the first change serial + SELECT DISTINCT + ON (entity_history_chain_id) * + FROM + public.${sql(entityName + "_history")} + WHERE + serial >= ( + SELECT + first_change_serial + FROM + first_change + ) + ORDER BY + entity_history_chain_id, + serial + ASC; -- Select the row with the lowest serial per id +`; + +module.exports.deleteRolledBackEntityHistory = ( + sql, + entityName, + getFirstChangeSerial, +) => sql` + WITH + first_change AS ( + -- Step 1: Find the "first change" serial originating from the reorg chain above the safe block number + -- (Using serial to account for unordered multi chain reorgs, where an earier event on another chain could be rolled back) + ${getFirstChangeSerial(sql)} + ) + -- Step 2: Delete all rows that have a serial >= the first change serial + DELETE FROM + public.${sql(entityName + "_history")} + WHERE + serial >= ( + SELECT + first_change_serial + FROM + first_change + ); + `; + +module.exports.getRollbackDiff = (sql, entityName, getFirstChangeSerial) => sql` + WITH + first_change AS ( + -- Step 1: Find the "first change" serial originating from the reorg chain above the safe block number + -- (Using serial to account for unordered multi chain reorgs, where an earier event on another chain could be rolled back) + ${getFirstChangeSerial(sql)} + ), + rollback_ids AS ( + -- Step 2: Get all unique entity ids of rows that require rollbacks where the row's serial is above the first change serial + SELECT DISTINCT + ON (id) after.* + FROM + public.${sql(entityName + "_history")} after + WHERE + after.serial >= ( + SELECT + first_change_serial + FROM + first_change + ) + ORDER BY + after.id, + after.serial ASC -- Select the row with the lowest serial per id ) - -- old AND new ids AND entity types should match - AND old.entity_id = next.entity_id - AND old.entity_type = next.entity_type - AND old.block_number = next.previous_block_number -FULL OUTER JOIN - entity_history new - ON old.entity_id = new.entity_id - AND new.previous_block_number >= old.block_number -WHERE COALESCE(old.block_timestamp, 0) <= ${blockTimestamp} AND COALESCE(new.block_timestamp, ${blockTimestamp} + 1) >= ${blockTimestamp}; + -- Step 3: For each relevant id, join to the row on the "previous_entity_history" fields + SELECT + -- Select all before fields, overriding the needed values with defaults + before.*, + -- In the case where no previous row exists, coalesce the needed values since this new entity + -- will need to be deleted + COALESCE(before.id, after.id) AS id, + COALESCE(before.action, 'DELETE') AS action, + -- Deleting at 0 values will work fine for future rollbacks + COALESCE(before.entity_history_block_number, 0) AS entity_history_block_number, + COALESCE(before.entity_history_block_timestamp, 0) AS entity_history_block_timestamp, + COALESCE(before.entity_history_chain_id, 0) AS entity_history_chain_id, + COALESCE(before.entity_history_log_index, 0) AS entity_history_log_index + FROM + -- Use a RIGHT JOIN, to ensure that nulls get returned if there is no "before" row + public.${sql(entityName + "_history")} before + RIGHT JOIN rollback_ids after ON before.id = after.id + AND before.entity_history_block_timestamp = after.previous_entity_history_block_timestamp + AND before.entity_history_chain_id = after.previous_entity_history_chain_id + AND before.entity_history_block_number = after.previous_entity_history_block_number + AND before.entity_history_log_index = after.previous_entity_history_log_index; `; diff --git a/codegenerator/cli/templates/static/codegen/src/db/EntityHistory.res b/codegenerator/cli/templates/static/codegen/src/db/EntityHistory.res index 9d36e7277..b3f13e64e 100644 --- a/codegenerator/cli/templates/static/codegen/src/db/EntityHistory.res +++ b/codegenerator/cli/templates/static/codegen/src/db/EntityHistory.res @@ -37,23 +37,6 @@ let currentHistoryFieldsSchema = S.object(s => { }) let makeHistoryRowSchema: S.t<'entity> => S.t> = entitySchema => { - //Instantiates an entity object with all fields set to null - let entityWithNullFields: Js.Dict.t = switch entitySchema->S.classify { - | Object({items}) => - let nulldict = Js.Dict.empty() - items->Belt.Array.forEach(({location}) => { - nulldict->Js.Dict.set(location, %raw(`null`)) - }) - nulldict - | _ => - Js.Exn.raiseError("Failed creating entityWithNullFields. Expected an object schema for entity") - } - - //Gets an entity object with all fields set to null except for the id field - let getEntityWithNullFields = (entityId: string) => { - entityWithNullFields->Utils.Dict.updateImmutable("id", entityId->Utils.magic) - } - //Maps a schema object for the given entity with all fields nullable except for the id field //Keeps any original nullable fields let nullableEntitySchema: S.t> = S.schema(s => @@ -111,8 +94,8 @@ let makeHistoryRowSchema: S.t<'entity> => S.t> = entitySchem | _ => s.fail("Unexpected mix of null and non-null values in previous history fields") }, entityData: switch v["action"] { - | SET => v["entityData"]->S.parseAnyOrRaiseWith(entitySchema)->Set - | DELETE => v["entityData"]->S.parseAnyOrRaiseWith(entityIdOnlySchema)->Delete + | SET => v["entityData"]->(Utils.magic: Js.Dict.t => 'entity)->Set + | DELETE => v["entityData"]->(Utils.magic: Js.Dict.t => entityIdOnly)->Delete }, }, serializer: v => { @@ -121,7 +104,10 @@ let makeHistoryRowSchema: S.t<'entity> => S.t> = entitySchem entityData->(Utils.magic: 'entity => Js.Dict.t), Enums.EntityHistoryRowAction.SET, ) - | Delete({id}) => (getEntityWithNullFields(id), DELETE) + | Delete(entityIdOnly) => ( + entityIdOnly->(Utils.magic: entityIdOnly => Js.Dict.t), + DELETE, + ) } { @@ -142,19 +128,34 @@ type t<'entity> = { table: table, createInsertFnQuery: string, schema: S.t>, - insertFn: (Postgres.sql, Js.Json.t) => promise, + schemaRows: S.t>>, + insertFn: (Postgres.sql, Js.Json.t, ~shouldCopyCurrentEntity: bool) => promise, } -let insertRow = (self: t<'entity>, ~sql, ~historyRow: historyRow<'entity>) => { +let insertRow = ( + self: t<'entity>, + ~sql, + ~historyRow: historyRow<'entity>, + ~shouldCopyCurrentEntity, +) => { let row = historyRow->S.serializeOrRaiseWith(self.schema) - self.insertFn(sql, row) + self.insertFn(sql, row, ~shouldCopyCurrentEntity) } -let batchInsertRows = (self: t<'entity>, ~sql, ~rows: array>) => { - Utils.Array.awaitEach(rows, async row => { - let row = row->S.serializeOrRaiseWith(self.schema) - await self.insertFn(sql, row) +let batchInsertRows = ( + self: t<'entity>, + ~sql, + ~rows: array>, + ~shouldCopyCurrentEntity, +) => { + let rows = + rows->S.serializeOrRaiseWith(self.schemaRows)->(Utils.magic: Js.Json.t => array) + rows + ->Belt.Array.map(row => { + self.insertFn(sql, row, ~shouldCopyCurrentEntity) }) + ->Promise.all + ->Promise.thenResolve(_ => ()) } type entityInternal @@ -217,6 +218,8 @@ let fromTable = (table: table, ~schema: S.t<'entity>): t<'entity> => { let actionField = mkField(actionFieldName, Custom(Enums.EntityHistoryRowAction.enum.name)) + let serialField = mkField("serial", Serial, ~isNullable=true) + let dataFieldNames = dataFields->Belt.Array.map(field => field->getFieldName) let originTableName = table.tableName @@ -228,7 +231,7 @@ let fromTable = (table: table, ~schema: S.t<'entity>): t<'entity> => { currentHistoryFields, previousHistoryFields, dataFields, - [actionField], + [actionField, serialField], ]), ) @@ -256,7 +259,7 @@ let fromTable = (table: table, ~schema: S.t<'entity>): t<'entity> => { ])->Belt.Array.map(fieldName => `"${fieldName}"`) let createInsertFnQuery = { - `CREATE OR REPLACE FUNCTION ${insertFnName}(${historyRowArg} ${historyTablePath}) + `CREATE OR REPLACE FUNCTION ${insertFnName}(${historyRowArg} ${historyTablePath}, should_copy_current_entity BOOLEAN) RETURNS void AS $$ DECLARE v_previous_record RECORD; @@ -280,7 +283,7 @@ let fromTable = (table: table, ~schema: S.t<'entity>): t<'entity> => { `${historyRowArg}.${previousFieldName} := v_previous_record.${currentFieldName};` }) ->Js.Array2.joinWith(" ")} - ElSE + ElSIF should_copy_current_entity THEN -- Check if a value for the id exists in the origin table and if so, insert a history row for it. SELECT ${dataFieldNamesCommaSeparated} FROM ${originTablePath} WHERE id = ${historyRowArg}.${id} INTO v_origin_record; IF FOUND THEN @@ -311,15 +314,16 @@ let fromTable = (table: table, ~schema: S.t<'entity>): t<'entity> => { ` } - let insertFnString = `(sql, rowArgs) => + let insertFnString = `(sql, rowArgs, shouldCopyCurrentEntity) => sql\`select ${insertFnName}(ROW(${allFieldNamesDoubleQuoted ->Belt.Array.map(fieldNameDoubleQuoted => `\${rowArgs[${fieldNameDoubleQuoted}]\}`) - ->Js.Array2.joinWith(", ")}));\`` + ->Js.Array2.joinWith(", ")}, NULL), --NULL argument for SERIAL field + \${shouldCopyCurrentEntity});\`` - let insertFn: (Postgres.sql, Js.Json.t) => promise = + let insertFn: (Postgres.sql, Js.Json.t, ~shouldCopyCurrentEntity: bool) => promise = insertFnString->Table.PostgresInterop.eval let schema = makeHistoryRowSchema(schema) - {table, createInsertFnQuery, schema, insertFn} + {table, createInsertFnQuery, schema, schemaRows: S.array(schema), insertFn} } diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res index cb57ba0fb..c13efc06c 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res @@ -220,18 +220,20 @@ let makeFromDbState = async (chainConfig: Config.chainConfig, ~maxAddrInPartitio latestProcessedBlock, numEventsProcessed, timestampCaughtUpToHeadOrEndblock, - }) => - { - // on restart, reset the events_processed gauge to the previous state - switch numEventsProcessed { - | Some(numEventsProcessed) => Prometheus.incrementEventsProcessedCounter(~number=numEventsProcessed) - | None => () // do nothing if no events have been processed yet for this chain - } - ( + }) => { + // on restart, reset the events_processed gauge to the previous state + switch numEventsProcessed { + | Some(numEventsProcessed) => + Prometheus.incrementEventsProcessedCounter(~number=numEventsProcessed) + | None => () // do nothing if no events have been processed yet for this chain + } + ( firstEventBlockNumber, latestProcessedBlock, numEventsProcessed, - Env.updateSyncTimeOnRestart ? None : timestampCaughtUpToHeadOrEndblock->Js.Nullable.toOption, + Env.updateSyncTimeOnRestart + ? None + : timestampCaughtUpToHeadOrEndblock->Js.Nullable.toOption, ) } | None => (None, None, None, None) @@ -436,15 +438,6 @@ let getLastScannedBlockData = lastBlockData => { }) } -let rollbackToLastBlockHashes = (chainFetcher: t, ~rolledBackLastBlockData) => { - let lastKnownValidBlock = rolledBackLastBlockData->getLastScannedBlockData - { - ...chainFetcher, - lastBlockScannedHashes: rolledBackLastBlockData, - fetchState: chainFetcher.fetchState->PartitionedFetchState.rollback(~lastKnownValidBlock), - } -} - let isFetchingAtHead = (chainFetcher: t) => chainFetcher.fetchState->PartitionedFetchState.isFetchingAtHead diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res index 7ff5886a0..1b9c26cd3 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res @@ -11,6 +11,8 @@ type blockNumberAndTimestamp = { blockTimestamp: int, } +type blockNumberAndLogIndex = {blockNumber: int, logIndex: int} + module DynamicContractsMap = { //mapping of address to dynamicContractId module IdCmp = Belt.Id.MakeComparableU({ @@ -53,12 +55,18 @@ module DynamicContractsMap = { accum->Map.set(nextKey, nextValMerged) }) - let removeContractAddressesPastValidBlock = (self: t, ~lastKnownValidBlock) => { + let removeContractAddressesFromFirstChangeEvent = ( + self: t, + ~firstChangeEvent: blockNumberAndLogIndex, + ) => { self ->Map.toArray ->Array.reduce((empty, []), ((currentMap, currentRemovedAddresses), (nextKey, nextVal)) => { - if nextKey.blockNumber > lastKnownValidBlock.blockNumber { - //If the registration block is greater than the last valid block, + if ( + (nextKey.blockNumber, nextKey.logIndex) >= + (firstChangeEvent.blockNumber, firstChangeEvent.logIndex) + ) { + //If the registration block is later than the first change event, //Do not add it to the currentMap, but add the removed addresses let updatedRemovedAddresses = currentRemovedAddresses->Array.concat( @@ -66,7 +74,7 @@ module DynamicContractsMap = { ) (currentMap, updatedRemovedAddresses) } else { - //If it is not passed the lastKnownValidBlock, updated the + //If it is earlier than the first change event, updated the //current map and keep the currentRemovedAddresses let updatedMap = currentMap->Map.set(nextKey, nextVal) (updatedMap, currentRemovedAddresses) @@ -482,7 +490,7 @@ newItems are ordered earliest to latest (as they are returned from the worker) let update = ( {baseRegister, pendingDynamicContracts, isFetchingAtHead}: t, ~id, - ~latestFetchedBlock, + ~latestFetchedBlock: blockNumberAndTimestamp, ~newItems, ~currentBlockHeight, ): result => { @@ -929,27 +937,24 @@ let getLatestFullyFetchedBlock = (self: t) => { } } -let pruneQueuePastValidBlock = (queue: array, ~lastKnownValidBlock) => { - let prunedQueue = [] - let rec loop = index => { - switch queue[index] { - | Some(head) => - if head.blockNumber <= lastKnownValidBlock.blockNumber { - prunedQueue->Js.Array2.push(head)->ignore - } - loop(index + 1) - | None => prunedQueue - } - } - loop(0) +let pruneQueueFromFirstChangeEvent = ( + queue: array, + ~firstChangeEvent: blockNumberAndLogIndex, +) => { + queue->Array.keep(item => + (item.blockNumber, item.logIndex) < (firstChangeEvent.blockNumber, firstChangeEvent.logIndex) + ) } -let pruneDynamicContractAddressesPastValidBlock = (~lastKnownValidBlock, register: register) => { +let pruneDynamicContractAddressesFromFirstChangeEvent = ( + register: register, + ~firstChangeEvent: blockNumberAndLogIndex, +) => { //get all dynamic contract addresses past valid blockNumber to remove along with //updated dynamicContracts map let (dynamicContracts, addressesToRemove) = - register.dynamicContracts->DynamicContractsMap.removeContractAddressesPastValidBlock( - ~lastKnownValidBlock, + register.dynamicContracts->DynamicContractsMap.removeContractAddressesFromFirstChangeEvent( + ~firstChangeEvent, ) //remove them from the contract address mapping and dynamic contract addresses mapping @@ -962,9 +967,10 @@ let pruneDynamicContractAddressesPastValidBlock = (~lastKnownValidBlock, registe /** Rolls back registers to the given valid block */ -let rec rollbackRegisterList = ( +let rec rollbackRegister = ( self: register, - ~lastKnownValidBlock: blockNumberAndTimestamp, + ~lastScannedBlock, + ~firstChangeEvent: blockNumberAndLogIndex, ~parent: option=?, ) => { let handleParent = updated => @@ -976,14 +982,15 @@ let rec rollbackRegisterList = ( switch self.registerType { //Case 1 Root register that has only fetched up to a confirmed valid block number //Should just return itself unchanged - | RootRegister(_) if self.latestFetchedBlock.blockNumber <= lastKnownValidBlock.blockNumber => + | RootRegister(_) if self.latestFetchedBlock.blockNumber < firstChangeEvent.blockNumber => self->handleParent //Case 2 Dynamic register that has only fetched up to a confirmed valid block number //Should just return itself, with the next register rolled back recursively | DynamicContractRegister({id, nextRegister}) - if self.latestFetchedBlock.blockNumber <= lastKnownValidBlock.blockNumber => - nextRegister->rollbackRegisterList( - ~lastKnownValidBlock, + if self.latestFetchedBlock.blockNumber < firstChangeEvent.blockNumber => + nextRegister->rollbackRegister( + ~lastScannedBlock, + ~firstChangeEvent, ~parent=self->Parent.make(~dynamicContractId=id, ~parent), ) @@ -992,44 +999,51 @@ let rec rollbackRegisterList = ( | RootRegister(_) => { ...self, - fetchedEventQueue: self.fetchedEventQueue->pruneQueuePastValidBlock(~lastKnownValidBlock), - latestFetchedBlock: lastKnownValidBlock, + fetchedEventQueue: self.fetchedEventQueue->pruneQueueFromFirstChangeEvent(~firstChangeEvent), + latestFetchedBlock: lastScannedBlock, } - ->pruneDynamicContractAddressesPastValidBlock(~lastKnownValidBlock) + ->pruneDynamicContractAddressesFromFirstChangeEvent(~firstChangeEvent) ->handleParent //Case 4 DynamicContract register that has fetched further than the confirmed valid block number //Should prune its queue, set its latest fetched blockdata + pruned queue //And recursivle prune the nextRegister | DynamicContractRegister({id, nextRegister}) => let updatedWithRemovedDynamicContracts = - self->pruneDynamicContractAddressesPastValidBlock(~lastKnownValidBlock) + self->pruneDynamicContractAddressesFromFirstChangeEvent(~firstChangeEvent) if updatedWithRemovedDynamicContracts.contractAddressMapping->ContractAddressingMap.isEmpty { //If the contractAddressMapping is empty after pruning dynamic contracts, then this //is a dead register. Simly return its next register rolled back - nextRegister->rollbackRegisterList(~lastKnownValidBlock, ~parent?) + nextRegister->rollbackRegister(~lastScannedBlock, ~firstChangeEvent, ~parent?) } else { //If there are still values in the contractAddressMapping, we should keep the register but //prune queues and next register let updated = { ...updatedWithRemovedDynamicContracts, - fetchedEventQueue: self.fetchedEventQueue->pruneQueuePastValidBlock(~lastKnownValidBlock), - latestFetchedBlock: lastKnownValidBlock, + fetchedEventQueue: self.fetchedEventQueue->pruneQueueFromFirstChangeEvent( + ~firstChangeEvent, + ), + latestFetchedBlock: lastScannedBlock, } - nextRegister->rollbackRegisterList( - ~lastKnownValidBlock, + nextRegister->rollbackRegister( + ~lastScannedBlock, + ~firstChangeEvent, ~parent=updated->Parent.make(~dynamicContractId=id, ~parent), ) } } } -let rollback = (self: t, ~lastKnownValidBlock) => { - let baseRegister = rollbackRegisterList(self.baseRegister, ~lastKnownValidBlock) +let rollback = (self: t, ~lastScannedBlock, ~firstChangeEvent) => { + let baseRegister = self.baseRegister->rollbackRegister(~lastScannedBlock, ~firstChangeEvent) let pendingDynamicContracts = - self.pendingDynamicContracts->Array.keep(({registeringEventBlockNumber}) => - registeringEventBlockNumber <= lastKnownValidBlock.blockNumber + self.pendingDynamicContracts->Array.keep(({ + registeringEventBlockNumber, + registeringEventLogIndex, + }) => + (registeringEventBlockNumber, registeringEventLogIndex) < + (firstChangeEvent.blockNumber, firstChangeEvent.logIndex) ) { ...self, diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res index 49aafba82..f173faa3b 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res @@ -270,9 +270,9 @@ let getNextQueriesOrThrow = ( /** Rolls back all partitions to the given valid block */ -let rollback = (self: t, ~lastKnownValidBlock) => { +let rollback = (self: t, ~lastScannedBlock, ~firstChangeEvent) => { let partitions = self.partitions->Array.map(partition => { - partition->FetchState.rollback(~lastKnownValidBlock) + partition->FetchState.rollback(~lastScannedBlock, ~firstChangeEvent) }) {...self, partitions} @@ -297,11 +297,10 @@ let queueSize = ({partitions}: t) => let getLatestFullyFetchedBlock = ({partitions}: t) => partitions - ->Array.reduce(None, (accum, partition) => { + ->Array.reduce((None: option), (accum, partition) => { let partitionBlock = partition->FetchState.getLatestFullyFetchedBlock switch accum { - | Some({FetchState.blockNumber: blockNumber}) - if partitionBlock.blockNumber >= blockNumber => accum + | Some({blockNumber}) if partitionBlock.blockNumber >= blockNumber => accum | _ => Some(partitionBlock) } }) diff --git a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res index 34714b3d2..56b438a92 100644 --- a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res +++ b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res @@ -748,11 +748,12 @@ let actionReducer = (state: t, action: action) => { let invalidatedActionReducer = (state: t, action: action) => switch (state, action) { - | ({rollbackState: RollingBack(_)}, EventBatchProcessed(_)) => ( - {...state, currentlyProcessingBatch: false}, - [Rollback], - ) - | _ => (state, []) + | ({rollbackState: RollingBack(_)}, EventBatchProcessed(_)) => + Logging.warn("Finished processing batch before rollback, actioning rollback") + ({...state, currentlyProcessingBatch: false}, [Rollback]) + | _ => + Logging.warn("Invalidated action discarded") + (state, []) } let waitForNewBlock = async ( @@ -985,6 +986,7 @@ let injectedTaskReducer = ( ~latestProcessedBlocks, ~eventBatch=batch, ~checkContractIsRegistered, + ~config=state.config, ) { | Ok(batchProcessed) => dispatchAction(DynamicContractPreRegisterProcessed(batchProcessed)) | Error(errHandler) => dispatchAction(ErrorExit(errHandler)) @@ -1098,64 +1100,109 @@ let injectedTaskReducer = ( | Rollback => //If it isn't processing a batch currently continue with rollback otherwise wait for current batch to finish processing switch state { - | {currentlyProcessingBatch: false, rollbackState: RollingBack(rollbackChain)} => + | {currentlyProcessingBatch: false, rollbackState: RollingBack(reorgChain)} => Logging.warn("Executing rollback") - let chainFetcher = state.chainManager.chainFetchers->ChainMap.get(rollbackChain) - let rollbackChainId = rollbackChain->ChainMap.Chain.toChainId - //Get rollback block and timestamp + + let chainFetcher = state.chainManager.chainFetchers->ChainMap.get(reorgChain) + + //Rollback the lastBlockScannedHashes to a point before blockhashes diverged let reorgChainRolledBackLastBlockData = await chainFetcher->rollbackLastBlockHashesToReorgLocation + //Get the last known valid block that was scanned on the reorg chain let {blockNumber: lastKnownValidBlockNumber, blockTimestamp: lastKnownValidBlockTimestamp} = reorgChainRolledBackLastBlockData->ChainFetcher.getLastScannedBlockData + let isUnorderedMultichainMode = state.config.isUnorderedMultichainMode + + let reorgChainId = reorgChain->ChainMap.Chain.toChainId + + //Get the first change event that occurred on each chain after the last known valid block + //Uses a different method depending on if the reorg chain is ordered or unordered + let firstChangeEventIdentifierPerChain = + await DbFunctions.sql->DbFunctions.EntityHistory.getFirstChangeEventPerChain( + isUnorderedMultichainMode + ? UnorderedMultichain({ + reorgChainId, + safeBlockNumber: lastKnownValidBlockNumber, + }) + : OrderedMultichain({ + safeBlockTimestamp: lastKnownValidBlockTimestamp, + reorgChainId, + safeBlockNumber: lastKnownValidBlockNumber, + }), + ) + + firstChangeEventIdentifierPerChain->DbFunctions.EntityHistory.FirstChangeEventPerChain.setIfEarlier( + ~chainId=reorgChainId, + ~event={ + blockNumber: lastKnownValidBlockNumber + 1, + logIndex: 0, + }, + ) + let chainFetchers = state.chainManager.chainFetchers->ChainMap.mapWithKey((chain, cf) => { - let rolledBackLastBlockData = if chain == rollbackChain { - //For the chain fetcher of the chain where a reorg occured, use the the - //rolledBackLastBlockData already computed - reorgChainRolledBackLastBlockData - } else { - //For all other chains, rollback to where a blockTimestamp is less than or equal to the block timestamp - //where the reorg chain is rolling back to - cf.lastBlockScannedHashes->ReorgDetection.LastBlockScannedHashes.rollBackToBlockTimestampLte( - ~blockTimestamp=lastKnownValidBlockTimestamp, + switch firstChangeEventIdentifierPerChain->DbFunctions.EntityHistory.FirstChangeEventPerChain.get( + ~chainId=chain->ChainMap.Chain.toChainId, + ) { + | Some(firstChangeEvent) => + //There was a change on the given chain after the reorged chain, + // rollback the lastBlockScannedHashes to before the first change produced by the given chain + let rolledBackLastBlockData = + cf.lastBlockScannedHashes->ReorgDetection.LastBlockScannedHashes.rollBackToBlockNumberLt( + ~blockNumber=firstChangeEvent.blockNumber, + ) + + let fetchState = + cf.fetchState->PartitionedFetchState.rollback( + ~lastScannedBlock=rolledBackLastBlockData->ChainFetcher.getLastScannedBlockData, + ~firstChangeEvent, + ) + + let rolledBackCf = { + ...cf, + lastBlockScannedHashes: rolledBackLastBlockData, + fetchState, + } + //On other chains, filter out evennts based on the first change present on the chain after the reorg + rolledBackCf->ChainFetcher.addProcessingFilter( + ~filter=eventBatchQueueItem => { + //Filter out events that occur passed the block where the query starts but + //are lower than the timestamp where we rolled back to + (eventBatchQueueItem.blockNumber, eventBatchQueueItem.logIndex) >= + (firstChangeEvent.blockNumber, firstChangeEvent.logIndex) + }, + ~isValid=(~fetchState) => { + //Remove the event filter once the fetchState has fetched passed the + //blockNumber of the valid first change event + let {blockNumber} = FetchState.getLatestFullyFetchedBlock(fetchState) + blockNumber <= firstChangeEvent.blockNumber + }, ) + | None => //If no change was produced on the given chain after the reorged chain, no need to rollback anything + cf } - - //Roll back chain fetcher with the given rolledBackLastBlockData - cf - ->ChainFetcher.rollbackToLastBlockHashes(~rolledBackLastBlockData) - ->ChainFetcher.addProcessingFilter( - ~filter=eventBatchQueueItem => { - let {timestamp, blockNumber} = eventBatchQueueItem - //Filter out events that occur passed the block where the query starts but - //are lower than the timestamp where we rolled back to - (timestamp, chain->ChainMap.Chain.toChainId, blockNumber) > - (lastKnownValidBlockTimestamp, rollbackChainId, lastKnownValidBlockNumber) - }, - ~isValid=(~fetchState) => { - //Remove the event filter once the fetchState has fetched passed the - //timestamp of the valid rollback block's timestamp - let {blockTimestamp, blockNumber} = FetchState.getLatestFullyFetchedBlock(fetchState) - (blockTimestamp, chain->ChainMap.Chain.toChainId, blockNumber) <= - (lastKnownValidBlockTimestamp, rollbackChainId, lastKnownValidBlockNumber) - }, - ) }) - let chainManager = { - ...state.chainManager, - chainFetchers, + let reorgChainLastBlockScannedData = { + let reorgChainFetcher = chainFetchers->ChainMap.get(reorgChain) + reorgChainFetcher.lastBlockScannedHashes->ChainFetcher.getLastScannedBlockData } //Construct a rolledback in Memory store let inMemoryStore = await IO.RollBack.rollBack( - ~chainId=rollbackChain->ChainMap.Chain.toChainId, - ~blockTimestamp=lastKnownValidBlockTimestamp, - ~blockNumber=lastKnownValidBlockNumber, + ~chainId=reorgChain->ChainMap.Chain.toChainId, + ~blockTimestamp=reorgChainLastBlockScannedData.blockTimestamp, + ~blockNumber=reorgChainLastBlockScannedData.blockNumber, ~logIndex=0, + ~isUnorderedMultichainMode, ) + let chainManager = { + ...state.chainManager, + chainFetchers, + } + dispatchAction(SetRollbackState(inMemoryStore, chainManager)) | _ => Logging.warn("Waiting for batch to finish processing before executing rollback") //wait for batch to finish processing diff --git a/scenarios/erc20_multichain_factory/test/RollbackMultichain_test.res b/scenarios/erc20_multichain_factory/test/RollbackMultichain_test.res index 230804a32..6842d47af 100644 --- a/scenarios/erc20_multichain_factory/test/RollbackMultichain_test.res +++ b/scenarios/erc20_multichain_factory/test/RollbackMultichain_test.res @@ -1,7 +1,10 @@ open Belt open RescriptMocha -let config = RegisterHandlers.registerAllHandlers() +let config = { + ...RegisterHandlers.registerAllHandlers(), + isUnorderedMultichainMode: true, +} module Mock = { /* @@ -224,13 +227,10 @@ describe("Multichain rollback test", () => { //Provision the db DbHelpers.runUpDownMigration() }) - Async.it_skip("Multichain indexer should rollback and not reprocess any events", async () => { + Async.it("Multichain indexer should rollback and not reprocess any events", async () => { //Setup a chainManager with unordered multichain mode to make processing happen //without blocking for the purposes of this test - let chainManager = { - ...ChainManager.makeFromConfig(~config), - isUnorderedMultichainMode: true, - } + let chainManager = ChainManager.makeFromConfig(~config) let loadLayer = LoadLayer.makeWithDbConnection() @@ -554,11 +554,11 @@ describe("Multichain rollback test", () => { ~message="Rollback should have actioned and next tasks are query and process batch", ) await makeAssertions( - ~queryName="After Rollback Action", + ~queryName="After Rollback Action A", ~chain1LatestFetchBlock=3, - ~chain2LatestFetchBlock=2, + ~chain2LatestFetchBlock=5, ~totalQueueSize=0, - ~batchName="After Rollback Action", + ~batchName="After Rollback Action A", //balances have not yet been changed ~chain1User1Balance=Some(100), ~chain1User2Balance=Some(50), @@ -577,13 +577,14 @@ describe("Multichain rollback test", () => { //Make new queries (C for Chain 1, B for Chain 2) //Artificially cut the tasks to only do one round of queries and batch processing tasks := [NextQuery(CheckAllChains)] + await dispatchAllTasks() await makeAssertions( - ~queryName="After Rollback Action", + ~queryName="After Rollback Action B", ~chain1LatestFetchBlock=5, - ~chain2LatestFetchBlock=5, - ~totalQueueSize=3, - ~batchName="After Rollback Action", + ~chain2LatestFetchBlock=8, + ~totalQueueSize=5, + ~batchName="After Rollback Action B", //balances have not yet been changed ~chain1User1Balance=Some(100), ~chain1User2Balance=Some(50), @@ -595,20 +596,32 @@ describe("Multichain rollback test", () => { tasks := [ProcessEventBatch] // Process event batch with reorg in mem store and action next queries await dispatchAllTasks() + await makeAssertions( - ~queryName="After Rollback EventProcess", + ~queryName="After Rollback EventProcess A", ~chain1LatestFetchBlock=5, - ~chain2LatestFetchBlock=5, + ~chain2LatestFetchBlock=8, ~totalQueueSize=0, - ~batchName="After Rollback EventProcess", + ~batchName="After Rollback EventProcess A", //balances have not yet been changed ~chain1User1Balance=Some(89), ~chain1User2Balance=Some(61), - ~chain2User1Balance=Some(100), - ~chain2User2Balance=Some(50), + ~chain2User1Balance=Some(98), + ~chain2User2Balance=Some(52), + ) + await dispatchAllTasks() + await makeAssertions( + ~queryName="After Rollback EventProcess B", + ~chain1LatestFetchBlock=6, + ~chain2LatestFetchBlock=9, + ~totalQueueSize=0, + ~batchName="After Rollback EventProcess B", + //balances have not yet been changed + ~chain1User1Balance=Some(74), + ~chain1User2Balance=Some(76), + ~chain2User1Balance=Some(90), + ~chain2User2Balance=Some(60), ) - //Todo assertions - //Assert new balances await setupDb() }) diff --git a/scenarios/test_codegen/test/Mock_test.res b/scenarios/test_codegen/test/Mock_test.res index 648f1a422..d09dd65df 100644 --- a/scenarios/test_codegen/test/Mock_test.res +++ b/scenarios/test_codegen/test/Mock_test.res @@ -72,7 +72,10 @@ describe_skip("E2E Db check", () => { }) it("Validate inmemory store state", () => { - let gravatars = inMemoryStore.gravatar->InMemoryTable.Entity.values + let gravatars = + inMemoryStore.entities + ->InMemoryStore.EntityTables.get(module(Entities.Gravatar)) + ->InMemoryTable.Entity.values Assert.deepEqual( gravatars, diff --git a/scenarios/test_codegen/test/lib_tests/EntityHistory_test.res b/scenarios/test_codegen/test/lib_tests/EntityHistory_test.res index 0a1c7759c..4f5b738a0 100644 --- a/scenarios/test_codegen/test/lib_tests/EntityHistory_test.res +++ b/scenarios/test_codegen/test/lib_tests/EntityHistory_test.res @@ -148,7 +148,7 @@ describe("Entity history serde", () => { describe("Entity History Codegen", () => { it("Creates a postgres insert function", () => { - let expected = `CREATE OR REPLACE FUNCTION "insert_TestEntity_history"(history_row "public"."TestEntity_history") + let expected = `CREATE OR REPLACE FUNCTION "insert_TestEntity_history"(history_row "public"."TestEntity_history", should_copy_current_entity BOOLEAN) RETURNS void AS $$ DECLARE v_previous_record RECORD; @@ -166,7 +166,7 @@ describe("Entity History Codegen", () => { -- If a previous record exists, use its values IF FOUND THEN history_row.previous_entity_history_block_timestamp := v_previous_record.entity_history_block_timestamp; history_row.previous_entity_history_chain_id := v_previous_record.entity_history_chain_id; history_row.previous_entity_history_block_number := v_previous_record.entity_history_block_number; history_row.previous_entity_history_log_index := v_previous_record.entity_history_log_index; - ElSE + ElSIF should_copy_current_entity THEN -- Check if a value for the id exists in the origin table and if so, insert a history row for it. SELECT "id", "fieldA", "fieldB" FROM "public"."TestEntity" WHERE id = history_row.id INTO v_origin_record; IF FOUND THEN @@ -192,10 +192,11 @@ describe("Entity History Codegen", () => { it("Creates a js insert function", () => { let insertFnString = mockEntityHistory.insertFn->toStringUnsafe - let expected = `(sql, rowArgs) => - sql\`select "insert_TestEntity_history"(ROW(\${rowArgs["entity_history_block_timestamp"]}, \${rowArgs["entity_history_chain_id"]}, \${rowArgs["entity_history_block_number"]}, \${rowArgs["entity_history_log_index"]}, \${rowArgs["previous_entity_history_block_timestamp"]}, \${rowArgs["previous_entity_history_chain_id"]}, \${rowArgs["previous_entity_history_block_number"]}, \${rowArgs["previous_entity_history_log_index"]}, \${rowArgs["id"]}, \${rowArgs["fieldA"]}, \${rowArgs["fieldB"]}, \${rowArgs["action"]}));\`` + let expected = `(sql, rowArgs, shouldCopyCurrentEntity) => + sql\`select "insert_TestEntity_history"(ROW(\${rowArgs["entity_history_block_timestamp"]}, \${rowArgs["entity_history_chain_id"]}, \${rowArgs["entity_history_block_number"]}, \${rowArgs["entity_history_log_index"]}, \${rowArgs["previous_entity_history_block_timestamp"]}, \${rowArgs["previous_entity_history_chain_id"]}, \${rowArgs["previous_entity_history_block_number"]}, \${rowArgs["previous_entity_history_log_index"]}, \${rowArgs["id"]}, \${rowArgs["fieldA"]}, \${rowArgs["fieldB"]}, \${rowArgs["action"]}, NULL), --NULL argument for SERIAL field + \${shouldCopyCurrentEntity});\`` - Assert.equal(expected, insertFnString) + Assert.equal(insertFnString, expected) }) Async.it("Creating tables and functions works", async () => { @@ -259,6 +260,7 @@ describe("Entity History Codegen", () => { switch await mockEntityHistory->EntityHistory.insertRow( ~sql=DbFunctions.sql, ~historyRow=entityHistoryItem, + ~shouldCopyCurrentEntity=true, ) { | exception exn => Js.log2("insertRow exn", exn) @@ -280,6 +282,7 @@ describe("Entity History Codegen", () => { "fieldA": 1, "fieldB": "test", "action": "SET", + "serial": 1, }, { "entity_history_block_timestamp": blockTimestamp, @@ -294,10 +297,65 @@ describe("Entity History Codegen", () => { "fieldA": 2, "fieldB": "test2", "action": "SET", + "serial": 2, }, ] let currentHistoryItems = await DbFunctions.sql->getAllMockEntityHistory Assert.deepEqual(currentHistoryItems, expectedResult) + + switch await mockEntityHistory->EntityHistory.insertRow( + ~sql=DbFunctions.sql, + ~historyRow={ + entityData: Set({id: "2", fieldA: 1, fieldB: None}), + previous: None, + current: { + chain_id: 1, + block_timestamp: 4, + block_number: 4, + log_index: 6, + }, + }, + ~shouldCopyCurrentEntity=true, + ) { + | exception exn => + Js.log2("insertRow exn", exn) + Assert.fail("Failed to insert mock entity history") + | _ => () + } + switch await mockEntityHistory->EntityHistory.insertRow( + ~sql=DbFunctions.sql, + ~historyRow={ + entityData: Set({id: "2", fieldA: 3, fieldB: None}), + previous: None, + current: { + chain_id: 1, + block_timestamp: 4, + block_number: 10, + log_index: 6, + }, + }, + ~shouldCopyCurrentEntity=true, + ) { + | exception exn => + Js.log2("insertRow exn", exn) + Assert.fail("Failed to insert mock entity history") + | _ => () + } + + await mockEntityHistory->EntityHistory.insertRow( + ~sql=DbFunctions.sql, + ~historyRow={ + entityData: Set({id: "3", fieldA: 4, fieldB: None}), + previous: None, + current: { + chain_id: 137, + block_timestamp: 4, + block_number: 7, + log_index: 6, + }, + }, + ~shouldCopyCurrentEntity=true, + ) }) }) diff --git a/scenarios/test_codegen/test/lib_tests/FetchState_test.res b/scenarios/test_codegen/test/lib_tests/FetchState_test.res index 38ae7b4fc..eb313ed9b 100644 --- a/scenarios/test_codegen/test/lib_tests/FetchState_test.res +++ b/scenarios/test_codegen/test/lib_tests/FetchState_test.res @@ -76,8 +76,8 @@ describe("FetchState.fetchState", () => { ->addAddr(dcId4, mockAddress4) let (_updatedMap, removedAddresses) = - dcMap->DynamicContractsMap.removeContractAddressesPastValidBlock( - ~lastKnownValidBlock={blockNumber: 5, blockTimestamp: 5 * 15}, + dcMap->DynamicContractsMap.removeContractAddressesFromFirstChangeEvent( + ~firstChangeEvent={blockNumber: 6, logIndex: 0}, ) Assert.deepEqual(removedAddresses, [mockAddress3, mockAddress4]) @@ -623,7 +623,11 @@ describe("FetchState.fetchState", () => { let fetchState = register3->makeMockFetchState - let updated = fetchState->rollback(~lastKnownValidBlock=getBlockData(~blockNumber=100)) + let updated = + fetchState->rollback( + ~lastScannedBlock=getBlockData(~blockNumber=100), + ~firstChangeEvent={blockNumber: 101, logIndex: 0}, + ) let expected = { ...register3,