Skip to content

Commit

Permalink
[4] Rollback diff implementation (#333)
Browse files Browse the repository at this point in the history
* Implement entity history tables for each entity

* Wip implement per table insert history item function with tests

* Add entity history schema creator

* Add eval function for inserting entity history

* Fix spelling of variable

* Move EntityHistory to its own file

* Add serial to entity history

* Make InMemomoryStore static

* Move InMemomoryStore to static templates dir

* Add new rollback diffing queries

* Implement delete rolled back entity history

* Implement multichain rollback filters

* Implement unordered and ordered multichain versions of rollbacks

* Make tests compile

* Fix broken magic in memory table function

* Fix double quoted dynamic table name in query

* Add new test expectations

* Refactor simpler in memory table entity history state

* Fix diff and delete entity history functions

* Fix tests

* Fix inverted rollback check

* Include rollback multichain test in suite
  • Loading branch information
JonoPrest authored Nov 18, 2024
1 parent a705617 commit 59918ee
Show file tree
Hide file tree
Showing 20 changed files with 908 additions and 502 deletions.
128 changes: 76 additions & 52 deletions codegenerator/cli/templates/dynamic/codegen/src/IO.res.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ let getEntityHistoryItems = entityUpdates => {
) => {
let (optPreviousEventIdentifier, entityHistoryItems) = prev

let {eventIdentifier, shouldSaveHistory, entityUpdateAction, entityId} = entityUpdate
let entityHistoryItems = if shouldSaveHistory {
let {eventIdentifier, entityUpdateAction, entityId} = entityUpdate
let entityHistoryItems = {
let historyItem: EntityHistory.historyRow<_> = {
current: {
chain_id: eventIdentifier.chainId,
Expand All @@ -38,10 +38,7 @@ let getEntityHistoryItems = entityUpdates => {
},
}
entityHistoryItems->Belt.Array.concat([historyItem])
} else {
entityHistoryItems
}

(Some(eventIdentifier), entityHistoryItems)
})

Expand All @@ -51,19 +48,20 @@ let getEntityHistoryItems = entityUpdates => {
let executeSetEntityWithHistory = (
type entity,
sql: Postgres.sql,
~rows: array<Types.inMemoryStoreRowEntity<entity>>,
~inMemoryStore: InMemoryStore.t,
~entityMod: module(Entities.Entity with type t = entity),
): promise<unit> => {
let rows =
inMemoryStore.entities
->InMemoryStore.EntityTables.get(entityMod)
->InMemoryTable.Entity.rows
let module(EntityMod) = entityMod
let (entitiesToSet, idsToDelete, entityHistoryItemsToSet) = rows->Belt.Array.reduce(
([], [], []),
((entitiesToSet, idsToDelete, entityHistoryItemsToSet), row) => {
switch row {
| Updated({latest, history}) =>
let entityHistoryItems =
history
->Belt.Array.concat([latest])
->getEntityHistoryItems
let entityHistoryItems = history->getEntityHistoryItems

switch latest.entityUpdateAction {
| Set(entity) => (
Expand All @@ -86,6 +84,7 @@ let executeSetEntityWithHistory = (
EntityMod.entityHistory->EntityHistory.batchInsertRows(
~sql,
~rows=Belt.Array.concatMany(entityHistoryItemsToSet),
~shouldCopyCurrentEntity=!(inMemoryStore->InMemoryStore.isRollingBack),
),
if entitiesToSet->Array.length > 0 {
sql->DbFunctionsEntities.batchSet(~entityMod)(entitiesToSet)
Expand All @@ -105,9 +104,14 @@ let executeSetEntityWithHistory = (
let executeDbFunctionsEntity = (
type entity,
sql: Postgres.sql,
~rows: array<Types.inMemoryStoreRowEntity<entity>>,
~inMemoryStore: InMemoryStore.t,
~entityMod: module(Entities.Entity with type t = entity),
): promise<unit> => {
let rows =
inMemoryStore.entities
->InMemoryStore.EntityTables.get(entityMod)
->InMemoryTable.Entity.rows

let (entitiesToSet, idsToDelete) = rows->Belt.Array.reduce(([], []), (
(accumulatedSets, accumulatedDeletes),
row,
Expand Down Expand Up @@ -139,9 +143,9 @@ let executeDbFunctionsEntity = (
promises->Promise.all->Promise.thenResolve(_ => ())
}

let executeBatch = async (sql, ~inMemoryStore: InMemoryStore.t, ~isInReorgThreshold) => {
let executeBatch = async (sql, ~inMemoryStore: InMemoryStore.t, ~isInReorgThreshold, ~config) => {
let entityDbExecutionComposer =
RegisterHandlers.getConfig()->Config.shouldSaveHistory(~isInReorgThreshold)
config->Config.shouldSaveHistory(~isInReorgThreshold)
? executeSetEntityWithHistory
: executeDbFunctionsEntity

Expand All @@ -167,7 +171,7 @@ let executeBatch = async (sql, ~inMemoryStore: InMemoryStore.t, ~isInReorgThresh
let set{{entity.name.capitalized}}s = entityDbExecutionComposer(
_,
~entityMod=module(Entities.{{entity.name.capitalized}}),
~rows=inMemoryStore.{{entity.name.uncapitalized}}->InMemoryTable.Entity.rows,
~inMemoryStore,
)

{{/each}}
Expand All @@ -177,7 +181,9 @@ let executeBatch = async (sql, ~inMemoryStore: InMemoryStore.t, ~isInReorgThresh
let rollbackTables = switch inMemoryStore.rollBackEventIdentifier {
| Some(eventIdentifier) =>
[
DbFunctions.EntityHistory.deleteAllEntityHistoryAfterEventIdentifier,
DbFunctions.EntityHistory.deleteAllEntityHistoryAfterEventIdentifier(
~isUnorderedMultichainMode=config.isUnorderedMultichainMode,
),
DbFunctions.RawEvents.deleteAllRawEventsAfterEventIdentifier,
DbFunctions.DynamicContractRegistry.deleteAllDynamicContractRegistrationsAfterEventIdentifier,
]->Belt.Array.map(fn => fn(_, ~eventIdentifier))
Expand All @@ -204,52 +210,70 @@ let executeBatch = async (sql, ~inMemoryStore: InMemoryStore.t, ~isInReorgThresh

module RollBack = {
exception DecodeError(S.error)
let rollBack = async (~chainId, ~blockTimestamp, ~blockNumber, ~logIndex) => {
let reorgData = switch await DbFunctions.sql->DbFunctions.EntityHistory.getRollbackDiff(
~chainId,
~blockTimestamp,
~blockNumber,
) {
| Ok(v) => v
| Error(exn) =>
exn
->DecodeError
->ErrorHandling.mkLogAndRaise(~msg="Failed to get rollback diff from entity history")
}

let rollBack = async (
~chainId,
~blockTimestamp,
~blockNumber,
~logIndex,
~isUnorderedMultichainMode,
) => {
let rollBackEventIdentifier: Types.eventIdentifier = {
chainId,
blockTimestamp,
blockNumber,
logIndex,
}

let inMemStore = InMemoryStore.makeWithRollBackEventIdentifier(Some(rollBackEventIdentifier))

//Don't save the rollback diffs to history table
let shouldSaveHistory = false
let inMemStore = InMemoryStore.make(~rollBackEventIdentifier)

reorgData->Belt.Array.forEach(e => {
switch e {
//Where previousEntity is Some,
//set the value with the eventIdentifier that set that value initially
{{#each entities as | entity |}}
| {previousEntity: Some({entity: {{entity.name.capitalized}}(entity), eventIdentifier}), entityId} =>
inMemStore.{{entity.name.uncapitalized}}->InMemoryTable.Entity.set(
Set(entity)->Types.mkEntityUpdate(~eventIdentifier, ~entityId, ~shouldSaveHistory),
~shouldSaveHistory,
)
{{/each}}
//Where previousEntity is None,
//delete it with the eventIdentifier of the rollback event
{{#each entities as | entity |}}
| {previousEntity: None, entityType: {{entity.name.capitalized}}, entityId} =>
inMemStore.{{entity.name.uncapitalized}}->InMemoryTable.Entity.set(
Delete->Types.mkEntityUpdate(~eventIdentifier=rollBackEventIdentifier, ~entityId, ~shouldSaveHistory),
~shouldSaveHistory,
await Utils.Array.awaitEach(Entities.allEntities, async entityMod => {
let module(Entity) = entityMod
let entityMod =
entityMod->(
Utils.magic: module(Entities.InternalEntity) => module(Entities.Entity with
type t = 'entity
)
)
{{/each}}
}

let diff = await DbFunctions.sql->DbFunctions.EntityHistory.getRollbackDiff(
isUnorderedMultichainMode
? UnorderedMultichain({
reorgChainId: chainId,
safeBlockNumber: blockNumber,
})
: OrderedMultichain({
safeBlockTimestamp: blockTimestamp,
reorgChainId: chainId,
safeBlockNumber: blockNumber,
}),
~entityMod,
)

let entityTable = inMemStore.entities->InMemoryStore.EntityTables.get(entityMod)

diff->Belt.Array.forEach(historyRow => {
let eventIdentifier: Types.eventIdentifier = {
chainId: historyRow.current.chain_id,
blockNumber: historyRow.current.block_number,
logIndex: historyRow.current.log_index,
blockTimestamp: historyRow.current.block_timestamp,
}
switch historyRow.entityData {
| Set(entity) =>
entityTable->InMemoryTable.Entity.set(
Set(entity)->Types.mkEntityUpdate(
~eventIdentifier,
~entityId=entity->Entities.getEntityId,
),
~shouldSaveHistory=false,
)
| Delete({id}) =>
entityTable->InMemoryTable.Entity.set(
Delete->Types.mkEntityUpdate(~eventIdentifier, ~entityId=id),
~shouldSaveHistory=false,
)
}
})
})

inMemStore
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ let rec makeWithInMemoryStore: InMemoryStore.t => t = (inMemoryStore: InMemorySt
makeStoreOperatorEntity(
~inMemoryStore,
~makeMockDb=makeWithInMemoryStore,
~getStore=db => db.{{entity.name.uncapitalized}},
~getStore=db => db.entities->InMemoryStore.EntityTables.get(module(Entities.{{entity.name.capitalized}})),
~getKey=({id}) => id,
)
},
Expand Down Expand Up @@ -297,11 +297,15 @@ A function composer for simulating the writing of an inMemoryStore to the extern
Runs all set and delete operations currently cached in an inMemory store against the mockDb
*/
let executeRowsEntity = (
type entity,
mockDb: t,
~inMemoryStore: InMemoryStore.t,
~getInMemTable: InMemoryStore.t => InMemoryTable.Entity.t<'entity>,
~getKey: 'entity => 'key,
~entityMod: module(Entities.Entity with type t = entity),
~getKey: entity => 'key,
) => {
let getInMemTable = (inMemoryStore: InMemoryStore.t) =>
inMemoryStore.entities->InMemoryStore.EntityTables.get(entityMod)

let inMemTable = getInMemTable(inMemoryStore)

inMemTable.table
Expand Down Expand Up @@ -377,7 +381,7 @@ let writeFromMemoryStore = (mockDb: t, ~inMemoryStore: InMemoryStore.t) => {
{{#each entities as | entity |}}
mockDb->executeRowsEntity(
~inMemoryStore,
~getInMemTable=self => {self.{{entity.name.uncapitalized}}},
~entityMod=module(Entities.{{entity.name.capitalized}}),
~getKey=entity => entity.id,
)
{{/each}}
Expand Down
12 changes: 2 additions & 10 deletions codegenerator/cli/templates/dynamic/codegen/src/Types.res.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,12 @@ type entityUpdateAction<'entityType> =

type entityUpdate<'entityType> = {
eventIdentifier: eventIdentifier,
shouldSaveHistory: bool,
entityId: id,
entityUpdateAction: entityUpdateAction<'entityType>,
}

let mkEntityUpdate = (~shouldSaveHistory=true, ~eventIdentifier, ~entityId, entityUpdateAction) => {
let mkEntityUpdate = (~eventIdentifier, ~entityId, entityUpdateAction) => {
entityId,
shouldSaveHistory,
eventIdentifier,
entityUpdateAction,
}
Expand All @@ -77,17 +75,11 @@ type entityValueAtStartOfBatch<'entityType> =
| NotSet // The entity isn't in the DB yet
| AlreadySet('entityType)

type existingValueInDb<'entityType> =
| Retrieved(entityValueAtStartOfBatch<'entityType>)
// NOTE: We use an postgres function solve the issue of this entities previous value not being known.
| Unknown

type updatedValue<'entityType> = {
// Initial value within a batch
initial: existingValueInDb<'entityType>,
latest: entityUpdate<'entityType>,
history: array<entityUpdate<'entityType>>,
}
@genType
type inMemoryStoreRowEntity<'entityType> =
| Updated(updatedValue<'entityType>)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,25 @@ let getEntityParamsDecoder = (entityName: Enums.EntityType.t) =>
{{/each}}
}
let allTables: array<table> = [
{{#each entities as |entity|}}
{{entity.name.capitalized}}.table,
{{/each}}
]
let allEntities: array<module(InternalEntity)> =
[
let allEntityHistory: array<EntityHistory.t<EntityHistory.entityInternal>> = [
{{#each entities as |entity|}}
{{entity.name.capitalized}}.entityHistory->EntityHistory.castInternal,
module({{entity.name.capitalized}}),
{{/each}}
]
]->(Utils.magic: array<module(Entity)> => array<module(InternalEntity)>)
let allTables: array<table> = allEntities->Belt.Array.map(entityMod => {
let module(Entity) = entityMod
Entity.table
})
let allEntityHistory: array<
EntityHistory.t<EntityHistory.entityInternal>,
> = allEntities->Belt.Array.map(entityMod => {
let module(Entity) = entityMod
Entity.entityHistory->EntityHistory.castInternal
})
let schema = Schema.make(allTables)
Expand Down
Loading

0 comments on commit 59918ee

Please sign in to comment.