Skip to content

Commit

Permalink
Fix stateId handling and uncomment tests
Browse files Browse the repository at this point in the history
  • Loading branch information
DZakh committed Jan 6, 2025
1 parent efb8205 commit c4be087
Show file tree
Hide file tree
Showing 6 changed files with 964 additions and 1,463 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type blockNumberAndTimestamp = {

type blockNumberAndLogIndex = {blockNumber: int, logIndex: int}

type status = {mutable isFetching: bool}
type status = {mutable fetchingStateId: option<int>}

/**
A state that holds a queue of events and data regarding what to fetch next.
Expand Down Expand Up @@ -125,7 +125,7 @@ let mergeIntoPartition = (register: register, ~target: register, ~maxAddrInParti
Some({
id: register.id,
status: {
isFetching: false,
fetchingStateId: None,
},
fetchedEventQueue: [],
contractAddressMapping: restContractAddressMapping,
Expand All @@ -144,7 +144,7 @@ let mergeIntoPartition = (register: register, ~target: register, ~maxAddrInParti
{
id: target.id,
status: {
isFetching: false,
fetchingStateId: None,
},
fetchedEventQueue: mergeSortedEventList(register.fetchedEventQueue, target.fetchedEventQueue),
contractAddressMapping: mergedContractAddressMapping,
Expand All @@ -168,7 +168,7 @@ let addItemsToPartition = (
{
...register,
status: {
isFetching: false,
fetchingStateId: None,
},
latestFetchedBlock,
fetchedEventQueue: Array.concat(reversedNewItems, register.fetchedEventQueue),
Expand Down Expand Up @@ -247,7 +247,7 @@ let makePartition = (
{
id: partitionIndex->Int.toString,
status: {
isFetching: false,
fetchingStateId: None,
},
latestFetchedBlock,
contractAddressMapping,
Expand Down Expand Up @@ -431,12 +431,12 @@ type nextQuery =
| NothingToQuery
| Ready(array<query>)

let startFetchingQueries = ({partitions}: t, ~queries: array<query>) => {
let startFetchingQueries = ({partitions}: t, ~queries: array<query>, ~stateId) => {
queries->Array.forEach(q => {
switch partitions->Js.Array2.find(p => p.id === q->queryPartitionId) {
// Shouldn't be mutated to false anymore
// Shouldn't be mutated to None anymore
// The status will be immutably set to the initial one when we handle response
| Some(p) => p.status.isFetching = true
| Some(p) => p.status.fetchingStateId = Some(stateId)
| None => Js.Exn.raiseError("Unexpected case: Couldn't find partition for the fetching query")
}
})
Expand All @@ -448,6 +448,7 @@ let getNextQuery = (
~concurrencyLimit,
~maxQueueSize,
~currentBlockHeight,
~stateId,
) => {
if currentBlockHeight === 0 {
WaitingForNewBlock
Expand All @@ -461,13 +462,24 @@ let getNextQuery = (
let mostBehindMergingPartition = ref(None)
let mergingPartitionTarget = ref(None)

let checkIsFetchingPartition = p => {
switch p.status.fetchingStateId {
| Some(fetchingStateId) => stateId <= fetchingStateId
| None => false
}
}

for idx in 0 to partitions->Js.Array2.length - 1 {
let p = partitions->Js.Array2.unsafe_get(idx)

let isFetching = checkIsFetchingPartition(p)

if isFetching {
hasFetchingPartition := true
}

if p.contractAddressMapping->ContractAddressingMap.addressCount >= maxAddrInPartition {
fullPartitions->Array.push(p)
if p.status.isFetching {
hasFetchingPartition := true
}
} else {
mergingPartitions->Array.push(p)

Expand Down Expand Up @@ -500,8 +512,7 @@ let getNextQuery = (
| None => p
}->Some

if p.status.isFetching {
hasFetchingPartition := true
if isFetching {
areMergingPartitionsFetching := true
}
}
Expand All @@ -513,7 +524,7 @@ let getNextQuery = (

let registerPartitionQuery = (p, ~checkQueueSize, ~mergeTarget=?) => {
if (
p.status.isFetching->not && (
p->checkIsFetchingPartition->not && (
checkQueueSize ? p.fetchedEventQueue->Array.length < maxPartitionQueueSize : true
)
) {
Expand Down Expand Up @@ -867,7 +878,7 @@ let rollbackPartition = (
dynamicContracts,
contractAddressMapping,
status: {
isFetching: false,
fetchingStateId: None,
},
fetchedEventQueue,
latestFetchedBlock: shouldRollbackFetched ? lastScannedBlock : partition.latestFetchedBlock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,13 @@ type t = {
mutable isWaitingForNewBlock: bool,
// Should take into consideration partitions fetching for previous states (before rollback)
mutable fetchingPartitionsCount: int,
// Keep track on the current state id
// to work with correct state during rollbacks & preRegistration
mutable currentStateId: int,
}

let make = (~maxPartitionConcurrency, ~endBlock, ~logger) => {
logger,
endBlock,
maxPartitionConcurrency,
isWaitingForNewBlock: false,
currentStateId: 0,
fetchingPartitionsCount: 0,
}

Expand All @@ -38,46 +34,40 @@ let fetchNext = async (
~maxPerChainQueueSize,
~stateId,
) => {
if stateId < sourceManager.currentStateId {
()
} else {
if stateId != sourceManager.currentStateId {
sourceManager.currentStateId = stateId
}
let {logger, endBlock, maxPartitionConcurrency} = sourceManager
let {logger, endBlock, maxPartitionConcurrency} = sourceManager

switch fetchState->FetchState.getNextQuery(
~endBlock,
~concurrencyLimit={
maxPartitionConcurrency - sourceManager.fetchingPartitionsCount
},
~maxQueueSize=maxPerChainQueueSize,
~currentBlockHeight,
) {
| ReachedMaxConcurrency
| NothingToQuery => ()
| WaitingForNewBlock =>
if !sourceManager.isWaitingForNewBlock {
sourceManager.isWaitingForNewBlock = true
let currentBlockHeight = await waitForNewBlock(~currentBlockHeight, ~logger)
sourceManager.isWaitingForNewBlock = false
onNewBlock(~currentBlockHeight)
}
| Ready(queries) => {
fetchState->FetchState.startFetchingQueries(~queries)
sourceManager.fetchingPartitionsCount =
sourceManager.fetchingPartitionsCount + queries->Array.length
let _ =
await queries
->Array.map(q => {
let promise = q->executeQuery
let _ = promise->Promise.thenResolve(_ => {
sourceManager.fetchingPartitionsCount = sourceManager.fetchingPartitionsCount - 1
})
promise
switch fetchState->FetchState.getNextQuery(
~endBlock,
~concurrencyLimit={
maxPartitionConcurrency - sourceManager.fetchingPartitionsCount
},
~maxQueueSize=maxPerChainQueueSize,
~currentBlockHeight,
~stateId,
) {
| ReachedMaxConcurrency
| NothingToQuery => ()
| WaitingForNewBlock =>
if !sourceManager.isWaitingForNewBlock {
sourceManager.isWaitingForNewBlock = true
let currentBlockHeight = await waitForNewBlock(~currentBlockHeight, ~logger)
sourceManager.isWaitingForNewBlock = false
onNewBlock(~currentBlockHeight)
}
| Ready(queries) => {
fetchState->FetchState.startFetchingQueries(~queries, ~stateId)
sourceManager.fetchingPartitionsCount =
sourceManager.fetchingPartitionsCount + queries->Array.length
let _ =
await queries
->Array.map(q => {
let promise = q->executeQuery
let _ = promise->Promise.thenResolve(_ => {
sourceManager.fetchingPartitionsCount = sourceManager.fetchingPartitionsCount - 1
})
->Promise.all
}
promise
})
->Promise.all
}
}
}
2 changes: 1 addition & 1 deletion scenarios/test_codegen/test/ChainManager_test.res
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ describe("determineNextEvent", () => {
blockNumber: 0,
},
status: {
isFetching: false,
fetchingStateId: None,
},
contractAddressMapping: ContractAddressingMap.make(),
fetchedEventQueue: item->Option.mapWithDefault([], v => [v]),
Expand Down
Loading

0 comments on commit c4be087

Please sign in to comment.