Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding parallelism to the ETH, ERC20, MATIC pipelines #162

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
49 changes: 35 additions & 14 deletions blockchains/erc20/erc20_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,45 +91,66 @@ class ERC20Worker extends BaseWorker {
}

async work() {
const result = constants.EXPORT_BLOCKS_LIST ?
this.getBlocksListInterval() :
await nextIntervalCalculator(this);
const requestIntervals = await nextIntervalCalculator(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you removing the 'export block list' feature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've completely missed that. But for the time being, I'd remove this part of the code and rework it later on. It needs to be reworked to fit in with this functionality. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, let's integrate it again in a next PR not to slow down this one. It would be very similar to what you do in nextIntervalCalculator currently - generate a sequence of intervals with the parallelism in mind.

if (requestIntervals.length === 0) return [];

if (!result.success) {
return [];
}

logger.info(`Fetching transfer events for interval ${result.fromBlock}:${result.toBlock}`);
logger.info(
`Fetching transfer events for interval ${requestIntervals[0].fromBlock}:` +
`${requestIntervals[requestIntervals.length - 1].toBlock}`);

let events = [];
let overwritten_events = [];
const timestampsCache = new TimestampsCache();
if ('extract_exact_overwrite' === constants.CONTRACT_MODE) {
if (this.allOldContracts.length > 0) {
events = await getPastEvents(this.web3, result.fromBlock, result.toBlock, this.allOldContracts, timestampsCache);
events = [].concat(...await Promise.all(
requestIntervals.map(async (requestInterval) => {
return await getPastEvents(
this.web3,
requestInterval.fromBlock,
requestInterval.toBlock,
this.allOldContracts,
timestampsCache);
})));
changeContractAddresses(events, this.contractsOverwriteArray);
}

if (this.contractsUnmodified.length > 0) {
const rawEvents = await getPastEvents(this.web3, result.fromBlock, result.toBlock, this.contractsUnmodified,
timestampsCache);
const rawEvents = [].concat(...await Promise.all(
requestIntervals.map(async (requestInterval) => {
return await getPastEvents(
this.web3,
requestInterval.fromBlock,
requestInterval.toBlock,
this.contractsUnmodified,
timestampsCache);
})));
events.push(...rawEvents);
}
}
else {
events = await getPastEvents(this.web3, result.fromBlock, result.toBlock, null, timestampsCache);
events = [].concat(...await Promise.all(
requestIntervals.map(async (requestInterval) => {
return await getPastEvents(
this.web3,
requestInterval.fromBlock,
requestInterval.toBlock,
null,
timestampsCache);
})));
if ('extract_all_append' === constants.CONTRACT_MODE) {
overwritten_events = extractChangedContractAddresses(events, this.contractsOverwriteArray);
}
}

if (events.length > 0) {
extendEventsWithPrimaryKey(events, overwritten_events);
logger.info(`Setting primary keys ${events.length} messages for blocks ${result.fromBlock}:${result.toBlock}`);
logger.info(`Setting primary keys ${events.length} messages for blocks ${requestIntervals[0].fromBlock}:`+
`${requestIntervals[requestIntervals.length - 1].toBlock}`);
this.lastPrimaryKey = events[events.length - 1].primaryKey;
}

this.lastExportedBlock = result.toBlock;
this.lastExportedBlock = requestIntervals[requestIntervals.length - 1].toBlock;
const resultEvents = events.concat(overwritten_events);

// If overwritten events have been generated, they need to be merged into the original events
Expand Down
103 changes: 66 additions & 37 deletions blockchains/eth/eth_worker.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
const Web3 = require('web3');
const jayson = require('jayson/promise');
const { filterErrors } = require('./lib/filter_errors');
const constants = require('./lib/constants');
const { logger } = require('../../lib/logger');
const { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } = require('./lib/dao_hack');
const { getGenesisTransfers } = require('./lib/genesis_transfers');
const { transactionOrder, stableSort } = require('./lib/util');
const BaseWorker = require('../../lib/worker_base');
const Web3Wrapper = require('./lib/web3_wrapper');
const { decodeTransferTrace } = require('./lib/decode_transfers');
const BaseWorker = require('../../lib/worker_base');
const { FeesDecoder } = require('./lib/fees_decoder');
const { nextIntervalCalculator } = require('./lib/next_interval_calculator');
const { filterErrors } = require('./lib/filter_errors');
const { transactionOrder, stableSort } = require('./lib/util');
const { decodeTransferTrace } = require('./lib/decode_transfers');
const { getGenesisTransfers } = require('./lib/genesis_transfers');
const { WithdrawalsDecoder } = require('./lib/withdrawals_decoder');
const { nextIntervalCalculator } = require('./lib/next_interval_calculator');
const { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } = require('./lib/dao_hack');


class ETHWorker extends BaseWorker {
constructor() {
Expand All @@ -29,6 +30,32 @@ class ETHWorker extends BaseWorker {
this.withdrawalsDecoder = new WithdrawalsDecoder(this.web3, this.web3Wrapper);
}

async ethClientRequestWithRetry(...params) {
let retries = 0;
let retryIntervalMs = 0;
while (retries < constants.MAX_RETRIES) {
try {
const response = await this.ethClient.request(...params);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should (...params) be spread again, I thought that request would work with an array

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it would work with an array if it works with a batch of requests. Technically, [] is a batch, but I'd have to rework how the parameters are given. Currently we have

('trace_filter', [{
  fromBlock: this.web3Wrapper.parseNumberToHex(fromBlock),
  toBlock: this.web3Wrapper.parseNumberToHex(toBlock)
}])

if (response.error || response.result === null) {
retries++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we even wait if the retries number is reached at this point, think MAX_RETRIES=0

Copy link
Contributor

@WonderBeat WonderBeat Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should definitely wait. I suggest to use a library for retryer. it's error prone to write it yourself

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect MAX_RETRIES to specify the number of retries, that is attempts to re-send after the initial sending. Instead if it is is 0, we would not send the request at all. If it is 1, we would send one time, on error wait, but then not retry. Seems to me like the logic is a bit off.

retryIntervalMs += (constants.BACKOFF_RETRY_STEP * retries);
logger.error(`${params[0]} failed. Reason: ${response.error}. Retrying for ${retries} time`);
YordanPavlov marked this conversation as resolved.
Show resolved Hide resolved
await new Promise((resolve) => setTimeout(resolve, retryIntervalMs));
} else {
return response;
}
} catch(err) {
retries++;
retryIntervalMs += (constants.BACKOFF_RETRY_STEP * retries);
logger.error(
`Try block in ${params[0]} failed. Reason: ${err.toString()}. Waiting ${retryIntervalMs} and retrying for ${retries} time`
);
await new Promise((resolve) => setTimeout(resolve, retryIntervalMs));
}
}
return Promise.reject(`${params[0]} failed after ${retries} retries`);
}

parseEthInternalTrx(result) {
const traces = filterErrors(result);

Expand All @@ -41,13 +68,15 @@ class ETHWorker extends BaseWorker {
}

fetchEthInternalTrx(fromBlock, toBlock) {
return this.ethClient.request('trace_filter', [{
logger.info(`Fetching internal transactions for blocks ${fromBlock}:${toBlock}`);
return this.ethClientRequestWithRetry('trace_filter', [{
fromBlock: this.web3Wrapper.parseNumberToHex(fromBlock),
toBlock: this.web3Wrapper.parseNumberToHex(toBlock)
}]).then((data) => this.parseEthInternalTrx(data['result']));
}

async fetchBlocks(fromBlock, toBlock) {
logger.info(`Fetching block info for blocks ${fromBlock}:${toBlock}`);
const blockRequests = [];
for (let i = fromBlock; i <= toBlock; i++) {
blockRequests.push(
Expand All @@ -67,36 +96,35 @@ class ETHWorker extends BaseWorker {
}

async fetchReceipts(blockNumbers) {
const responses = [];

const batch = [];
for (const blockNumber of blockNumbers) {
const req = this.ethClient.request(constants.RECEIPTS_API_METHOD, [this.web3Wrapper.parseNumberToHex(blockNumber)], undefined, false);
responses.push(this.ethClient.request([req]));
batch.push(
this.ethClient.request(
constants.RECEIPTS_API_METHOD,
[this.web3Wrapper.parseNumberToHex(blockNumber)],
undefined,
false
)
);
}

const finishedRequests = await Promise.all(responses);
const finishedRequests = await this.ethClientRequestWithRetry(batch);
const result = {};

finishedRequests.forEach((blockResponses) => {
if (!blockResponses) return;

blockResponses.forEach((blockResponse) => {
if (blockResponse.result) {
blockResponse.result.forEach((receipt) => {
result[receipt.transactionHash] = receipt;
});
}
else {
throw new Error(JSON.stringify(blockResponse));
}
});
finishedRequests.forEach((response) => {
if (response.result) {
response.result.forEach((receipt) => {
result[receipt.transactionHash] = receipt;
});
}
else {
throw new Error(JSON.stringify(response));
}
});

return result;
}

async fetchTracesBlocksAndReceipts(fromBlock, toBlock) {
logger.info(`Fetching traces for blocks ${fromBlock}:${toBlock}`);
const [traces, blocks] = await Promise.all([
this.fetchEthInternalTrx(fromBlock, toBlock),
this.fetchBlocks(fromBlock, toBlock)
Expand All @@ -108,6 +136,7 @@ class ETHWorker extends BaseWorker {
}

async getPastEvents(fromBlock, toBlock, traces, blocks, receipts) {
logger.info(`Fetching transfer events for interval ${fromBlock}:${toBlock}`);
let events = [];
if (fromBlock === 0) {
logger.info('Adding the GENESIS transfers');
Expand Down Expand Up @@ -151,14 +180,15 @@ class ETHWorker extends BaseWorker {
}

async work() {
const result = await nextIntervalCalculator(this);
if (!result.success) {
return [];
}
const requestIntervals = await nextIntervalCalculator(this);
if (requestIntervals.length === 0) return [];

logger.info(`Fetching transfer events for interval ${result.fromBlock}:${result.toBlock}`);
const [traces, blocks, receipts] = await this.fetchTracesBlocksAndReceipts(result.fromBlock, result.toBlock);
const events = await this.getPastEvents(result.fromBlock, result.toBlock, traces, blocks, receipts);
const events = [].concat(...await Promise.all(
requestIntervals.map(async (requestInterval) => {
const [traces, blocks, receipts] = await this.fetchTracesBlocksAndReceipts(requestInterval.fromBlock, requestInterval.toBlock);
return await this.getPastEvents(requestInterval.fromBlock, requestInterval.toBlock, traces, blocks, receipts);
})
));

if (events.length > 0) {
stableSort(events, transactionOrder);
Expand All @@ -169,8 +199,7 @@ class ETHWorker extends BaseWorker {
this.lastPrimaryKey += events.length;
}

this.lastExportedBlock = result.toBlock;

this.lastExportedBlock = requestIntervals[requestIntervals.length - 1].toBlock;
return events;
}

Expand Down
26 changes: 16 additions & 10 deletions blockchains/eth/lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,27 @@ const ETH_WITHDRAWAL = 'withdrawal';
const LONDON_FORK_BLOCK = 12965000;
const SHANGHAI_FORK_BLOCK = 17034871;
const IS_ETH = parseInt(process.env.IS_ETH || '1');
const MAX_RETRIES = parseInt(process.env.MAX_RETRIES) || 3;
const CONFIRMATIONS = parseInt(process.env.CONFIRMATIONS || '3');
const BLOCK_INTERVAL = parseInt(process.env.BLOCK_INTERVAL || '100');
const BACKOFF_RETRY_STEP = parseInt(process.env.BACKOFF_RETRY_STEP || '2000');
const RECEIPTS_API_METHOD = process.env.RECEIPTS_API_METHOD || 'eth_getBlockReceipts';
const MAX_CONCURRENT_REQUESTS = parseInt(process.env.MAX_CONCURRENT_REQUESTS || '1');
const NODE_URL = process.env.NODE_URL || process.env.PARITY_URL || 'http://localhost:8545/';
const LOOP_INTERVAL_CURRENT_MODE_SEC = parseInt(process.env.LOOP_INTERVAL_CURRENT_MODE_SEC || '30');

module.exports = {
BLOCK_INTERVAL,
CONFIRMATIONS,
NODE_URL,
LOOP_INTERVAL_CURRENT_MODE_SEC,
BURN_ADDRESS,
ETH_WITHDRAWAL,
IS_ETH,
LONDON_FORK_BLOCK,
SHANGHAI_FORK_BLOCK,
RECEIPTS_API_METHOD
IS_ETH,
NODE_URL,
MAX_RETRIES,
BURN_ADDRESS,
CONFIRMATIONS,
BLOCK_INTERVAL,
ETH_WITHDRAWAL,
LONDON_FORK_BLOCK,
BACKOFF_RETRY_STEP,
SHANGHAI_FORK_BLOCK,
RECEIPTS_API_METHOD,
MAX_CONCURRENT_REQUESTS,
LOOP_INTERVAL_CURRENT_MODE_SEC
};
65 changes: 25 additions & 40 deletions blockchains/eth/lib/next_interval_calculator.js
Original file line number Diff line number Diff line change
@@ -1,56 +1,41 @@
const constants = require('./constants');


function isNewBlockAvailable(worker) {
return worker.lastExportedBlock < worker.lastConfirmedBlock;
}
/**
* Return the next interval to be fetched.
* NOTE: this method modifies the member variables of its argument
*
* @param {*} worker A worker object, member variables would be modified
* @returns An object like so:
* {
* success: Boolean,
* fromBlock: Integer,
* toBlock: Integer
* }
* A function that returns the appropriate array of intervals,
* depending on the progress that the worker's made.
* If the exporter's caught up, we check for a new block. We then check whether the Node
* returns a valid block (sometimes the Node returns an early block, like 3 for example).
* We don't want to get the new blocks right away, so we set a sleep variable. On the next iteration
* the function will return the appropriate array of intervals.
* @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class.
* @returns {Array} An array of intervals.
*/
async function nextIntervalCalculator(worker) {
// Check if we need to ask the Node for new Head block. This is an optimization to skip this call when the exporter
// is behind the last seen Head anyways.
const firstNewBlockCheck = isNewBlockAvailable(worker);
if (!firstNewBlockCheck) {
// On the previous cycle we closed the gap to the head of the blockchain.
// Check if there are new blocks now.
if (worker.lastExportedBlock >= worker.lastConfirmedBlock) {
const newConfirmedBlock = await worker.web3.eth.getBlockNumber() - constants.CONFIRMATIONS;
if (newConfirmedBlock > worker.lastConfirmedBlock) {
// The Node has progressed
if (worker.lastConfirmedBlock < newConfirmedBlock) {
worker.lastConfirmedBlock = newConfirmedBlock;
}
worker.sleepTimeMsec = constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000;
return [];
}

if (firstNewBlockCheck || isNewBlockAvailable(worker)) {
// If data was available without asking with Node, we are catching up and should come back straight away
if (firstNewBlockCheck) {
worker.sleepTimeMsec = 0;
}
else {
// If data became available only after asking the Node, we are close to the Head, come back later
worker.sleepTimeMsec = constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000;
}
worker.sleepTimeMsec = 0;
YordanPavlov marked this conversation as resolved.
Show resolved Hide resolved
const progressDifference = worker.lastConfirmedBlock - worker.lastExportedBlock;
const maxInterval = constants.MAX_CONCURRENT_REQUESTS * constants.BLOCK_INTERVAL;
let intervalArrayLength;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be made const with the ternary operator.

Copy link
Contributor Author

@spiderjako spiderjako Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be, but IMO it looks cleaner this way. With ternary it would be a bit too bloated. I like to flex with ternary but I realised a few weeks back that I don't have to always do it. I don't think it's a big deal that it isn't a const, as it's basically given as an argument at the next line and the function ends.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not about flexing, const variables make it easier to reason about the code. Minor ofc, your call.

if (progressDifference < maxInterval) {
intervalArrayLength = Math.ceil(progressDifference / constants.BLOCK_INTERVAL);
} else {
intervalArrayLength = constants.MAX_CONCURRENT_REQUESTS;
}

return Array.from({ length: intervalArrayLength }, (_, i) => {
return {
success: true,
fromBlock: worker.lastExportedBlock + 1,
toBlock: Math.min(worker.lastExportedBlock + constants.BLOCK_INTERVAL, worker.lastConfirmedBlock)
fromBlock: worker.lastExportedBlock + constants.BLOCK_INTERVAL * i + 1,
toBlock: Math.min(worker.lastExportedBlock + constants.BLOCK_INTERVAL * (i + 1), worker.lastConfirmedBlock)
};
}
else {
// The Node has not progressed
worker.sleepTimeMsec = constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000;
return { success: false };
}
});
}

module.exports = {
Expand Down
Loading