From e0fa5f18feb35bf293855eff677bf1a7221a166e Mon Sep 17 00:00:00 2001 From: Dmitry Zakharov Date: Mon, 23 Dec 2024 21:12:58 +0400 Subject: [PATCH] Refactor FetchState to have flat registers --- .../codegen/src/eventFetching/FetchState.res | 968 ++++++------------ .../eventFetching/PartitionedFetchState.res | 12 +- .../src/eventFetching/SourceManager.res | 9 +- .../codegen/src/globalState/GlobalState.res | 19 - 4 files changed, 351 insertions(+), 657 deletions(-) diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res index 3e94b3af9..2fcef6fc3 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res @@ -83,18 +83,19 @@ module DynamicContractsMap = { } } +/** +An id for a given register. Either the root or a dynamic contract register +with a dynamicContractId +*/ +type id = Root | DynamicContract(dynamicContractId) + /** A state that holds a queue of events and data regarding what to fetch next. -If there are dynamic contracts currently catching up to the root register, -this the register field will hold "DynamicContractRegister" with id of the registering -contract and either the root register or a chain of "DynamicContractRegisters" ordered -from earliest registering event to latest with the RootRegister at the end of the chain. - -As one dynamic contract register catches up to the fetched blocknumebr of the next, it will -merge itself into the next register and combine queries/addresses and queues until fully caught -up to the root. +There's always a root register and dynamic contract registers, +until they are not merged with each other when cought up to the fetched block number. */ -type registerData = { +type register = { + id: id, latestFetchedBlock: blockNumberAndTimestamp, contractAddressMapping: ContractAddressingMap.mapping, //Events ordered from latest to earliest @@ -105,81 +106,21 @@ type registerData = { firstEventBlockNumber: option, } -type rec register = { - registerType: registerType, - ...registerData, -} -and registerType = - | RootRegister - | DynamicContractRegister({id: EventUtils.eventIndex, nextRegister: register}) - type dynamicContractRegistration = { registeringEventBlockNumber: int, registeringEventLogIndex: int, registeringEventChain: ChainMap.Chain.t, dynamicContracts: array, } + type t = { partitionId: int, - baseRegister: register, + registers: array, pendingDynamicContracts: array, isFetchingAtHead: bool, -} - -module Parent = { - type fetchState = register - type rec t = { - dynamicContractId: dynamicContractId, - parent: option, - ...registerData, - } - - let make = ( - { - latestFetchedBlock, - contractAddressMapping, - fetchedEventQueue, - dynamicContracts, - firstEventBlockNumber, - }: fetchState, - ~dynamicContractId, - ~parent=None, - ): t => { - latestFetchedBlock, - contractAddressMapping, - fetchedEventQueue, - dynamicContracts, - firstEventBlockNumber, - dynamicContractId, - parent, - } - - let rec joinChild = ( - { - latestFetchedBlock, - contractAddressMapping, - fetchedEventQueue, - dynamicContracts, - firstEventBlockNumber, - dynamicContractId, - parent, - }: t, - child: fetchState, - ) => { - let joined: fetchState = { - registerType: DynamicContractRegister({id: dynamicContractId, nextRegister: child}), - latestFetchedBlock, - contractAddressMapping, - fetchedEventQueue, - dynamicContracts, - firstEventBlockNumber, - } - - switch parent { - | Some(parent) => parent->joinChild(joined) - | None => joined - } - } + // Fields computed by updateInternal + mostBehindRegister: register, + nextMostBehindRegister: option, } let shallowCopyRegister = (register: register) => { @@ -188,98 +129,71 @@ let shallowCopyRegister = (register: register) => { } let copy = (self: t) => { - let rec loop = (register: register, ~parent=?) => - switch register.registerType { - | RootRegister => - let copied = register->shallowCopyRegister - switch parent { - | Some(parent) => parent->Parent.joinChild(copied) - | None => copied - } - | DynamicContractRegister({id, nextRegister}) => - nextRegister->loop( - ~parent=register->shallowCopyRegister->Parent.make(~dynamicContractId=id, ~parent), - ) - } - - let baseRegister = loop(self.baseRegister) let pendingDynamicContracts = self.pendingDynamicContracts->Array.copy { partitionId: self.partitionId, - baseRegister, + registers: self.registers->Array.map(shallowCopyRegister), + mostBehindRegister: self.mostBehindRegister->shallowCopyRegister, + nextMostBehindRegister: self.nextMostBehindRegister->Option.map(shallowCopyRegister), pendingDynamicContracts, isFetchingAtHead: self.isFetchingAtHead, } } -/** + +let isRegisterBehind = (r1, r2: register) => + r1.latestFetchedBlock.blockNumber < r2.latestFetchedBlock.blockNumber + +/* Comapritor for two events from the same chain. No need for chain id or timestamp */ +/* +Returns the latest of two events on the same chain +*/ let getEventCmp = (event: Internal.eventItem) => { (event.blockNumber, event.logIndex) } -/** -Returns the latest of two events on the same chain -*/ let eventCmp = (a, b) => a->getEventCmp > b->getEventCmp -/** +/* Merges two event queues on a single event fetcher Pass the shorter list into A for better performance */ let mergeSortedEventList = (a, b) => Utils.Array.mergeSorted(eventCmp, a, b) -/** -Merges a node into its next registered branch. Combines contract address mappings and queues -*/ -let mergeIntoNextRegistered = (self: register) => { - switch self.registerType { - | DynamicContractRegister({nextRegister}) => - let fetchedEventQueue = mergeSortedEventList( - self.fetchedEventQueue, - nextRegister.fetchedEventQueue, - ) - let contractAddressMapping = ContractAddressingMap.combine( - self.contractAddressMapping, - nextRegister.contractAddressMapping, - ) +let mergeWithNextRegister = (register: register, ~next: register) => { + let fetchedEventQueue = mergeSortedEventList(register.fetchedEventQueue, next.fetchedEventQueue) + let contractAddressMapping = ContractAddressingMap.combine( + register.contractAddressMapping, + next.contractAddressMapping, + ) - let dynamicContracts = DynamicContractsMap.merge( - self.dynamicContracts, - nextRegister.dynamicContracts, - ) + let dynamicContracts = DynamicContractsMap.merge(register.dynamicContracts, next.dynamicContracts) - { - registerType: nextRegister.registerType, - fetchedEventQueue, - contractAddressMapping, - dynamicContracts, - firstEventBlockNumber: Utils.Math.minOptInt( - self.firstEventBlockNumber, - nextRegister.firstEventBlockNumber, + { + id: next.id, + fetchedEventQueue, + contractAddressMapping, + dynamicContracts, + firstEventBlockNumber: Utils.Math.minOptInt( + register.firstEventBlockNumber, + next.firstEventBlockNumber, + ), + latestFetchedBlock: { + // FIXME: Validate that they are the same + blockTimestamp: Pervasives.max( + register.latestFetchedBlock.blockTimestamp, + next.latestFetchedBlock.blockTimestamp, ), - latestFetchedBlock: { - blockTimestamp: Pervasives.max( - self.latestFetchedBlock.blockTimestamp, - nextRegister.latestFetchedBlock.blockTimestamp, - ), - blockNumber: Pervasives.max( - self.latestFetchedBlock.blockNumber, - nextRegister.latestFetchedBlock.blockNumber, - ), - }, - } - | RootRegister => self //already merged + blockNumber: Pervasives.max( + register.latestFetchedBlock.blockNumber, + next.latestFetchedBlock.blockNumber, + ), + }, } } -/** -An id for a given register. Either the root or a dynamic contract register -with a dynamicContractId -*/ -type id = Root | DynamicContract(dynamicContractId) - let registerIdToString = (id: id) => switch id { | Root => "root" @@ -287,16 +201,6 @@ let registerIdToString = (id: id) => `dynamic-${blockNumber->Int.toString}-${logIndex->Int.toString}` } -/** -Constructs id from a register -*/ -let getRegisterId = (self: register) => { - switch self.registerType { - | RootRegister => Root - | DynamicContractRegister({id}) => DynamicContract(id) - } -} - exception UnexpectedRegisterDoesNotExist(id) /** @@ -321,59 +225,20 @@ let updateRegister = ( } } -/** -Updates node at the given id with the values passed. -Errors if the node can't be found. -*/ -let rec updateInternal = ( - register: register, - ~id, - ~latestFetchedBlock, - ~reversedNewItems, - ~parent: option=?, -): result => { - let handleParent = (updated: register) => { - switch parent { - | Some(parent) => parent->Parent.joinChild(updated)->Ok - | None => updated->Ok - } - } - - switch (register.registerType, id) { - | (RootRegister, Root) => - register - ->updateRegister(~reversedNewItems, ~latestFetchedBlock) - ->handleParent - | (DynamicContractRegister({id}), DynamicContract(targetId)) if id == targetId => - register - ->updateRegister(~reversedNewItems, ~latestFetchedBlock) - ->handleParent - | (DynamicContractRegister({id: dynamicContractId, nextRegister}), id) => - nextRegister->updateInternal( - ~id, - ~latestFetchedBlock, - ~reversedNewItems, - ~parent=register->Parent.make(~dynamicContractId, ~parent), - ) - | (RootRegister, DynamicContract(_)) => Error(UnexpectedRegisterDoesNotExist(id)) - } -} - -/** -Inserts a dynamic contract register to the head of a given -register. It will then precede the given register in the chain -*/ -let addNewRegisterToHead = ( - self, +let makeDynamicContractRegister = ( ~registeringEventBlockNumber, ~registeringEventLogIndex, - ~contractAddressMapping, + ~dynamicContractRegistrations: array, ) => { let id: dynamicContractId = { blockNumber: registeringEventBlockNumber, logIndex: registeringEventLogIndex, } - let registerType = DynamicContractRegister({id, nextRegister: self}) + + let contractAddressMapping = + dynamicContractRegistrations + ->Array.map(d => (d.contractAddress, (d.contractType :> string))) + ->ContractAddressingMap.fromArray let dynamicContracts = DynamicContractsMap.empty->DynamicContractsMap.add( @@ -382,7 +247,7 @@ let addNewRegisterToHead = ( ) { - registerType, + id: DynamicContract(id), latestFetchedBlock: { blockNumber: registeringEventBlockNumber - 1, blockTimestamp: 0, @@ -394,49 +259,49 @@ let addNewRegisterToHead = ( } } -/** -Adds a new dynamic contract registration. It inserts the registration ordered in the -chain from earliest registered contract to latest. So if this is being called on a batch -of registrations its best to do this in order of latest to earliest to reduce recursions -of this function. -*/ -let rec addDynamicContractRegister = ( - self: register, - ~registeringEventBlockNumber, - ~registeringEventLogIndex, - ~dynamicContractRegistrations: array, - ~parent: option=?, -) => { - let handleParent = updated => - switch parent { - | Some(parent) => parent->Parent.joinChild(updated) - | None => updated +/* + Update fetchState, merge registers and recompute derived values + */ +let updateInternal = ( + fetchState: t, + ~registers=fetchState.registers, + ~pendingDynamicContracts=fetchState.pendingDynamicContracts, + ~isFetchingAtHead=fetchState.isFetchingAtHead, +): t => { + let registerByLatestBlock = Js.Dict.empty() + let add = register => { + let key = register.latestFetchedBlock.blockNumber->Js.Int.toString + let mergedRegister = switch registerByLatestBlock->Utils.Dict.dangerouslyGetNonOption(key) { + | Some(next) => register->mergeWithNextRegister(~next) + | None => register } + registerByLatestBlock->Js.Dict.set(key, mergedRegister) + } - let addToHead = updated => - updated - ->addNewRegisterToHead( - ~contractAddressMapping=dynamicContractRegistrations - ->Array.map(d => (d.contractAddress, (d.contractType :> string))) - ->ContractAddressingMap.fromArray, - ~registeringEventLogIndex, - ~registeringEventBlockNumber, - ) - ->handleParent - - let latestFetchedBlockNumber = registeringEventBlockNumber - 1 + registers->Array.forEach(add) - switch self.registerType { - | RootRegister => self->addToHead - | DynamicContractRegister(_) if latestFetchedBlockNumber <= self.latestFetchedBlock.blockNumber => - self->addToHead - | DynamicContractRegister({id: dynamicContractId, nextRegister}) => - nextRegister->addDynamicContractRegister( + pendingDynamicContracts->Array.forEach(({ + registeringEventBlockNumber, + registeringEventLogIndex, + dynamicContracts, + }) => { + // FIXME: Test there are no duplicates by id + makeDynamicContractRegister( ~registeringEventBlockNumber, ~registeringEventLogIndex, - ~dynamicContractRegistrations, - ~parent=self->Parent.make(~dynamicContractId, ~parent), - ) + ~dynamicContractRegistrations=dynamicContracts, + )->add + }) + + let registers = registerByLatestBlock->Js.Dict.values + { + partitionId: fetchState.partitionId, + pendingDynamicContracts: [], + // Js automatically sorts numeric dict keys + mostBehindRegister: registers->Js.Array2.unsafe_get(0), + nextMostBehindRegister: registers->Belt.Array.get(1), + registers, + isFetchingAtHead, } } @@ -446,104 +311,95 @@ contract registrations. These pending registrations are applied to the base regi query is called. */ let registerDynamicContract = ( - self: t, + fetchState: t, registration: dynamicContractRegistration, ~isFetchingAtHead, + ~endBlock, ) => { - ...self, - pendingDynamicContracts: self.pendingDynamicContracts->Array.concat([registration]), - isFetchingAtHead, -} - -let addDynamicContractRegisters = (baseRegister, pendingDynamicContracts) => { - pendingDynamicContracts->Array.reduce(baseRegister, ( - baseRegister, - {registeringEventBlockNumber, registeringEventLogIndex, dynamicContracts}, - ) => { - baseRegister->addDynamicContractRegister( - ~registeringEventBlockNumber, - ~registeringEventLogIndex, - ~dynamicContractRegistrations=dynamicContracts, - ) - }) -} - -exception NextRegisterIsLessThanCurrent - -let isRootRegister = registerType => - switch registerType { - | RootRegister => true - | DynamicContractRegister(_) => false + let updatedFetchState = { + ...fetchState, + pendingDynamicContracts: fetchState.pendingDynamicContracts->Array.concat([registration]), + isFetchingAtHead, + } + let isStoppedFetching = switch endBlock { + | Some(endBlock) => fetchState.mostBehindRegister.latestFetchedBlock.blockNumber >= endBlock + | None => false + } + if isStoppedFetching { + updatedFetchState->updateInternal + } else { + updatedFetchState } +} /** If a fetchState register has caught up to its next regisered node. Merge them and recurse. If no merging happens, None is returned */ -let rec pruneAndMergeNextRegistered = (register: register, ~isMerged=false) => { - let merged = isMerged ? Some(register) : None - switch register.registerType { - | RootRegister => merged - | DynamicContractRegister({nextRegister}) - if register.latestFetchedBlock.blockNumber < - nextRegister.latestFetchedBlock.blockNumber => merged - | DynamicContractRegister({nextRegister}) => - if register.latestFetchedBlock.blockNumber > nextRegister.latestFetchedBlock.blockNumber { - let logger = Logging.createChild( - ~params={ - "context": "Merging Dynamic Contract Registers", - "currentRegister": { - "id": register->getRegisterId, - "latestFetchedBlock": register.latestFetchedBlock.blockNumber, - "addresses": register.contractAddressMapping->ContractAddressingMap.getAllAddresses, - }, - "nextRegister": { - "id": nextRegister->getRegisterId, - "latestFetchedBlock": nextRegister.latestFetchedBlock.blockNumber, - "addresses": nextRegister.registerType->isRootRegister - ? "Root"->Utils.magic - : nextRegister.contractAddressMapping->ContractAddressingMap.getAllAddresses, - }, - }, - ) - NextRegisterIsLessThanCurrent->ErrorHandling.mkLogAndRaise( - ~msg="Unexpected: Dynamic contract register latest fetched block is greater than next register when it should be equal", - ~logger, - ) - } - // Recursively look for other merges - register->mergeIntoNextRegistered->pruneAndMergeNextRegistered(~isMerged=true) - } -} - /** Updates node at given id with given values and checks to see if it can be merged into its next register. Returns Error if the node with given id cannot be found (unexpected) newItems are ordered earliest to latest (as they are returned from the worker) */ -let update = ( - {baseRegister, pendingDynamicContracts, isFetchingAtHead, partitionId}: t, +let // let rec pruneAndMergeNextRegistered = (register: register, ~isMerged=false) => { +// let merged = isMerged ? Some(register) : None +// switch register.registerType { +// | RootRegister => merged +// | DynamicContractRegister({nextRegister}) +// if register.latestFetchedBlock.blockNumber < +// nextRegister.latestFetchedBlock.blockNumber => merged +// | DynamicContractRegister({nextRegister}) => +// if register.latestFetchedBlock.blockNumber > nextRegister.latestFetchedBlock.blockNumber { +// let logger = Logging.createChild( +// ~params={ +// "context": "Merging Dynamic Contract Registers", +// "currentRegister": { +// "id": register->getRegisterId, +// "latestFetchedBlock": register.latestFetchedBlock.blockNumber, +// "addresses": register.contractAddressMapping->ContractAddressingMap.getAllAddresses, +// }, +// "nextRegister": { +// "id": nextRegister->getRegisterId, +// "latestFetchedBlock": nextRegister.latestFetchedBlock.blockNumber, +// "addresses": nextRegister.registerType->isRootRegister +// ? "Root"->Utils.magic +// : nextRegister.contractAddressMapping->ContractAddressingMap.getAllAddresses, +// }, +// }, +// ) +// NextRegisterIsLessThanCurrent->ErrorHandling.mkLogAndRaise( +// ~msg="Unexpected: Dynamic contract register latest fetched block is greater than next register when it should be equal", +// ~logger, +// ) +// } +// // Recursively look for other merges +// register->mergeWithNextRegistered->pruneAndMergeNextRegistered(~isMerged=true) +// } +// } + +setFetchedItems = ( + fetchState: t, ~id, ~latestFetchedBlock: blockNumberAndTimestamp, ~newItems, ~currentBlockHeight, ): result => { - let isFetchingAtHead = isFetchingAtHead || currentBlockHeight <= latestFetchedBlock.blockNumber - - baseRegister - ->updateInternal(~id, ~latestFetchedBlock, ~reversedNewItems=newItems->Array.reverse) - ->Result.map(updatedRegister => { - let withNewDynamicContracts = - updatedRegister->addDynamicContractRegisters(pendingDynamicContracts) - let maybeMerged = withNewDynamicContracts->pruneAndMergeNextRegistered - { - partitionId, - baseRegister: maybeMerged->Option.getWithDefault(withNewDynamicContracts), - pendingDynamicContracts: [], - isFetchingAtHead, + let registers = fetchState.registers->Array.map(r => { + if r.id == id { + r->updateRegister(~latestFetchedBlock, ~reversedNewItems=newItems->Array.reverse) + } else { + r } }) + + fetchState + ->updateInternal( + ~registers, + ~isFetchingAtHead=fetchState.isFetchingAtHead || + currentBlockHeight <= latestFetchedBlock.blockNumber, + ) + ->Ok // FIXME: Check that it's missing } type nextQuery = { @@ -561,9 +417,10 @@ let getQueryLogger = ( ) => { let fetchStateRegister = fetchStateRegisterId->registerIdToString let allAddresses = contractAddressMapping->ContractAddressingMap.getAllAddresses - let addresses = allAddresses->Js.Array2.slice(~start=0, ~end_=3)->Array.map(addr => addr->Address.toString) + let addresses = + allAddresses->Js.Array2.slice(~start=0, ~end_=3)->Array.map(addr => addr->Address.toString) let restCount = allAddresses->Array.length - addresses->Array.length - if restCount > 0 { + if restCount > 0 { addresses->Js.Array2.push(`... and ${restCount->Int.toString} more`)->ignore } let params = { @@ -575,96 +432,41 @@ let getQueryLogger = ( Logging.createChildFrom(~logger, ~params) } -let minOfOption: (int, option) => int = (a: int, b: option) => { - switch (a, b) { - | (a, Some(b)) => min(a, b) - | (a, None) => a - } -} - -let getNextFromBlock = ({latestFetchedBlock}: register) => { - switch latestFetchedBlock.blockNumber { - | 0 => 0 - | latestFetchedBlockNumber => latestFetchedBlockNumber + 1 - } -} - type nextQueryOrDone = | NextQuery(nextQuery) | Done -/** -Applies pending dynamic contract registrations to the base register -Returns None if there are no pending dynamic contracts -and Some with the updated fetch state if there are pending dynamic contracts -*/ -let applyPendingDynamicContractRegistrations = (self: t) => { - switch self.pendingDynamicContracts { - | [] => None - | pendingDynamicContracts => - Some({ - ...self, - baseRegister: self.baseRegister->addDynamicContractRegisters(pendingDynamicContracts), - pendingDynamicContracts: [], - }) +let makeRegisterQuery = (register, ~partitionId, ~endBlock, ~nextRegister) => { + let fromBlock = switch register.latestFetchedBlock.blockNumber { + | 0 => 0 + | latestFetchedBlockNumber => latestFetchedBlockNumber + 1 } -} - -let mergeRegistersBeforeNextQuery = (self: t) => { - let mapMaybeMerge = (fetchState: t) => - fetchState.baseRegister - ->pruneAndMergeNextRegistered - ->Option.map(merged => { - ...fetchState, - baseRegister: merged, + switch endBlock { + | Some(endBlock) if fromBlock > endBlock => Done + | _ => + NextQuery({ + partitionId, + fetchStateRegisterId: register.id, + fromBlock, + toBlock: switch nextRegister { + | Some({latestFetchedBlock}) => Some(latestFetchedBlock.blockNumber) + | None => None + }, + contractAddressMapping: register.contractAddressMapping, }) - - //First apply pending dynamic contracts, then try and merge - //These steps should only happen before getNextQuery, to avoid in between states where a - //query is in flight and the underlying registers are changing - let maybeUpdatedFetchState = switch self->applyPendingDynamicContractRegistrations { - | Some(updatedWithDynamicContracts) => - //After adding the pending dynamic contracts, try and merge registers - switch updatedWithDynamicContracts->mapMaybeMerge { - //Pass through the merged value if it updated anything - | Some(merged) => Some(merged) - //Even if the merge returned none, the pending dynamic contracts should be applied - //as an updated - | None => Some(updatedWithDynamicContracts) - } - //If no dynamic contracts were added just try and merge - | None => self->mapMaybeMerge } - - maybeUpdatedFetchState->Option.getWithDefault(self) } /** Gets the next query either with a to block -of the nextRegistered latestBlockNumber to catch up and merge -or None if we don't care about an end block of a query +to catch up to another registery or without endBlock if all registries are merged */ -let getNextQuery = ({baseRegister, partitionId}: t, ~endBlock) => { - let fromBlock = getNextFromBlock(baseRegister) - switch (baseRegister.registerType, endBlock) { - | (RootRegister, Some(endBlock)) if fromBlock > endBlock => Done - | (RootRegister, _) => - NextQuery({ - partitionId, - fetchStateRegisterId: Root, - fromBlock, - toBlock: endBlock, - contractAddressMapping: baseRegister.contractAddressMapping, - }) - | (DynamicContractRegister({id, nextRegister: {latestFetchedBlock}}), _) => - NextQuery({ - partitionId, - fetchStateRegisterId: DynamicContract(id), - fromBlock, - toBlock: Some(latestFetchedBlock.blockNumber), - contractAddressMapping: baseRegister.contractAddressMapping, - }) - } +let getNextQuery = ({partitionId, mostBehindRegister, nextMostBehindRegister}: t, ~endBlock) => { + mostBehindRegister->makeRegisterQuery( + ~partitionId, + ~endBlock, + ~nextRegister=nextMostBehindRegister, + ) } type itemWithPopFn = {item: Internal.eventItem, popItemOffQueue: unit => unit} @@ -717,53 +519,11 @@ let qItemLt = (a, b) => a->getCmpVal < b->getCmpVal Returns queue item WITHOUT the updated fetch state. Used for checking values not updating state */ -let getEarliestEventInRegister = (self: register) => { - switch self.fetchedEventQueue->Utils.Array.last { +let getEarliestEventInRegister = (register: register) => { + switch register.fetchedEventQueue->Utils.Array.last { | Some(head) => - Item({item: head, popItemOffQueue: () => self.fetchedEventQueue->Js.Array2.pop->ignore}) - | None => makeNoItem(self) - } -} - -/** -Recurses through all registers and finds the register with the earliest queue item, -then returns its id. -*/ -let rec findRegisterIdWithEarliestQueueItem = (~currentEarliestRegister=?, register: register) => { - let currentEarliestRegister = switch currentEarliestRegister { - | None => register - | Some(currentEarliestRegister) => - if ( - register - ->getEarliestEventInRegister - ->qItemLt(currentEarliestRegister->getEarliestEventInRegister) - ) { - register - } else { - currentEarliestRegister - } - } - - switch register.registerType { - | RootRegister => currentEarliestRegister->getRegisterId - | DynamicContractRegister({nextRegister}) => - nextRegister->findRegisterIdWithEarliestQueueItem(~currentEarliestRegister) - } -} - -/** -Given a register id, pop a queue item off of that register and return the entire updated -fetch state with that item. - -Recurses through registers and Errors if ID does not exist -*/ -let rec popQItemAtRegisterId = (register: register, ~id) => { - switch register.registerType { - | RootRegister - | DynamicContractRegister(_) if id == register->getRegisterId => - register->getEarliestEventInRegister->Ok - | DynamicContractRegister({nextRegister}) => nextRegister->popQItemAtRegisterId(~id) - | RootRegister => Error(UnexpectedRegisterDoesNotExist(id)) + Item({item: head, popItemOffQueue: () => register.fetchedEventQueue->Js.Array2.pop->ignore}) + | None => makeNoItem(register) } } @@ -773,26 +533,36 @@ Gets the earliest queueItem from thgetNodeEarliestEventWithUpdatedQueue. Finds the earliest queue item across all registers and then returns that queue item with an update fetch state. */ -let getEarliestEvent = (self: t) => { - let earliestItemInRegisters = { - let registerWithEarliestQItem = self.baseRegister->findRegisterIdWithEarliestQueueItem - //Can safely unwrap here since the id is returned from self and so is guarenteed to exist - self.baseRegister->popQItemAtRegisterId(~id=registerWithEarliestQItem)->Utils.unwrapResultExn +let getEarliestEvent = (fetchState: t) => { + let earliestItemInRegisters = switch fetchState.registers { + | [r] => r->getEarliestEventInRegister + | registers => { + let item = ref(registers->Js.Array2.unsafe_get(0)->getEarliestEventInRegister) + for idx in 1 to registers->Js.Array2.length - 1 { + let register = registers->Js.Array2.unsafe_get(idx) + let registerItem = register->getEarliestEventInRegister + if registerItem->qItemLt(item.contents) { + item := registerItem + } + } + item.contents + } } - if self.pendingDynamicContracts->Utils.Array.isEmpty { + if fetchState.pendingDynamicContracts->Utils.Array.isEmpty { //In the case where there are no pending dynamic contracts, return the earliest item //from the registers earliestItemInRegisters } else { //In the case where there are pending dynamic contracts, construct the earliest queue item from //the pending dynamic contracts - let earliestPendingDynamicContractBlockNumber = self.pendingDynamicContracts->Array.reduce( - (self.pendingDynamicContracts->Js.Array2.unsafe_get(0)).registeringEventBlockNumber, - (accumBlockNumber, dynamicContractRegistration) => { - min(accumBlockNumber, dynamicContractRegistration.registeringEventBlockNumber) - }, - ) + let earliestPendingDynamicContractBlockNumber = + fetchState.pendingDynamicContracts->Array.reduce( + (fetchState.pendingDynamicContracts->Js.Array2.unsafe_get(0)).registeringEventBlockNumber, + (accumBlockNumber, dynamicContractRegistration) => { + min(accumBlockNumber, dynamicContractRegistration.registeringEventBlockNumber) + }, + ) let earliestItemInPendingDynamicContracts = NoItem({ blockTimestamp: 0, @@ -841,8 +611,8 @@ let make = ( accum->DynamicContractsMap.addAddress(dynamicContractId, contractAddress) }) - let baseRegister = { - registerType: RootRegister, + let rootRegister = { + id: Root, latestFetchedBlock: { blockTimestamp: 0, // Here's a bug that startBlock: 1 won't work @@ -856,7 +626,9 @@ let make = ( { partitionId, - baseRegister, + registers: [rootRegister], + mostBehindRegister: rootRegister, + nextMostBehindRegister: None, pendingDynamicContracts: [], isFetchingAtHead, } @@ -865,16 +637,15 @@ let make = ( /** Calculates the cummulative queue sizes in all registers */ -let rec registerQueueSize = (register: register, ~accum=0) => { - let accum = register.fetchedEventQueue->Array.length + accum - switch register.registerType { - | RootRegister => accum - | DynamicContractRegister({nextRegister}) => nextRegister->registerQueueSize(~accum) +let queueSize = ({registers}: t) => { + let size = ref(0) + for idx in 0 to registers->Js.Array2.length - 1 { + let register = registers->Js.Array2.unsafe_get(idx) + size := size.contents + register.fetchedEventQueue->Array.length } + size.contents } -let queueSize = (self: t) => self.baseRegister->registerQueueSize - /** Check the max queue size of the tip of the tree. @@ -884,9 +655,9 @@ fetching registration If there are pending dynamic contracts, we always need to allow the next query */ -let isReadyForNextQuery = ({pendingDynamicContracts, baseRegister}: t, ~maxQueueSize) => +let isReadyForNextQuery = ({pendingDynamicContracts} as fetchState: t, ~maxQueueSize) => pendingDynamicContracts->Utils.Array.isEmpty - ? baseRegister.fetchedEventQueue->Array.length < maxQueueSize + ? fetchState.mostBehindRegister.fetchedEventQueue->Array.length < maxQueueSize : true let warnIfAttemptedAddressRegisterOnDifferentContracts = ( @@ -910,38 +681,6 @@ let warnIfAttemptedAddressRegisterOnDifferentContracts = ( } } -let rec checkBaseRegisterContainsRegisteredContract = ( - register: register, - ~contractName, - ~contractAddress, - ~chainId, -) => { - switch register.contractAddressMapping->ContractAddressingMap.getContractNameFromAddress( - ~contractAddress, - ) { - | Some(existingContractName) => - if existingContractName != contractName { - warnIfAttemptedAddressRegisterOnDifferentContracts( - ~contractAddress, - ~contractName, - ~existingContractName, - ~chainId, - ) - } - true - | _ => - switch register.registerType { - | RootRegister => false - | DynamicContractRegister({nextRegister}) => - nextRegister->checkBaseRegisterContainsRegisteredContract( - ~contractName, - ~contractAddress, - ~chainId, - ) - } - } -} - /** Recurses through registers and determines whether a contract has already been registered with the given name and address @@ -952,11 +691,23 @@ let checkContainsRegisteredContractAddress = ( ~contractAddress, ~chainId, ) => { - self.baseRegister->checkBaseRegisterContainsRegisteredContract( - ~contractName, - ~contractAddress, - ~chainId, - ) || + self.registers->Array.some(r => { + switch r.contractAddressMapping->ContractAddressingMap.getContractNameFromAddress( + ~contractAddress, + ) { + | Some(existingContractName) => + if existingContractName != contractName { + warnIfAttemptedAddressRegisterOnDifferentContracts( + ~contractAddress, + ~contractName, + ~existingContractName, + ~chainId, + ) + } + true + | None => false + } + }) || self.pendingDynamicContracts->Array.some(({dynamicContracts}) => dynamicContracts->Array.some(dcr => { let exists = dcr.contractAddress == contractAddress @@ -976,26 +727,23 @@ let checkContainsRegisteredContractAddress = ( /** * Returns the latest block number fetched for the lowest fetcher queue (ie the earliest un-fetched dynamic contract) */ -let getLatestFullyFetchedBlock = (self: t) => { +let getLatestFullyFetchedBlock = ({mostBehindRegister, pendingDynamicContracts}: t) => { + let latestFullyFetchedBlock = ref(mostBehindRegister.latestFetchedBlock) + //Consider pending dynamic contracts when calculating the latest fully fetched block //Since they are now registered lazily on query or update of the fetchstate, not when //the register function is called - let minPendingDynamicContracts = self.pendingDynamicContracts->Belt.Array.reduce(None, ( - acc, - contract, - ) => { + pendingDynamicContracts->Js.Array2.forEach(contract => { let {registeringEventBlockNumber} = contract - minOfOption(registeringEventBlockNumber - 1, acc)->Some + if registeringEventBlockNumber < latestFullyFetchedBlock.contents.blockNumber { + latestFullyFetchedBlock := { + blockNumber: registeringEventBlockNumber, + blockTimestamp: 0, + } + } }) - switch (self.baseRegister.latestFetchedBlock, minPendingDynamicContracts) { - | ({blockNumber}, Some(pendingDynamicContractBlockNumber)) - if pendingDynamicContractBlockNumber < blockNumber => { - blockNumber: pendingDynamicContractBlockNumber, - blockTimestamp: 0, - } - | (baseRegisterLatest, _) => baseRegisterLatest - } + latestFullyFetchedBlock.contents } let pruneQueueFromFirstChangeEvent = ( @@ -1028,156 +776,122 @@ let pruneDynamicContractAddressesFromFirstChangeEvent = ( /** Rolls back registers to the given valid block */ -let rec rollbackRegister = ( - self: register, +let rollbackRegister = ( + register: register, ~lastScannedBlock, ~firstChangeEvent: blockNumberAndLogIndex, - ~parent: option=?, ) => { - let handleParent = updated => - switch parent { - | Some(parent) => parent->Parent.joinChild(updated) - | None => updated - } - - switch self.registerType { - //Case 1 Root register that has only fetched up to a confirmed valid block number - //Should just return itself unchanged - | RootRegister if self.latestFetchedBlock.blockNumber < firstChangeEvent.blockNumber => - self->handleParent - //Case 2 Dynamic register that has only fetched up to a confirmed valid block number - //Should just return itself, with the next register rolled back recursively - | DynamicContractRegister({id, nextRegister}) - if self.latestFetchedBlock.blockNumber < firstChangeEvent.blockNumber => - nextRegister->rollbackRegister( - ~lastScannedBlock, - ~firstChangeEvent, - ~parent=self->Parent.make(~dynamicContractId=id, ~parent), - ) - - //Case 3 Root register that has fetched further than the confirmed valid block number - //Should prune its queue and set its latest fetched block data to the latest known confirmed block - | RootRegister => - { - ...self, - fetchedEventQueue: self.fetchedEventQueue->pruneQueueFromFirstChangeEvent(~firstChangeEvent), - latestFetchedBlock: lastScannedBlock, - } - ->pruneDynamicContractAddressesFromFirstChangeEvent(~firstChangeEvent) - ->handleParent - //Case 4 DynamicContract register that has fetched further than the confirmed valid block number - //Should prune its queue, set its latest fetched blockdata + pruned queue - //And recursivle prune the nextRegister - | DynamicContractRegister({id, nextRegister}) => + if register.latestFetchedBlock.blockNumber < firstChangeEvent.blockNumber { + Some(register) + } else { let updatedWithRemovedDynamicContracts = - self->pruneDynamicContractAddressesFromFirstChangeEvent(~firstChangeEvent) - + register->pruneDynamicContractAddressesFromFirstChangeEvent(~firstChangeEvent) if updatedWithRemovedDynamicContracts.contractAddressMapping->ContractAddressingMap.isEmpty { - //If the contractAddressMapping is empty after pruning dynamic contracts, then this - //is a dead register. Simly return its next register rolled back - nextRegister->rollbackRegister(~lastScannedBlock, ~firstChangeEvent, ~parent?) + //If the contractAddressMapping is empty after pruning dynamic contracts, + // then this is a dead register. + None } else { - //If there are still values in the contractAddressMapping, we should keep the register but - //prune queues and next register - let updated = { + //If there are still values in the contractAddressMapping, + //we should keep the register but prune queues + Some({ ...updatedWithRemovedDynamicContracts, - fetchedEventQueue: self.fetchedEventQueue->pruneQueueFromFirstChangeEvent( + fetchedEventQueue: register.fetchedEventQueue->pruneQueueFromFirstChangeEvent( ~firstChangeEvent, ), latestFetchedBlock: lastScannedBlock, - } - nextRegister->rollbackRegister( - ~lastScannedBlock, - ~firstChangeEvent, - ~parent=updated->Parent.make(~dynamicContractId=id, ~parent), - ) + }) } } } -let rollback = (self: t, ~lastScannedBlock, ~firstChangeEvent) => { - let baseRegister = self.baseRegister->rollbackRegister(~lastScannedBlock, ~firstChangeEvent) +let rollback = (fetchState: t, ~lastScannedBlock, ~firstChangeEvent) => { + let registers = + fetchState.registers->Array.keepMap(r => + r->rollbackRegister(~lastScannedBlock, ~firstChangeEvent) + ) let pendingDynamicContracts = - self.pendingDynamicContracts->Array.keep(({ + fetchState.pendingDynamicContracts->Array.keep(({ registeringEventBlockNumber, registeringEventLogIndex, }) => (registeringEventBlockNumber, registeringEventLogIndex) < (firstChangeEvent.blockNumber, firstChangeEvent.logIndex) ) - { - ...self, - pendingDynamicContracts, - baseRegister, - } + + fetchState->updateInternal(~pendingDynamicContracts, ~registers) } /** * Returns a boolean indicating whether the fetch state is actively indexing * used for comparing event queues in the chain manager */ -let isActivelyIndexing = ({baseRegister} as fetchState: t, ~endBlock) => { - // nesting to limit additional unnecessary computation - switch (baseRegister.registerType, endBlock) { - | (RootRegister, Some(endBlock)) => - let isPastEndblock = getLatestFullyFetchedBlock(fetchState).blockNumber >= endBlock - if isPastEndblock { - baseRegister->registerQueueSize > 0 - } else { - true +let isActivelyIndexing = ( + {mostBehindRegister, pendingDynamicContracts} as fetchState: t, + ~endBlock, +) => { + if pendingDynamicContracts->Utils.Array.isEmpty { + switch endBlock { + | Some(endBlock) => + let isPastEndblock = mostBehindRegister.latestFetchedBlock.blockNumber >= endBlock + if isPastEndblock { + fetchState->queueSize > 0 + } else { + true + } + | None => true } - | _ => true + } else { + true } } -let getNumContracts = (self: t) => { - let rec loop = (register: register, ~accum=0) => { - let accum = accum + register.contractAddressMapping->ContractAddressingMap.addressCount - switch register.registerType { - | RootRegister => accum - | DynamicContractRegister({nextRegister}) => nextRegister->loop(~accum) - } +// FIXME: Pending +let getNumContracts = ({registers}: t) => { + let sum = ref(0) + for idx in 0 to registers->Js.Array2.length - 1 { + let register = registers->Js.Array2.unsafe_get(idx) + sum := sum.contents + register.contractAddressMapping->ContractAddressingMap.addressCount } - loop(self.baseRegister) + sum.contents } -/** +/* Helper functions for debugging and printing */ -module DebugHelpers = { - let registerToString = register => - switch register { - | RootRegister => "root" - | DynamicContractRegister({id: {blockNumber, logIndex}}) => - `DC-${blockNumber->Int.toString}-${logIndex->Int.toString}` - } - - let rec getQueueSizesInternal = (register: register, ~accum) => { - let next = (register.registerType->registerToString, register.fetchedEventQueue->Array.length) - let accum = list{next, ...accum} - switch register.registerType { - | RootRegister => accum - | DynamicContractRegister({nextRegister}) => nextRegister->getQueueSizesInternal(~accum) - } - } - - let getQueueSizes = (self: t) => - self.baseRegister->getQueueSizesInternal(~accum=list{})->List.toArray->Js.Dict.fromArray - - let rec numberRegistered = (~accum=0, self: register) => { - let accum = accum + 1 - switch self.registerType { - | RootRegister => accum - | DynamicContractRegister({nextRegister}) => nextRegister->numberRegistered(~accum) - } - } - - let rec getRegisterAddressMaps = (self: register, ~accum=[]) => { - accum->Js.Array2.push(self.contractAddressMapping.nameByAddress)->ignore - switch self.registerType { - | RootRegister => accum - | DynamicContractRegister({nextRegister}) => nextRegister->getRegisterAddressMaps(~accum) - } - } -} +// module DebugHelpers = { +// let registerToString = register => +// switch register.id { +// | Root => "root" +// | DynamicContract({blockNumber, logIndex}) => +// `DC-${blockNumber->Int.toString}-${logIndex->Int.toString}` +// } + +// let rec getQueueSizesInternal = (register: register, ~accum) => { +// let next = (register->registerToString, register.fetchedEventQueue->Array.length) +// let accum = list{next, ...accum} +// switch register.registerType { +// | RootRegister => accum +// | DynamicContractRegister({nextRegister}) => nextRegister->getQueueSizesInternal(~accum) +// } +// } + +// let getQueueSizes = (self: t) => +// self.baseRegister->getQueueSizesInternal(~accum=list{})->List.toArray->Js.Dict.fromArray + +// let rec numberRegistered = (~accum=0, self: register) => { +// let accum = accum + 1 +// switch self.registerType { +// | RootRegister => accum +// | DynamicContractRegister({nextRegister}) => nextRegister->numberRegistered(~accum) +// } +// } + +// let rec getRegisterAddressMaps = (self: register, ~accum=[]) => { +// accum->Js.Array2.push(self.contractAddressMapping.nameByAddress)->ignore +// switch self.registerType { +// | RootRegister => accum +// | DynamicContractRegister({nextRegister}) => nextRegister->getRegisterAddressMaps(~accum) +// } +// } +// } diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res index b9305c327..992093711 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res @@ -125,6 +125,7 @@ let registerDynamicContracts = ( newestPartition->FetchState.registerDynamicContract( dynamicContractRegistration, ~isFetchingAtHead, + ~endBlock, ) partitions->Utils.Array.setIndexImmutable(newestPartitionIndex, updated) } else { @@ -160,7 +161,12 @@ let update = (self: t, ~id: id, ~latestFetchedBlock, ~newItems, ~currentBlockHei switch self.partitions[id.partitionId] { | Some(partition) => partition - ->FetchState.update(~id=id.fetchStateId, ~latestFetchedBlock, ~newItems, ~currentBlockHeight) + ->FetchState.setFetchedItems( + ~id=id.fetchStateId, + ~latestFetchedBlock, + ~newItems, + ~currentBlockHeight, + ) ->Result.map(updatedPartition => { Prometheus.PartitionBlockFetched.set( ~blockNumber=latestFetchedBlock.blockNumber, @@ -188,7 +194,7 @@ let getReadyPartitions = ( let maxPartitionQueueSize = maxPerChainQueueSize / numPartitions allPartitions->Js.Array2.filter(fetchState => { !(fetchingPartitions->Utils.Set.has(fetchState.partitionId)) && - fetchState->FetchState.isReadyForNextQuery(~maxQueueSize=maxPartitionQueueSize) + fetchState->FetchState.isReadyForNextQuery(~maxQueueSize=maxPartitionQueueSize) }) } @@ -257,7 +263,7 @@ let isFetchingAtHead = ({partitions}: t) => { let getFirstEventBlockNumber = ({partitions}: t) => { partitions->Array.reduce(None, (accum, partition) => { - Utils.Math.minOptInt(accum, partition.baseRegister.firstEventBlockNumber) + Utils.Math.minOptInt(accum, partition.mostBehindRegister.firstEventBlockNumber) }) } diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/SourceManager.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/SourceManager.res index 21df9c91c..97f10f4bf 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/SourceManager.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/SourceManager.res @@ -57,7 +57,6 @@ let fetchBatch = async ( ~waitForNewBlock, ~onNewBlock, ~maxPerChainQueueSize, - ~setMergedPartitions, ~stateId, ) => { if stateId < sourceManager.currentStateId { @@ -84,14 +83,9 @@ let fetchBatch = async ( ~fetchingPartitions, ) - let mergedPartitions = Js.Dict.empty() let hasQueryWaitingForNewBlock = ref(false) let queries = readyPartitions->Array.keepMap(fetchState => { - let mergedFetchState = fetchState->FetchState.mergeRegistersBeforeNextQuery - if mergedFetchState !== fetchState { - mergedPartitions->Js.Dict.set(fetchState.partitionId->(Utils.magic: int => string), mergedFetchState) - } - switch mergedFetchState->FetchState.getNextQuery(~endBlock) { + switch fetchState->FetchState.getNextQuery(~endBlock) { | Done => None | NextQuery(nextQuery) => switch allPartitionsFetchingState->Belt.Array.get(fetchState.partitionId) { @@ -120,7 +114,6 @@ let fetchBatch = async ( } } }) - setMergedPartitions(mergedPartitions) switch (queries, currentBlockHeight) { | ([], _) diff --git a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res index 9aaa7647f..21e62c820 100644 --- a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res +++ b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res @@ -104,7 +104,6 @@ type action = | StartIndexingAfterPreRegister | SetCurrentlyProcessing(bool) | SetIsInReorgThreshold(bool) - | SetUpdatedPartitions(chain, dict) | UpdateQueues(ChainMap.t, arbitraryEventQueue) | SetSyncedChains | SuccessExit @@ -602,23 +601,6 @@ let actionReducer = (state: t, action: action) => { Logging.info("Reorg threshold reached") } ({...state, chainManager: {...state.chainManager, isInReorgThreshold}}, []) - | SetUpdatedPartitions(chain, updatedPartitions) => - let updatedPartitionIds = updatedPartitions->Js.Dict.keys - if updatedPartitionIds->Utils.Array.isEmpty { - (state, []) - } else { - updateChainFetcher(currentChainFetcher => { - let partitionsCopy = currentChainFetcher.partitionedFetchState.partitions->Js.Array2.copy - updatedPartitionIds->Js.Array2.forEach(partitionId => { - let partition = updatedPartitions->Js.Dict.unsafeGet(partitionId) - partitionsCopy->Js.Array2.unsafe_set(partitionId->(Utils.magic: string => int), partition) - }) - { - ...currentChainFetcher, - partitionedFetchState: {...currentChainFetcher.partitionedFetchState, partitions: partitionsCopy}, - } - }, ~chain, ~state) - } | SetSyncedChains => { let shouldExit = EventProcessing.EventsProcessed.allChainsEventsProcessedToEndblock( state.chainManager.chainFetchers, @@ -849,7 +831,6 @@ let checkAndFetchForChain = ( ~isPreRegisteringDynamicContracts=state.chainManager->ChainManager.isPreRegisteringDynamicContracts, ), ~maxPerChainQueueSize=state.maxPerChainQueueSize, - ~setMergedPartitions=partitions => dispatchAction(SetUpdatedPartitions(chain, partitions)), ~stateId=state.id, ) }