Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating next interval calculator file #180

Merged
merged 2 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions blockchains/erc20/erc20_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const { extendEventsWithPrimaryKey } = require('./lib/extend_events_key');
const { ContractOverwrite, changeContractAddresses, extractChangedContractAddresses } = require('./lib/contract_overwrite');
const { stableSort, readJsonFile } = require('./lib/util');
const BaseWorker = require('../../lib/worker_base');
const { nextIntervalCalculator } = require('../eth/lib/next_interval_calculator');
const { nextIntervalCalculator, setWorkerSleepTime, analyzeWorkerContext, NO_WORK_SLEEP } = require('../eth/lib/next_interval_calculator');
const Web3Wrapper = require('../eth/lib/web3_wrapper');
const { TimestampsCache } = require('./lib/timestamps_cache');
const { getPastEvents } = require('./lib/fetch_events');
Expand Down Expand Up @@ -98,13 +98,13 @@ class ERC20Worker extends BaseWorker {
}

async work() {
const workerContext = await analyzeWorkerContext(this);
setWorkerSleepTime(this, workerContext);
if (workerContext === NO_WORK_SLEEP) return [];

const interval = this.settings.EXPORT_BLOCKS_LIST ?
this.getBlocksListInterval() :
await nextIntervalCalculator(this);

if (!interval.success) {
return [];
}
nextIntervalCalculator(this);

logger.info(`Fetching transfer events for interval ${interval.fromBlock}:${interval.toBlock}`);

Expand Down
18 changes: 9 additions & 9 deletions blockchains/eth/eth_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const BaseWorker = require('../../lib/worker_base');
const Web3Wrapper = require('./lib/web3_wrapper');
const { decodeTransferTrace } = require('./lib/decode_transfers');
const { FeesDecoder } = require('./lib/fees_decoder');
const { nextIntervalCalculator } = require('./lib/next_interval_calculator');
const { nextIntervalCalculator, analyzeWorkerContext, setWorkerSleepTime, NO_WORK_SLEEP } = require('./lib/next_interval_calculator');
const { WithdrawalsDecoder } = require('./lib/withdrawals_decoder');

class ETHWorker extends BaseWorker {
Expand Down Expand Up @@ -146,14 +146,14 @@ class ETHWorker extends BaseWorker {
}

async work() {
const result = await nextIntervalCalculator(this);
if (!result.success) {
return [];
}
const workerContext = await analyzeWorkerContext(this);
setWorkerSleepTime(this, workerContext);
if (workerContext === NO_WORK_SLEEP) return [];

logger.info(`Fetching transfer events for interval ${result.fromBlock}:${result.toBlock}`);
const [traces, blocks, receipts] = await this.fetchData(result.fromBlock, result.toBlock);
const events = this.transformPastEvents(result.fromBlock, result.toBlock, traces, blocks, receipts);
const { fromBlock, toBlock } = nextIntervalCalculator(this);
logger.info(`Fetching transfer events for interval ${fromBlock}:${toBlock}`);
const [traces, blocks, receipts] = await this.fetchData(fromBlock, toBlock);
const events = this.transformPastEvents(fromBlock, toBlock, traces, blocks, receipts);

if (events.length > 0) {
stableSort(events, transactionOrder);
Expand All @@ -164,7 +164,7 @@ class ETHWorker extends BaseWorker {
this.lastPrimaryKey += events.length;
}

this.lastExportedBlock = result.toBlock;
this.lastExportedBlock = toBlock;

return events;
}
Expand Down
92 changes: 48 additions & 44 deletions blockchains/eth/lib/next_interval_calculator.js
Original file line number Diff line number Diff line change
@@ -1,55 +1,59 @@
function isNewBlockAvailable(worker) {
return worker.lastExportedBlock < worker.lastConfirmedBlock;
}
const WORK_NO_SLEEP = 0;
const WORK_SLEEP = 1;
const NO_WORK_SLEEP = 2;

/**
* Return the next interval to be fetched.
* NOTE: this method modifies the member variables of its argument
* Returns the context in which the worker finds itself at a given moment:
*
* WORK_NO_SLEEP : Exporting blocks that are behind the last confirmed block
*
* WORK_SLEEP : We've caught up to the last confirmed block. After a query to the node,
* we find out that there's a higher goal
*
* @param {*} worker A worker object, member variables would be modified
* @returns An object like so:
* {
* success: Boolean,
* fromBlock: Integer,
* toBlock: Integer
* }
* NO_WORK_SLEEP : We've caught up to the last confirmed block. After a query to the node,
* we find out that we've caught up
*
* @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class.
* @returns {number} A number, which points to one of the above-given scenarios
*/
async function nextIntervalCalculator(worker) {
// Check if we need to ask the Node for new Head block. This is an optimization to skip this call when the exporter
// is behind the last seen Head anyways.
const firstNewBlockCheck = isNewBlockAvailable(worker);
if (!firstNewBlockCheck) {
// On the previous cycle we closed the gap to the head of the blockchain.
// Check if there are new blocks now.
const newConfirmedBlock = await worker.web3Wrapper.getBlockNumber() - worker.settings.CONFIRMATIONS;
if (newConfirmedBlock > worker.lastConfirmedBlock) {
// The Node has progressed
worker.lastConfirmedBlock = newConfirmedBlock;
}
async function analyzeWorkerContext(worker) {
if (worker.lastExportedBlock < worker.lastConfirmedBlock) return WORK_NO_SLEEP;

const newConfirmedBlock = await worker.web3Wrapper.getBlockNumber() - worker.settings.CONFIRMATIONS;
if (newConfirmedBlock > worker.lastConfirmedBlock) {
worker.lastConfirmedBlock = newConfirmedBlock;
return WORK_SLEEP;
}

if (firstNewBlockCheck || isNewBlockAvailable(worker)) {
// If data was available without asking with Node, we are catching up and should come back straight away
if (firstNewBlockCheck) {
worker.sleepTimeMsec = 0;
}
else {
// If data became available only after asking the Node, we are close to the Head, come back later
worker.sleepTimeMsec = worker.settings.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000;
}
return NO_WORK_SLEEP;
}

return {
success: true,
fromBlock: worker.lastExportedBlock + 1,
toBlock: Math.min(worker.lastExportedBlock + worker.settings.BLOCK_INTERVAL, worker.lastConfirmedBlock)
};
}
else {
// The Node has not progressed
worker.sleepTimeMsec = worker.settings.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000;
return { success: false };
}
/**
* Function for setting the work loop's sleep time, after the end of the worker's work method.
* @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class.
* @param {number} context The scenario used for setting the sleep time
*/
function setWorkerSleepTime(worker, context) {
worker.sleepTimeMsec = (context !== WORK_NO_SLEEP) ? worker.settings.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000 : 0;
}

/**
* Function for calculating the next interval to be used in the worker's methods for querying the node.
* @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class.
* @returns {object} The interval, derived from the progress of the worker
*/
function nextIntervalCalculator(worker) {
return {
fromBlock: worker.lastExportedBlock + 1,
toBlock: Math.min(worker.lastExportedBlock + worker.settings.BLOCK_INTERVAL, worker.lastConfirmedBlock)
};
}

module.exports = {
WORK_SLEEP,
NO_WORK_SLEEP,
WORK_NO_SLEEP,
setWorkerSleepTime,
analyzeWorkerContext,
nextIntervalCalculator
};
Loading