From 0c72c364ce632f29a496f69c63a0335b6cd13393 Mon Sep 17 00:00:00 2001 From: Lyudmil Danailov Date: Fri, 16 Feb 2024 14:42:12 +0200 Subject: [PATCH] Reworking task order monitoring --- blockchains/eth/eth_worker.js | 6 ++-- index.js | 10 ++++-- lib/task_manager.js | 64 ++++++++++++++++++++++++----------- test/eth/worker.spec.js | 4 +-- test/index.test.js | 2 +- test/lib/task_manager.spec.js | 36 ++++++++++---------- 6 files changed, 77 insertions(+), 45 deletions(-) diff --git a/blockchains/eth/eth_worker.js b/blockchains/eth/eth_worker.js index a879f10e..af037bfe 100644 --- a/blockchains/eth/eth_worker.js +++ b/blockchains/eth/eth_worker.js @@ -95,7 +95,7 @@ class ETHWorker extends BaseWorker { return await Promise.all([ this.fetchEthInternalTrx(fromBlock, toBlock), this.fetchBlocks(fromBlock, toBlock), - // this.fetchReceipts(fromBlock, toBlock), + this.fetchReceipts(fromBlock, toBlock), ]); } @@ -149,7 +149,7 @@ class ETHWorker extends BaseWorker { const workerContext = await analyzeWorkerContext(this); setWorkerSleepTime(this, workerContext); if (workerContext === NO_WORK_SLEEP) return []; - + const { fromBlock, toBlock } = nextIntervalCalculator(this); this.lastQueuedBlock = toBlock; @@ -168,7 +168,7 @@ class ETHWorker extends BaseWorker { this.lastExportedBlock = toBlock; - return events; + return [{ fromBlock, toBlock }, events]; } async init() { diff --git a/index.js b/index.js index ee7b5c7f..6042d941 100644 --- a/index.js +++ b/index.js @@ -36,8 +36,9 @@ class Main { await this.exporter.savePosition(this.lastProcessedPosition); } - async #initTaskManager() { + async #initTaskManager(lastBlock) { this.taskManager = await TaskManager.create(MAX_CONCURRENT_REQUESTS); + this.taskManager.currentFromBlock = lastBlock + 1; } #isWorkerSet() { @@ -93,12 +94,15 @@ class Main { async workLoop() { while (this.shouldWork) { await this.taskManager.queue.onSizeLessThan(constantsBase.PQUEUE_MAX_SIZE); - this.taskManager.pushToQueue(this.worker); + this.taskManager.pushToQueue(() => this.worker.work().catch(err => { + logger.error(err.toString()); + this.shouldWork = false; + })); this.worker.lastRequestStartTime = new Date(); this.worker.lastExportTime = Date.now(); this.lastProcessedPosition = this.worker.getLastProcessedPosition(); - if (this.taskManager.buffer.length > 0) this.waitOnStoreEvents(); + if (this.taskManager.buffer.length > 0) await this.waitOnStoreEvents(); this.updateMetrics(); if (this.shouldWork) { diff --git a/lib/task_manager.js b/lib/task_manager.js index e86e3d3e..cc1fc9c1 100644 --- a/lib/task_manager.js +++ b/lib/task_manager.js @@ -1,53 +1,79 @@ -const { cloneDeep } = require('lodash'); - - class TaskManager { constructor() { - this.taskData = {}; this.queue; this.buffer = []; + this.taskData = {}; this.taskIndex = 0; - this.lastPushedToBuffer = 0; + this.currentFromBlock; } + /** + * Method for initialization of the queue. It's done in such a way, + * because the p-queue package itself does not support CommonJS type importing. + * @param {number} maxConcurrentRequests Number of maximum concurrent tasks that should work at the same time + */ async initQueue(maxConcurrentRequests) { const PQueue = (await import('p-queue')).default; this.queue = new PQueue({ concurrency: maxConcurrentRequests }); - this.queue.on('completed', (data) => this.handleNewData(data)); + this.queue.on('completed', ([interval, data]) => this.handleNewData(interval, data)); } + /** + * Method for creating a TaskManager instance. + * @param {number} maxConcurrentRequests Number of maximum concurrent tasks that should work at the same time + * @returns A TaskManager instance + */ static async create(maxConcurrentRequests) { const tm = new TaskManager(); await tm.initQueue(maxConcurrentRequests); return tm; } + /** + * @returns A deep copy of the current TaskManager buffer + */ retrieveCompleted() { const bufferCopy = []; while (this.buffer.length > 0) bufferCopy.push(this.buffer.shift()); return bufferCopy; } + /** + * Private method for pushing the sequential intervals that are ready. + * While the loop hits sequential intervals in the taskData property, + * the data should be pushed to the buffer property. When the while loop hits + * an undefined (yet) key of taskData, the function should stop. + */ #pushAllEligable() { - while (this.taskData[this.lastPushedToBuffer]) { - for (const data of this.taskData[this.lastPushedToBuffer]) this.buffer.push(data); - delete this.taskData[this.lastPushedToBuffer]; - this.lastPushedToBuffer++; + while (this.taskData[this.currentFromBlock]) { + for (const event of this.taskData[this.currentFromBlock].data) this.buffer.push(event); + const interval = this.taskData[this.currentFromBlock].toBlock - this.currentFromBlock + 1; + delete this.taskData[this.currentFromBlock]; + this.currentFromBlock += interval; } } - handleNewData([key, newTransformedData]) { - this.taskData[key] = newTransformedData; + /** + * On the completion of a task in the p-queue, the task + * should return an array in the form of [interval, events]. + * These would be set up in the taskData object accordingly in the + * correct format, after which we use the #pushAllEligable private method + * to push ready-to-go sequential data into the buffer. + * @param {object} interval + * @param {Array} newTransformedData + */ + handleNewData(interval, newTransformedData) { + this.taskData[interval.fromBlock] = { toBlock: interval.toBlock, data: newTransformedData }; this.#pushAllEligable(); } - pushToQueue(worker) { - this.queue.add(async () => { - const result = await worker.work(); - const currIndex = cloneDeep(this.taskIndex); - return [currIndex, result]; - }); - this.taskIndex++; + /** + * Takes a `() => worker.work()` function and pushes it + * into the TaskManager's p-queue. + * @param {Function} workTask + */ + pushToQueue(workTask) { + this.queue.add(workTask); } } diff --git a/test/eth/worker.spec.js b/test/eth/worker.spec.js index 9501ac75..01fd2f6e 100644 --- a/test/eth/worker.spec.js +++ b/test/eth/worker.spec.js @@ -46,7 +46,7 @@ describe('Test worker', function () { it('test primary key assignment', async function () { // Overwrite variables and methods that the 'work' method would use internally. worker.lastConfirmedBlock = 1; - worker.lastExportedBlock = 0; + worker.lastQueuedBlock = 0; worker.fetchData = async function () { return []; }; @@ -56,7 +56,7 @@ describe('Test worker', function () { const result = await worker.work(); - assert.deepStrictEqual(result, [feeResultWithPrimaryKey, callResultWithPrimaryKey]); + assert.deepStrictEqual(result, [{fromBlock: 1, toBlock: 1}, [feeResultWithPrimaryKey, callResultWithPrimaryKey]]); }); }); diff --git a/test/index.test.js b/test/index.test.js index 62e97070..ef7f7cb1 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -165,7 +165,7 @@ describe('Main', () => { await mainInstance.initWorker(); assert(mainInstance.handleInitPosition.calledOnce); - assert.strictEqual(mainInstance.taskManager.lastPushedToBuffer, 0); + assert.strictEqual(mainInstance.taskManager.currentFromBlock, 11); }); it('workLoop throws error when worker can\'t be initialised', async () => { diff --git a/test/lib/task_manager.spec.js b/test/lib/task_manager.spec.js index 85d62678..d5aba3e5 100644 --- a/test/lib/task_manager.spec.js +++ b/test/lib/task_manager.spec.js @@ -7,7 +7,7 @@ describe('TaskManager', () => { assert.deepStrictEqual(taskManager.taskData, {}); assert.deepStrictEqual(taskManager.buffer, []); - assert.strictEqual(taskManager.lastPushedToBuffer, 0); + assert.strictEqual(taskManager.lastPushedToBuffer, undefined); assert.strictEqual(taskManager.queue, undefined); }); @@ -28,31 +28,33 @@ describe('TaskManager', () => { assert.deepStrictEqual(taskManager.buffer, []); }); - it('handleNewData() produces a map key->data pair accordingly', () => { + it('handleNewData() fills the taskData object in the correct format', () => { const taskManager = new TaskManager(); - const exampleDataObject = [{ fromBlock: 1, toBlock: 10, data: [1, 2, 3] }]; + const exampleDataObject = [{ fromBlock: 1, toBlock: 10 }, [1, 2, 3]]; - taskManager.handleNewData([1, exampleDataObject]); // This [key, data] pair comes from the worker's work method - assert.deepStrictEqual(taskManager.taskData, {1: exampleDataObject}); + taskManager.handleNewData(...exampleDataObject); // This [ interval, data ] pair comes from the worker's work method + assert.deepStrictEqual(taskManager.taskData, { 1: { toBlock: 10, data: [1, 2, 3] } }); }); - it('handleNewData() fills the buffer accordingly', () => { + it('handleNewData() fills the buffer when sequential intervals present', () => { const taskManager = new TaskManager(); - const exampleDataObject = [{ fromBlock: 1, toBlock: 10, data: [1, 2, 3] }]; - const exampleDataObject2 = [{ fromBlock: 31, toBlock: 40, data: [4, 5, 6] }]; + taskManager.currentFromBlock = 1; + const exampleDataObject = [{ fromBlock: 1, toBlock: 10 }, [1, 2, 3]]; + const exampleDataObject2 = [{ fromBlock: 11, toBlock: 30 }, [4, 5, 6]]; - taskManager.handleNewData([0, exampleDataObject]); - taskManager.handleNewData([1, exampleDataObject2]); - assert.deepStrictEqual(taskManager.buffer, [...exampleDataObject, ...exampleDataObject2]); + taskManager.handleNewData(...exampleDataObject); + taskManager.handleNewData(...exampleDataObject2); + assert.deepStrictEqual(taskManager.buffer, [...exampleDataObject[1], ...exampleDataObject2[1]]); }); - it('handleNewData() fills the buffer accordingly 2', () => { + it('handleNewData() should not skip interval', () => { const taskManager = new TaskManager(); - const exampleDataObject = [{ fromBlock: 1, toBlock: 10, data: [1, 2, 3] }]; - const exampleDataObject2 = [{ fromBlock: 31, toBlock: 40, data: [4, 5, 6] }]; + taskManager.currentFromBlock = 1; + const exampleDataObject = [{ fromBlock: 1, toBlock: 10}, [1, 2, 3]]; + const exampleDataObject2 = [{ fromBlock: 31, toBlock: 40}, [4, 5, 6] ]; - taskManager.handleNewData([0, exampleDataObject]); - taskManager.handleNewData([3, exampleDataObject2]); - assert.deepStrictEqual(taskManager.buffer, [...exampleDataObject]); + taskManager.handleNewData(...exampleDataObject); + taskManager.handleNewData(...exampleDataObject2); + assert.deepStrictEqual(taskManager.buffer, [...exampleDataObject[1]]); }); });