Skip to content

Commit

Permalink
Fix duplicated processing of events on Fuel indexer restarts (#272)
Browse files Browse the repository at this point in the history
* Get rid of Caml_obj.compare usage for filters

* Clean up blockRangeFetchStats

* Reverse fetched items only once

* Apply processingFilters in ChainFetcher instead of chain workers

* Add simple test

* Fixes
  • Loading branch information
DZakh authored Oct 17, 2024
1 parent e3950c0 commit 25cb567
Show file tree
Hide file tree
Showing 14 changed files with 521 additions and 520 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
open Belt

//A filter should return true if the event should be kept and isValid should return
//false when the filter should be removed/cleaned up
type processingFilter = {
filter: Types.eventBatchQueueItem => bool,
isValid: (~fetchState: FetchState.t) => bool,
}

type addressToDynContractLookup = dict<TablesStatic.DynamicContractRegistry.t>
type t = {
logger: Pino.t,
Expand All @@ -16,7 +23,7 @@ type t = {
lastBlockScannedHashes: ReorgDetection.LastBlockScannedHashes.t,
//An optional list of filters to apply on event queries
//Used for reorgs and restarts
eventFilters: option<FetchState.eventFilters>,
processingFilters: option<array<processingFilter>>,
//Currently this state applies to all chains simultaneously but it may be possible to,
//in the future, have a per chain state and allow individual chains to start indexing as
//soon as the pre registration is done
Expand All @@ -37,7 +44,7 @@ let make = (
~timestampCaughtUpToHeadOrEndblock,
~numEventsProcessed,
~numBatchesFetched,
~eventFilters,
~processingFilters,
~maxAddrInPartition,
~dynamicContractPreRegistration,
): t => {
Expand All @@ -64,7 +71,7 @@ let make = (
timestampCaughtUpToHeadOrEndblock,
numEventsProcessed,
numBatchesFetched,
eventFilters,
processingFilters,
partitionsCurrentlyFetching: Belt.Set.Int.empty,
dynamicContractPreRegistration,
}
Expand Down Expand Up @@ -100,7 +107,7 @@ let makeFromConfig = (chainConfig: Config.chainConfig, ~maxAddrInPartition) => {
~numEventsProcessed=0,
~numBatchesFetched=0,
~logger,
~eventFilters=None,
~processingFilters=None,
~dynamicContractRegistrations=[],
~maxAddrInPartition,
~dynamicContractPreRegistration,
Expand All @@ -123,25 +130,25 @@ let makeFromDbState = async (chainConfig: Config.chainConfig, ~maxAddrInPartitio
let (
startBlock: int,
isPreRegisteringDynamicContracts: bool,
eventFilters: option<FetchState.eventFilters>,
processingFilters: option<array<processingFilter>>,
) = switch latestProcessedEvent {
| Some(event) =>
//start from the same block but filter out any events already processed
let eventFilters = list{
// Start from the same block but filter out any events already processed
let processingFilters = [
{
FetchState.filter: qItem => {
filter: qItem => {
//Only keep events greater than the last processed event
(qItem.chain->ChainMap.Chain.toChainId, qItem.blockNumber, qItem.logIndex) >
(event.chainId, event.blockNumber, event.logIndex)
},
isValid: (~fetchState, ~chain as _) => {
isValid: (~fetchState) => {
//the filter can be cleaned up as soon as the fetch state block is ahead of the latestProcessedEvent blockNumber
FetchState.getLatestFullyFetchedBlock(fetchState).blockNumber <= event.blockNumber
},
},
}
]

(event.blockNumber, event.isPreRegisteringDynamicContracts, Some(eventFilters))
(event.blockNumber, event.isPreRegisteringDynamicContracts, Some(processingFilters))
| None => (chainConfig.startBlock, preRegisterDynamicContracts, None)
}

Expand Down Expand Up @@ -251,7 +258,7 @@ let makeFromDbState = async (chainConfig: Config.chainConfig, ~maxAddrInPartitio
~numEventsProcessed=numEventsProcessed->Option.getWithDefault(0),
~numBatchesFetched=0,
~logger,
~eventFilters,
~processingFilters,
~maxAddrInPartition,
~dynamicContractPreRegistration,
)
Expand All @@ -262,40 +269,44 @@ Adds an event filter that will be passed to worker on query
isValid is a function that determines when the filter
should be cleaned up
*/
let addEventFilter = (self: t, ~filter, ~isValid) => {
let eventFilters =
self.eventFilters
->Option.getWithDefault(list{})
->List.add({filter, isValid})
->Some
{...self, eventFilters}
let addProcessingFilter = (self: t, ~filter, ~isValid) => {
let processingFilter: processingFilter = {filter, isValid}
{
...self,
processingFilters: switch self.processingFilters {
| Some(processingFilters) => Some(processingFilters->Array.concat([processingFilter]))
| None => Some([processingFilter])
},
}
}

let cleanUpEventFilters = (self: t) => {
switch self.eventFilters {
//Only spread if there are eventFilters
| None => self
let applyProcessingFilters = (
items: array<Types.eventBatchQueueItem>,
~processingFilters: array<processingFilter>,
) =>
items->Array.keep(item => {
processingFilters->Js.Array2.every(processingFilter => processingFilter.filter(item))
})

//Run the clean up condition "isNoLongerValid" against fetchState on each eventFilter and remove
//any that meet the cleanup condition
| Some(eventFilters) => {
...self,
eventFilters: switch eventFilters->List.keep(eventFilter =>
self.fetchState->PartitionedFetchState.eventFilterIsValid(
~eventFilter,
~chain=self.chainConfig.chain,
)
) {
| list{} => None
| eventFilters => eventFilters->Some
},
}
//Run the clean up condition "isNoLongerValid" against fetchState on each eventFilter and remove
//any that meet the cleanup condition
let cleanUpProcessingFilters = (
processingFilters: array<processingFilter>,
~fetchState as {partitions}: PartitionedFetchState.t,
) => {
switch processingFilters->Array.keep(processingFilter =>
partitions->List.reduce(false, (accum, partition) => {
accum || processingFilter.isValid(~fetchState=partition)
})
) {
| [] => None
| filters => Some(filters)
}
}

/**
Updates of fetchState and cleans up event filters. Should be used whenever updating fetchState
to ensure eventFilters are always valid.
to ensure processingFilters are always valid.
Returns Error if the node with given id cannot be found (unexpected)
*/
let updateFetchState = (
Expand All @@ -306,18 +317,30 @@ let updateFetchState = (
~fetchedEvents,
~currentBlockHeight,
) => {
let newItems = switch self.processingFilters {
| None => fetchedEvents
| Some(processingFilters) => fetchedEvents->applyProcessingFilters(~processingFilters)
}

self.fetchState
->PartitionedFetchState.update(
~id,
~latestFetchedBlock={
blockNumber: latestFetchedBlockNumber,
blockTimestamp: latestFetchedBlockTimestamp,
},
~fetchedEvents,
~newItems,
~currentBlockHeight,
)
->Result.map(fetchState => {
{...self, fetchState}->cleanUpEventFilters
{
...self,
fetchState,
processingFilters: switch self.processingFilters {
| Some(processingFilters) => processingFilters->cleanUpProcessingFilters(~fetchState)
| None => None
},
}
})
}

Expand All @@ -331,13 +354,8 @@ Errors if nextRegistered dynamic contract has a lower latestFetchedBlock than th
an invalid state.
*/
let getNextQuery = (self: t, ~maxPerChainQueueSize) => {
//Chain Fetcher should have already cleaned up stale event filters by the time this
//is called but just ensure its cleaned before getting the next query
let cleanedChainFetcher = self->cleanUpEventFilters

cleanedChainFetcher.fetchState->PartitionedFetchState.getNextQueriesOrThrow(
~eventFilters=?cleanedChainFetcher.eventFilters,
~currentBlockHeight=cleanedChainFetcher.currentBlockHeight,
self.fetchState->PartitionedFetchState.getNextQueriesOrThrow(
~currentBlockHeight=self.currentBlockHeight,
~maxPerChainQueueSize,
~partitionsCurrentlyFetching=self.partitionsCurrentlyFetching,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,20 +280,20 @@ events.
let updateRegister = (
self: t,
~latestFetchedBlock,
//Events ordered earliest to latest
~newFetchedEvents: array<Types.eventBatchQueueItem>,
//Events ordered latest to earliest
~reversedNewItems: array<Types.eventBatchQueueItem>,
~isFetchingAtHead,
) => {
let firstEventBlockNumber = switch self.firstEventBlockNumber {
| Some(n) => Some(n)
| None => newFetchedEvents[0]->Option.map(v => v.blockNumber)
| None => reversedNewItems->Utils.Array.last->Option.map(v => v.blockNumber)
}
{
...self,
isFetchingAtHead,
latestFetchedBlock,
firstEventBlockNumber,
fetchedEventQueue: Array.concat(newFetchedEvents->Array.reverse, self.fetchedEventQueue),
fetchedEventQueue: Array.concat(reversedNewItems, self.fetchedEventQueue),
}
}

Expand All @@ -305,7 +305,7 @@ let rec updateInternal = (
register: t,
~id,
~latestFetchedBlock,
~newFetchedEvents,
~reversedNewItems,
~isFetchingAtHead,
~parent: option<Parent.t>=?,
): result<t, exn> => {
Expand All @@ -319,17 +319,17 @@ let rec updateInternal = (
switch (register.registerType, id) {
| (RootRegister(_), Root) =>
register
->updateRegister(~newFetchedEvents, ~latestFetchedBlock, ~isFetchingAtHead)
->updateRegister(~reversedNewItems, ~latestFetchedBlock, ~isFetchingAtHead)
->handleParent
| (DynamicContractRegister(id, _nextRegistered), DynamicContract(targetId)) if id == targetId =>
register
->updateRegister(~newFetchedEvents, ~latestFetchedBlock, ~isFetchingAtHead)
->updateRegister(~reversedNewItems, ~latestFetchedBlock, ~isFetchingAtHead)
->handleParent
| (DynamicContractRegister(dynamicContractId, nextRegistered), id) =>
nextRegistered->updateInternal(
~id,
~latestFetchedBlock,
~newFetchedEvents,
~reversedNewItems,
~isFetchingAtHead,
~parent=register->Parent.make(~dynamicContractId, ~parent),
)
Expand Down Expand Up @@ -357,42 +357,31 @@ let rec pruneAndMergeNextRegistered = (self: t, ~isMerged=false) => {
Updates node at given id with given values and checks to see if it can be merged into its next register.
Returns Error if the node with given id cannot be found (unexpected)
fetchedEvents are ordered earliest to latest (as they are returned from the worker)
newItems are ordered earliest to latest (as they are returned from the worker)
*/
let update = (self: t, ~id, ~latestFetchedBlock, ~fetchedEvents, ~currentBlockHeight): result<
let update = (self: t, ~id, ~latestFetchedBlock, ~newItems, ~currentBlockHeight): result<
t,
exn,
> => {
let isFetchingAtHead =
currentBlockHeight <= latestFetchedBlock.blockNumber ? true : self.isFetchingAtHead
self
->updateInternal(~id, ~latestFetchedBlock, ~newFetchedEvents=fetchedEvents, ~isFetchingAtHead)
->updateInternal(
~id,
~latestFetchedBlock,
~reversedNewItems=newItems->Array.reverse,
~isFetchingAtHead,
)
->Result.map(result => pruneAndMergeNextRegistered(result)->Option.getWithDefault(result))
}

//A filter should return true if the event should be kept and isValid should return
//false when the filter should be removed/cleaned up
type eventFilter = {
filter: Types.eventBatchQueueItem => bool,
isValid: (~fetchState: t, ~chain: ChainMap.Chain.t) => bool,
}

type eventFilters = list<eventFilter>
let applyFilters = (eventBatchQueueItem, ~eventFilters) =>
eventFilters->List.reduce(true, (acc, eventFilter) =>
acc && eventBatchQueueItem->eventFilter.filter
)

type nextQuery = {
fetchStateRegisterId: id,
//used to id the partition of the fetchstate
partitionId: int,
fromBlock: int,
toBlock: int,
contractAddressMapping: ContractAddressingMap.mapping,
//Used to filter events where its not possible to filter in the query
//eg. event should be above a logIndex in a block or above a timestamp
eventFilters?: eventFilters,
}

let getQueryLogger = (
Expand Down Expand Up @@ -443,7 +432,6 @@ Constructs `nextQuery` from a given node
let getNextQueryFromNode = (
{registerType, latestFetchedBlock, contractAddressMapping}: t,
~toBlock,
~eventFilters,
~partitionId,
) => {
let (id, endBlock) = switch registerType {
Expand All @@ -461,7 +449,6 @@ let getNextQueryFromNode = (
fromBlock,
toBlock,
contractAddressMapping,
?eventFilters,
}
}

Expand Down Expand Up @@ -493,19 +480,15 @@ Or with a toBlock of the nextRegistered latestBlockNumber to catch up and merge
Errors if nextRegistered dynamic contract has a lower latestFetchedBlock than the current as this would be
an invalid state.
*/
let getNextQuery = (~eventFilters=?, ~currentBlockHeight, ~partitionId, self: t) => {
let getNextQuery = (self: t, ~currentBlockHeight, ~partitionId) => {
let maybeMerged = self->pruneAndMergeNextRegistered
let self = maybeMerged->Option.getWithDefault(self)

let nextQuery = switch self.registerType {
| RootRegister({endBlock}) =>
self->getNextQueryFromNode(
~toBlock={minOfOption(currentBlockHeight, endBlock)},
~eventFilters,
~partitionId,
)
self->getNextQueryFromNode(~toBlock={minOfOption(currentBlockHeight, endBlock)}, ~partitionId)
| DynamicContractRegister(_, {latestFetchedBlock}) =>
self->getNextQueryFromNode(~toBlock=latestFetchedBlock.blockNumber, ~eventFilters, ~partitionId)
self->getNextQueryFromNode(~toBlock=latestFetchedBlock.blockNumber, ~partitionId)
}

switch nextQuery {
Expand Down
Loading

0 comments on commit 25cb567

Please sign in to comment.