diff --git a/blockchains/erc20/erc20_worker.js b/blockchains/erc20/erc20_worker.js index d0375ed6..2d778608 100644 --- a/blockchains/erc20/erc20_worker.js +++ b/blockchains/erc20/erc20_worker.js @@ -6,7 +6,7 @@ const { extendEventsWithPrimaryKey } = require('./lib/extend_events_key'); const { ContractOverwrite, changeContractAddresses, extractChangedContractAddresses } = require('./lib/contract_overwrite'); const { stableSort, readJsonFile } = require('./lib/util'); const BaseWorker = require('../../lib/worker_base'); -const { nextIntervalCalculator } = require('../eth/lib/next_interval_calculator'); +const { nextIntervalCalculator, setWorkerSleepTime, analyzeWorkerContext, NO_WORK_SLEEP } = require('../eth/lib/next_interval_calculator'); const Web3Wrapper = require('../eth/lib/web3_wrapper'); const { TimestampsCache } = require('./lib/timestamps_cache'); const { getPastEvents } = require('./lib/fetch_events'); @@ -98,13 +98,13 @@ class ERC20Worker extends BaseWorker { } async work() { + const workerContext = await analyzeWorkerContext(this); + setWorkerSleepTime(this, workerContext); + if (workerContext === NO_WORK_SLEEP) return []; + const interval = this.settings.EXPORT_BLOCKS_LIST ? this.getBlocksListInterval() : - await nextIntervalCalculator(this); - - if (!interval.success) { - return []; - } + nextIntervalCalculator(this); logger.info(`Fetching transfer events for interval ${interval.fromBlock}:${interval.toBlock}`); diff --git a/blockchains/eth/eth_worker.js b/blockchains/eth/eth_worker.js index 4be8493e..fd13da9c 100644 --- a/blockchains/eth/eth_worker.js +++ b/blockchains/eth/eth_worker.js @@ -9,7 +9,7 @@ const BaseWorker = require('../../lib/worker_base'); const Web3Wrapper = require('./lib/web3_wrapper'); const { decodeTransferTrace } = require('./lib/decode_transfers'); const { FeesDecoder } = require('./lib/fees_decoder'); -const { nextIntervalCalculator, analyzeWorkerProgress, setWorkerSleepTime } = require('./lib/next_interval_calculator'); +const { nextIntervalCalculator, analyzeWorkerContext, setWorkerSleepTime, NO_WORK_SLEEP } = require('./lib/next_interval_calculator'); const { WithdrawalsDecoder } = require('./lib/withdrawals_decoder'); class ETHWorker extends BaseWorker { @@ -146,9 +146,9 @@ class ETHWorker extends BaseWorker { } async work() { - const workerContext = await analyzeWorkerProgress(this); - setWorkerSleepTime(this); - if (workerContext === 2) return []; + const workerContext = await analyzeWorkerContext(this); + setWorkerSleepTime(this, workerContext); + if (workerContext === NO_WORK_SLEEP) return []; const { fromBlock, toBlock } = nextIntervalCalculator(this); logger.info(`Fetching transfer events for interval ${fromBlock}:${toBlock}`); diff --git a/blockchains/eth/lib/next_interval_calculator.js b/blockchains/eth/lib/next_interval_calculator.js index 1418503e..d6b45aec 100644 --- a/blockchains/eth/lib/next_interval_calculator.js +++ b/blockchains/eth/lib/next_interval_calculator.js @@ -1,36 +1,40 @@ +const WORK_NO_SLEEP = 0; +const WORK_SLEEP = 1; +const NO_WORK_SLEEP = 2; + /** * Returns the context in which the worker finds itself at a given moment: - * - * 0 : Exporting blocks that are behind the last confirmed block - * - * 1 : We've caught up to the last confirmed block. After a query to the node, we find out that there's a higher goal - * - * 2 : We've caught up to the last confirmed block. After a query to the node, we find out that we've caught up - * + * + * WORK_NO_SLEEP : Exporting blocks that are behind the last confirmed block + * + * WORK_SLEEP : We've caught up to the last confirmed block. After a query to the node, + * we find out that there's a higher goal + * + * NO_WORK_SLEEP : We've caught up to the last confirmed block. After a query to the node, + * we find out that we've caught up + * * @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class. * @returns {number} A number, which points to one of the above-given scenarios */ -async function analyzeWorkerProgress(worker) { - if (worker.lastExportedBlock < worker.lastConfirmedBlock) return 0; +async function analyzeWorkerContext(worker) { + if (worker.lastExportedBlock < worker.lastConfirmedBlock) return WORK_NO_SLEEP; const newConfirmedBlock = await worker.web3Wrapper.getBlockNumber() - worker.settings.CONFIRMATIONS; if (newConfirmedBlock > worker.lastConfirmedBlock) { worker.lastConfirmedBlock = newConfirmedBlock; - return 1; + return WORK_SLEEP; } - return 2; + return NO_WORK_SLEEP; } /** * Function for setting the work loop's sleep time, after the end of the worker's work method. - * For the above given 0 and 1 scenarios, we'd want no sleep, because we have to work. - * For 2 we'd want to sleep, because we'd have caught up completely and should back off for some time. * @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class. * @param {number} context The scenario used for setting the sleep time */ function setWorkerSleepTime(worker, context) { - worker.sleepTimeMsec = (context === 2) ? worker.settings.LOOP_INTERVAL_CURRENT_MODE_SEC : 0; + worker.sleepTimeMsec = (context !== WORK_NO_SLEEP) ? worker.settings.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000 : 0; } /** @@ -46,7 +50,8 @@ function nextIntervalCalculator(worker) { } module.exports = { + NO_WORK_SLEEP, setWorkerSleepTime, - analyzeWorkerProgress, + analyzeWorkerContext, nextIntervalCalculator }; diff --git a/test/eth/next_interval_calculator.spec.js b/test/eth/next_interval_calculator.spec.js index 85724291..9b8d9f3a 100644 --- a/test/eth/next_interval_calculator.spec.js +++ b/test/eth/next_interval_calculator.spec.js @@ -1,189 +1,96 @@ const eth_worker = require('../../blockchains/eth/eth_worker'); const constants = require('../../blockchains/eth/lib/constants'); -const { nextIntervalCalculator } = require('../../blockchains/eth/lib/next_interval_calculator'); +const { + nextIntervalCalculator, + analyzeWorkerContext, + setWorkerSleepTime } = require('../../blockchains/eth/lib/next_interval_calculator'); + const assert = require('assert'); class MockWeb3Wrapper { - constructor(blockNumber) { - this.blockNumber = blockNumber; - } - async getBlockNumber() { - return this.blockNumber; - } -} - -class MockIsCalledWeb3Wrapper { - constructor() { - this.isCalled = false; - } - async getBlockNumber() { - this.isCalled = true; - return null; - } + constructor(blockNumber) { + this.blockNumber = blockNumber; + } + async getBlockNumber() { + return this.blockNumber; + } } constants.CONFIRMATIONS = 3; -constants.BLOCK_INTERVAL = 100; - -describe('Check interval not going backwards', function () { - - it('Fetched interval should not go backwards even if Node reports old block numbers', async function () { - const worker = new eth_worker.worker(constants); - worker.web3Wrapper = new MockWeb3Wrapper(100); - const result = await nextIntervalCalculator(worker); - - assert.deepStrictEqual(result.success, true); - assert.deepStrictEqual(result.fromBlock, 0); - assert.deepStrictEqual(result.toBlock, 100 - constants.CONFIRMATIONS); - - // Modify the last exported block as if we have consumed this interval - worker.lastExportedBlock = result.toBlock; - - // Mock the Node to report an old number - worker.web3Wrapper.blockNumber = 90; - - const resultSecond = await nextIntervalCalculator(worker); - assert.deepStrictEqual(resultSecond.success, false); - assert.deepStrictEqual(worker.sleepTimeMsec, constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000); - - const resultThird = await nextIntervalCalculator(worker); - assert.deepStrictEqual(resultThird.success, false); - assert.deepStrictEqual(worker.sleepTimeMsec, constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000); - - }); - - it('Fetched interval should not go backwards even if saved state is invalid', async function () { - const worker = new eth_worker.worker(constants); - worker.web3Wrapper = new MockWeb3Wrapper(4); - - // Setup a situation where the exported block has exceeded the Node block - // Test is similar to the above but test that we already saved an old lastConfirmedBlock - worker.lastExportedBlock = 10; - worker.lastConfirmedBlock = 5; - - const resultSecond = await nextIntervalCalculator(worker); - assert.deepStrictEqual(resultSecond.success, false); - assert.deepStrictEqual(worker.sleepTimeMsec, constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000); - - }); +constants.BLOCK_INTERVAL = 50; + +describe('analyzeWorkerContext', () => { + it('Returns "work no sleep" case', async () => { + const worker = new eth_worker.worker(constants); + worker.lastExportedBlock = 89; + worker.lastConfirmedBlock = 90; + + const scenario = await analyzeWorkerContext(worker); + assert.deepStrictEqual(scenario, 0); + }); + + it('Returns "work sleep" case', async () => { + const worker = new eth_worker.worker(constants); + worker.lastExportedBlock = 90; + worker.lastConfirmedBlock = 90; + worker.web3Wrapper = new MockWeb3Wrapper(100); + + const scenario = await analyzeWorkerContext(worker); + assert.deepStrictEqual(scenario, 1); + }); + + it('Returns "no work no sleep" case', async () => { + const worker = new eth_worker.worker(constants); + worker.lastExportedBlock = 100; + worker.lastConfirmedBlock = 100; + worker.web3Wrapper = new MockWeb3Wrapper(100); + + const scenario = await analyzeWorkerContext(worker); + assert.deepStrictEqual(scenario, 2); + }); }); -describe('Check logic when Node is ahead', function () { - it('Exporter should not wait for full BLOCK_INTERVAL', async function () { - const worker = new eth_worker.worker(constants); - worker.web3Wrapper = new MockWeb3Wrapper(constants.BLOCK_INTERVAL - 1); - - const resultSecond = await nextIntervalCalculator(worker); - assert.deepStrictEqual(resultSecond.success, true); - assert.deepStrictEqual(resultSecond.fromBlock, 0); - assert.deepStrictEqual(resultSecond.toBlock, constants.BLOCK_INTERVAL - 1 - constants.CONFIRMATIONS); - assert.deepStrictEqual(worker.sleepTimeMsec, constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000); - }); - - it('Exporter would not exceed BLOCK_INTERVAL', async function () { - const worker = new eth_worker.worker(constants); - worker.web3Wrapper = new MockWeb3Wrapper(constants.CONFIRMATIONS + 10);; - - const resultSecond = await nextIntervalCalculator(worker); - assert.deepStrictEqual(resultSecond.success, true); - assert.deepStrictEqual(resultSecond.fromBlock, 0); - assert.deepStrictEqual(resultSecond.toBlock, 10); - assert.deepStrictEqual(worker.sleepTimeMsec, constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000); - }); - - it('Interval is correct if Node is ahead', async function () { - const worker = new eth_worker.worker(constants); +describe('setWorkerSleepTime', () => { + it('sets sleep time accordingly when we\'re behind', () => { + const worker = new eth_worker.worker(constants); + setWorkerSleepTime(worker, 0); - worker.lastExportedBlock = 1; - worker.lastConfirmedBlock = 2; + assert.deepStrictEqual(worker.sleepTimeMsec, 0); + }); - const resultSecond = await nextIntervalCalculator(worker); - assert.deepStrictEqual(resultSecond.success, true); - assert.deepStrictEqual(resultSecond.fromBlock, 2); - assert.deepStrictEqual(resultSecond.toBlock, 2); - }); + it('sets sleep time accordingly when we\'ve caught up to lastConfirmedBlock, but not Node head', () => { + const worker = new eth_worker.worker(constants); + setWorkerSleepTime(worker, 1); - it('No sleep time if Node is ahead', async function () { - const worker = new eth_worker.worker(constants); + assert.deepStrictEqual(worker.sleepTimeMsec, worker.settings.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000); + }); - worker.lastExportedBlock = 1; - worker.lastConfirmedBlock = 2; - - const resultSecond = await nextIntervalCalculator(worker); - assert.deepStrictEqual(resultSecond.success, true); - assert.deepStrictEqual(worker.sleepTimeMsec, 0); - }); - - it('Node not called if Node is ahead', async function () { - const worker = new eth_worker.worker(constants); - worker.web3Wrapper = new MockIsCalledWeb3Wrapper(); - - worker.lastExportedBlock = 1; - worker.lastConfirmedBlock = 2; - - await nextIntervalCalculator(worker); - assert.deepStrictEqual(worker.web3Wrapper.isCalled, false); - }); - - it('Block interval is not exceeded', async function () { - const worker = new eth_worker.worker(constants); - - worker.lastExportedBlock = 1; - worker.lastConfirmedBlock = constants.BLOCK_INTERVAL + constants.CONFIRMATIONS + 10; - - const result = await nextIntervalCalculator(worker); - assert.deepStrictEqual(result.success, true); - assert.deepStrictEqual(result.fromBlock, 2); - assert.deepStrictEqual(result.toBlock, 1 + constants.BLOCK_INTERVAL); - }); + it('sets sleep time accordingly when we\'ve caught up', () => { + const worker = new eth_worker.worker(constants); + setWorkerSleepTime(worker, 2); + assert.deepStrictEqual(worker.sleepTimeMsec, worker.settings.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000); + }); }); -describe('Check logic when Node is not ahead', function () { - it('Check interval is correct if Node is behind', async function () { - const worker = new eth_worker.worker(constants); - worker.web3Wrapper = new MockWeb3Wrapper(constants.CONFIRMATIONS + 10); - - worker.lastExportedBlock = 2; - worker.lastConfirmedBlock = 1; - - const resultSecond = await nextIntervalCalculator(worker); - assert.deepStrictEqual(resultSecond.success, true); - assert.deepStrictEqual(resultSecond.fromBlock, 3); - assert.deepStrictEqual(resultSecond.toBlock, 10); - }); - - it('Sleep time set if Node is not ahead', async function () { - const worker = new eth_worker.worker(constants); - worker.web3Wrapper = new MockWeb3Wrapper(constants.CONFIRMATIONS + 10); - - worker.lastExportedBlock = 1; - worker.lastConfirmedBlock = 1; - - await nextIntervalCalculator(worker); - assert.deepStrictEqual(worker.sleepTimeMsec, constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000); - }); - - it('Node gets called if Node is not ahead', async function () { - const worker = new eth_worker.worker(constants); - worker.web3Wrapper = new MockIsCalledWeb3Wrapper(); - - worker.lastExportedBlock = 1; - worker.lastConfirmedBlock = 1; - - await nextIntervalCalculator(worker); - assert.deepStrictEqual(worker.web3Wrapper.isCalled, true); - }); - - it('Block interval is not exceeded', async function () { - const worker = new eth_worker.worker(constants); - worker.web3Wrapper = new MockWeb3Wrapper(constants.BLOCK_INTERVAL + constants.CONFIRMATIONS + 10); - - const result = await nextIntervalCalculator(worker); - assert.deepStrictEqual(result.success, true); - assert.deepStrictEqual(result.fromBlock, 0); - assert.deepStrictEqual(result.toBlock, constants.BLOCK_INTERVAL - 1); - }); - +describe('nextIntervalCalculator', () => { + it('returns interval according to the worker progress', () => { + const worker = new eth_worker.worker(constants); + worker.lastExportedBlock = 0; + worker.lastConfirmedBlock = 100; + + const { fromBlock, toBlock } = nextIntervalCalculator(worker); + assert.deepStrictEqual(fromBlock, 1); + assert.deepStrictEqual(toBlock, 50); + }); + + it('returns the interval accordingly when lastConfirmedBlock is close', () => { + const worker = new eth_worker.worker(constants); + worker.lastExportedBlock = 0; + worker.lastConfirmedBlock = 37; + + const { fromBlock, toBlock } = nextIntervalCalculator(worker); + assert.deepStrictEqual(fromBlock, 1); + assert.deepStrictEqual(toBlock, 37); + }); }); -