Skip to content

Commit

Permalink
Introduce mutable fetch state (#385)
Browse files Browse the repository at this point in the history
* Improve getQueryLogger perf

* Remove unused rpc events filter

* Move maxPartitionConcurrency check after all the filtering is done

* Clean up maxQueriesNumber

* Clean up FetchState.make function

* Start moving fetching state to mutable

* Add mutable fetch state and fully test fetchBatch logic

* Fix most of the tests

* Finish fixing tests

* Fix typo
  • Loading branch information
DZakh authored Dec 13, 2024
1 parent 131e7b8 commit 5605da1
Show file tree
Hide file tree
Showing 16 changed files with 928 additions and 451 deletions.
32 changes: 1 addition & 31 deletions codegenerator/cli/templates/static/codegen/src/EventFetching.res
Original file line number Diff line number Diff line change
Expand Up @@ -70,34 +70,6 @@ let makeCombinedEventFilterQuery = (
})
}

let applyConditionalFunction = (value: 'a, condition: bool, callback: 'a => 'b) => {
condition ? callback(value) : value
}

let queryEventsWithCombinedFilter = async (
~contractInterfaceManager,
~fromBlock,
~toBlock,
~minFromBlockLogIndex=0,
~provider,
~logger: Pino.t,
): array<Ethers.log> => {
let combinedFilterRes = await makeCombinedEventFilterQuery(
~provider,
~contractInterfaceManager,
~fromBlock,
~toBlock,
~logger,
)

combinedFilterRes->applyConditionalFunction(minFromBlockLogIndex > 0, arrLogs => {
arrLogs->Belt.Array.keep(log => {
log.blockNumber > fromBlock ||
(log.blockNumber == fromBlock && log.logIndex >= minFromBlockLogIndex)
})
})
}

type eventBatchQuery = {
logs: array<Ethers.log>,
finalExecutedBlockInterval: int,
Expand All @@ -108,7 +80,6 @@ let getNextPage = async (
~fromBlock,
~toBlock,
~initialBlockInterval,
~minFromBlockLogIndex=0,
~syncConfig as sc: Config.syncConfig,
~provider,
~logger,
Expand All @@ -134,11 +105,10 @@ let getNextPage = async (
let nextToBlock =
Pervasives.min(upperBoundToBlock, toBlock)->Pervasives.max(fromBlockRef.contents) //Defensively ensure we never query a target block below fromBlock
let logsPromise =
queryEventsWithCombinedFilter(
makeCombinedEventFilterQuery(
~contractInterfaceManager,
~fromBlock=fromBlockRef.contents,
~toBlock=nextToBlock,
~minFromBlockLogIndex=fromBlockRef.contents == fromBlock ? minFromBlockLogIndex : 0,
~provider,
~logger,
)->Promise.thenResolve(logs => (logs, nextToBlock - fromBlockRef.contents + 1))
Expand Down
5 changes: 0 additions & 5 deletions codegenerator/cli/templates/static/codegen/src/EventUtils.res
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ type eventIndex = {
logIndex: int,
}

let eventIndexSchema = S.object(s => {
blockNumber: s.field("blockNumber", S.int),
logIndex: s.field("logIndex", S.int),
})

// takes blockNumber, logIndex and packs them into a number with
//32 bits, 16 bits and 16 bits respectively
let packEventIndex = (~blockNumber, ~logIndex) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ type addressToDynContractLookup = dict<TablesStatic.DynamicContractRegistry.t>
type t = {
logger: Pino.t,
fetchState: PartitionedFetchState.t,
sourceManager: SourceManager.t,
chainConfig: Config.chainConfig,
//The latest known block of the chain
currentBlockHeight: int,
isWaitingForNewBlock: bool,
partitionsCurrentlyFetching: PartitionedFetchState.partitionIndexSet,
timestampCaughtUpToHeadOrEndblock: option<Js.Date.t>,
dbFirstEventBlockNumber: option<int>,
latestProcessedBlock: option<int>,
Expand Down Expand Up @@ -64,17 +63,16 @@ let make = (
{
logger,
chainConfig,
sourceManager: SourceManager.make(~maxPartitionConcurrency=Env.maxPartitionConcurrency, ~logger),
lastBlockScannedHashes,
currentBlockHeight: 0,
isWaitingForNewBlock: false,
fetchState,
dbFirstEventBlockNumber,
latestProcessedBlock,
timestampCaughtUpToHeadOrEndblock,
numEventsProcessed,
numBatchesFetched,
processingFilters,
partitionsCurrentlyFetching: Belt.Set.Int.empty,
dynamicContractPreRegistration,
}
}
Expand Down Expand Up @@ -358,18 +356,6 @@ let updateFetchState = (
})
}

/**
Gets the next queries for all most behind partitions not exceeding queue size
Applies any event filters found in the chain fetcher
*/
let getNextQueries = (self: t, ~maxPerChainQueueSize) => {
self.fetchState->PartitionedFetchState.getNextQueries(
~maxPerChainQueueSize,
~partitionsCurrentlyFetching=self.partitionsCurrentlyFetching,
)
}

/**
Gets the latest item on the front of the queue and returns updated fetcher
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
open Belt

type t = {
chainFetchers: ChainMap.t<ChainFetcher.t>,
//Holds arbitrary events that were added when a batch ended processing early
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,12 @@ with a dynamicContractId
*/
type id = Root | DynamicContract(dynamicContractId)

let idSchema = S.union([
S.literal(Root),
S.schema(s => DynamicContract(s.matches(EventUtils.eventIndexSchema))),
])
let registerIdToString = (id: id) =>
switch id {
| Root => "root"
| DynamicContract({blockNumber, logIndex}) =>
`dynamic-${blockNumber->Int.toString}-${logIndex->Int.toString}`
}

/**
Constructs id from a register
Expand Down Expand Up @@ -554,27 +556,12 @@ let getQueryLogger = (
{fetchStateRegisterId, fromBlock, toBlock, contractAddressMapping}: nextQuery,
~logger,
) => {
let fetchStateRegister = switch fetchStateRegisterId {
| Root => "root"
| DynamicContract({blockNumber, logIndex}) =>
`dynamic-${blockNumber->Int.toString}-${logIndex->Int.toString}`
}
let addressesAll = contractAddressMapping->ContractAddressingMap.getAllAddresses
let (displayAddr, restCount) = addressesAll->Array.reduce(([], 0), (
(currentDisplay, restCount),
addr,
) => {
if currentDisplay->Array.length < 3 {
(Array.concat(currentDisplay, [addr->Address.toString]), restCount)
} else {
(currentDisplay, restCount + 1)
}
})

let addresses = if restCount > 0 {
displayAddr->Array.concat([`... and ${restCount->Int.toString} more`])
} else {
displayAddr
let fetchStateRegister = fetchStateRegisterId->registerIdToString
let allAddresses = contractAddressMapping->ContractAddressingMap.getAllAddresses
let addresses = allAddresses->Js.Array2.slice(~start=0, ~end_=3)->Array.map(addr => addr->Address.toString)
let restCount = allAddresses->Array.length - addresses->Array.length
if restCount > 0 {
addresses->Js.Array2.push(`... and ${restCount->Int.toString} more`)->ignore
}
let params = {
"fromBlock": fromBlock,
Expand Down Expand Up @@ -816,26 +803,20 @@ let getEarliestEvent = (self: t) => {
}
}

let makeInternal = (
~registerType,
/**
Instantiates a fetch state with root register
*/
let make = (
~staticContracts,
~dynamicContractRegistrations: array<TablesStatic.DynamicContractRegistry.t>,
~startBlock,
~endBlock,
~isFetchingAtHead,
~logger,
~logger as _,
): t => {
let contractAddressMapping = ContractAddressingMap.make()

staticContracts->Belt.Array.forEach(((contractName, address)) => {
Logging.childTrace(
logger,
{
"msg": "adding contract address",
"contractName": contractName,
"address": address,
},
)

contractAddressMapping->ContractAddressingMap.addAddress(~name=contractName, ~address)
})

Expand All @@ -858,7 +839,7 @@ let makeInternal = (
})

let baseRegister = {
registerType,
registerType: RootRegister({endBlock: endBlock}),
latestFetchedBlock: {
blockTimestamp: 0,
// Here's a bug that startBlock: 1 won't work
Expand All @@ -877,11 +858,6 @@ let makeInternal = (
}
}

/**
Instantiates a fetch state with root register
*/
let makeRoot = (~endBlock) => makeInternal(~registerType=RootRegister({endBlock: endBlock}), ...)

/**
Calculates the cummulative queue sizes in all registers
*/
Expand Down
Loading

0 comments on commit 5605da1

Please sign in to comment.