Skip to content

Commit

Permalink
[1] Entity History Rework (#310)
Browse files Browse the repository at this point in the history
* Implement entity history tables for each entity

* Wip implement per table insert history item function with tests

* Add entity history schema creator
  • Loading branch information
JonoPrest authored Nov 18, 2024
1 parent 46d00e8 commit 881a193
Show file tree
Hide file tree
Showing 3 changed files with 473 additions and 1 deletion.
243 changes: 243 additions & 0 deletions codegenerator/cli/templates/dynamic/codegen/src/db/Entities.res.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,240 @@ let isIndex = true
@genType
type whereOperations<'entity, 'fieldType> = {eq: 'fieldType => promise<array<'entity>>}
module EntityHistory = {
type historyFieldsGeneral<'a> = {
chain_id: 'a,
block_timestamp: 'a,
block_number: 'a,
log_index: 'a,
}

type historyFields = historyFieldsGeneral<int>

type historyRow<'entity> = {
current: historyFields,
previous: option<historyFields>,
entityData: 'entity,
}

type previousHistoryFields = historyFieldsGeneral<option<int>>

let previousHistoryFieldsSchema = S.object(s => {
chain_id: s.field("previous_entity_history_chain_id", S.null(S.int)),
block_timestamp: s.field("previous_entity_history_block_timestamp", S.null(S.int)),
block_number: s.field("previous_entity_history_block_number", S.null(S.int)),
log_index: s.field("previous_entity_history_log_index", S.null(S.int)),
})

let currentHistoryFieldsSchema = S.object(s => {
chain_id: s.field("entity_history_chain_id", S.int),
block_timestamp: s.field("entity_history_block_timestamp", S.int),
block_number: s.field("entity_history_block_number", S.int),
log_index: s.field("entity_history_log_index", S.int),
})

let makeHistoryRowSchema: S.t<'entity> => S.t<historyRow<'entity>> = entitySchema =>
S.object(s => {
{
"current": s.flatten(currentHistoryFieldsSchema),
"previous": s.flatten(previousHistoryFieldsSchema),
"entityData": s.flatten(entitySchema),
}
})->S.transform(s => {
parser: v => {
current: v["current"],
previous: switch v["previous"] {
| {
chain_id: Some(chain_id),
block_timestamp: Some(block_timestamp),
block_number: Some(block_number),
log_index: Some(log_index),
} =>
Some({
chain_id,
block_timestamp,
block_number,
log_index,
})
| {chain_id: None, block_timestamp: None, block_number: None, log_index: None} => None
| _ => s.fail("Unexpected mix of null and non-null values in previous history fields")
},
entityData: v["entityData"],
},
serializer: v =>
{
"current": v.current,
"entityData": v.entityData,
"previous": switch v.previous {
| Some({chain_id, block_timestamp, block_number, log_index}) => {
chain_id: Some(chain_id),
block_timestamp: Some(block_timestamp),
block_number: Some(block_number),
log_index: Some(log_index),
}
| None => {
chain_id: None,
block_timestamp: None,
block_number: None,
log_index: None,
}
},
},
})
type t<'entity> = {
table: table,
createInsertFnQuery: string,
// insertFn: historyRow<'entity> => promise<unit>,
}

type entityInternal

external castInternal: t<'entity> => t<entityInternal> = "%identity"

let fromTable = (table: table): t<'entity> => {
let entity_history_block_timestamp = "entity_history_block_timestamp"
let entity_history_chain_id = "entity_history_chain_id"
let entity_history_block_number = "entity_history_block_number"
let entity_history_log_index = "entity_history_log_index"

//NB: Ordered by hirarchy of event ordering
let currentChangeFieldNames = [
entity_history_block_timestamp,
entity_history_chain_id,
entity_history_block_number,
entity_history_log_index,
]

let currentHistoryFields =
currentChangeFieldNames->Belt.Array.map(fieldName =>
mkField(fieldName, Integer, ~isPrimaryKey)
)

let previousChangeFieldNames =
currentChangeFieldNames->Belt.Array.map(fieldName => "previous_" ++ fieldName)

let previousHistoryFields =
previousChangeFieldNames->Belt.Array.map(fieldName =>
mkField(fieldName, Integer, ~isNullable)
)

let id = "id"

let dataFields = table.fields->Belt.Array.keepMap(field =>
switch field {
| Field(field) =>
switch field.fieldName {
//id is not nullable and should be part of the pk
| "id" => {...field, fieldName: id, isPrimaryKey: true}->Field->Some
//db_write_timestamp can be removed for this. TODO: remove this when we depracate
//automatic db_write_timestamp creation
| "db_write_timestamp" => None
| _ =>
{
...field,
isNullable: true, //All entity fields are nullable in the case
isIndex: false, //No need to index any additional entity data fields in entity history
}
->Field
->Some
}

| DerivedFrom(_) => None
}
)

let dataFieldNames = dataFields->Belt.Array.map(field => field->getFieldName)

let originTableName = table.tableName
let historyTableName = originTableName ++ "_history"
//ignore composite indices
let table = mkTable(
historyTableName,
~fields=Belt.Array.concatMany([currentHistoryFields, previousHistoryFields, dataFields]),
)

let createInsertFnQuery = {
let insertFnName = `"insert_${table.tableName}"`
let historRowArg = "history_row"
let historyTablePath = `"public"."${historyTableName}"`
let originTablePath = `"public"."${originTableName}"`

let previousHistoryFieldsAreNullStr =
previousChangeFieldNames
->Belt.Array.map(fieldName => `${historRowArg}.${fieldName} IS NULL`)
->Js.Array2.joinWith(" OR ")

let currentChangeFieldNamesCommaSeparated = currentChangeFieldNames->Js.Array2.joinWith(", ")

let dataFieldNamesDoubleQuoted = dataFieldNames->Belt.Array.map(fieldName => `"${fieldName}"`)
let dataFieldNamesCommaSeparated = dataFieldNamesDoubleQuoted->Js.Array2.joinWith(", ")

let allFieldNames = Belt.Array.concatMany([
currentChangeFieldNames,
previousChangeFieldNames,
dataFieldNamesDoubleQuoted,
])

`CREATE OR REPLACE FUNCTION ${insertFnName}(${historRowArg} ${historyTablePath})
RETURNS void AS $$
DECLARE
v_previous_record RECORD;
v_origin_record RECORD;
BEGIN
-- Check if previous values are not provided
IF ${previousHistoryFieldsAreNullStr} THEN
-- Find the most recent record for the same id
SELECT ${currentChangeFieldNamesCommaSeparated} INTO v_previous_record
FROM ${historyTablePath}
WHERE ${id} = ${historRowArg}.${id}
ORDER BY ${currentChangeFieldNames
->Belt.Array.map(fieldName => fieldName ++ " DESC")
->Js.Array2.joinWith(", ")}
LIMIT 1;

-- If a previous record exists, use its values
IF FOUND THEN
${Belt.Array.zip(currentChangeFieldNames, previousChangeFieldNames)
->Belt.Array.map(((currentFieldName, previousFieldName)) => {
`${historRowArg}.${previousFieldName} := v_previous_record.${currentFieldName};`
})
->Js.Array2.joinWith(" ")}
ElSE
-- Check if a value for the id exists in the origin table and if so, insert a history row for it.
SELECT ${dataFieldNamesCommaSeparated} FROM ${originTablePath} WHERE id = ${historRowArg}.${id} INTO v_origin_record;
IF FOUND THEN
INSERT INTO ${historyTablePath} (${currentChangeFieldNamesCommaSeparated}, ${dataFieldNamesCommaSeparated})
-- SET the current change data fields to 0 since we don't know what they were
-- and it doesn't matter provided they are less than any new values
VALUES (${currentChangeFieldNames
->Belt.Array.map(_ => "0")
->Js.Array2.joinWith(", ")}, ${dataFieldNames
->Belt.Array.map(fieldName => `v_origin_record."${fieldName}"`)
->Js.Array2.joinWith(", ")});

${previousChangeFieldNames
->Belt.Array.map(previousFieldName => {
`${historRowArg}.${previousFieldName} := 0;`
})
->Js.Array2.joinWith(" ")}
END IF;
END IF;
END IF;

INSERT INTO ${historyTablePath} (${allFieldNames->Js.Array2.joinWith(", ")})
VALUES (${allFieldNames
->Belt.Array.map(fieldName => `${historRowArg}.${fieldName}`)
->Js.Array2.joinWith(", ")});
END;
$$ LANGUAGE plpgsql;
`
}

{table, createInsertFnQuery}
}
}

{{#each entities as |entity|}}
module {{entity.name.capitalized}} = {
let key = "{{entity.name.original}}"
Expand Down Expand Up @@ -86,6 +320,8 @@ module {{entity.name.capitalized}} = {
],
{{/if}}
)

let entityHistory = table->EntityHistory.fromTable
}

{{/each}}
Expand All @@ -109,6 +345,13 @@ let allTables: array<table> = [
{{entity.name.capitalized}}.table,
{{/each}}
]

let allEntityHistory: array<EntityHistory.t<EntityHistory.entityInternal>> = [
{{#each entities as |entity|}}
{{entity.name.capitalized}}.entityHistory->EntityHistory.castInternal,
{{/each}}
]

let schema = Schema.make(allTables)

@get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,9 @@ let runUpMigrations = async (~shouldExit) => {
)
})

let allEntityHistoryTables = Entities.allEntityHistory->Belt.Array.map(table => table.table)
//Create all tables with indices
await [TablesStatic.allTables, Entities.allTables]
await [TablesStatic.allTables, Entities.allTables, allEntityHistoryTables]
->Belt.Array.concatMany
->awaitEach(async table => {
await creatTableIfNotExists(DbFunctions.sql, table)->handleFailure(
Expand All @@ -458,6 +459,12 @@ let runUpMigrations = async (~shouldExit) => {
)
})

await Entities.allEntityHistory->awaitEach(async entityHistory => {
await sql
->Postgres.unsafe(entityHistory.createInsertFnQuery)
->handleFailure(~msg=`EE800: Error creating ${entityHistory.table.tableName} insert function`)
})

//Create extra entity history tables
await EntityHistory.createEntityHistoryTableFunctions()->handleFailure(
~msg=`EE800: Error creating entity history table`,
Expand Down
Loading

0 comments on commit 881a193

Please sign in to comment.