Skip to content

Commit

Permalink
Multiple fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
DZakh committed Dec 23, 2024
1 parent e0fa5f1 commit d155d82
Showing 1 changed file with 47 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,19 @@ let shallowCopyRegister = (register: register) => {

let copy = (self: t) => {
let pendingDynamicContracts = self.pendingDynamicContracts->Array.copy
let registers = self.registers->Array.map(shallowCopyRegister)
{
partitionId: self.partitionId,
registers: self.registers->Array.map(shallowCopyRegister),
mostBehindRegister: self.mostBehindRegister->shallowCopyRegister,
nextMostBehindRegister: self.nextMostBehindRegister->Option.map(shallowCopyRegister),
registers,
// Should use the reference to copied value
mostBehindRegister: registers
->Js.Array2.find(r => r.id == self.mostBehindRegister.id)
->Option.getExn,
nextMostBehindRegister: switch self.nextMostBehindRegister {
| Some(nextMostBehindRegister) =>
registers->Js.Array2.find(r => r.id == nextMostBehindRegister.id)->Option.getExn->Some
| None => None
},
pendingDynamicContracts,
isFetchingAtHead: self.isFetchingAtHead,
}
Expand Down Expand Up @@ -171,6 +179,10 @@ let mergeWithNextRegister = (register: register, ~next: register) => {

let dynamicContracts = DynamicContractsMap.merge(register.dynamicContracts, next.dynamicContracts)

if register.latestFetchedBlock.blockNumber !== next.latestFetchedBlock.blockNumber {
Js.Exn.raiseError("Invalid state: Merged registers should belong to the same block")
}

{
id: next.id,
fetchedEventQueue,
Expand All @@ -180,17 +192,7 @@ let mergeWithNextRegister = (register: register, ~next: register) => {
register.firstEventBlockNumber,
next.firstEventBlockNumber,
),
latestFetchedBlock: {
// FIXME: Validate that they are the same
blockTimestamp: Pervasives.max(
register.latestFetchedBlock.blockTimestamp,
next.latestFetchedBlock.blockTimestamp,
),
blockNumber: Pervasives.max(
register.latestFetchedBlock.blockNumber,
next.latestFetchedBlock.blockNumber,
),
},
latestFetchedBlock: next.latestFetchedBlock,
}
}

Expand Down Expand Up @@ -285,12 +287,18 @@ let updateInternal = (
registeringEventLogIndex,
dynamicContracts,
}) => {
// FIXME: Test there are no duplicates by id
makeDynamicContractRegister(
let register = makeDynamicContractRegister(
~registeringEventBlockNumber,
~registeringEventLogIndex,
~dynamicContractRegistrations=dynamicContracts,
)->add
)
// FIXME: Make it work with the case
if registers->Js.Array2.some(r => r.id == register.id) {
Js.Exn.raiseError(
"Invalid state: Register with the same id already exists for dynamic contract",
)
}
register->add
})

let registers = registerByLatestBlock->Js.Dict.values
Expand Down Expand Up @@ -332,74 +340,41 @@ let registerDynamicContract = (
}
}

/**
If a fetchState register has caught up to its next regisered node. Merge them and recurse.
If no merging happens, None is returned
*/
/**
/*
Updates node at given id with given values and checks to see if it can be merged into its next register.
Returns Error if the node with given id cannot be found (unexpected)
newItems are ordered earliest to latest (as they are returned from the worker)
*/
let // let rec pruneAndMergeNextRegistered = (register: register, ~isMerged=false) => {
// let merged = isMerged ? Some(register) : None
// switch register.registerType {
// | RootRegister => merged
// | DynamicContractRegister({nextRegister})
// if register.latestFetchedBlock.blockNumber <
// nextRegister.latestFetchedBlock.blockNumber => merged
// | DynamicContractRegister({nextRegister}) =>
// if register.latestFetchedBlock.blockNumber > nextRegister.latestFetchedBlock.blockNumber {
// let logger = Logging.createChild(
// ~params={
// "context": "Merging Dynamic Contract Registers",
// "currentRegister": {
// "id": register->getRegisterId,
// "latestFetchedBlock": register.latestFetchedBlock.blockNumber,
// "addresses": register.contractAddressMapping->ContractAddressingMap.getAllAddresses,
// },
// "nextRegister": {
// "id": nextRegister->getRegisterId,
// "latestFetchedBlock": nextRegister.latestFetchedBlock.blockNumber,
// "addresses": nextRegister.registerType->isRootRegister
// ? "Root"->Utils.magic
// : nextRegister.contractAddressMapping->ContractAddressingMap.getAllAddresses,
// },
// },
// )
// NextRegisterIsLessThanCurrent->ErrorHandling.mkLogAndRaise(
// ~msg="Unexpected: Dynamic contract register latest fetched block is greater than next register when it should be equal",
// ~logger,
// )
// }
// // Recursively look for other merges
// register->mergeWithNextRegistered->pruneAndMergeNextRegistered(~isMerged=true)
// }
// }

setFetchedItems = (
let setFetchedItems = (
fetchState: t,
~id,
~latestFetchedBlock: blockNumberAndTimestamp,
~newItems,
~currentBlockHeight,
): result<t, exn> => {
let isMissing = ref(true)

let registers = fetchState.registers->Array.map(r => {
if r.id == id {
isMissing := false
r->updateRegister(~latestFetchedBlock, ~reversedNewItems=newItems->Array.reverse)
} else {
r
}
})

fetchState
->updateInternal(
~registers,
~isFetchingAtHead=fetchState.isFetchingAtHead ||
currentBlockHeight <= latestFetchedBlock.blockNumber,
)
->Ok // FIXME: Check that it's missing
if isMissing.contents {
Error(UnexpectedRegisterDoesNotExist(id))
} else {
fetchState
->updateInternal(
~registers,
~isFetchingAtHead=fetchState.isFetchingAtHead ||
currentBlockHeight <= latestFetchedBlock.blockNumber,
)
->Ok
}
}

type nextQuery = {
Expand Down Expand Up @@ -448,10 +423,10 @@ let makeRegisterQuery = (register, ~partitionId, ~endBlock, ~nextRegister) => {
partitionId,
fetchStateRegisterId: register.id,
fromBlock,
toBlock: switch nextRegister {
| Some({latestFetchedBlock}) => Some(latestFetchedBlock.blockNumber)
| None => None
},
toBlock: Utils.Math.minOptInt(
nextRegister->Option.map(r => r.latestFetchedBlock.blockNumber),
endBlock,
),
contractAddressMapping: register.contractAddressMapping,
})
}
Expand Down Expand Up @@ -656,7 +631,7 @@ fetching registration
If there are pending dynamic contracts, we always need to allow the next query
*/
let isReadyForNextQuery = ({pendingDynamicContracts} as fetchState: t, ~maxQueueSize) =>
pendingDynamicContracts->Utils.Array.isEmpty
pendingDynamicContracts->Utils.Array.isEmpty // FIXME: This is won't work anymore
? fetchState.mostBehindRegister.fetchedEventQueue->Array.length < maxQueueSize
: true

Expand Down Expand Up @@ -846,7 +821,7 @@ let isActivelyIndexing = (
}
}

// FIXME: Pending
// FIXME: Should include pending contracts
let getNumContracts = ({registers}: t) => {
let sum = ref(0)
for idx in 0 to registers->Js.Array2.length - 1 {
Expand Down

0 comments on commit d155d82

Please sign in to comment.