diff --git a/blockchains/eth/eth_worker.js b/blockchains/eth/eth_worker.js index af037bfe..4b678dac 100644 --- a/blockchains/eth/eth_worker.js +++ b/blockchains/eth/eth_worker.js @@ -4,13 +4,17 @@ const { logger } = require('../../lib/logger'); const { constructRPCClient } = require('../../lib/http_client'); const { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } = require('./lib/dao_hack'); const { getGenesisTransfers } = require('./lib/genesis_transfers'); -const { transactionOrder, stableSort } = require('./lib/util'); const BaseWorker = require('../../lib/worker_base'); const Web3Wrapper = require('./lib/web3_wrapper'); const { decodeTransferTrace } = require('./lib/decode_transfers'); const { FeesDecoder } = require('./lib/fees_decoder'); -const { nextIntervalCalculator, analyzeWorkerContext, setWorkerSleepTime, NO_WORK_SLEEP } = require('./lib/next_interval_calculator'); const { WithdrawalsDecoder } = require('./lib/withdrawals_decoder'); +const { + analyzeWorkerContext, + setWorkerSleepTime, + NO_WORK_SLEEP, + nextIntervalCalculatorV2 } = require('./lib/next_interval_calculator'); + class ETHWorker extends BaseWorker { constructor(settings) { @@ -150,24 +154,13 @@ class ETHWorker extends BaseWorker { setWorkerSleepTime(this, workerContext); if (workerContext === NO_WORK_SLEEP) return []; - const { fromBlock, toBlock } = nextIntervalCalculator(this); + const { fromBlock, toBlock } = nextIntervalCalculatorV2(this); this.lastQueuedBlock = toBlock; logger.info(`Fetching transfer events for interval ${fromBlock}:${toBlock}`); const [traces, blocks, receipts] = await this.fetchData(fromBlock, toBlock); const events = this.transformPastEvents(fromBlock, toBlock, traces, blocks, receipts); - if (events.length > 0) { - stableSort(events, transactionOrder); - for (let i = 0; i < events.length; i++) { - events[i].primaryKey = this.lastPrimaryKey + i + 1; - } - - this.lastPrimaryKey += events.length; - } - - this.lastExportedBlock = toBlock; - return [{ fromBlock, toBlock }, events]; } diff --git a/blockchains/eth/lib/next_interval_calculator.js b/blockchains/eth/lib/next_interval_calculator.js index 87d2348e..e6af421e 100644 --- a/blockchains/eth/lib/next_interval_calculator.js +++ b/blockchains/eth/lib/next_interval_calculator.js @@ -16,7 +16,7 @@ const NO_WORK_SLEEP = 2; * @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class. * @returns {number} A number, which points to one of the above-given scenarios */ -async function analyzeWorkerContext(worker) { +async function analyzeWorkerContext(worker) {//TODO: Check how it will handle with HEAD block if (worker.lastExportedBlock < worker.lastConfirmedBlock) return WORK_NO_SLEEP; const newConfirmedBlock = await worker.web3Wrapper.getBlockNumber() - worker.settings.CONFIRMATIONS; @@ -40,9 +40,21 @@ function setWorkerSleepTime(worker, context) { /** * Function for calculating the next interval to be used in the worker's methods for querying the node. * @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class. - * @returns {object} The interval, derived from the progress of the worker + * @returns */ function nextIntervalCalculator(worker) { + return { + fromBlock: worker.lastExportedBlock + 1, + toBlock: Math.min(worker.lastExportedBlock + worker.settings.BLOCK_INTERVAL, worker.lastConfirmedBlock) + }; +} + +/** + * Function for calculating the next interval to be used in the worker's methods for querying the node. + * @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class. + * @returns {object} The interval, derived from the progress of the worker + */ +function nextIntervalCalculatorV2(worker) { return { fromBlock: worker.lastQueuedBlock + 1, toBlock: Math.min(worker.lastQueuedBlock + worker.settings.BLOCK_INTERVAL, worker.lastConfirmedBlock) @@ -55,5 +67,6 @@ module.exports = { WORK_NO_SLEEP, setWorkerSleepTime, analyzeWorkerContext, - nextIntervalCalculator + nextIntervalCalculator, + nextIntervalCalculatorV2 }; diff --git a/index.js b/index.js index e3f67376..12d6ebf6 100644 --- a/index.js +++ b/index.js @@ -32,13 +32,17 @@ class Main { async handleInitPosition() { const lastRecoveredPosition = await this.exporter.getLastPosition(); - this.lastProcessedPosition = this.worker.initPosition(lastRecoveredPosition); + if (BLOCKCHAIN === 'eth') { + this.lastProcessedPosition = this.taskManager.initPosition(lastRecoveredPosition); + this.worker.lastQueuedBlock = this.lastProcessedPosition.blockNumber; + } else { + this.lastProcessedPosition = this.worker.initPosition(lastRecoveredPosition); + } await this.exporter.savePosition(this.lastProcessedPosition); } - async #initTaskManager(lastBlock) { + async initTaskManager() { this.taskManager = await TaskManager.create(MAX_CONCURRENT_REQUESTS); - this.taskManager.currentFromBlock = lastBlock + 1; } #isWorkerSet() { @@ -49,14 +53,15 @@ class Main { this.#isWorkerSet(); const mergedConstants = { ...constantsBase, ...constants }; this.worker = new worker.worker(mergedConstants); - await this.worker.init(this.exporter, metrics); - await this.handleInitPosition(); - await this.#initTaskManager(this.lastProcessedPosition.blockNumber); + await this.worker.init(this.exporter); } async init() { await this.initExporter(EXPORTER_NAME, true); await this.initWorker(); + if (BLOCKCHAIN === 'eth') await this.initTaskManager(this.lastProcessedPosition); + await this.handleInitPosition(); + metrics.startCollection(); this.microServer = micro(microHandler); @@ -77,29 +82,53 @@ class Main { metrics.currentBlock.set(this.worker.lastConfirmedBlock); metrics.requestsCounter.inc(this.worker.getNewRequestsCount()); metrics.requestsResponseTime.observe(new Date() - this.worker.lastRequestStartTime); - metrics.lastExportedBlock.set(this.worker.lastExportedBlock); + if (BLOCKCHAIN === 'eth') { + metrics.lastExportedBlock.set(this.taskManager.lastExportedBlock); + } else { + metrics.lastExportedBlock.set(this.worker.lastExportedBlock); + } } async waitOnStoreEvents() { - if (this.taskManager.buffer.length > 0) { - const bufferCopy = this.taskManager.retrieveCompleted(); - await this.exporter.storeEvents(bufferCopy); - this.lastProcessedPosition = { - primaryKey: bufferCopy[bufferCopy.length - 1].primaryKey, - blockNumber: bufferCopy[bufferCopy.length - 1].blockNumber - }; + const buffer = this.taskManager.retrieveCompleted(); + if (buffer.length > 0) { + await this.exporter.storeEvents(buffer); + } + + this.taskManager.restartQueueIfNeeded(); + } + + async updatePosition() { + if (this.taskManager.isChangedLastExportedBlock(this.lastProcessedPosition.blockNumber)) { + this.lastProcessedPosition = this.taskManager.getLastProcessedPosition(); await this.exporter.savePosition(this.lastProcessedPosition); logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`); } + } + + async workLoop() { + while (this.shouldWork) { + this.worker.lastRequestStartTime = new Date(); + const events = await this.worker.work(); - if (this.taskManager.queue.isPaused) { - this.taskManager.queue.start(); - this.taskManager.consequentTaskIndex = 0; - logger.info('Resuming the queue...'); + this.worker.lastExportTime = Date.now(); + + this.updateMetrics(); + this.lastProcessedPosition = this.worker.getLastProcessedPosition(); + + if (events && events.length > 0) { + await this.exporter.storeEvents(events); + } + await this.exporter.savePosition(this.lastProcessedPosition); + logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`); + + if (this.shouldWork) { + await new Promise((resolve) => setTimeout(resolve, this.worker.sleepTimeMsec)); + } } } - async workLoop() { + async workLoopV2() { while (this.shouldWork) { await this.taskManager.queue.onSizeLessThan(constantsBase.PQUEUE_MAX_SIZE); this.taskManager.pushToQueue(() => this.worker.work().catch(err => { @@ -109,8 +138,8 @@ class Main { this.worker.lastRequestStartTime = new Date(); this.worker.lastExportTime = Date.now(); - this.lastProcessedPosition = this.worker.getLastProcessedPosition(); await this.waitOnStoreEvents(); + await this.updatePosition(); this.updateMetrics(); if (this.shouldWork) { @@ -199,7 +228,11 @@ async function main() { throw new Error(`Error initializing exporter: ${err.message}`); } try { - await mainInstance.workLoop(); + if (BLOCKCHAIN === 'eth') { + await mainInstance.workLoopV2(); + } else { + await mainInstance.workLoop(); + } await mainInstance.disconnect(); logger.info('Bye!'); } catch (err) { diff --git a/lib/task_manager.js b/lib/task_manager.js index da18e78f..caf68c0d 100644 --- a/lib/task_manager.js +++ b/lib/task_manager.js @@ -1,13 +1,26 @@ const { logger } = require('./logger'); -const { MAX_TASK_DATA_KEYS } = require('./constants'); +const { MAX_TASK_DATA_KEYS, BLOCKCHAIN, START_BLOCK, START_PRIMARY_KEY } = require('./constants'); +const { stableSort } = require('../blockchains/erc20/lib/util'); +const { transactionOrder } = require('../blockchains/eth/lib/util'); class TaskManager { constructor() { this.queue; - this.buffer = []; this.taskData = {}; + this.lastPrimaryKey; + this.lastExportedBlock; this.consequentTaskIndex = 0; - this.currentFromBlock; + } + + /** + * Method for creating a TaskManager instance. + * @param {number} maxConcurrentRequests Number of maximum concurrent tasks that should work at the same time + * @returns A TaskManager instance + */ + static async create(maxConcurrentRequests) { + const tm = new TaskManager(); + await tm.initQueue(maxConcurrentRequests); + return tm; } /** @@ -18,65 +31,101 @@ class TaskManager { async initQueue(maxConcurrentRequests) { const PQueue = (await import('p-queue')).default; this.queue = new PQueue({ concurrency: maxConcurrentRequests }); + this.queue.on('completed', ([interval, data]) => { this.consequentTaskIndex++; - this.handleNewData(interval, data); - if (this.consequentTaskIndex >= MAX_TASK_DATA_KEYS) { - if (!this.queue.isPaused) { - this.queue.pause(); - logger.info('Pausing the queue...'); - } - } + this.#handleNewData(interval, data); + this.#maybePauseQueue(); }); } /** - * Method for creating a TaskManager instance. - * @param {number} maxConcurrentRequests Number of maximum concurrent tasks that should work at the same time - * @returns A TaskManager instance + * On the completion of a task in the p-queue, the task + * should return an array in the form of [interval, events]. + * These would be set up in the taskData object accordingly in the + * correct format + * @param {object} interval + * @param {Array} newTransformedData */ - static async create(maxConcurrentRequests) { - const tm = new TaskManager(); - await tm.initQueue(maxConcurrentRequests); - return tm; + #handleNewData(interval, newTransformedData) { + this.taskData[interval.fromBlock] = { toBlock: interval.toBlock, data: newTransformedData }; } /** - * @returns A deep copy of the current TaskManager buffer + * If the Kafka producer slows down and at the same time we manage + * to fetch a lot of data, we'll hit an OOM error at some point, because + * of the `taskData` property getting full. + * We want to have a border (MAX_TASK_DATA_KEYS), which, if we cross, we'd have the queue + * pause the task generation for a bit. */ - retrieveCompleted() { - const bufferCopy = []; - while (this.buffer.length > 0) bufferCopy.push(this.buffer.shift()); - return bufferCopy; + #maybePauseQueue() { + if (this.consequentTaskIndex >= MAX_TASK_DATA_KEYS) { + if (!this.queue.isPaused) { + this.queue.pause(); + logger.info('Pausing the queue...'); + } + } } /** - * Private method for pushing the sequential intervals that are ready. - * While the loop hits sequential intervals in the taskData property, - * the data should be pushed to the buffer property. When the while loop hits - * an undefined (yet) key of taskData, the function should stop. + * When the Kafka Producer finishes with the storage, we'd want to check + * whether the queue's task generation has been paused, and if positive, + * start the queue anew. */ - #pushAllEligable() { - while (this.taskData[this.currentFromBlock]) { - for (const event of this.taskData[this.currentFromBlock].data) this.buffer.push(event); - const interval = this.taskData[this.currentFromBlock].toBlock - this.currentFromBlock + 1; - delete this.taskData[this.currentFromBlock]; - this.currentFromBlock += interval; + restartQueueIfNeeded() { + if (this.queue.isPaused) { + this.queue.start(); + this.consequentTaskIndex = 0; + logger.info('Resuming the queue...'); } } /** - * On the completion of a task in the p-queue, the task - * should return an array in the form of [interval, events]. - * These would be set up in the taskData object accordingly in the - * correct format, after which we use the #pushAllEligable private method - * to push ready-to-go sequential data into the buffer. - * @param {object} interval - * @param {Array} newTransformedData + * Helper method for when we push the `taskData` events' data into the buffer + * @param {Array} events Events data that we get from the current key in the `taskData` property + * @param {Array} buffer The array that at the end of the primary function would result in the + * combined data, accordingly updated with the primary keys */ - handleNewData(interval, newTransformedData) { - this.taskData[interval.fromBlock] = { toBlock: interval.toBlock, data: newTransformedData }; - this.#pushAllEligable(); + #updatePrimaryKeysPushToBuffer(events, buffer) { + for (let i = 0; i < events.length; i++) { + events[i].primaryKey = this.lastPrimaryKey + i + 1; + buffer.push(events[i]); + } + this.lastPrimaryKey += events.length; + } + + /** + * Helper method for checking the blockchain in order to use the correct primary key functionality + * and push the data into the buffer array. + * @param {*} events Events data that we get from the current key in the `taskData` property + * @param {*} buffer The array that at the end of the primary function would result in the + * combined data, accordingly updated with the primary keys + */ + #pushToBuffer(events, buffer) { + if (BLOCKCHAIN === 'eth') { + stableSort(events, transactionOrder); + this.#updatePrimaryKeysPushToBuffer(events, buffer); + } + } + + /** + * Method for pushing the sequential intervals that are ready. + * While the loop hits sequential intervals in the taskData property, + * the data should be pushed to the buffer array. When the while loop hits + * an undefined (yet) key of taskData, the function should stop. + * @returns Array of the events' data + */ + retrieveCompleted() { + const buffer = []; + while (this.taskData[this.lastExportedBlock + 1]) { + const events = this.taskData[this.lastExportedBlock + 1].data; + this.#pushToBuffer(events, buffer); + + const newLastExportedBlock = this.taskData[this.lastExportedBlock + 1].toBlock; + delete this.taskData[this.lastExportedBlock + 1]; + this.lastExportedBlock = newLastExportedBlock; + } + return buffer; } /** @@ -87,6 +136,47 @@ class TaskManager { pushToQueue(workTask) { this.queue.add(workTask); } + + /** + * Same as in the `worker_base.js` module, this is used for the definition of the + * `lastExportedBlock` and `lastPrimaryKey` properties, according to whether we have a + * `lastProcessedPosition` from ZooKeeper and if not, use the ones set from the config file + * or the defaults. + * @param {*} lastProcessedPosition + * @returns An object, either the same from Zookeeper or one generated from the config given + * to the exporter + */ + initPosition(lastProcessedPosition) { + if (lastProcessedPosition) { + logger.info(`Resuming export from position ${JSON.stringify(lastProcessedPosition)}`); + } else { + lastProcessedPosition = { + blockNumber: START_BLOCK, + primaryKey: START_PRIMARY_KEY + }; + logger.info(`Initialized exporter with initial position ${JSON.stringify(lastProcessedPosition)}`); + } + this.lastExportedBlock = lastProcessedPosition.blockNumber; + this.lastPrimaryKey = lastProcessedPosition.primaryKey; + + return lastProcessedPosition; + } + + /** + * Gets the object, made by taking the lastExportedBlock and lastPrimaryKey variables. + * Used when updating the position of the exporter in ZooKeeper. + * @returns An object of the last exported block and last used primary key + */ + getLastProcessedPosition() { + return { + blockNumber: this.lastExportedBlock, + primaryKey: this.lastPrimaryKey + }; + } + + isChangedLastExportedBlock(lastProcessedBlock) { + return lastProcessedBlock < this.lastExportedBlock; + } } module.exports = TaskManager; diff --git a/test/eth/next_interval_calculator.spec.js b/test/eth/next_interval_calculator.spec.js index 2df81a19..0b5b9813 100644 --- a/test/eth/next_interval_calculator.spec.js +++ b/test/eth/next_interval_calculator.spec.js @@ -80,7 +80,7 @@ describe('setWorkerSleepTime', () => { describe('nextIntervalCalculator', () => { it('would not exceed BLOCK_INTERVAL', () => { const worker = new eth_worker.worker(constants); - worker.lastQueuedBlock = 0; + worker.lastExportedBlock = 0; worker.lastConfirmedBlock = 150; const { fromBlock, toBlock } = nextIntervalCalculator(worker); @@ -90,7 +90,7 @@ describe('nextIntervalCalculator', () => { it('would not return full BLOCK_INTERVAL', () => { const worker = new eth_worker.worker(constants); - worker.lastQueuedBlock = 0; + worker.lastExportedBlock = 0; worker.lastConfirmedBlock = 37; const { fromBlock, toBlock } = nextIntervalCalculator(worker); diff --git a/test/eth/worker.spec.js b/test/eth/worker.spec.js index 01fd2f6e..2efe9c0d 100644 --- a/test/eth/worker.spec.js +++ b/test/eth/worker.spec.js @@ -42,22 +42,6 @@ describe('Test worker', function () { callResultWithPrimaryKey = v8.deserialize(v8.serialize(callResult)); callResultWithPrimaryKey.primaryKey = 2; }); - - it('test primary key assignment', async function () { - // Overwrite variables and methods that the 'work' method would use internally. - worker.lastConfirmedBlock = 1; - worker.lastQueuedBlock = 0; - worker.fetchData = async function () { - return []; - }; - worker.transformPastEvents = function () { - return [feeResult, callResult]; - }; - - const result = await worker.work(); - - assert.deepStrictEqual(result, [{fromBlock: 1, toBlock: 1}, [feeResultWithPrimaryKey, callResultWithPrimaryKey]]); - }); }); describe('Test that when action is null parsing would not break', function () { diff --git a/test/index.test.js b/test/index.test.js index ef7f7cb1..a49054e8 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -55,6 +55,7 @@ describe('Main', () => { exporterStub.getLastPosition.returns(JSON.parse('{"blockNumber":123456,"primaryKey":0}')); const mainInstance = new Main(); + mainInstance.taskManager = await TaskManager.create(1); mainInstance.exporter = exporterStub; mainInstance.worker = new BaseWorker(constants); @@ -71,6 +72,7 @@ describe('Main', () => { exporterStub.getLastPosition.returns(null); const mainInstance = new Main(); + mainInstance.taskManager = await TaskManager.create(1); mainInstance.exporter = exporterStub; mainInstance.worker = new BaseWorker(constants); @@ -87,6 +89,7 @@ describe('Main', () => { exporterStub.getLastPosition.throws(new Error('Exporter getLastPosition failed')); const mainInstance = new Main(); + mainInstance.taskManager = await TaskManager.create(1); mainInstance.exporter = exporterStub; mainInstance.worker = new BaseWorker(constants); @@ -104,6 +107,7 @@ describe('Main', () => { exporterStub.savePosition.throws(new Error('Exporter savePosition failed')); const mainInstance = new Main(); + mainInstance.taskManager = await TaskManager.create(1); mainInstance.exporter = exporterStub; mainInstance.worker = new BaseWorker(constants); @@ -140,44 +144,29 @@ describe('Main', () => { } }); - it('initWorker throws an error when handleInitPosition() fails', async () => { - const mainInstance = new Main(); - mainInstance.exporter = new Exporter('test-exporter'); - sinon.stub(worker.prototype, 'init').resolves(); - - sinon.stub(mainInstance, 'handleInitPosition').throws(new Error('Error when initializing position')); - - try { - await mainInstance.initWorker(); - expect.fail('initWorker should have thrown an error'); - } catch (err) { - assert.strictEqual(err.message, 'Error when initializing position'); - } - }); - it('initWorker success', async () => { const mainInstance = new Main(); + mainInstance.taskManager = await TaskManager.create(1); mainInstance.exporter = new Exporter('test-exporter'); sinon.stub(worker.prototype, 'init').resolves(); - sinon.stub(mainInstance, 'handleInitPosition').resolves(); - sinon.stub(TaskManager.prototype, 'initQueue').resolves(); mainInstance.lastProcessedPosition = { blockNumber: 10, primaryKey: 1 }; await mainInstance.initWorker(); - assert(mainInstance.handleInitPosition.calledOnce); - assert.strictEqual(mainInstance.taskManager.currentFromBlock, 11); + expect(mainInstance.worker).to.be.instanceof(worker); }); it('workLoop throws error when worker can\'t be initialised', async () => { sinon.stub(BaseWorker.prototype, 'work').rejects(new Error('Error in worker "work" method')); const mainInstance = new Main(); mainInstance.worker = new BaseWorker(constants); - mainInstance.taskManager = new TaskManager(0, 50); - await mainInstance.taskManager.initQueue(1); + mainInstance.taskManager = await TaskManager.create(1); sinon.spy(mainInstance, 'workLoop'); - await mainInstance.workLoop(); - assert(mainInstance.workLoop.calledOnce); - assert.strictEqual(mainInstance.shouldWork, false); + try { + await mainInstance.workLoop(); + } catch(err) { + assert(mainInstance.workLoop.calledOnce); + assert.strictEqual(err.message, 'Error in worker "work" method'); + } }); it('workLoop throws error when storeEvents() fails', async () => { @@ -238,7 +227,7 @@ describe('main function', () => { it('main function throws error when workLoop fails', async () => { sinon.stub(Main.prototype, 'init').resolves(); - sinon.stub(Main.prototype, 'workLoop').rejects(new Error('Main workLoop failed')); + sinon.stub(Main.prototype, 'workLoopV2').rejects(new Error('Main workLoop failed')); try { await main(); @@ -250,7 +239,7 @@ describe('main function', () => { it('main function throws error when disconnecting fails', async () => { sinon.stub(Main.prototype, 'init').resolves(); - sinon.stub(Main.prototype, 'workLoop').resolves(); + sinon.stub(Main.prototype, 'workLoopV2').resolves(); sinon.stub(Main.prototype, 'disconnect').rejects(new Error('Main disconnect failed')); try { @@ -263,11 +252,11 @@ describe('main function', () => { it('main function works', async () => { sinon.stub(Main.prototype, 'init').resolves(); - sinon.stub(Main.prototype, 'workLoop').resolves(); + sinon.stub(Main.prototype, 'workLoopV2').resolves(); sinon.stub(Main.prototype, 'disconnect').resolves(); await main(); assert(Main.prototype.init.calledOnce); - assert(Main.prototype.workLoop.calledOnce); + assert(Main.prototype.workLoopV2.calledOnce); }); }); diff --git a/test/lib/task_manager.spec.js b/test/lib/task_manager.spec.js index d5aba3e5..ce7f7e07 100644 --- a/test/lib/task_manager.spec.js +++ b/test/lib/task_manager.spec.js @@ -1,14 +1,18 @@ -const { assert } = require('chai'); +const { assert, expect } = require('chai'); const TaskManager = require('../../lib/task_manager'); +const { MAX_TASK_DATA_KEYS, START_BLOCK, START_PRIMARY_KEY } = require('../../lib/constants'); describe('TaskManager', () => { it('constructor initializes corresponding variables', () => { const taskManager = new TaskManager(); assert.deepStrictEqual(taskManager.taskData, {}); - assert.deepStrictEqual(taskManager.buffer, []); - assert.strictEqual(taskManager.lastPushedToBuffer, undefined); + assert.strictEqual(taskManager.consequentTaskIndex, 0); + assert.strictEqual(taskManager.queue, undefined); + assert.strictEqual(taskManager.lastPushedToBuffer, undefined); + assert.strictEqual(taskManager.lastPrimaryKey, undefined); + assert.strictEqual(taskManager.lastExportedBlock, undefined); }); it('queue is initialized accordingly', async () => { @@ -19,42 +23,157 @@ describe('TaskManager', () => { assert.strictEqual(taskManager.queue.concurrency, 5); }); - it('retrieveCompleted() makes a copy list of the buffer', () => { - const taskManager = new TaskManager(); - taskManager.buffer = [1, 2, 3, 4, 5, 6, 7]; + it('create static method initializes the TaskManager with its queue', async () => { + const CONCURRENCY = 2; + const taskManager = await TaskManager.create(CONCURRENCY); + const PQueue = (await import('p-queue')).default; + + expect(taskManager.queue).to.be.an.instanceof(PQueue); + expect(taskManager.queue.concurrency).to.eq(CONCURRENCY); - const copy = taskManager.retrieveCompleted(); - assert.deepStrictEqual(copy, [1, 2, 3, 4, 5, 6, 7]); - assert.deepStrictEqual(taskManager.buffer, []); + expect(taskManager).to.be.an.instanceof(TaskManager); }); - it('handleNewData() fills the taskData object in the correct format', () => { - const taskManager = new TaskManager(); - const exampleDataObject = [{ fromBlock: 1, toBlock: 10 }, [1, 2, 3]]; + it('queue handles tasks accordingly when they finish', async () => { + const CONCURRENCY = 1; + const taskManager = await TaskManager.create(CONCURRENCY); + + const exampleEventsData = [ 1, 2, 3, 4, 5 ]; + const emmitedTaskData = [{ fromBlock: 1, toBlock: 10 }, exampleEventsData]; + taskManager.queue.emit('completed', emmitedTaskData); - taskManager.handleNewData(...exampleDataObject); // This [ interval, data ] pair comes from the worker's work method - assert.deepStrictEqual(taskManager.taskData, { 1: { toBlock: 10, data: [1, 2, 3] } }); + expect(taskManager.taskData).to.deep.eq({ 1: { toBlock: 10, data: exampleEventsData } }); + expect(taskManager.consequentTaskIndex).to.eq(1); + expect(taskManager.queue.isPaused).to.eq(false); }); - it('handleNewData() fills the buffer when sequential intervals present', () => { - const taskManager = new TaskManager(); - taskManager.currentFromBlock = 1; - const exampleDataObject = [{ fromBlock: 1, toBlock: 10 }, [1, 2, 3]]; - const exampleDataObject2 = [{ fromBlock: 11, toBlock: 30 }, [4, 5, 6]]; + it('queue is paused when `consequentTaskIndex` crosses the borderline', async () => { + const CONCURRENCY = 1; + const taskManager = await TaskManager.create(CONCURRENCY); + taskManager.consequentTaskIndex = MAX_TASK_DATA_KEYS; - taskManager.handleNewData(...exampleDataObject); - taskManager.handleNewData(...exampleDataObject2); - assert.deepStrictEqual(taskManager.buffer, [...exampleDataObject[1], ...exampleDataObject2[1]]); + const exampleEventsData = [ 1, 2, 3, 4, 5 ]; + const emmitedTaskData = [{ fromBlock: 1, toBlock: 10 }, exampleEventsData]; + taskManager.queue.emit('completed', emmitedTaskData); + + expect(taskManager.queue.isPaused).to.eq(true); }); - it('handleNewData() should not skip interval', () => { - const taskManager = new TaskManager(); - taskManager.currentFromBlock = 1; - const exampleDataObject = [{ fromBlock: 1, toBlock: 10}, [1, 2, 3]]; - const exampleDataObject2 = [{ fromBlock: 31, toBlock: 40}, [4, 5, 6] ]; + it('queue gets started if it\'s been paused', async () => { + const CONCURRENCY = 1; + const taskManager = await TaskManager.create(CONCURRENCY); + taskManager.consequentTaskIndex = MAX_TASK_DATA_KEYS; + taskManager.queue.pause(); + + taskManager.restartQueueIfNeeded(); + + expect(taskManager.queue.isPaused).to.eq(false); + expect(taskManager.consequentTaskIndex).to.eq(0); + }); + + it('retrieveCompleted returns empty array if `taskData` is an empty object', async () => { + const CONCURRENCY = 1; + const taskManager = await TaskManager.create(CONCURRENCY); + + const testResult = taskManager.retrieveCompleted(); + expect(testResult).to.be.empty; + }); + + it('retrieveCompleted returns an array for the corresponding sequence in `taskData`', async () => { + const CONCURRENCY = 1; + const taskManager = await TaskManager.create(CONCURRENCY); + taskManager.initPosition({ blockNumber: 0, primaryKey: 0 }); + + const exampleEventsData = [ { prop: 1 }, { prop: 2 }, { prop: 3 } ]; + const exampleEventsData2 = [ { prop: 4 }, { prop: 5 }, { prop: 6 } ]; + const exampleEventsData3 = [ { prop: 7 }, { prop: 8 }, { prop: 9 } ]; + const exampleTaskData = { + 1: { toBlock: 10, data: exampleEventsData }, + 11: { toBlock: 20, data: exampleEventsData2 }, + 21: { toBlock: 30, data: exampleEventsData3 }, + }; + taskManager.taskData = exampleTaskData; + + const exampleResult = [ + { prop: 1, primaryKey: 1 }, { prop: 2, primaryKey: 2}, { prop: 3, primaryKey: 3 }, + { prop: 4, primaryKey: 4 }, { prop: 5, primaryKey: 5 }, { prop: 6, primaryKey: 6 }, + { prop: 7, primaryKey: 7 }, { prop: 8, primaryKey: 8 }, { prop: 9, primaryKey: 9 } + ]; + + const testResult = taskManager.retrieveCompleted(); + expect(testResult).to.deep.eq(exampleResult); + }); + + it('retrieveCompleted returns only the first part of the keys data when holes are present in `taskData`', async () => { + const CONCURRENCY = 1; + const taskManager = await TaskManager.create(CONCURRENCY); + taskManager.initPosition({ blockNumber: 0, primaryKey: 0 }); + + const exampleEventsData = [ { prop: 1 }, { prop: 2 }, { prop: 3 } ]; + const exampleEventsData3 = [ { prop: 7 }, { prop: 8 }, { prop: 9 } ]; + const exampleTaskData = { + 1: { toBlock: 10, data: exampleEventsData }, + 21: { toBlock: 30, data: exampleEventsData3 }, + }; + taskManager.taskData = exampleTaskData; + + const exampleResult = [ + { prop: 1, primaryKey: 1 }, { prop: 2, primaryKey: 2}, { prop: 3, primaryKey: 3 }, + ]; + + const testResult = taskManager.retrieveCompleted(); + expect(testResult).to.deep.eq(exampleResult); + }); + + it('initPosition sets the instance properties accordingly, returns corresponding object (no ZK value present)', async () => { + const CONCURRENCY = 1; + const taskManager = await TaskManager.create(CONCURRENCY); + + const testResult = taskManager.initPosition(); + expect(taskManager.lastExportedBlock).to.eq(START_BLOCK); + expect(taskManager.lastPrimaryKey).to.eq(START_PRIMARY_KEY); + expect(testResult).to.deep.eq({ blockNumber: START_BLOCK, primaryKey: START_PRIMARY_KEY }); + }); + + it('initPosition sets the instance properties accordingly, returns corresponding object (ZK value present)', async () => { + const CONCURRENCY = 1; + const taskManager = await TaskManager.create(CONCURRENCY); + + const exampleZookeeperValue = { blockNumber: 10, primaryKey: 300 }; + const testResult = taskManager.initPosition(exampleZookeeperValue); + expect(taskManager.lastExportedBlock).to.eq(exampleZookeeperValue.blockNumber); + expect(taskManager.lastPrimaryKey).to.eq(exampleZookeeperValue.primaryKey); + expect(testResult).to.deep.eq(exampleZookeeperValue); + }); + + it('getLastProcessedPosition returns correct object', async () => { + const CONCURRENCY = 1; + const taskManager = await TaskManager.create(CONCURRENCY); + taskManager.lastExportedBlock = 10; + taskManager.lastPrimaryKey = 300; + + const testResult = taskManager.getLastProcessedPosition(); + expect(testResult).to.deep.eq({ + blockNumber: taskManager.lastExportedBlock, + primaryKey: taskManager.lastPrimaryKey + }); + }); + + it('isChangedLastExportedBlock returns false when lastProcessedPosition.blockNumber = lastExportedBlock', async () => { + const CONCURRENCY = 1; + const taskManager = await TaskManager.create(CONCURRENCY); + taskManager.lastExportedBlock = 10; + + const testResult = taskManager.isChangedLastExportedBlock(10); + expect(testResult).to.eq(false); + }); + + it('isChangedLastExportedBlock returns true when lastProcessedPosition.blockNumber != lastExportedBlock', async () => { + const CONCURRENCY = 1; + const taskManager = await TaskManager.create(CONCURRENCY); + taskManager.lastExportedBlock = 10; - taskManager.handleNewData(...exampleDataObject); - taskManager.handleNewData(...exampleDataObject2); - assert.deepStrictEqual(taskManager.buffer, [...exampleDataObject[1]]); + const testResult = taskManager.isChangedLastExportedBlock(9); + expect(testResult).to.eq(true); }); });