Skip to content

Commit

Permalink
feat: implement setting to throttle block consumption
Browse files Browse the repository at this point in the history
  • Loading branch information
anxolin committed Jul 19, 2024
1 parent 2c1e3a1 commit 5394c5a
Showing 1 changed file with 42 additions and 21 deletions.
63 changes: 42 additions & 21 deletions src/services/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ export class ChainContext {
readonly dryRun: boolean;
readonly watchdogTimeout: number;
readonly addresses?: string[];
readonly processEveryNumBlocks: number;

private sync: ChainSync = ChainSync.SYNCING;
static chains: Chains = {};

Expand Down Expand Up @@ -106,6 +108,7 @@ export class ChainContext {
this.deploymentBlock = deploymentBlock;
this.pageSize = pageSize ?? PAGE_SIZE_DEFAULT;
this.dryRun = dryRun;
this.processEveryNumBlocks = options.processEveryNumBlocks ?? 1;
this.watchdogTimeout = watchdogTimeout ?? WATCHDOG_TIMEOUT_DEFAULT_SECS;
this.addresses = owners;

Expand Down Expand Up @@ -303,17 +306,29 @@ export class ChainContext {
* 2. Check if any orders want to create discrete orders.
*/
private async runBlockWatcher(lastProcessedBlock: providers.Block) {
const { provider, registry, chainId, watchdogTimeout } = this;
const {
provider,
registry,
chainId,
watchdogTimeout,
processEveryNumBlocks,
} = this;
const log = getLogger("chainContext:runBlockWatcher", chainId.toString());
// Watch for new blocks
log.info(`👀 Start block watcher`);
log.debug(`Watchdog timeout: ${watchdogTimeout} seconds`);
let lastBlockReceived = lastProcessedBlock;
provider.on("block", async (blockNumber: number) => {
try {
const block = await provider.getBlock(blockNumber);
// Decide if we should process this block
const shouldProcessBlock = blockNumber % processEveryNumBlocks === 0;
log.debug(
`New block ${blockNumber}. ${
shouldProcessBlock ? "Processing" : "Skipping"
}`
);

log.debug(`New block ${blockNumber}`);
const block = await provider.getBlock(blockNumber);

// Set the block time metric
const _blockTime = block.timestamp - lastBlockReceived.timestamp;
Expand Down Expand Up @@ -341,6 +356,7 @@ export class ChainContext {
await processBlockAndPersist({
context: this,
blockNumber,
shouldProcessBlock,
events,
log,
provider,
Expand Down Expand Up @@ -430,6 +446,7 @@ export class ChainContext {
async function processBlock(
context: ChainContext,
block: providers.Block,
shouldProcessBlock: boolean,
events: ConditionalOrderCreatedEvent[],
blockNumberOverride?: number,
blockTimestampOverride?: number
Expand Down Expand Up @@ -463,24 +480,26 @@ async function processBlock(
}
}

// run action
const result = await checkForAndPlaceOrder(
context,
block,
blockNumberOverride,
blockTimestampOverride
)
.then(() => true)
.catch(() => {
hasErrors = true;
log.error(`Error running "checkForAndPlaceOrder" action`);
return false;
});
log.debug(
`Result of "checkForAndPlaceOrder" action for block ${
block.number
}: ${_formatResult(result)}`
);
// Check programmatic orders and place orders if necessary
if (shouldProcessBlock) {
const result = await checkForAndPlaceOrder(
context,
block,
blockNumberOverride,
blockTimestampOverride
)
.then(() => true)
.catch(() => {
hasErrors = true;
log.error(`Error running "checkForAndPlaceOrder" action`);
return false;
});
log.debug(
`Result of "checkForAndPlaceOrder" action for block ${
block.number
}: ${_formatResult(result)}`
);
}

timer();
if (hasErrors) {
Expand Down Expand Up @@ -512,6 +531,7 @@ async function persistLastProcessedBlock(params: {
async function processBlockAndPersist(params: {
context: ChainContext;
blockNumber: number;
shouldProcessBlock: boolean;
events: ConditionalOrderCreatedEvent[];
currentBlock?: providers.Block;
log: LoggerWithMethods;
Expand All @@ -523,6 +543,7 @@ async function processBlockAndPersist(params: {
await processBlock(
context,
block,
shouldProcessBlock,
events,
currentBlock?.number,
currentBlock?.timestamp
Expand Down

0 comments on commit 5394c5a

Please sign in to comment.