Skip to content

Commit

Permalink
Rehauling lastExportedBlock logic, adding versioning
Browse files Browse the repository at this point in the history
  • Loading branch information
Lyudmil Danailov authored and Lyudmil Danailov committed Feb 26, 2024
1 parent 787a289 commit 29145ae
Show file tree
Hide file tree
Showing 8 changed files with 377 additions and 156 deletions.
21 changes: 7 additions & 14 deletions blockchains/eth/eth_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ const { logger } = require('../../lib/logger');
const { constructRPCClient } = require('../../lib/http_client');
const { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } = require('./lib/dao_hack');
const { getGenesisTransfers } = require('./lib/genesis_transfers');
const { transactionOrder, stableSort } = require('./lib/util');
const BaseWorker = require('../../lib/worker_base');
const Web3Wrapper = require('./lib/web3_wrapper');
const { decodeTransferTrace } = require('./lib/decode_transfers');
const { FeesDecoder } = require('./lib/fees_decoder');
const { nextIntervalCalculator, analyzeWorkerContext, setWorkerSleepTime, NO_WORK_SLEEP } = require('./lib/next_interval_calculator');
const { WithdrawalsDecoder } = require('./lib/withdrawals_decoder');
const {
analyzeWorkerContext,
setWorkerSleepTime,
NO_WORK_SLEEP,
nextIntervalCalculatorV2 } = require('./lib/next_interval_calculator');


class ETHWorker extends BaseWorker {
constructor(settings) {
Expand Down Expand Up @@ -150,24 +154,13 @@ class ETHWorker extends BaseWorker {
setWorkerSleepTime(this, workerContext);
if (workerContext === NO_WORK_SLEEP) return [];

const { fromBlock, toBlock } = nextIntervalCalculator(this);
const { fromBlock, toBlock } = nextIntervalCalculatorV2(this);
this.lastQueuedBlock = toBlock;

logger.info(`Fetching transfer events for interval ${fromBlock}:${toBlock}`);
const [traces, blocks, receipts] = await this.fetchData(fromBlock, toBlock);
const events = this.transformPastEvents(fromBlock, toBlock, traces, blocks, receipts);

if (events.length > 0) {
stableSort(events, transactionOrder);
for (let i = 0; i < events.length; i++) {
events[i].primaryKey = this.lastPrimaryKey + i + 1;
}

this.lastPrimaryKey += events.length;
}

this.lastExportedBlock = toBlock;

return [{ fromBlock, toBlock }, events];
}

Expand Down
19 changes: 16 additions & 3 deletions blockchains/eth/lib/next_interval_calculator.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const NO_WORK_SLEEP = 2;
* @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class.
* @returns {number} A number, which points to one of the above-given scenarios
*/
async function analyzeWorkerContext(worker) {
async function analyzeWorkerContext(worker) {//TODO: Check how it will handle with HEAD block
if (worker.lastExportedBlock < worker.lastConfirmedBlock) return WORK_NO_SLEEP;

const newConfirmedBlock = await worker.web3Wrapper.getBlockNumber() - worker.settings.CONFIRMATIONS;
Expand All @@ -40,9 +40,21 @@ function setWorkerSleepTime(worker, context) {
/**
* Function for calculating the next interval to be used in the worker's methods for querying the node.
* @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class.
* @returns {object} The interval, derived from the progress of the worker
* @returns
*/
function nextIntervalCalculator(worker) {
return {
fromBlock: worker.lastExportedBlock + 1,
toBlock: Math.min(worker.lastExportedBlock + worker.settings.BLOCK_INTERVAL, worker.lastConfirmedBlock)
};
}

/**
* Function for calculating the next interval to be used in the worker's methods for querying the node.
* @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class.
* @returns {object} The interval, derived from the progress of the worker
*/
function nextIntervalCalculatorV2(worker) {
return {
fromBlock: worker.lastQueuedBlock + 1,
toBlock: Math.min(worker.lastQueuedBlock + worker.settings.BLOCK_INTERVAL, worker.lastConfirmedBlock)
Expand All @@ -55,5 +67,6 @@ module.exports = {
WORK_NO_SLEEP,
setWorkerSleepTime,
analyzeWorkerContext,
nextIntervalCalculator
nextIntervalCalculator,
nextIntervalCalculatorV2
};
75 changes: 54 additions & 21 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@ class Main {

async handleInitPosition() {
const lastRecoveredPosition = await this.exporter.getLastPosition();
this.lastProcessedPosition = this.worker.initPosition(lastRecoveredPosition);
if (BLOCKCHAIN === 'eth') {
this.lastProcessedPosition = this.taskManager.initPosition(lastRecoveredPosition);
this.worker.lastQueuedBlock = this.lastProcessedPosition.blockNumber;
} else {
this.lastProcessedPosition = this.worker.initPosition(lastRecoveredPosition);
}
await this.exporter.savePosition(this.lastProcessedPosition);
}

async #initTaskManager(lastBlock) {
async initTaskManager() {
this.taskManager = await TaskManager.create(MAX_CONCURRENT_REQUESTS);
this.taskManager.currentFromBlock = lastBlock + 1;
}

#isWorkerSet() {
Expand All @@ -49,14 +53,15 @@ class Main {
this.#isWorkerSet();
const mergedConstants = { ...constantsBase, ...constants };
this.worker = new worker.worker(mergedConstants);
await this.worker.init(this.exporter, metrics);
await this.handleInitPosition();
await this.#initTaskManager(this.lastProcessedPosition.blockNumber);
await this.worker.init(this.exporter);
}

async init() {
await this.initExporter(EXPORTER_NAME, true);
await this.initWorker();
if (BLOCKCHAIN === 'eth') await this.initTaskManager(this.lastProcessedPosition);
await this.handleInitPosition();

metrics.startCollection();

this.microServer = micro(microHandler);
Expand All @@ -77,29 +82,53 @@ class Main {
metrics.currentBlock.set(this.worker.lastConfirmedBlock);
metrics.requestsCounter.inc(this.worker.getNewRequestsCount());
metrics.requestsResponseTime.observe(new Date() - this.worker.lastRequestStartTime);
metrics.lastExportedBlock.set(this.worker.lastExportedBlock);
if (BLOCKCHAIN === 'eth') {
metrics.lastExportedBlock.set(this.taskManager.lastExportedBlock);
} else {
metrics.lastExportedBlock.set(this.worker.lastExportedBlock);
}
}

async waitOnStoreEvents() {
if (this.taskManager.buffer.length > 0) {
const bufferCopy = this.taskManager.retrieveCompleted();
await this.exporter.storeEvents(bufferCopy);
this.lastProcessedPosition = {
primaryKey: bufferCopy[bufferCopy.length - 1].primaryKey,
blockNumber: bufferCopy[bufferCopy.length - 1].blockNumber
};
const buffer = this.taskManager.retrieveCompleted();
if (buffer.length > 0) {
await this.exporter.storeEvents(buffer);
}

this.taskManager.restartQueueIfNeeded();
}

async updatePosition() {
if (this.taskManager.isChangedLastExportedBlock(this.lastProcessedPosition.blockNumber)) {
this.lastProcessedPosition = this.taskManager.getLastProcessedPosition();
await this.exporter.savePosition(this.lastProcessedPosition);
logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`);
}
}

async workLoop() {
while (this.shouldWork) {
this.worker.lastRequestStartTime = new Date();
const events = await this.worker.work();

if (this.taskManager.queue.isPaused) {
this.taskManager.queue.start();
this.taskManager.consequentTaskIndex = 0;
logger.info('Resuming the queue...');
this.worker.lastExportTime = Date.now();

this.updateMetrics();
this.lastProcessedPosition = this.worker.getLastProcessedPosition();

if (events && events.length > 0) {
await this.exporter.storeEvents(events);
}
await this.exporter.savePosition(this.lastProcessedPosition);
logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`);

if (this.shouldWork) {
await new Promise((resolve) => setTimeout(resolve, this.worker.sleepTimeMsec));
}
}
}

async workLoop() {
async workLoopV2() {
while (this.shouldWork) {
await this.taskManager.queue.onSizeLessThan(constantsBase.PQUEUE_MAX_SIZE);
this.taskManager.pushToQueue(() => this.worker.work().catch(err => {
Expand All @@ -109,8 +138,8 @@ class Main {
this.worker.lastRequestStartTime = new Date();
this.worker.lastExportTime = Date.now();

this.lastProcessedPosition = this.worker.getLastProcessedPosition();
await this.waitOnStoreEvents();
await this.updatePosition();
this.updateMetrics();

if (this.shouldWork) {
Expand Down Expand Up @@ -199,7 +228,11 @@ async function main() {
throw new Error(`Error initializing exporter: ${err.message}`);
}
try {
await mainInstance.workLoop();
if (BLOCKCHAIN === 'eth') {
await mainInstance.workLoopV2();
} else {
await mainInstance.workLoop();
}
await mainInstance.disconnect();
logger.info('Bye!');
} catch (err) {
Expand Down
Loading

0 comments on commit 29145ae

Please sign in to comment.