diff --git a/index.js b/index.js index 6042d941..4de40e10 100644 --- a/index.js +++ b/index.js @@ -81,14 +81,21 @@ class Main { } async waitOnStoreEvents() { - const bufferCopy = this.taskManager.retrieveCompleted(); - await this.exporter.storeEvents(bufferCopy); - this.lastProcessedPosition = { - primaryKey: bufferCopy[bufferCopy.length - 1].primaryKey, - blockNumber: bufferCopy[bufferCopy.length - 1].blockNumber - }; - await this.exporter.savePosition(this.lastProcessedPosition); - logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`); + if (this.taskManager.buffer.length > 0) { + const bufferCopy = this.taskManager.retrieveCompleted(); + await this.exporter.storeEvents(bufferCopy); + this.lastProcessedPosition = { + primaryKey: bufferCopy[bufferCopy.length - 1].primaryKey, + blockNumber: bufferCopy[bufferCopy.length - 1].blockNumber + }; + await this.exporter.savePosition(this.lastProcessedPosition); + logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`); + } + + if (this.taskManager.queue.isPaused) { + this.taskManager.queue.start(); + logger.info('Resuming the queue...'); + } } async workLoop() { @@ -102,7 +109,7 @@ class Main { this.worker.lastExportTime = Date.now(); this.lastProcessedPosition = this.worker.getLastProcessedPosition(); - if (this.taskManager.buffer.length > 0) await this.waitOnStoreEvents(); + await this.waitOnStoreEvents(); this.updateMetrics(); if (this.shouldWork) { diff --git a/lib/constants.js b/lib/constants.js index 22a862cb..afefef08 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -3,6 +3,7 @@ const CONFIG_PATH = process.env.CONFIG_PATH; const START_BLOCK = parseInt(process.env.START_BLOCK || '0') - 1; const EXPORT_BLOCKS_LIST = process.env.EXPORT_BLOCKS_LIST || false; const PQUEUE_MAX_SIZE = parseInt(process.env.PQUEUE_MAX_SIZE || '100'); +const MAX_TASK_DATA_KEYS = parseInt(process.env.PQUEUE_MAX_SIZE || '10'); const MAX_CONCURRENT_REQUESTS = parseInt(process.env.MAX_CONCURRENT_REQUESTS || '1'); const BLOCK_INTERVAL = parseInt(process.env.BLOCK_INTERVAL || '50'); const START_PRIMARY_KEY = parseInt(process.env.START_PRIMARY_KEY || '-1'); @@ -17,6 +18,7 @@ module.exports = { PQUEUE_MAX_SIZE, START_PRIMARY_KEY, EXPORT_BLOCKS_LIST, + MAX_TASK_DATA_KEYS, EXPORT_TIMEOUT_MLS, MAX_CONCURRENT_REQUESTS, EXPORT_BLOCKS_LIST_MAX_INTERVAL diff --git a/lib/task_manager.js b/lib/task_manager.js index cc1fc9c1..142bccc8 100644 --- a/lib/task_manager.js +++ b/lib/task_manager.js @@ -1,3 +1,6 @@ +const { logger } = require('./logger'); +const { MAX_TASK_DATA_KEYS } = require('./constants'); + class TaskManager { constructor() { this.queue; @@ -65,6 +68,11 @@ class TaskManager { handleNewData(interval, newTransformedData) { this.taskData[interval.fromBlock] = { toBlock: interval.toBlock, data: newTransformedData }; this.#pushAllEligable(); + + if (Object.keys(this.taskData).length >= MAX_TASK_DATA_KEYS) { + this.queue.pause(); + logger.info('Pausing the queue...'); + } } /**