Skip to content

Commit

Permalink
Reworking task order monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Lyudmil Danailov authored and Lyudmil Danailov committed Feb 18, 2024
1 parent 85c9bbd commit 0c72c36
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 45 deletions.
6 changes: 3 additions & 3 deletions blockchains/eth/eth_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class ETHWorker extends BaseWorker {
return await Promise.all([
this.fetchEthInternalTrx(fromBlock, toBlock),
this.fetchBlocks(fromBlock, toBlock),
// this.fetchReceipts(fromBlock, toBlock),
this.fetchReceipts(fromBlock, toBlock),
]);
}

Expand Down Expand Up @@ -149,7 +149,7 @@ class ETHWorker extends BaseWorker {
const workerContext = await analyzeWorkerContext(this);
setWorkerSleepTime(this, workerContext);
if (workerContext === NO_WORK_SLEEP) return [];

const { fromBlock, toBlock } = nextIntervalCalculator(this);
this.lastQueuedBlock = toBlock;

Expand All @@ -168,7 +168,7 @@ class ETHWorker extends BaseWorker {

this.lastExportedBlock = toBlock;

return events;
return [{ fromBlock, toBlock }, events];
}

async init() {
Expand Down
10 changes: 7 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ class Main {
await this.exporter.savePosition(this.lastProcessedPosition);
}

async #initTaskManager() {
async #initTaskManager(lastBlock) {
this.taskManager = await TaskManager.create(MAX_CONCURRENT_REQUESTS);
this.taskManager.currentFromBlock = lastBlock + 1;
}

#isWorkerSet() {
Expand Down Expand Up @@ -93,12 +94,15 @@ class Main {
async workLoop() {
while (this.shouldWork) {
await this.taskManager.queue.onSizeLessThan(constantsBase.PQUEUE_MAX_SIZE);
this.taskManager.pushToQueue(this.worker);
this.taskManager.pushToQueue(() => this.worker.work().catch(err => {
logger.error(err.toString());
this.shouldWork = false;
}));
this.worker.lastRequestStartTime = new Date();
this.worker.lastExportTime = Date.now();

this.lastProcessedPosition = this.worker.getLastProcessedPosition();
if (this.taskManager.buffer.length > 0) this.waitOnStoreEvents();
if (this.taskManager.buffer.length > 0) await this.waitOnStoreEvents();
this.updateMetrics();

if (this.shouldWork) {
Expand Down
64 changes: 45 additions & 19 deletions lib/task_manager.js
Original file line number Diff line number Diff line change
@@ -1,53 +1,79 @@
const { cloneDeep } = require('lodash');


class TaskManager {
constructor() {
this.taskData = {};
this.queue;
this.buffer = [];
this.taskData = {};
this.taskIndex = 0;
this.lastPushedToBuffer = 0;
this.currentFromBlock;
}

/**
* Method for initialization of the queue. It's done in such a way,
* because the p-queue package itself does not support CommonJS type importing.
* @param {number} maxConcurrentRequests Number of maximum concurrent tasks that should work at the same time
*/
async initQueue(maxConcurrentRequests) {
const PQueue = (await import('p-queue')).default;
this.queue = new PQueue({ concurrency: maxConcurrentRequests });
this.queue.on('completed', (data) => this.handleNewData(data));
this.queue.on('completed', ([interval, data]) => this.handleNewData(interval, data));
}

/**
* Method for creating a TaskManager instance.
* @param {number} maxConcurrentRequests Number of maximum concurrent tasks that should work at the same time
* @returns A TaskManager instance
*/
static async create(maxConcurrentRequests) {
const tm = new TaskManager();
await tm.initQueue(maxConcurrentRequests);
return tm;
}

/**
* @returns A deep copy of the current TaskManager buffer
*/
retrieveCompleted() {
const bufferCopy = [];
while (this.buffer.length > 0) bufferCopy.push(this.buffer.shift());
return bufferCopy;
}

/**
* Private method for pushing the sequential intervals that are ready.
* While the loop hits sequential intervals in the taskData property,
* the data should be pushed to the buffer property. When the while loop hits
* an undefined (yet) key of taskData, the function should stop.
*/
#pushAllEligable() {
while (this.taskData[this.lastPushedToBuffer]) {
for (const data of this.taskData[this.lastPushedToBuffer]) this.buffer.push(data);
delete this.taskData[this.lastPushedToBuffer];
this.lastPushedToBuffer++;
while (this.taskData[this.currentFromBlock]) {
for (const event of this.taskData[this.currentFromBlock].data) this.buffer.push(event);
const interval = this.taskData[this.currentFromBlock].toBlock - this.currentFromBlock + 1;
delete this.taskData[this.currentFromBlock];
this.currentFromBlock += interval;
}
}

handleNewData([key, newTransformedData]) {
this.taskData[key] = newTransformedData;
/**
* On the completion of a task in the p-queue, the task
* should return an array in the form of [interval, events].
* These would be set up in the taskData object accordingly in the
* correct format, after which we use the #pushAllEligable private method
* to push ready-to-go sequential data into the buffer.
* @param {object} interval
* @param {Array} newTransformedData
*/
handleNewData(interval, newTransformedData) {
this.taskData[interval.fromBlock] = { toBlock: interval.toBlock, data: newTransformedData };
this.#pushAllEligable();
}

pushToQueue(worker) {
this.queue.add(async () => {
const result = await worker.work();
const currIndex = cloneDeep(this.taskIndex);
return [currIndex, result];
});
this.taskIndex++;
/**
* Takes a `() => worker.work()` function and pushes it
* into the TaskManager's p-queue.
* @param {Function} workTask
*/
pushToQueue(workTask) {
this.queue.add(workTask);
}
}

Expand Down
4 changes: 2 additions & 2 deletions test/eth/worker.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ describe('Test worker', function () {
it('test primary key assignment', async function () {
// Overwrite variables and methods that the 'work' method would use internally.
worker.lastConfirmedBlock = 1;
worker.lastExportedBlock = 0;
worker.lastQueuedBlock = 0;
worker.fetchData = async function () {
return [];
};
Expand All @@ -56,7 +56,7 @@ describe('Test worker', function () {

const result = await worker.work();

assert.deepStrictEqual(result, [feeResultWithPrimaryKey, callResultWithPrimaryKey]);
assert.deepStrictEqual(result, [{fromBlock: 1, toBlock: 1}, [feeResultWithPrimaryKey, callResultWithPrimaryKey]]);
});
});

Expand Down
2 changes: 1 addition & 1 deletion test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ describe('Main', () => {

await mainInstance.initWorker();
assert(mainInstance.handleInitPosition.calledOnce);
assert.strictEqual(mainInstance.taskManager.lastPushedToBuffer, 0);
assert.strictEqual(mainInstance.taskManager.currentFromBlock, 11);
});

it('workLoop throws error when worker can\'t be initialised', async () => {
Expand Down
36 changes: 19 additions & 17 deletions test/lib/task_manager.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ describe('TaskManager', () => {

assert.deepStrictEqual(taskManager.taskData, {});
assert.deepStrictEqual(taskManager.buffer, []);
assert.strictEqual(taskManager.lastPushedToBuffer, 0);
assert.strictEqual(taskManager.lastPushedToBuffer, undefined);
assert.strictEqual(taskManager.queue, undefined);
});

Expand All @@ -28,31 +28,33 @@ describe('TaskManager', () => {
assert.deepStrictEqual(taskManager.buffer, []);
});

it('handleNewData() produces a map key->data pair accordingly', () => {
it('handleNewData() fills the taskData object in the correct format', () => {
const taskManager = new TaskManager();
const exampleDataObject = [{ fromBlock: 1, toBlock: 10, data: [1, 2, 3] }];
const exampleDataObject = [{ fromBlock: 1, toBlock: 10 }, [1, 2, 3]];

taskManager.handleNewData([1, exampleDataObject]); // This [key, data] pair comes from the worker's work method
assert.deepStrictEqual(taskManager.taskData, {1: exampleDataObject});
taskManager.handleNewData(...exampleDataObject); // This [ interval, data ] pair comes from the worker's work method
assert.deepStrictEqual(taskManager.taskData, { 1: { toBlock: 10, data: [1, 2, 3] } });
});

it('handleNewData() fills the buffer accordingly', () => {
it('handleNewData() fills the buffer when sequential intervals present', () => {
const taskManager = new TaskManager();
const exampleDataObject = [{ fromBlock: 1, toBlock: 10, data: [1, 2, 3] }];
const exampleDataObject2 = [{ fromBlock: 31, toBlock: 40, data: [4, 5, 6] }];
taskManager.currentFromBlock = 1;
const exampleDataObject = [{ fromBlock: 1, toBlock: 10 }, [1, 2, 3]];
const exampleDataObject2 = [{ fromBlock: 11, toBlock: 30 }, [4, 5, 6]];

taskManager.handleNewData([0, exampleDataObject]);
taskManager.handleNewData([1, exampleDataObject2]);
assert.deepStrictEqual(taskManager.buffer, [...exampleDataObject, ...exampleDataObject2]);
taskManager.handleNewData(...exampleDataObject);
taskManager.handleNewData(...exampleDataObject2);
assert.deepStrictEqual(taskManager.buffer, [...exampleDataObject[1], ...exampleDataObject2[1]]);
});

it('handleNewData() fills the buffer accordingly 2', () => {
it('handleNewData() should not skip interval', () => {
const taskManager = new TaskManager();
const exampleDataObject = [{ fromBlock: 1, toBlock: 10, data: [1, 2, 3] }];
const exampleDataObject2 = [{ fromBlock: 31, toBlock: 40, data: [4, 5, 6] }];
taskManager.currentFromBlock = 1;
const exampleDataObject = [{ fromBlock: 1, toBlock: 10}, [1, 2, 3]];
const exampleDataObject2 = [{ fromBlock: 31, toBlock: 40}, [4, 5, 6] ];

taskManager.handleNewData([0, exampleDataObject]);
taskManager.handleNewData([3, exampleDataObject2]);
assert.deepStrictEqual(taskManager.buffer, [...exampleDataObject]);
taskManager.handleNewData(...exampleDataObject);
taskManager.handleNewData(...exampleDataObject2);
assert.deepStrictEqual(taskManager.buffer, [...exampleDataObject[1]]);
});
});

0 comments on commit 0c72c36

Please sign in to comment.