diff --git a/blockchains/bnb/bnb_worker.js b/blockchains/bnb/bnb_worker.js index 9e2c716c..d1c21c2e 100644 --- a/blockchains/bnb/bnb_worker.js +++ b/blockchains/bnb/bnb_worker.js @@ -79,7 +79,7 @@ class BNBWorker extends BaseWorker { } // The API returns most recent transactions first. Take block number from the first one. - this.lastBufferedBlock = fetchTransactions[0].blockHeight; + this.lastExportedBlock = fetchTransactions[0].blockHeight; if (!this.bnbTradesMode) { fetchTransactions = await fetch_transactions.replaceParentTransactionsWithChildren(this.queue, @@ -88,7 +88,7 @@ class BNBWorker extends BaseWorker { resultTransactions = getTransactionsWithKeys(fetchTransactions); - this.lastBufferedBlock = resultTransactions[resultTransactions.length - 1].blockHeight; + this.lastExportedBlock = resultTransactions[resultTransactions.length - 1].blockHeight; } // The upper limit of the load rate is enforced by p-queue. @@ -118,7 +118,7 @@ class BNBWorker extends BaseWorker { */ getLastProcessedPosition() { return { - blockNumber: this.lastBufferedBlock, + blockNumber: this.lastExportedBlock, timestampReached: this.bnbTransactionsFetcher.getIntervalFetchEnd(), fetchRangeMsec: this.bnbTransactionsFetcher.getMsecInFetchRange() }; @@ -151,7 +151,7 @@ class BNBWorker extends BaseWorker { this.bnbTransactionsFetcher = new BNBTransactionsFetcher(lastProcessedPosition.timestampReached, fetchRangeMsec, this.bnbTradesMode); - this.lastBufferedBlock = lastProcessedPosition.blockNumber; + this.lastExportedBlock = lastProcessedPosition.blockNumber; return lastProcessedPosition; } diff --git a/blockchains/cardano/cardano_worker.js b/blockchains/cardano/cardano_worker.js index 39dc0127..a2fdc48c 100644 --- a/blockchains/cardano/cardano_worker.js +++ b/blockchains/cardano/cardano_worker.js @@ -170,7 +170,7 @@ class CardanoWorker extends BaseWorker { } async work() { - const fromBlock = this.lastBufferedBlock + 1; + const fromBlock = this.lastExportedBlock + 1; if (fromBlock >= this.lastConfirmedBlock - 2) { // We are up to date with the blockchain (aka 'current mode'). Sleep longer after finishing this loop. // The last confirmed block may be partial and would not be exported. Allow for one block gap. @@ -192,7 +192,7 @@ class CardanoWorker extends BaseWorker { } let transactions = null; - if (this.lastBufferedBlock < 0) { + if (this.lastExportedBlock < 0) { transactions = await this.getGenesisTransactions(); if (transactions.length === 0) { throw new Error('Error getting Cardano genesis transactions'); @@ -213,7 +213,7 @@ class CardanoWorker extends BaseWorker { transactions[i].primaryKey = this.lastPrimaryKey + i + 1; } - this.lastBufferedBlock = transactions[transactions.length - 1].block.number; + this.lastExportedBlock = transactions[transactions.length - 1].block.number; this.lastPrimaryKey += transactions.length; return transactions; diff --git a/blockchains/erc20/erc20_worker.js b/blockchains/erc20/erc20_worker.js index 42978a84..d0375ed6 100644 --- a/blockchains/erc20/erc20_worker.js +++ b/blockchains/erc20/erc20_worker.js @@ -77,14 +77,14 @@ class ERC20Worker extends BaseWorker { } getBlocksListInterval() { - if (this.lastBufferedBlock === -1 && this.blocksList.length > 0) { + if (this.lastExportedBlock === -1 && this.blocksList.length > 0) { return { success: true, fromBlock: this.blocksList[0][0], toBlock: this.blocksList[0][1] }; } - while (this.blocksList.length > 0 && this.lastBufferedBlock >= this.blocksList[0][1]) { + while (this.blocksList.length > 0 && this.lastExportedBlock >= this.blocksList[0][1]) { this.blocksList.shift(); } if (this.blocksList.length === 0) { @@ -139,7 +139,7 @@ class ERC20Worker extends BaseWorker { this.lastPrimaryKey = events[events.length - 1].primaryKey; } - this.lastBufferedBlock = interval.toBlock; + this.lastExportedBlock = interval.toBlock; const resultEvents = events.concat(overwritten_events); // If overwritten events have been generated, they need to be merged into the original events diff --git a/blockchains/eth/eth_worker.js b/blockchains/eth/eth_worker.js index 42cfaac3..7b561933 100644 --- a/blockchains/eth/eth_worker.js +++ b/blockchains/eth/eth_worker.js @@ -9,7 +9,7 @@ const BaseWorker = require('../../lib/worker_base'); const Web3Wrapper = require('./lib/web3_wrapper'); const { decodeTransferTrace } = require('./lib/decode_transfers'); const { FeesDecoder } = require('./lib/fees_decoder'); -const { nextIntervalCalculator } = require('./lib/next_interval_calculator'); +const { nextIntervalCalculator, isNewBlockAvailable } = require('./lib/next_interval_calculator'); const { WithdrawalsDecoder } = require('./lib/withdrawals_decoder'); class ETHWorker extends BaseWorker { @@ -40,8 +40,7 @@ class ETHWorker extends BaseWorker { fromBlock: this.web3Wrapper.parseNumberToHex(fromBlock), toBlock: this.web3Wrapper.parseNumberToHex(toBlock) }]) - .then((data) => this.parseEthInternalTrx(data['result'])) - .catch((err) => { throw err; }); + .then((data) => this.parseEthInternalTrx(data['result'])); } async fetchBlocks(fromBlock, toBlock) { @@ -146,14 +145,15 @@ class ETHWorker extends BaseWorker { return result; } - async work(key) { - const result = await nextIntervalCalculator(this); - if (!result.success) return []; - this.lastBufferedBlock = result.toBlock; - - logger.info(`Fetching transfer events for interval ${result.fromBlock}:${result.toBlock}`); - const [traces, blocks, receipts] = await this.fetchData(result.fromBlock, result.toBlock); - const events = this.transformPastEvents(result.fromBlock, result.toBlock, traces, blocks, receipts); + async work() { + if (!(await isNewBlockAvailable(this))) { + return []; + } + const { fromBlock, toBlock } = nextIntervalCalculator(this); + this.lastExportedBlock = toBlock; + logger.info(`Fetching transfer events for interval ${fromBlock}:${toBlock}`); + const [traces, blocks, receipts] = await this.fetchData(fromBlock, toBlock); + const events = this.transformPastEvents(fromBlock, toBlock, traces, blocks, receipts); if (events.length > 0) { stableSort(events, transactionOrder); @@ -164,7 +164,7 @@ class ETHWorker extends BaseWorker { this.lastPrimaryKey += events.length; } - return [key, events]; + return events; } async init() { diff --git a/blockchains/eth/lib/next_interval_calculator.js b/blockchains/eth/lib/next_interval_calculator.js index 27e4af57..8c603de3 100644 --- a/blockchains/eth/lib/next_interval_calculator.js +++ b/blockchains/eth/lib/next_interval_calculator.js @@ -1,31 +1,40 @@ /** - * A function that returns the appropriate interval, - * depending on the progress that the worker's made. * If the exporter's caught up, we check for a new block. We then check whether the Node * returns a valid block (sometimes the Node returns an early block, like 3 for example). * We don't want to get the new blocks right away, so we set a sleep variable. On the next iteration * the function will return the appropriate array of intervals. - * @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class. - * @returns {Array} The corresponding interval + * @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class. + * @returns {boolean} Boolean variable set to true or false, depending on whether there's a new block or not */ -async function nextIntervalCalculator(worker) { - if (worker.lastBufferedBlock >= worker.lastConfirmedBlock) { +async function isNewBlockAvailable(worker) { + if (worker.lastExportedBlock >= worker.lastConfirmedBlock) { const newConfirmedBlock = await worker.web3Wrapper.getBlockNumber() - worker.settings.CONFIRMATIONS; if (worker.lastConfirmedBlock < newConfirmedBlock) { worker.lastConfirmedBlock = newConfirmedBlock; } worker.sleepTimeMsec = worker.settings.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000; - return { success: false }; + + return false; } + return true; +} + +/** + * A function that returns the appropriate interval, + * depending on the progress that the worker's made. + * @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class. + * @returns {object} The corresponding interval + */ +function nextIntervalCalculator(worker) { worker.sleepTimeMsec = 0; return { - success: true, - fromBlock: worker.lastBufferedBlock + 1, - toBlock: Math.min(worker.lastBufferedBlock + worker.settings.BLOCK_INTERVAL, worker.lastConfirmedBlock) + fromBlock: worker.lastExportedBlock + 1, + toBlock: Math.min(worker.lastExportedBlock + worker.settings.BLOCK_INTERVAL, worker.lastConfirmedBlock) }; } module.exports = { + isNewBlockAvailable, nextIntervalCalculator }; diff --git a/blockchains/matic/matic_worker.js b/blockchains/matic/matic_worker.js index 2cbc0793..7604af58 100644 --- a/blockchains/matic/matic_worker.js +++ b/blockchains/matic/matic_worker.js @@ -33,7 +33,7 @@ class MaticWorker extends BaseWorker { this.lastPrimaryKey = events[events.length - 1].primaryKey; } - this.lastBufferedBlock = result.toBlock; + this.lastExportedBlock = result.toBlock; return events; } diff --git a/blockchains/receipts/receipts_worker.js b/blockchains/receipts/receipts_worker.js index b0e7fcdc..38bacb5a 100644 --- a/blockchains/receipts/receipts_worker.js +++ b/blockchains/receipts/receipts_worker.js @@ -91,7 +91,7 @@ class ReceiptsWorker extends BaseWorker { } async work() { - if (this.lastConfirmedBlock === this.lastBufferedBlock) { + if (this.lastConfirmedBlock === this.lastExportedBlock) { this.sleepTimeMsec = this.settings.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000; const newConfirmedBlock = await this.web3Wrapper.getBlockNumber() - this.settings.CONFIRMATIONS; @@ -103,13 +103,13 @@ class ReceiptsWorker extends BaseWorker { this.sleepTimeMsec = 0; } - const toBlock = Math.min(this.lastBufferedBlock + this.settings.BLOCK_INTERVAL, this.lastConfirmedBlock); - const fromBlock = this.lastBufferedBlock + 1; + const toBlock = Math.min(this.lastExportedBlock + this.settings.BLOCK_INTERVAL, this.lastConfirmedBlock); + const fromBlock = this.lastExportedBlock + 1; logger.info(`Fetching receipts for interval ${fromBlock}:${toBlock}`); const receipts = await this.getReceiptsForBlocks(fromBlock, toBlock); - this.lastBufferedBlock = toBlock; + this.lastExportedBlock = toBlock; return receipts; } diff --git a/blockchains/utxo/utxo_worker.js b/blockchains/utxo/utxo_worker.js index 9316896d..847eef7f 100644 --- a/blockchains/utxo/utxo_worker.js +++ b/blockchains/utxo/utxo_worker.js @@ -88,7 +88,7 @@ class UtxoWorker extends BaseWorker { } async work() { - if (this.lastConfirmedBlock === this.lastBufferedBlock) { + if (this.lastConfirmedBlock === this.lastExportedBlock) { this.sleepTimeMsec = this.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000; const blockchainInfo = await this.sendRequestWithRetry('getblockchaininfo', []); @@ -102,10 +102,10 @@ class UtxoWorker extends BaseWorker { this.sleepTimeMsec = 0; } - const numConcurrentRequests = Math.min(this.MAX_CONCURRENT_REQUESTS, this.lastConfirmedBlock - this.lastBufferedBlock); - const requests = Array.from({ length: numConcurrentRequests }, (_, i) => this.fetchBlock(this.lastBufferedBlock + 1 + i)); + const numConcurrentRequests = Math.min(this.MAX_CONCURRENT_REQUESTS, this.lastConfirmedBlock - this.lastExportedBlock); + const requests = Array.from({ length: numConcurrentRequests }, (_, i) => this.fetchBlock(this.lastExportedBlock + 1 + i)); const blocks = await Promise.all(requests); - this.lastBufferedBlock += blocks.length; + this.lastExportedBlock += blocks.length; return blocks; } } diff --git a/blockchains/xrp/xrp_worker.js b/blockchains/xrp/xrp_worker.js index cad21794..dcd24e2c 100644 --- a/blockchains/xrp/xrp_worker.js +++ b/blockchains/xrp/xrp_worker.js @@ -149,7 +149,7 @@ class XRPWorker extends BaseWorker { } async work() { - if (this.lastConfirmedBlock === this.lastBufferedBlock) { + if (this.lastConfirmedBlock === this.lastExportedBlock) { this.sleepTimeMsec = this.settings.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000; const lastValidatedLedger = await this.connectionSend(this.connections[0], { command: 'ledger', @@ -165,8 +165,8 @@ class XRPWorker extends BaseWorker { } else { this.sleepTimeMsec = 0; } - const toBlock = Math.min(this.lastBufferedBlock + this.settings.SEND_BATCH_SIZE, this.lastConfirmedBlock); - let fromBlock = this.lastBufferedBlock + 1; + const toBlock = Math.min(this.lastExportedBlock + this.settings.SEND_BATCH_SIZE, this.lastConfirmedBlock); + let fromBlock = this.lastExportedBlock + 1; const requests = []; let transfers = []; @@ -182,7 +182,7 @@ class XRPWorker extends BaseWorker { }); this.checkAllTransactionsValid(ledgers); - this.lastBufferedBlock = toBlock; + this.lastExportedBlock = toBlock; if (ledgers.length > 0) { this.lastPrimaryKey = ledgers[ledgers.length - 1].primaryKey; } diff --git a/index.js b/index.js index c937b666..c3b51cd6 100644 --- a/index.js +++ b/index.js @@ -78,25 +78,21 @@ class Main { metrics.currentBlock.set(this.worker.lastConfirmedBlock); metrics.requestsCounter.inc(this.worker.getNewRequestsCount()); metrics.requestsResponseTime.observe(new Date() - this.worker.lastRequestStartTime); - metrics.lastExportedBlock.set(this.worker.lastBufferedBlock); + metrics.lastExportedBlock.set(this.worker.lastExportedBlock); } async waitOnStoreEvents() { const bufferCopy = this.taskManager.retrieveCompleted(); await this.exporter.storeEvents(bufferCopy); - this.lastProcessedPosition = this.taskManager.lastProcessedPosition; + this.lastProcessedPosition = { + primaryKey: bufferCopy[bufferCopy.length - 1].primaryKey, + blockNumber: bufferCopy[bufferCopy.length - 1].blockNumber + }; } async workLoop() { while (this.shouldWork) { - if (this.taskManager.queue.size < constantsBase.PQUEUE_MAX_SIZE) { - const currentKey = this.lastWorkerKey; - this.taskManager.pushToQueue(() => this.worker.work(currentKey).catch(err => { - logger.error(err.stack); - this.shouldWork = false; - })); - this.lastWorkerKey++; - } + if (this.taskManager.queue.size < constantsBase.PQUEUE_MAX_SIZE) this.taskManager.pushToQueue(this.worker.work); this.worker.lastRequestStartTime = new Date(); this.worker.lastExportTime = Date.now(); diff --git a/integration_test/src/binance-native-token.spec.js b/integration_test/src/binance-native-token.spec.js index c61381c0..08e76642 100644 --- a/integration_test/src/binance-native-token.spec.js +++ b/integration_test/src/binance-native-token.spec.js @@ -19,7 +19,7 @@ describe('BEP20 worker test', function () { }; const bep20Worker = new worker.worker(settings); await bep20Worker.init(); - bep20Worker.lastBufferedBlock = 19999999; + bep20Worker.lastExportedBlock = 19999999; let expectedDataPosition = 0; for (let i = 0; i < 5; ++i) { diff --git a/integration_test/src/ethereum-erc20-exact-contracts.spec.js b/integration_test/src/ethereum-erc20-exact-contracts.spec.js index 7c4d07ca..7a409c8a 100644 --- a/integration_test/src/ethereum-erc20-exact-contracts.spec.js +++ b/integration_test/src/ethereum-erc20-exact-contracts.spec.js @@ -25,7 +25,7 @@ describe('BEP20 worker test', function () { }; const erc20Worker = new worker.worker(settings); await erc20Worker.init(new MockExporter()); - erc20Worker.lastBufferedBlock = 9999999; + erc20Worker.lastExportedBlock = 9999999; let expectedDataPosition = 0; for (let i = 0; i < 10; ++i) { diff --git a/integration_test/src/matic.spec.js b/integration_test/src/matic.spec.js index 5bb864ea..b5dc7da5 100644 --- a/integration_test/src/matic.spec.js +++ b/integration_test/src/matic.spec.js @@ -23,7 +23,7 @@ describe('Matic worker test', function () { }; const maticWorker = new worker.worker(settings); await maticWorker.init(new MockExporter()); - maticWorker.lastBufferedBlock = 49999999; + maticWorker.lastExportedBlock = 49999999; let expectedDataPosition = 0; for (let i = 0; i < 4; ++i) { diff --git a/integration_test/src/optimism-erc20.spec.js b/integration_test/src/optimism-erc20.spec.js index 17154ef0..8aa40cb0 100644 --- a/integration_test/src/optimism-erc20.spec.js +++ b/integration_test/src/optimism-erc20.spec.js @@ -24,7 +24,7 @@ describe('Optimism worker test', function () { }; const erc20Worker = new worker.worker(settings); await erc20Worker.init(new MockExporter()); - erc20Worker.lastBufferedBlock = 99999999; + erc20Worker.lastExportedBlock = 99999999; let expectedDataPosition = 0; for (let i = 0; i < 10; ++i) { diff --git a/lib/task_manager.js b/lib/task_manager.js index 607c3f9c..29925e17 100644 --- a/lib/task_manager.js +++ b/lib/task_manager.js @@ -1,8 +1,9 @@ class TaskManager { constructor() { - this.map = {}; + this.taskData = {}; this.queue; this.buffer = []; + this.taskIndex = 0; this.lastPushedToBuffer = 0; } @@ -17,49 +18,32 @@ class TaskManager { this.queue.clear(); } - #generateDefensiveCopy() { - const bufferCopy = []; - for (const data of this.buffer) bufferCopy.push(data); - return bufferCopy; - } - - #shiftOriginalBuffer(shiftsCount) { - while (shiftsCount >= 0) { - this.buffer.shift(); - shiftsCount--; - } - } - retrieveCompleted() { - const bufferCopy = this.#generateDefensiveCopy(); - let shiftsCount = bufferCopy.length - 1; - this.#shiftOriginalBuffer(shiftsCount); - + const bufferCopy = []; + while (this.buffer.length > 0) bufferCopy.push(this.buffer.shift()); return bufferCopy; } #pushAllEligable() { - while (this.map[this.lastPushedToBuffer]) { - for (const data of this.map[this.lastPushedToBuffer]) { - this.buffer.push(data); - this.lastProcessedPosition = { - primaryKey: this.buffer[this.buffer.length - 1].primaryKey, - blockNumber: this.buffer[this.buffer.length - 1].blockNumber - }; - } - - delete this.map[this.lastPushedToBuffer]; + while (this.taskData[this.lastPushedToBuffer]) { + for (const data of this.taskData[this.lastPushedToBuffer]) this.buffer.push(data); + delete this.taskData[this.lastPushedToBuffer]; this.lastPushedToBuffer++; } } handleNewData([key, newTransformedData]) { - this.map[key] = newTransformedData; + this.taskData[key] = newTransformedData; this.#pushAllEligable(); } - pushToQueue(task) { - this.queue.add(task); + pushToQueue(workTask) { + this.queue.add(() => { + const result = workTask(); + const currIndex = this.taskIndex; + return [currIndex, result]; + }); + this.taskIndex++; } } diff --git a/lib/worker_base.js b/lib/worker_base.js index 05170e82..13ebdb31 100644 --- a/lib/worker_base.js +++ b/lib/worker_base.js @@ -7,7 +7,7 @@ class WorkerBase { // part of data, we set lastExportTime to current time. this.lastExportTime = Date.now(); this.lastConfirmedBlock = -1; - this.lastBufferedBlock = -1; + this.lastExportedBlock = -1; this.lastRequestStartTime; this.lastPrimaryKey = 0; this.sleepTimeMsec = 0; @@ -43,7 +43,7 @@ class WorkerBase { */ getLastProcessedPosition() { return { - blockNumber: this.lastBufferedBlock, + blockNumber: this.lastExportedBlock, primaryKey: this.lastPrimaryKey }; } @@ -66,7 +66,7 @@ class WorkerBase { }; logger.info(`Initialized exporter with initial position ${JSON.stringify(lastProcessedPosition)}`); } - this.lastBufferedBlock = lastProcessedPosition.blockNumber; + this.lastExportedBlock = lastProcessedPosition.blockNumber; this.lastPrimaryKey = lastProcessedPosition.primaryKey; return lastProcessedPosition; diff --git a/test/cardano/worker.spec.js b/test/cardano/worker.spec.js index 3312c273..b989301c 100644 --- a/test/cardano/worker.spec.js +++ b/test/cardano/worker.spec.js @@ -180,7 +180,7 @@ describe('workLoopTest', function () { }); it('test sleep is set once we are caught up', async function () { - cardanoWorker.lastBufferedBlock = await cardanoWorker.getCurrentBlock() - constants.CONFIRMATIONS; + cardanoWorker.lastExportedBlock = await cardanoWorker.getCurrentBlock() - constants.CONFIRMATIONS; await cardanoWorker.work(); assert.strictEqual(cardanoWorker.sleepTimeMsec, constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000); }); diff --git a/test/erc20/worker.spec.js b/test/erc20/worker.spec.js index a3b22cb4..20cd632e 100644 --- a/test/erc20/worker.spec.js +++ b/test/erc20/worker.spec.js @@ -77,7 +77,7 @@ describe('Test ERC20 worker', function () { await worker.init(mockExporter); worker.lastConfirmedBlock = 1; - worker.lastBufferedBlock = 0; + worker.lastExportedBlock = 0; const result = await worker.work(); @@ -113,7 +113,7 @@ describe('Test ERC20 worker', function () { } )); worker.lastConfirmedBlock = 1; - worker.lastBufferedBlock = 0; + worker.lastExportedBlock = 0; const result = await worker.work(); @@ -149,7 +149,7 @@ describe('Test ERC20 worker', function () { } )); worker.lastConfirmedBlock = 1; - worker.lastBufferedBlock = 0; + worker.lastExportedBlock = 0; // In this mode the primary key is 1 more than the 'original' event correctedEventWithPrimaryKey.primaryKey += 1; @@ -187,7 +187,7 @@ describe('Test ERC20 worker', function () { } )); worker.lastConfirmedBlock = 1; - worker.lastBufferedBlock = 0; + worker.lastExportedBlock = 0; // In this mode the primary key is 1 more than the 'original' event correctedEventWithPrimaryKey.primaryKey += 1; @@ -200,7 +200,7 @@ describe('Test ERC20 worker', function () { constants.EXPORT_BLOCKS_LIST = true; const worker = new erc20_worker.worker(constants, new MockWeb3Wrapper(), new MockEthClient()); worker.blocksList = [[1, 10], [11, 20], [21, 30]]; - worker.lastBufferedBlock = -1; + worker.lastExportedBlock = -1; const result = worker.getBlocksListInterval(); @@ -212,7 +212,7 @@ describe('Test ERC20 worker', function () { constants.EXPORT_BLOCKS_LIST = true; const worker = new erc20_worker.worker(constants, new MockWeb3Wrapper(), new MockEthClient()); worker.blocksList = [[1, 10], [11, 20], [21, 30]]; - worker.lastBufferedBlock = 20; + worker.lastExportedBlock = 20; const result = worker.getBlocksListInterval(); @@ -224,7 +224,7 @@ describe('Test ERC20 worker', function () { constants.EXPORT_BLOCKS_LIST = true; const worker = new erc20_worker.worker(constants, new MockWeb3Wrapper(), new MockEthClient()); worker.blocksList = [[5, 10], [11, 20], [21, 30]]; - worker.lastBufferedBlock = 10; + worker.lastExportedBlock = 10; const result = worker.getBlocksListInterval(); @@ -236,7 +236,7 @@ describe('Test ERC20 worker', function () { constants.EXPORT_BLOCKS_LIST = true; const worker = new erc20_worker.worker(constants, new MockWeb3Wrapper(), new MockEthClient()); worker.blocksList = [[5, 10], [11, 20], [21, 30]]; - worker.lastBufferedBlock = 30; + worker.lastExportedBlock = 30; const result = worker.getBlocksListInterval(); @@ -248,7 +248,7 @@ describe('Test ERC20 worker', function () { constants.EXPORT_BLOCKS_LIST = true; const worker = new erc20_worker.worker(constants, new MockWeb3Wrapper(), new MockEthClient()); worker.blocksList = [[21, 30]]; - worker.lastBufferedBlock = 30; + worker.lastExportedBlock = 30; const result = worker.getBlocksListInterval(); diff --git a/test/eth/next_interval_calculator.spec.js b/test/eth/next_interval_calculator.spec.js index 5d7fff67..508bd58a 100644 --- a/test/eth/next_interval_calculator.spec.js +++ b/test/eth/next_interval_calculator.spec.js @@ -43,7 +43,7 @@ describe('Check interval not going backwards', function () { assert.deepStrictEqual(resultSecond.success, true); assert.deepStrictEqual(resultSecond.fromBlock, 0); assert.deepStrictEqual(resultSecond.toBlock, 97); - worker.lastBufferedBlock = resultSecond.toBlock; + worker.lastExportedBlock = resultSecond.toBlock; const resultThird = await nextIntervalCalculator(worker); assert.deepStrictEqual(resultThird.success, false); @@ -56,7 +56,7 @@ describe('Check interval not going backwards', function () { // Setup a situation where the exported block has exceeded the Node block // Test is similar to the above but test that we already saved an old lastConfirmedBlock - worker.lastBufferedBlock = 10; + worker.lastExportedBlock = 10; worker.lastConfirmedBlock = 5; const resultSecond = await nextIntervalCalculator(worker); @@ -97,7 +97,7 @@ describe('Check logic when Node is ahead', function () { it('Interval is correct if Node is ahead', async function () { const worker = new eth_worker.worker(constants); - worker.lastBufferedBlock = 1; + worker.lastExportedBlock = 1; worker.lastConfirmedBlock = 2; const resultSecond = await nextIntervalCalculator(worker); @@ -109,7 +109,7 @@ describe('Check logic when Node is ahead', function () { it('No sleep time if Node is ahead', async function () { const worker = new eth_worker.worker(constants); - worker.lastBufferedBlock = 1; + worker.lastExportedBlock = 1; worker.lastConfirmedBlock = 2; const resultSecond = await nextIntervalCalculator(worker); @@ -121,7 +121,7 @@ describe('Check logic when Node is ahead', function () { const worker = new eth_worker.worker(constants); worker.web3Wrapper = new MockIsCalledWeb3Wrapper(); - worker.lastBufferedBlock = 1; + worker.lastExportedBlock = 1; worker.lastConfirmedBlock = 2; await nextIntervalCalculator(worker); @@ -131,7 +131,7 @@ describe('Check logic when Node is ahead', function () { it('Block interval is not exceeded', async function () { const worker = new eth_worker.worker(constants); - worker.lastBufferedBlock = 1; + worker.lastExportedBlock = 1; worker.lastConfirmedBlock = constants.BLOCK_INTERVAL + constants.CONFIRMATIONS + 10; const result = await nextIntervalCalculator(worker); @@ -146,7 +146,7 @@ describe('Check logic when Node is not ahead', function () { const worker = new eth_worker.worker(constants); worker.web3Wrapper = new MockWeb3Wrapper(constants.CONFIRMATIONS + 10); - worker.lastBufferedBlock = 2; + worker.lastExportedBlock = 2; worker.lastConfirmedBlock = 1; const resultFirst = await nextIntervalCalculator(worker); @@ -163,7 +163,7 @@ describe('Check logic when Node is not ahead', function () { const worker = new eth_worker.worker(constants); worker.web3Wrapper = new MockWeb3Wrapper(constants.CONFIRMATIONS + 10); - worker.lastBufferedBlock = 1; + worker.lastExportedBlock = 1; worker.lastConfirmedBlock = 1; await nextIntervalCalculator(worker); @@ -174,7 +174,7 @@ describe('Check logic when Node is not ahead', function () { const worker = new eth_worker.worker(constants); worker.web3Wrapper = new MockIsCalledWeb3Wrapper(); - worker.lastBufferedBlock = 1; + worker.lastExportedBlock = 1; worker.lastConfirmedBlock = 1; await nextIntervalCalculator(worker); diff --git a/test/eth/worker.spec.js b/test/eth/worker.spec.js index 63aaf9a6..f91f4f5b 100644 --- a/test/eth/worker.spec.js +++ b/test/eth/worker.spec.js @@ -46,7 +46,7 @@ describe('Test worker', function () { it('test primary key assignment', async function () { // Overwrite variables and methods that the 'work' method would use internally. worker.lastConfirmedBlock = 1; - worker.lastBufferedBlock = 0; + worker.lastExportedBlock = 0; worker.fetchData = async function () { return []; }; diff --git a/test/lib/task_manager.spec.js b/test/lib/task_manager.spec.js index 279fd2a6..85d62678 100644 --- a/test/lib/task_manager.spec.js +++ b/test/lib/task_manager.spec.js @@ -5,7 +5,7 @@ describe('TaskManager', () => { it('constructor initializes corresponding variables', () => { const taskManager = new TaskManager(); - assert.deepStrictEqual(taskManager.map, {}); + assert.deepStrictEqual(taskManager.taskData, {}); assert.deepStrictEqual(taskManager.buffer, []); assert.strictEqual(taskManager.lastPushedToBuffer, 0); assert.strictEqual(taskManager.queue, undefined); @@ -33,7 +33,7 @@ describe('TaskManager', () => { const exampleDataObject = [{ fromBlock: 1, toBlock: 10, data: [1, 2, 3] }]; taskManager.handleNewData([1, exampleDataObject]); // This [key, data] pair comes from the worker's work method - assert.deepStrictEqual(taskManager.map, {1: exampleDataObject}); + assert.deepStrictEqual(taskManager.taskData, {1: exampleDataObject}); }); it('handleNewData() fills the buffer accordingly', () => { diff --git a/test/xrp/worker.test.js b/test/xrp/worker.test.js index 4d5d057a..4aeca47b 100644 --- a/test/xrp/worker.test.js +++ b/test/xrp/worker.test.js @@ -44,7 +44,7 @@ describe('workLoopSimpleTest', function () { worker.fetchLedgerTransactions = mockFetchLedgerTransactions; // Set a huge last confirmed Node block, so that we do not ask the node and mock more easily. - worker.lastBufferedBlock = 10; + worker.lastExportedBlock = 10; worker.lastConfirmedBlock = 20; await worker.work(); @@ -65,7 +65,7 @@ describe('workLoopSimpleTest', function () { worker.fetchLedgerTransactions = mockFetchLedgerTransactions; // Set a huge last confirmed Node block, so that we do not ask the node and mock more easily. - worker.lastBufferedBlock = 10; + worker.lastExportedBlock = 10; worker.lastConfirmedBlock = 20; const result = await worker.work();