Skip to content

Commit

Permalink
Fix blocked partition (#369)
Browse files Browse the repository at this point in the history
* Fix stuck partition case

* Add tests for stuck partition fix

* Improve getEarliestEvent function

* Remove it_only for single test

Co-authored-by: Dmitry Zakharov <[email protected]>

* Fix test after pending dynamic contract makes comparison

* Optimise reduce function for pending dynamic contracts

Co-authored-by: Dmitry Zakharov <[email protected]>

---------

Co-authored-by: Dmitry Zakharov <[email protected]>
  • Loading branch information
JonoPrest and DZakh authored Nov 28, 2024
1 parent 71cfc01 commit e4e5867
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -753,9 +753,36 @@ Finds the earliest queue item across all registers and then returns that
queue item with an update fetch state.
*/
let getEarliestEvent = (self: t) => {
let registerWithEarliestQItem = self.baseRegister->findRegisterIdWithEarliestQueueItem
//Can safely unwrap here since the id is returned from self and so is guarenteed to exist
self.baseRegister->popQItemAtRegisterId(~id=registerWithEarliestQItem)->Utils.unwrapResultExn
let earliestItemInRegisters = {
let registerWithEarliestQItem = self.baseRegister->findRegisterIdWithEarliestQueueItem
//Can safely unwrap here since the id is returned from self and so is guarenteed to exist
self.baseRegister->popQItemAtRegisterId(~id=registerWithEarliestQItem)->Utils.unwrapResultExn
}

if self.pendingDynamicContracts->Utils.Array.isEmpty {
//In the case where there are no pending dynamic contracts, return the earliest item
//from the registers
earliestItemInRegisters
} else {
//In the case where there are pending dynamic contracts, construct the earliest queue item from
//the pending dynamic contracts
let earliestPendingDynamicContractBlockNumber = self.pendingDynamicContracts->Array.reduce(
(self.pendingDynamicContracts->Js.Array2.unsafe_get(0)).registeringEventBlockNumber,
(accumBlockNumber, dynamicContractRegistration) => {
min(accumBlockNumber, dynamicContractRegistration.registeringEventBlockNumber)
},
)

let earliestItemInPendingDynamicContracts = NoItem({
blockTimestamp: 0,
blockNumber: earliestPendingDynamicContractBlockNumber - 1,
})

//Compare the earliest item in the pending dynamic contracts with the earliest item in the registers
earliestItemInPendingDynamicContracts->qItemLt(earliestItemInRegisters)
? earliestItemInPendingDynamicContracts
: earliestItemInRegisters
}
}

let makeInternal = (
Expand Down Expand Up @@ -843,9 +870,13 @@ Check the max queue size of the tip of the tree.
Don't use the cummulative queue sizes because otherwise there
could be a deadlock. With a very small buffer size of the actively
fetching registration
If there are pending dynamic contracts, we always need to allow the next query
*/
let isReadyForNextQuery = (self: t, ~maxQueueSize) =>
self.baseRegister.fetchedEventQueue->Array.length < maxQueueSize
let isReadyForNextQuery = ({pendingDynamicContracts, baseRegister}: t, ~maxQueueSize) =>
pendingDynamicContracts->Utils.Array.isEmpty
? baseRegister.fetchedEventQueue->Array.length < maxQueueSize
: true

let rec checkBaseRegisterContainsRegisteredContract = (
register: register,
Expand Down
168 changes: 168 additions & 0 deletions scenarios/test_codegen/test/lib_tests/FetchState_test.res
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,174 @@ describe("FetchState.fetchState", () => {
Assert.deepEqual(earliestQueueItem, mockEvent(~blockNumber=1, ~logIndex=1))
})

it("getEarliestEvent accounts for pending dynamicContracts", () => {
let baseRegister = {
latestFetchedBlock: {
blockNumber: 500,
blockTimestamp: 500 * 15,
},
contractAddressMapping: ContractAddressingMap.fromArray([
(mockAddress1, (Gravatar :> string)),
]),
firstEventBlockNumber: None,
dynamicContracts: DynamicContractsMap.empty,
fetchedEventQueue: [
mockEvent(~blockNumber=106, ~logIndex=1),
mockEvent(~blockNumber=105),
mockEvent(~blockNumber=101, ~logIndex=2),
],
registerType: RootRegister({endBlock: None}),
}
let dynamicContractRegistration = {
registeringEventBlockNumber: 100,
registeringEventLogIndex: 0,
registeringEventChain: ChainMap.Chain.makeUnsafe(~chainId=1),
dynamicContracts: [],
}

let fetchState = {
...baseRegister->makeMockFetchState,
pendingDynamicContracts: [dynamicContractRegistration],
}
let earliestQueueItem = fetchState->getEarliestEvent

Assert.deepEqual(
earliestQueueItem,
NoItem({
blockNumber: dynamicContractRegistration.registeringEventBlockNumber - 1,
blockTimestamp: 0,
}),
~message="Should account for pending dynamicContracts earliest registering event",
)
})

it("isReadyForNextQuery standard", () => {
let baseRegister = {
latestFetchedBlock: {
blockNumber: 500,
blockTimestamp: 500 * 15,
},
contractAddressMapping: ContractAddressingMap.fromArray([
(mockAddress1, (Gravatar :> string)),
]),
firstEventBlockNumber: None,
dynamicContracts: DynamicContractsMap.empty,
fetchedEventQueue: [
mockEvent(~blockNumber=6, ~logIndex=1),
mockEvent(~blockNumber=5),
mockEvent(~blockNumber=1, ~logIndex=2),
],
registerType: RootRegister({endBlock: None}),
}
let fetchState = baseRegister->makeMockFetchState

Assert.ok(
fetchState->isReadyForNextQuery(~maxQueueSize=10),
~message="Should be ready for next query when under max queue size",
)

Assert.ok(
!(fetchState->isReadyForNextQuery(~maxQueueSize=3)),
~message="Should not be ready for next query when at max queue size",
)
})

it(
"isReadyForNextQuery when cummulatively over max queue size but dynamic contract is under",
() => {
let rootRegister = {
latestFetchedBlock: {
blockNumber: 500,
blockTimestamp: 500 * 15,
},
contractAddressMapping: ContractAddressingMap.fromArray([
(mockAddress1, (Gravatar :> string)),
]),
firstEventBlockNumber: None,
dynamicContracts: DynamicContractsMap.empty,
fetchedEventQueue: [
mockEvent(~blockNumber=6, ~logIndex=1),
mockEvent(~blockNumber=5),
mockEvent(~blockNumber=4, ~logIndex=2),
],
registerType: RootRegister({endBlock: None}),
}

let baseRegister = {
registerType: DynamicContractRegister({
id: {blockNumber: 100, logIndex: 0},
nextRegister: rootRegister,
}),
latestFetchedBlock: {
blockNumber: 500,
blockTimestamp: 500 * 15,
},
contractAddressMapping: ContractAddressingMap.fromArray([
(mockAddress2, (Gravatar :> string)),
]),
firstEventBlockNumber: None,
dynamicContracts: DynamicContractsMap.empty,
fetchedEventQueue: [
mockEvent(~blockNumber=3, ~logIndex=2),
mockEvent(~blockNumber=2),
mockEvent(~blockNumber=1, ~logIndex=1),
],
}

let fetchState = baseRegister->makeMockFetchState

Assert.equal(fetchState->queueSize, 6, ~message="Should have 6 items total in queue")

Assert.ok(
fetchState->isReadyForNextQuery(~maxQueueSize=5),
~message="Should be ready for next query when base register is under max queue size",
)
},
)

it("isReadyForNextQuery when containing pending dynamic contracts", () => {
let baseRegister = {
latestFetchedBlock: {
blockNumber: 500,
blockTimestamp: 500 * 15,
},
contractAddressMapping: ContractAddressingMap.fromArray([
(mockAddress1, (Gravatar :> string)),
]),
firstEventBlockNumber: None,
dynamicContracts: DynamicContractsMap.empty,
fetchedEventQueue: [
mockEvent(~blockNumber=6, ~logIndex=1),
mockEvent(~blockNumber=5),
mockEvent(~blockNumber=1, ~logIndex=2),
],
registerType: RootRegister({endBlock: None}),
}
let dynamicContractRegistration = {
registeringEventBlockNumber: 100,
registeringEventLogIndex: 0,
registeringEventChain: ChainMap.Chain.makeUnsafe(~chainId=1),
dynamicContracts: [],
}

let fetchStateWithoutPendingDynamicContracts = baseRegister->makeMockFetchState

Assert.ok(
!(fetchStateWithoutPendingDynamicContracts->isReadyForNextQuery(~maxQueueSize=3)),
~message="Should not be ready for next query when base register is at the max queue size",
)

let fetchStateWithPendingDynamicContracts = {
...fetchStateWithoutPendingDynamicContracts,
pendingDynamicContracts: [dynamicContractRegistration],
}

Assert.ok(
fetchStateWithPendingDynamicContracts->isReadyForNextQuery(~maxQueueSize=3),
~message="Should be ready for next query when base register is at the max queue size but contains pending dynamic contracts",
)
})

it("getNextQuery", () => {
let latestFetchedBlock = getBlockData(~blockNumber=500)
let root = {
Expand Down

0 comments on commit e4e5867

Please sign in to comment.