Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Lyudmil Danailov authored and Lyudmil Danailov committed Feb 19, 2024
1 parent 0c72c36 commit 3a2c05c
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 9 deletions.
25 changes: 16 additions & 9 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,21 @@ class Main {
}

async waitOnStoreEvents() {
const bufferCopy = this.taskManager.retrieveCompleted();
await this.exporter.storeEvents(bufferCopy);
this.lastProcessedPosition = {
primaryKey: bufferCopy[bufferCopy.length - 1].primaryKey,
blockNumber: bufferCopy[bufferCopy.length - 1].blockNumber
};
await this.exporter.savePosition(this.lastProcessedPosition);
logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`);
if (this.taskManager.buffer.length > 0) {
const bufferCopy = this.taskManager.retrieveCompleted();
await this.exporter.storeEvents(bufferCopy);
this.lastProcessedPosition = {
primaryKey: bufferCopy[bufferCopy.length - 1].primaryKey,
blockNumber: bufferCopy[bufferCopy.length - 1].blockNumber
};
await this.exporter.savePosition(this.lastProcessedPosition);
logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`);
}

if (this.taskManager.queue.isPaused) {
this.taskManager.queue.start();
logger.info('Resuming the queue...');
}
}

async workLoop() {
Expand All @@ -102,7 +109,7 @@ class Main {
this.worker.lastExportTime = Date.now();

this.lastProcessedPosition = this.worker.getLastProcessedPosition();
if (this.taskManager.buffer.length > 0) await this.waitOnStoreEvents();
await this.waitOnStoreEvents();
this.updateMetrics();

if (this.shouldWork) {
Expand Down
2 changes: 2 additions & 0 deletions lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const CONFIG_PATH = process.env.CONFIG_PATH;
const START_BLOCK = parseInt(process.env.START_BLOCK || '0') - 1;
const EXPORT_BLOCKS_LIST = process.env.EXPORT_BLOCKS_LIST || false;
const PQUEUE_MAX_SIZE = parseInt(process.env.PQUEUE_MAX_SIZE || '100');
const MAX_TASK_DATA_KEYS = parseInt(process.env.PQUEUE_MAX_SIZE || '10');
const MAX_CONCURRENT_REQUESTS = parseInt(process.env.MAX_CONCURRENT_REQUESTS || '1');
const BLOCK_INTERVAL = parseInt(process.env.BLOCK_INTERVAL || '50');
const START_PRIMARY_KEY = parseInt(process.env.START_PRIMARY_KEY || '-1');
Expand All @@ -17,6 +18,7 @@ module.exports = {
PQUEUE_MAX_SIZE,
START_PRIMARY_KEY,
EXPORT_BLOCKS_LIST,
MAX_TASK_DATA_KEYS,
EXPORT_TIMEOUT_MLS,
MAX_CONCURRENT_REQUESTS,
EXPORT_BLOCKS_LIST_MAX_INTERVAL
Expand Down
8 changes: 8 additions & 0 deletions lib/task_manager.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
const { logger } = require('./logger');
const { MAX_TASK_DATA_KEYS } = require('./constants');

class TaskManager {
constructor() {
this.queue;
Expand Down Expand Up @@ -65,6 +68,11 @@ class TaskManager {
handleNewData(interval, newTransformedData) {
this.taskData[interval.fromBlock] = { toBlock: interval.toBlock, data: newTransformedData };
this.#pushAllEligable();

if (Object.keys(this.taskData).length >= MAX_TASK_DATA_KEYS) {
this.queue.pause();
logger.info('Pausing the queue...');
}
}

/**
Expand Down

0 comments on commit 3a2c05c

Please sign in to comment.