Skip to content

Commit

Permalink
Refactoring queue and TM initialisation
Browse files Browse the repository at this point in the history
  • Loading branch information
Lyudmil Danailov authored and Lyudmil Danailov committed Feb 13, 2024
1 parent 4522b63 commit 85c9bbd
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 192 deletions.
162 changes: 0 additions & 162 deletions blockchains/bnb/bnb_worker.js

This file was deleted.

7 changes: 4 additions & 3 deletions blockchains/eth/eth_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class ETHWorker extends BaseWorker {
return await Promise.all([
this.fetchEthInternalTrx(fromBlock, toBlock),
this.fetchBlocks(fromBlock, toBlock),
this.fetchReceipts(fromBlock, toBlock),
// this.fetchReceipts(fromBlock, toBlock),
]);
}

Expand Down Expand Up @@ -149,8 +149,10 @@ class ETHWorker extends BaseWorker {
const workerContext = await analyzeWorkerContext(this);
setWorkerSleepTime(this, workerContext);
if (workerContext === NO_WORK_SLEEP) return [];

const { fromBlock, toBlock } = nextIntervalCalculator(this);
this.lastQueuedBlock = toBlock;

logger.info(`Fetching transfer events for interval ${fromBlock}:${toBlock}`);
const [traces, blocks, receipts] = await this.fetchData(fromBlock, toBlock);
const events = this.transformPastEvents(fromBlock, toBlock, traces, blocks, receipts);
Expand All @@ -163,7 +165,6 @@ class ETHWorker extends BaseWorker {

this.lastPrimaryKey += events.length;
}
this.worker.lastBufferedBlock = toBlock;

this.lastExportedBlock = toBlock;

Expand Down
4 changes: 2 additions & 2 deletions blockchains/eth/lib/next_interval_calculator.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ function setWorkerSleepTime(worker, context) {
*/
function nextIntervalCalculator(worker) {
return {
fromBlock: worker.lastExportedBlock + 1,
toBlock: Math.min(worker.lastExportedBlock + worker.settings.BLOCK_INTERVAL, worker.lastConfirmedBlock)
fromBlock: worker.lastQueuedBlock + 1,
toBlock: Math.min(worker.lastQueuedBlock + worker.settings.BLOCK_INTERVAL, worker.lastConfirmedBlock)
};
}

Expand Down
22 changes: 8 additions & 14 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ class Main {
await this.exporter.savePosition(this.lastProcessedPosition);
}

async #initTaskManager(blockNumber) {
this.taskManager = new TaskManager(blockNumber, constants.BLOCK_INTERVAL);
await this.taskManager.initQueue(MAX_CONCURRENT_REQUESTS);
async #initTaskManager() {
this.taskManager = await TaskManager.create(MAX_CONCURRENT_REQUESTS);
}

#isWorkerSet() {
Expand Down Expand Up @@ -87,21 +86,20 @@ class Main {
primaryKey: bufferCopy[bufferCopy.length - 1].primaryKey,
blockNumber: bufferCopy[bufferCopy.length - 1].blockNumber
};
await this.exporter.savePosition(this.lastProcessedPosition);
logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`);
}

async workLoop() {
while (this.shouldWork) {
if (this.taskManager.queue.size < constantsBase.PQUEUE_MAX_SIZE) this.taskManager.pushToQueue(this.worker.work);
await this.taskManager.queue.onSizeLessThan(constantsBase.PQUEUE_MAX_SIZE);
this.taskManager.pushToQueue(this.worker);
this.worker.lastRequestStartTime = new Date();
this.worker.lastExportTime = Date.now();

this.lastProcessedPosition = this.worker.getLastProcessedPosition();
if (this.taskManager.buffer && this.taskManager.buffer.length > 0) {
await this.waitOnStoreEvents();
await this.exporter.savePosition(this.lastProcessedPosition);
logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`);
this.updateMetrics();
}
if (this.taskManager.buffer.length > 0) this.waitOnStoreEvents();
this.updateMetrics();

if (this.shouldWork) {
await new Promise((resolve) => setTimeout(resolve, this.worker.sleepTimeMsec));
Expand All @@ -112,10 +110,6 @@ class Main {
async disconnect() {
// This call should be refactored to work with async/await
this.exporter.disconnect();

logger.info('Clearing queue...');
this.taskManager.clearQueue();

await this.microServer.close();
}

Expand Down
18 changes: 11 additions & 7 deletions lib/task_manager.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
const { cloneDeep } = require('lodash');


class TaskManager {
constructor() {
this.taskData = {};
Expand All @@ -13,9 +16,10 @@ class TaskManager {
this.queue.on('completed', (data) => this.handleNewData(data));
}

clearQueue() {
this.queue.pause();
this.queue.clear();
static async create(maxConcurrentRequests) {
const tm = new TaskManager();
await tm.initQueue(maxConcurrentRequests);
return tm;
}

retrieveCompleted() {
Expand All @@ -37,10 +41,10 @@ class TaskManager {
this.#pushAllEligable();
}

pushToQueue(workTask) {
this.queue.add(() => {
const result = workTask();
const currIndex = this.taskIndex;
pushToQueue(worker) {
this.queue.add(async () => {
const result = await worker.work();
const currIndex = cloneDeep(this.taskIndex);
return [currIndex, result];
});
this.taskIndex++;
Expand Down
2 changes: 2 additions & 0 deletions lib/worker_base.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class WorkerBase {
}
this.lastExportedBlock = lastProcessedPosition.blockNumber;
this.lastPrimaryKey = lastProcessedPosition.primaryKey;

this.lastQueuedBlock = this.lastExportedBlock;

return lastProcessedPosition;
}
Expand Down
4 changes: 2 additions & 2 deletions test/eth/next_interval_calculator.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ describe('setWorkerSleepTime', () => {
describe('nextIntervalCalculator', () => {
it('would not exceed BLOCK_INTERVAL', () => {
const worker = new eth_worker.worker(constants);
worker.lastExportedBlock = 0;
worker.lastQueuedBlock = 0;
worker.lastConfirmedBlock = 150;

const { fromBlock, toBlock } = nextIntervalCalculator(worker);
Expand All @@ -90,7 +90,7 @@ describe('nextIntervalCalculator', () => {

it('would not return full BLOCK_INTERVAL', () => {
const worker = new eth_worker.worker(constants);
worker.lastExportedBlock = 0;
worker.lastQueuedBlock = 0;
worker.lastConfirmedBlock = 37;

const { fromBlock, toBlock } = nextIntervalCalculator(worker);
Expand Down
4 changes: 2 additions & 2 deletions test/eth/worker.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ describe('Test worker', function () {
return [feeResult, callResult];
};

const result = await worker.work(0);
const result = await worker.work();

assert.deepStrictEqual(result, [0, [feeResultWithPrimaryKey, callResultWithPrimaryKey]]);
assert.deepStrictEqual(result, [feeResultWithPrimaryKey, callResultWithPrimaryKey]);
});
});

Expand Down

0 comments on commit 85c9bbd

Please sign in to comment.