-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix inconsistent processed events with many dynamic contracts #399
base: main
Are you sure you want to change the base?
Conversation
@@ -130,7 +130,7 @@ let registerDynamicContracts = ( | |||
} else { | |||
let newPartition = FetchState.make( | |||
~partitionId=partitions->Array.length, | |||
~startBlock, | |||
~startBlock=dynamicContractRegistration.registeringEventBlockNumber, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a fix for the first issue. I had to fix the second one with a huge refactoring to be able to reproduce and find the little first one 😁
if register.latestFetchedBlock.blockNumber > nextRegister.latestFetchedBlock.blockNumber { | ||
let logger = Logging.createChild( | ||
~params={ | ||
"context": "Merging Dynamic Contract Registers", | ||
"currentRegister": { | ||
"id": register->getRegisterId, | ||
"latestFetchedBlock": register.latestFetchedBlock.blockNumber, | ||
"addresses": register.contractAddressMapping->ContractAddressingMap.getAllAddresses, | ||
}, | ||
"nextRegister": { | ||
"id": nextRegister->getRegisterId, | ||
"latestFetchedBlock": nextRegister.latestFetchedBlock.blockNumber, | ||
"addresses": nextRegister.registerType->isRootRegister | ||
? "Root"->Utils.magic | ||
: nextRegister.contractAddressMapping->ContractAddressingMap.getAllAddresses, | ||
}, | ||
}, | ||
) | ||
NextRegisterIsLessThanCurrent->ErrorHandling.mkLogAndRaise( | ||
~msg="Unexpected: Dynamic contract register latest fetched block is greater than next register when it should be equal", | ||
~logger, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not possible anymore
let rollbackRegister = ( | ||
register: register, | ||
~lastScannedBlock, | ||
~firstChangeEvent: blockNumberAndLogIndex, | ||
~parent: option<Parent.t>=?, | ||
) => { | ||
let handleParent = updated => | ||
switch parent { | ||
| Some(parent) => parent->Parent.joinChild(updated) | ||
| None => updated | ||
} | ||
|
||
switch self.registerType { | ||
//Case 1 Root register that has only fetched up to a confirmed valid block number | ||
//Should just return itself unchanged | ||
| RootRegister if self.latestFetchedBlock.blockNumber < firstChangeEvent.blockNumber => | ||
self->handleParent | ||
//Case 2 Dynamic register that has only fetched up to a confirmed valid block number | ||
//Should just return itself, with the next register rolled back recursively | ||
| DynamicContractRegister({id, nextRegister}) | ||
if self.latestFetchedBlock.blockNumber < firstChangeEvent.blockNumber => | ||
nextRegister->rollbackRegister( | ||
~lastScannedBlock, | ||
~firstChangeEvent, | ||
~parent=self->Parent.make(~dynamicContractId=id, ~parent), | ||
) | ||
|
||
//Case 3 Root register that has fetched further than the confirmed valid block number | ||
//Should prune its queue and set its latest fetched block data to the latest known confirmed block | ||
| RootRegister => | ||
{ | ||
...self, | ||
fetchedEventQueue: self.fetchedEventQueue->pruneQueueFromFirstChangeEvent(~firstChangeEvent), | ||
latestFetchedBlock: lastScannedBlock, | ||
} | ||
->pruneDynamicContractAddressesFromFirstChangeEvent(~firstChangeEvent) | ||
->handleParent | ||
//Case 4 DynamicContract register that has fetched further than the confirmed valid block number | ||
//Should prune its queue, set its latest fetched blockdata + pruned queue | ||
//And recursivle prune the nextRegister | ||
| DynamicContractRegister({id, nextRegister}) => | ||
if register.latestFetchedBlock.blockNumber < firstChangeEvent.blockNumber { | ||
Some(register) | ||
} else { | ||
let updatedWithRemovedDynamicContracts = | ||
self->pruneDynamicContractAddressesFromFirstChangeEvent(~firstChangeEvent) | ||
|
||
register->pruneDynamicContractAddressesFromFirstChangeEvent(~firstChangeEvent) | ||
if updatedWithRemovedDynamicContracts.contractAddressMapping->ContractAddressingMap.isEmpty { | ||
//If the contractAddressMapping is empty after pruning dynamic contracts, then this | ||
//is a dead register. Simly return its next register rolled back | ||
nextRegister->rollbackRegister(~lastScannedBlock, ~firstChangeEvent, ~parent?) | ||
//If the contractAddressMapping is empty after pruning dynamic contracts, | ||
// then this is a dead register. | ||
None | ||
} else { | ||
//If there are still values in the contractAddressMapping, we should keep the register but | ||
//prune queues and next register | ||
let updated = { | ||
//If there are still values in the contractAddressMapping, | ||
//we should keep the register but prune queues | ||
Some({ | ||
...updatedWithRemovedDynamicContracts, | ||
fetchedEventQueue: self.fetchedEventQueue->pruneQueueFromFirstChangeEvent( | ||
fetchedEventQueue: register.fetchedEventQueue->pruneQueueFromFirstChangeEvent( | ||
~firstChangeEvent, | ||
), | ||
latestFetchedBlock: lastScannedBlock, | ||
} | ||
nextRegister->rollbackRegister( | ||
~lastScannedBlock, | ||
~firstChangeEvent, | ||
~parent=updated->Parent.make(~dynamicContractId=id, ~parent), | ||
) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is absolutely the same, but it is so much simpler without a recursion.
@@ -123,22 +139,9 @@ describe("PartitionedFetchState getMostBehindPartitions", () => { | |||
fetchState0, | |||
{ | |||
partitionId: 1, | |||
baseRegister: { | |||
registerType: RootRegister, | |||
latestFetchedBlock: {blockNumber: 0, blockTimestamp: 0}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's the wrong test for the first issue. After the fix it became:
latestFetchedBlock: {blockNumber: 9, blockTimestamp: 0},
Terminology:
We have two fetching modes:
There were two main issues:
To fix the second issue, I've change the ordered list-like structure for registers to an independent array, simplifying operations with them, and allowing to addition new dynamic registrations after the latestFetchBlock. Also, this will simplify the further improvement to fetch the dynamic contract registers in parallel. Currently, the fetch requests to catch up to the latest register are done sequentially.