Skip to content

Commit

Permalink
Programming session idea
Browse files Browse the repository at this point in the history
  • Loading branch information
Lyudmil Danailov authored and Lyudmil Danailov committed Mar 18, 2024
1 parent 124766e commit 09cfb87
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 108 deletions.
34 changes: 13 additions & 21 deletions blockchains/eth/eth_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,13 @@ const Web3Wrapper = require('./lib/web3_wrapper');
const BaseWorker = require('../../lib/worker_base');
const { FeesDecoder } = require('./lib/fees_decoder');
const { filterErrors } = require('./lib/filter_errors');
const { stableSort } = require('./blockchains/erc20/lib/util');
const { stableSort } = require('../erc20/lib/util');
const { constructRPCClient } = require('../../lib/http_client');
const { decodeTransferTrace } = require('./lib/decode_transfers');
const { transactionOrder } = require('./blockchains/eth/lib/util');
const { transactionOrder } = require('./lib/util');
const { getGenesisTransfers } = require('./lib/genesis_transfers');
const { WithdrawalsDecoder } = require('./lib/withdrawals_decoder');
const { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } = require('./lib/dao_hack');
const {
analyzeWorkerContext,
setWorkerSleepTime,
NO_WORK_SLEEP,
nextIntervalCalculatorV2 } = require('./lib/next_interval_calculator');


class ETHWorker extends BaseWorker {
Expand All @@ -30,6 +25,10 @@ class ETHWorker extends BaseWorker {
this.withdrawalsDecoder = new WithdrawalsDecoder(this.web3Wrapper);
}

getLastPrimaryKey() {
return this.lastPrimaryKey;
}

parseEthInternalTrx(result) {
const traces = filterErrors(result);

Expand Down Expand Up @@ -152,28 +151,21 @@ class ETHWorker extends BaseWorker {
}

decorateWithPrimaryKeys(events) {
if (this.settings.BLOCKCHAIN === 'eth') {
stableSort(events, transactionOrder);
for (let i = 0; i < events.length; i++) {
events[i].primaryKey = this.lastPrimaryKey + i + 1;
}
this.lastPrimaryKey += events.length;
stableSort(events, transactionOrder);
for (let i = 0; i < events.length; i++) {
events[i].primaryKey = this.lastPrimaryKey + i + 1;
}
this.lastPrimaryKey += events.length;
}

async work() {
const workerContext = await analyzeWorkerContext(this);//TODO: move into index.js loopv2 (?)
setWorkerSleepTime(this, workerContext);
if (workerContext === NO_WORK_SLEEP) return [];

const { fromBlock, toBlock } = nextIntervalCalculatorV2(this);
this.lastQueuedBlock = toBlock;
async work(interval) {
const { fromBlock, toBlock } = interval;

logger.info(`Fetching transfer events for interval ${fromBlock}:${toBlock}`);
const [traces, blocks, receipts] = await this.fetchData(fromBlock, toBlock);
const events = this.transformPastEvents(fromBlock, toBlock, traces, blocks, receipts);

return [{ fromBlock, toBlock }, events];
return events;
}

async init() {
Expand Down
86 changes: 52 additions & 34 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ const { BLOCKCHAIN, EXPORT_TIMEOUT_MLS, MAX_CONCURRENT_REQUESTS } = require('./l
const worker = require(`./blockchains/${BLOCKCHAIN}/${BLOCKCHAIN}_worker`);
const constants = require(`./blockchains/${BLOCKCHAIN}/lib/constants`);
const constantsBase = require('./lib/constants');
const {
analyzeWorkerContext,
setWorkerSleepTime,
NO_WORK_SLEEP,
nextIntervalCalculator } = require('./blockchains/eth/lib/next_interval_calculator');

var SegfaultHandler = require('segfault-handler');
SegfaultHandler.registerHandler(`${EXPORTER_NAME}_crash.log`);
Expand All @@ -30,22 +35,9 @@ class Main {
.catch((err) => { throw new Error(`${INIT_EXPORTER_ERR_MSG}${err.message}`); });
}

initLastProcessedPosition(lastRecoveredPosition) {
if (lastRecoveredPosition) {
this.lastProcessedPosition = lastRecoveredPosition;
logger.info(`Resuming export from position ${JSON.stringify(lastRecoveredPosition)}`);
} else {
this.lastProcessedPosition = {
blockNumber: constants.START_BLOCK,
primaryKey: constants.START_PRIMARY_KEY
};
logger.info(`Initialized exporter with initial position ${JSON.stringify(lastRecoveredPosition)}`);
}
}

async handleInitPosition() {
const lastRecoveredPosition = await this.exporter.getLastPosition();
this.initLastProcessedPosition(lastRecoveredPosition);
this.lastProcessedPosition = await this.exporter.getLastPosition();
this.worker.initPosition(this.lastProcessedPosition);
await this.exporter.savePosition(this.lastProcessedPosition);
}

Expand All @@ -57,20 +49,18 @@ class Main {
this.#isWorkerSet();
const mergedConstants = { ...constantsBase, ...constants };
this.worker = new worker.worker(mergedConstants);
this.worker.initFromLastPosition(this.lastProcessedPosition);

await this.worker.init(this.exporter);
}

async initTaskManager() {
this.taskManager = await TaskManager.create(MAX_CONCURRENT_REQUESTS);
this.taskManager.initFromLastPosition(this.lastProcessedPosition);
}

async init() {
await this.initExporter(EXPORTER_NAME, true);
await this.handleInitPosition();
await this.initWorker();
await this.handleInitPosition();
if (BLOCKCHAIN === 'eth') await this.initTaskManager();

metrics.startCollection();
Expand All @@ -93,24 +83,22 @@ class Main {
metrics.currentBlock.set(this.worker.lastConfirmedBlock);
metrics.requestsCounter.inc(this.worker.getNewRequestsCount());
metrics.requestsResponseTime.observe(new Date() - this.worker.lastRequestStartTime);
if (BLOCKCHAIN === 'eth') {
metrics.lastExportedBlock.set(this.lastProcessedPosition.blockNumber);
} else {
metrics.lastExportedBlock.set(this.worker.lastExportedBlock);
}
metrics.lastExportedBlock.set(this.worker.lastExportedBlock);
}

async waitOnStoreEvents() {
const buffer = this.taskManager.retrieveCompleted();
async waitOnStoreEvents(buffer) {
if (buffer.length > 0) {
this.worker.decorateWithPrimaryKeys(buffer);
await this.exporter.storeEvents(buffer);
}
}

async updatePosition() {
if (this.taskManager.isChangedLastExportedBlock(this.lastProcessedPosition.blockNumber)) {
this.lastProcessedPosition = this.taskManager.getLastProcessedPosition();
async updatePosition(lastPrimaryKey, lastExportedBlock) {
if (lastExportedBlock > this.lastProcessedPosition.blockNumber) {
this.lastProcessedPosition = {
primaryKey: lastPrimaryKey,
blockNumber: lastExportedBlock
};
await this.exporter.savePosition(this.lastProcessedPosition);
logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`);
}
Expand Down Expand Up @@ -138,18 +126,48 @@ class Main {
}
}

generateIntervals() {
const intervals = [];
for (let i = 0; i < MAX_CONCURRENT_REQUESTS; i++) {
const interval = nextIntervalCalculator(
this.worker.lastQueuedBlock,
this.worker.lastConfirmedBlock,
constantsBase.BLOCK_INTERVAL);
this.worker.lastQueuedBlock = interval.toBlock;
intervals.push(interval);
if (interval.toBlock - interval.fromBlock + 1 < constantsBase.BLOCK_INTERVAL) break; //We've caught up with the head, no new intervals needed
}
return intervals;
}

pushTasks(intervals) {
for (const interval of intervals) {
const taskMetadata = {
interval: interval,
lambda: (interval) => this.worker.work(interval)
};
this.taskManager.pushToQueue(taskMetadata);
}
}

async workLoopV2() {
while (this.shouldWork) {
await this.taskManager.queue.onSizeLessThan(constantsBase.PQUEUE_MAX_SIZE);
this.taskManager.pushToQueue(() => this.worker.work().catch(err => {
logger.error(err.toString());
this.shouldWork = false;
}));

const workerContext = await analyzeWorkerContext(this.worker);
setWorkerSleepTime(this.worker, workerContext);
if (workerContext === NO_WORK_SLEEP) return [];//TODO:

const intervals = this.generateIntervals();
this.pushTasks(intervals);

this.worker.lastRequestStartTime = new Date();
this.worker.lastExportTime = Date.now();

await this.waitOnStoreEvents();
await this.updatePosition();
const [lastExportedBlock, buffer] = this.taskManager.retrieveCompleted();
await this.waitOnStoreEvents(buffer);
const lastPrimaryKey = this.worker.getLastPrimaryKey();
await this.updatePosition(lastPrimaryKey, lastExportedBlock);
this.updateMetrics();

if (this.shouldWork) {
Expand Down
73 changes: 22 additions & 51 deletions lib/task_manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ class TaskManager {
constructor() {
this.queue;
this.taskData = {};
this.lastPrimaryKey;
this.lastExportedBlock;
this.lastTaskIndex = 0;
this.lastExportedIndex = 0;
}

/**
Expand All @@ -26,8 +26,8 @@ class TaskManager {
const PQueue = (await import('p-queue')).default;
this.queue = new PQueue({ concurrency: maxConcurrentRequests });

this.queue.on('completed', ([interval, data]) => {
this.#handleNewData(interval, data);
this.queue.on('completed', ([index, data]) => {
this.#handleNewData(index, data);
});
}

Expand All @@ -39,11 +39,11 @@ class TaskManager {
* @param {object} interval
* @param {Array} newTransformedData
*/
#handleNewData(interval, newTransformedData) {
this.taskData[interval.fromBlock] = { toBlock: interval.toBlock, data: newTransformedData };
#handleNewData(index, newTransformedData) {
this.taskData[index].data = newTransformedData;
delete this.taskData[index].lambda;
}


/**
* Method for pushing the sequential intervals that are ready.
* While the loop hits sequential intervals in the taskData property,
Expand All @@ -52,61 +52,32 @@ class TaskManager {
* @returns Array of the events' data
*/
retrieveCompleted() {
let lastExportedBlock;
const buffer = [];
while (this.taskData[this.lastExportedBlock + 1]) {
const events = this.taskData[this.lastExportedBlock + 1].data;
while (this.taskData[this.lastExportedIndex + 1].data) {
const events = this.taskData[this.lastExportedIndex + 1].data;
for (const event of events) buffer.push(event);

const newLastExportedBlock = this.taskData[this.lastExportedBlock + 1].toBlock;
delete this.taskData[this.lastExportedBlock + 1];
this.lastExportedBlock = newLastExportedBlock;
lastExportedBlock = this.taskData[this.lastExportedIndex + 1].interval.toBlock;
delete this.taskData[this.lastExportedIndex + 1];
this.lastExportedIndex += 1;
}
return buffer;
return [lastExportedBlock, buffer];
}

/**
* Takes a `() => worker.work()` function and pushes it
* into the TaskManager's p-queue.
* @param {Function} workTask
*/
pushToQueue(interval, workTask) {
// this.queue.add()
this.queue.add(workTask);
}

/**
* Same as in the `worker_base.js` module, this is used for the definition of the
* `lastExportedBlock` and `lastPrimaryKey` properties, according to whether we have a
* `lastProcessedPosition` from ZooKeeper and if not, use the ones set from the config file
* or the defaults.
* @param {object} lastProcessedPosition
*/
initFromLastPosition(lastProcessedPosition) {
this.lastExportedBlock = lastProcessedPosition.blockNumber;
this.lastPrimaryKey = lastProcessedPosition.primaryKey;
}

/**
* Gets the object, made by taking the lastExportedBlock and lastPrimaryKey variables.
* Used when updating the position of the exporter in ZooKeeper.
* @returns An object of the last exported block and last used primary key
*/
getLastProcessedPosition() {
return {
blockNumber: this.lastExportedBlock,
primaryKey: this.lastPrimaryKey
};
}

/**
* @param {number} bufferLength The length of the buffer of updated events
*/
updateLastPrimaryKey(bufferLength) {
this.lastPrimaryKey += bufferLength;
}

isChangedLastExportedBlock(lastProcessedBlock) {
return lastProcessedBlock < this.lastExportedBlock;
pushToQueue(taskMetadata) {
this.lastTaskIndex++;
this.taskData[this.lastTaskIndex] = taskMetadata;
const taskIndex = this.lastTaskIndex;
this.queue.add(async () => {
const result = await taskMetadata.lambda(taskMetadata.interval);
return [taskIndex, result];
});
}
}

Expand Down
16 changes: 14 additions & 2 deletions lib/worker_base.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
'use strict';
const { logger } = require('./logger');


class WorkerBase {
constructor(constants) {
// To prevent healthcheck failing during initialization and processing first
Expand Down Expand Up @@ -54,10 +57,19 @@ class WorkerBase {
* @param {JSON} lastProcessedPosition
* @return {Object} The received or modified object describing the position to start from.
*/
initFromLastPosition(lastProcessedPosition) {
initPosition(lastProcessedPosition) {
if (lastProcessedPosition) {
logger.info(`Resuming export from position ${JSON.stringify(lastProcessedPosition)}`);
} else {
lastProcessedPosition = {
blockNumber: this.settings.START_BLOCK,
primaryKey: this.settings.START_PRIMARY_KEY
};
logger.info(`Initialized exporter with initial position ${JSON.stringify(lastProcessedPosition)}`);
}
this.lastExportedBlock = lastProcessedPosition.blockNumber;
this.lastPrimaryKey = lastProcessedPosition.primaryKey;
if (this.settings.BLOCKCHAIN === 'eth') this.lastQueuedBlock = this.lastExportedBlock;
this.lastQueuedBlock = this.lastExportedBlock;

return lastProcessedPosition;
}
Expand Down

0 comments on commit 09cfb87

Please sign in to comment.