Skip to content

Commit

Permalink
[5] Remove old entity_history code and add history pruning (#338)
Browse files Browse the repository at this point in the history
* Implement entity history tables for each entity

* Wip implement per table insert history item function with tests

* Add entity history schema creator

* Add eval function for inserting entity history

* Fix spelling of variable

* Move EntityHistory to its own file

* Add EntityHistoryRowAction enum

* Handle delete entities with enum flag

* Use new entity history tables for saving entity history

* Add serial to entity history

* Make InMemomoryStore static

* Move InMemomoryStore to static templates dir

* Add new rollback diffing queries

* Implement delete rolled back entity history

* Implement multichain rollback filters

* Implement unordered and ordered multichain versions of rollbacks

* Make tests compile

* Fix broken magic in memory table function

* Fix double quoted dynamic table name in query

* Add new test expectations

* Refactor simpler in memory table entity history state

* Fix diff and delete entity history functions

* Fix tests

* Fix inverted rollback check

* Implement entity history pruning

* Remove all code related to previous entity_history table

* Remove union schemas for float and int

* Fix tests

* Improve comment about entity history check

Co-authored-by: Dmitry Zakharov <[email protected]>

---------

Co-authored-by: Dmitry Zakharov <[email protected]>
  • Loading branch information
JonoPrest and DZakh authored Nov 18, 2024
1 parent 59918ee commit 563737b
Show file tree
Hide file tree
Showing 20 changed files with 259 additions and 804 deletions.
18 changes: 17 additions & 1 deletion codegenerator/cli/npm/envio/src/Utils.res
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ Helper to check if a value exists in an array
} else {
let head = array->Js.Array2.slice(~start=0, ~end_=index)
let tail = array->Belt.Array.sliceToEnd(index + 1)
[...head, ...tail]
Belt.Array.concat(head, tail)
}
}

Expand Down Expand Up @@ -182,6 +182,22 @@ Helper to check if a value exists in an array
*/
@send
external spliceInPlace: (array<'a>, ~pos: int, ~remove: int) => array<'a> = "splice"

/**
Interleaves an array with a separator
interleave([1, 2, 3], 0) -> [1, 0, 2, 0, 3]
*/
let interleave = (arr: array<'a>, separator: 'a) => {
let interleaved = []
arr->Js.Array2.forEachi((v, i) => {
interleaved->Js.Array2.push(v)->ignore
if i < arr->Array.length - 1 {
interleaved->Js.Array2.push(separator)->ignore
}
})
interleaved
}
}

module String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,9 @@ let makeEntityHandlerContext = (
~logger,
~getKey,
~loadLayer,
~isInReorgThreshold,
~shouldSaveHistory,
): entityHandlerContext<entity> => {
let inMemTable = inMemoryStore->InMemoryStore.getInMemTable(~entityMod)
let shouldSaveHistory =
RegisterHandlers.getConfig()->Config.shouldSaveHistory(~isInReorgThreshold)
{
set: entity => {
inMemTable->InMemoryTable.Entity.set(
Expand Down Expand Up @@ -179,7 +177,7 @@ let getHandlerContext = (
context,
~inMemoryStore: InMemoryStore.t,
~loadLayer,
~isInReorgThreshold,
~shouldSaveHistory,
) => {
let {eventBatchQueueItem, logger} = context

Expand All @@ -194,7 +192,7 @@ let getHandlerContext = (
~getKey=entity => entity.id,
~logger,
~loadLayer,
~isInReorgThreshold,
~shouldSaveHistory,
),
{{/each}}
}
Expand All @@ -215,9 +213,9 @@ let getHandlerArgs = (
~inMemoryStore,
~loaderReturn,
~loadLayer,
~isInReorgThreshold,
~shouldSaveHistory,
) => {
Types.HandlerTypes.event: contextEnv.eventBatchQueueItem.event,
context: contextEnv->getHandlerContext(~inMemoryStore, ~loadLayer, ~isInReorgThreshold),
context: contextEnv->getHandlerContext(~inMemoryStore, ~loadLayer, ~shouldSaveHistory),
loaderReturn,
}
33 changes: 30 additions & 3 deletions codegenerator/cli/templates/dynamic/codegen/src/IO.res.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,13 @@ let executeDbFunctionsEntity = (
promises->Promise.all->Promise.thenResolve(_ => ())
}

let executeBatch = async (sql, ~inMemoryStore: InMemoryStore.t, ~isInReorgThreshold, ~config) => {
let executeBatch = async (
sql,
~inMemoryStore: InMemoryStore.t,
~isInReorgThreshold,
~safeChainIdAndBlockNumberArray,
~config,
) => {
let entityDbExecutionComposer =
config->Config.shouldSaveHistory(~isInReorgThreshold)
? executeSetEntityWithHistory
Expand Down Expand Up @@ -190,8 +196,26 @@ let executeBatch = async (sql, ~inMemoryStore: InMemoryStore.t, ~isInReorgThresh
| None => []
}

let pruneEntityHistory = if (
isInReorgThreshold &&
config->Config.shouldPruneHistory &&
safeChainIdAndBlockNumberArray->Belt.Array.length > 0
) {
Entities.allEntities->Belt.Array.map(entityMod => {
let module(Entity) = entityMod

DbFunctions.EntityHistory.pruneStaleEntityHistory(
_,
~entityName=Entity.name,
~safeChainIdAndBlockNumberArray,
)
})
} else {
[]
}

let res = await sql->Postgres.beginSql(sql => {
Belt.Array.concat(
Belt.Array.concatMany([
//Rollback tables need to happen first in the traction
rollbackTables,
[
Expand All @@ -202,7 +226,10 @@ let executeBatch = async (sql, ~inMemoryStore: InMemoryStore.t, ~isInReorgThresh
set{{entity.name.capitalized}}s,
{{/each}}
],
)->Belt.Array.map(dbFunc => sql->dbFunc)
//History pruning needs to happen last in the transaction
//It deletes all unneeded history rows outside of the reorg threshold
pruneEntityHistory,
])->Belt.Array.map(dbFunc => sql->dbFunc)
})

res
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ module EventFunctions = {
~loadLayer,
~loaderHandler,
~logger,
~isInReorgThreshold=false,
~shouldSaveHistory=false,
) {
| Ok(_) => ()
| Error(e) => e->ErrorHandling.logAndRaise
Expand Down
15 changes: 11 additions & 4 deletions codegenerator/cli/templates/static/codegen/src/EventProcessing.res
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ let runEventHandler = (
~inMemoryStore,
~logger,
~loadLayer,
~isInReorgThreshold,
~shouldSaveHistory,
) => {
open ErrorHandling.ResultPropogateEnv
runAsyncEnv(async () => {
Expand All @@ -274,7 +274,7 @@ let runEventHandler = (
~loaderReturn,
~inMemoryStore,
~loadLayer,
~isInReorgThreshold,
~shouldSaveHistory,
),
) {
| exception exn =>
Expand Down Expand Up @@ -317,7 +317,7 @@ let runHandler = async (
~inMemoryStore,
~logger,
~loadLayer,
~isInReorgThreshold,
~shouldSaveHistory=config->Config.shouldSaveHistory(~isInReorgThreshold),
)
| None => Ok()
}
Expand Down Expand Up @@ -535,6 +535,7 @@ let getDynamicContractRegistrations = (
~inMemoryStore,
~isInReorgThreshold=false,
~config,
~safeChainIdAndBlockNumberArray=[], //No need to prune history for dynamic contract pre registration
) {
| exception exn =>
exn->ErrorHandling.make(~msg="Failed writing batch to database", ~logger)->Error->propogate
Expand All @@ -556,6 +557,7 @@ let processEventBatch = (
~checkContractIsRegistered,
~loadLayer,
~config,
~safeChainIdAndBlockNumberArray,
) => {
let logger = Logging.createChild(
~params={
Expand Down Expand Up @@ -602,7 +604,12 @@ let processEventBatch = (

let elapsedTimeAfterProcess = timeRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis

switch await DbFunctions.sql->IO.executeBatch(~inMemoryStore, ~isInReorgThreshold, ~config) {
switch await DbFunctions.sql->IO.executeBatch(
~inMemoryStore,
~isInReorgThreshold,
~config,
~safeChainIdAndBlockNumberArray,
) {
| exception exn =>
exn->ErrorHandling.make(~msg="Failed writing batch to database", ~logger)->Error->propogate
| () => ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,12 @@ module Float = {
@genType
type t = float

external fromStringUnsafe: string => float = "Number"

let schema = S.union([
S.float,
//This is needed to parse entity history json fields
S.string->S.transform(_s => {
parser: string => string->fromStringUnsafe,
serializer: Utils.magic,
}),
])->S.setName("GqlDbCustomTypes.Float")
let schema = S.float->S.setName("GqlDbCustomTypes.Float")
}

// Schema allows parsing strings or numbers to ints
// this is needed for entity_history field where we on the first copy we encode all numbers as strings
// to avoid loss of precision. Otherwise we always serialize to int
module Int = {
@genType
type t = int

external fromStringUnsafe: string => int = "Number"

let schema = S.union([
S.int,
//This is needed to parse entity history json fields
S.string->S.transform(_s => {
parser: string => string->fromStringUnsafe,
serializer: Utils.magic,
}),
])->S.setName("GqlDbCustomTypes.Int")
let schema = S.int->S.setName("GqlDbCustomTypes.Int")
}
76 changes: 48 additions & 28 deletions codegenerator/cli/templates/static/codegen/src/db/DbFunctions.res
Original file line number Diff line number Diff line change
Expand Up @@ -242,23 +242,6 @@ module DynamicContractRegistry = {
}

module EntityHistory = {
//Given chainId, blockTimestamp, blockNumber
//Delete all rows where chain_id = chainId and block_timestamp < blockTimestamp and block_number < blockNumber
//But keep 1 row that is satisfies this condition and has the most recent block_number
@module("./DbFunctionsImplementation.js")
external deleteAllEntityHistoryOnChainBeforeThreshold: (
Postgres.sql,
~chainId: int,
~blockNumberThreshold: int,
~blockTimestampThreshold: int,
) => promise<unit> = "deleteAllEntityHistoryOnChainBeforeThreshold"

@module("./DbFunctionsImplementation.js")
external batchSetInternal: (
Postgres.sql,
~entityHistoriesToSet: array<Js.Json.t>,
) => promise<unit> = "batchInsertEntityHistory"

type dynamicSqlQuery
module UnorderedMultichain = {
@module("./DbFunctionsImplementation.js")
Expand Down Expand Up @@ -303,6 +286,36 @@ module EntityHistory = {
~getFirstChangeSerial: Postgres.sql => dynamicSqlQuery,
) => promise<unit> = "deleteRolledBackEntityHistory"

type chainIdAndBlockNumber = {
chainId: int,
blockNumber: int,
}

@module("./DbFunctionsImplementation.js")
external pruneStaleEntityHistoryInternal: (
Postgres.sql,
~entityName: Enums.EntityType.t,
~safeChainIdAndBlockNumberArray: array<chainIdAndBlockNumber>,
) => promise<unit> = "pruneStaleEntityHistory"

let pruneStaleEntityHistory = async (sql, ~entityName, ~safeChainIdAndBlockNumberArray) => {
try await sql->pruneStaleEntityHistoryInternal(
~entityName,
~safeChainIdAndBlockNumberArray,
) catch {
| exn =>
exn->ErrorHandling.mkLogAndRaise(
~msg=`Failed to prune stale entity history`,
~logger=Logging.createChild(
~params={
"entityName": entityName,
"safeChainIdAndBlockNumberArray": safeChainIdAndBlockNumberArray,
},
),
)
}
}

module Args = {
type t =
| OrderedMultichain({safeBlockTimestamp: int, reorgChainId: int, safeBlockNumber: int})
Expand Down Expand Up @@ -493,17 +506,24 @@ module EntityHistory = {
firstChangeEventPerChain
}

let copyTableToEntityHistory = (sql, ~sourceTableName: Enums.EntityType.t): promise<unit> => {
sql->Postgres.unsafe(`SELECT copy_table_to_entity_history('${(sourceTableName :> string)}');`)
}

let copyAllEntitiesToEntityHistory = sql => {
sql->Postgres.beginSql(sql => {
Enums.EntityType.variants->Belt.Array.map(entityType => {
sql->copyTableToEntityHistory(~sourceTableName=entityType)
let hasRows = async () => {
let all =
await Entities.allEntities
->Belt.Array.map(async entityMod => {
let module(Entity) = entityMod
try await General.hasRows(sql, ~table=Entity.entityHistory.table) catch {
| exn =>
exn->ErrorHandling.mkLogAndRaise(
~msg=`Failed to check if entity history table has rows`,
~logger=Logging.createChild(
~params={
"entityName": Entity.name,
},
),
)
}
})
})
->Promise.all
all->Belt.Array.some(v => v)
}

let hasRows = () => General.hasRows(sql, ~table=TablesStatic.EntityHistory.table)
}
Loading

0 comments on commit 563737b

Please sign in to comment.