Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract the parallelism idea into a single entity #178

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15,062 changes: 0 additions & 15,062 deletions integration_test/testdata/binance_native_token_block_20000000_to_20000999.json

This file was deleted.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"main": "index.js",
"scripts": {
"test": "TEST_ENV=1 BLOCKCHAIN=eth CONTRACT_MAPPING_FILE_PATH=test/erc20/contract_mapping/contract_mapping.json LOG_LEVEL=error CONTRACT_MODE=extract_exact_overwrite mocha --require source-map-support/register --recursive --reporter spec ./built/test/**/*.js",
"integration_test": "mocha --recursive --reporter spec integration_test",
"integration_test": "mocha --recursive --reporter spec ./built/integration_test",
"lint": "eslint . --ext .js,.jsx,.ts,.tsx",
"start": "npm run build && node ./built/index.js",
"build": "rm -r -f ./built && tsc && ./copy_assets.sh",
Expand Down
7 changes: 5 additions & 2 deletions src/blockchains/erc20/erc20_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ class ERC20Worker extends BaseWorker {

const interval = this.settings.EXPORT_BLOCKS_LIST ?
this.getBlocksListInterval() :
nextIntervalCalculator(this);
nextIntervalCalculator(
this.lastExportedBlock,
this.lastConfirmedBlock,
this.settings.BLOCK_INTERVAL);

logger.info(`Fetching transfer events for interval ${interval.fromBlock}:${interval.toBlock}`);

Expand Down Expand Up @@ -136,7 +139,7 @@ class ERC20Worker extends BaseWorker {
}

if (events.length > 0) {
extendEventsWithPrimaryKey(events, overwritten_events);
extendEventsWithPrimaryKey(events, this.settings.PRIMARY_KEY_MULTIPLIER, overwritten_events);
logger.info(`Setting primary keys ${events.length} messages for blocks ${interval.fromBlock}:${interval.toBlock}`);
this.lastPrimaryKey = events[events.length - 1].primaryKey;
}
Expand Down
9 changes: 4 additions & 5 deletions src/blockchains/erc20/lib/extend_events_key.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict';

const { stableSort } = require('./util');
const constants = require('./constants');
const { logger } = require('../../../lib/logger');

function transactionOrder(a, b) {
Expand All @@ -14,26 +13,26 @@ function transactionOrder(a, b) {
}
}

function extendEventsWithPrimaryKey(events, overwritten_events = []) {
function extendEventsWithPrimaryKey(events, primaryKeyMultiplier, overwritten_events = []) {
stableSort(events, transactionOrder);
const lastEvent = events[events.length - 1];
if (lastEvent.logIndex + overwritten_events.length >= constants.PRIMARY_KEY_MULTIPLIER) {
if (lastEvent.logIndex + overwritten_events.length >= primaryKeyMultiplier) {
logger.error(`An event with log index ${lastEvent.logIndex} is breaking the primaryKey generation logic at block `
+ `${lastEvent.blockNumber}. There are ${overwritten_events.length} overwritten events.`);
}

// Store the last log index of the original events per block
let lastLogIndexPerBlock = {};
events.forEach(function (event) {
event.primaryKey = event.blockNumber * constants.PRIMARY_KEY_MULTIPLIER + event.logIndex;
event.primaryKey = event.blockNumber * primaryKeyMultiplier + event.logIndex;
// We depend on the events being sorted by log index above
lastLogIndexPerBlock[event.blockNumber] = event.logIndex;
});
// As the overwritten events are copies of the main events, they would have the same logIndex. To generate unique primary keys,
// the primary keys of overwritten events start after the biggest primary key of the main events and increase by 1.
overwritten_events.forEach(function (event) {
lastLogIndexPerBlock[event.blockNumber] += 1;
event.primaryKey = event.blockNumber * constants.PRIMARY_KEY_MULTIPLIER + lastLogIndexPerBlock[event.blockNumber];
event.primaryKey = event.blockNumber * primaryKeyMultiplier + lastLogIndexPerBlock[event.blockNumber];
});
}

Expand Down
67 changes: 39 additions & 28 deletions src/blockchains/eth/eth_worker.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
const { Web3 } = require('web3');
const { filterErrors } = require('./lib/filter_errors');
const { logger } = require('../../lib/logger');
const { constructRPCClient } = require('../../lib/http_client');
const { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } = require('./lib/dao_hack');
const { getGenesisTransfers } = require('./lib/genesis_transfers');
const { transactionOrder, stableSort } = require('./lib/util');
const BaseWorker = require('../../lib/worker_base');
const Web3Wrapper = require('./lib/web3_wrapper');
const { decodeTransferTrace } = require('./lib/decode_transfers');
const BaseWorker = require('../../lib/worker_base');
const { FeesDecoder } = require('./lib/fees_decoder');
const { nextIntervalCalculator, analyzeWorkerContext, setWorkerSleepTime, NO_WORK_SLEEP } = require('./lib/next_interval_calculator');
const { filterErrors } = require('./lib/filter_errors');
const { stableSort } = require('../erc20/lib/util');
const { constructRPCClient } = require('../../lib/http_client');
const { decodeTransferTrace } = require('./lib/decode_transfers');
const { transactionOrder } = require('./lib/util');
const { getGenesisTransfers } = require('./lib/genesis_transfers');
const { WithdrawalsDecoder } = require('./lib/withdrawals_decoder');
const { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } = require('./lib/dao_hack');


class ETHWorker extends BaseWorker {
constructor(settings) {
Expand All @@ -20,11 +21,22 @@ class ETHWorker extends BaseWorker {
logger.info(`Applying the following settings: ${JSON.stringify(settings)}`);
this.web3Wrapper = new Web3Wrapper(new Web3(new Web3.providers.HttpProvider(settings.NODE_URL)));
this.ethClient = constructRPCClient(settings.NODE_URL);

this.feesDecoder = new FeesDecoder(this.web3Wrapper);
this.feesDecoder = new FeesDecoder(
this.web3Wrapper,
this.settings.BURN_ADDRESS,
this.settings.IS_ETH,
this.settings.LONDON_FORK_BLOCK);
this.withdrawalsDecoder = new WithdrawalsDecoder(this.web3Wrapper);
}

getLastPrimaryKey() {
return this.lastPrimaryKey;
}

setLastExportedBlock(block) {
this.lastExportedBlock = block;
}

parseEthInternalTrx(result) {
const traces = filterErrors(result);

Expand All @@ -40,7 +52,8 @@ class ETHWorker extends BaseWorker {
return this.ethClient.request('trace_filter', [{
fromBlock: this.web3Wrapper.parseNumberToHex(fromBlock),
toBlock: this.web3Wrapper.parseNumberToHex(toBlock)
}]).then((data) => this.parseEthInternalTrx(data['result']));
}])
.then((data) => this.parseEthInternalTrx(data['result']));
}

async fetchBlocks(fromBlock, toBlock) {
Expand Down Expand Up @@ -137,35 +150,33 @@ class ETHWorker extends BaseWorker {
const decoded_transactions = this.feesDecoder.getFeesFromTransactionsInBlock(block, blockNumber, receipts);
if (this.settings.IS_ETH && blockNumber >= this.settings.SHANGHAI_FORK_BLOCK) {
const blockTimestamp = this.web3Wrapper.parseHexToNumber(block.timestamp);
decoded_transactions.push(...this.withdrawalsDecoder.getBeaconChainWithdrawals(block.withdrawals, blockNumber, blockTimestamp));
decoded_transactions.push(...this.withdrawalsDecoder.getBeaconChainWithdrawals(
block.withdrawals,
blockNumber,
blockTimestamp,
this.settings.ETH_WITHDRAWAL));
}
result.push(...decoded_transactions);
}

return result;
}

async work() {
const workerContext = await analyzeWorkerContext(this);
setWorkerSleepTime(this, workerContext);
if (workerContext === NO_WORK_SLEEP) return [];
decorateWithPrimaryKeys(events) {
stableSort(events, transactionOrder);
for (let i = 0; i < events.length; i++) {
events[i].primaryKey = this.lastPrimaryKey + i + 1;
}
this.lastPrimaryKey += events.length;
}

async work(interval) {
const { fromBlock, toBlock } = interval;

const { fromBlock, toBlock } = nextIntervalCalculator(this);
logger.info(`Fetching transfer events for interval ${fromBlock}:${toBlock}`);
const [traces, blocks, receipts] = await this.fetchData(fromBlock, toBlock);
const events = this.transformPastEvents(fromBlock, toBlock, traces, blocks, receipts);

if (events.length > 0) {
stableSort(events, transactionOrder);
for (let i = 0; i < events.length; i++) {
events[i].primaryKey = this.lastPrimaryKey + i + 1;
}

this.lastPrimaryKey += events.length;
}

this.lastExportedBlock = toBlock;

return events;
}

Expand Down
11 changes: 6 additions & 5 deletions src/blockchains/eth/lib/fees_decoder.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
const constants = require('./constants');

class FeesDecoder {
constructor(web3Wrapper) {
constructor(web3Wrapper, burnAddress, isEth, londonForkBlock) {
this.web3Wrapper = web3Wrapper;
this.BURN_ADDRESS = burnAddress;
this.IS_ETH = isEth;
this.LONDON_FORK_BLOCK = londonForkBlock;
}

getPreLondonForkFees(transaction, block, receipts) {
Expand All @@ -23,7 +24,7 @@ class FeesDecoder {
const gasExpense = BigInt(this.web3Wrapper.parseHexToNumber(block.baseFeePerGas)) * BigInt(this.web3Wrapper.parseHexToNumber(receipts[transaction.hash].gasUsed));
result.push({
from: transaction.from,
to: constants.BURN_ADDRESS,
to: this.BURN_ADDRESS,
value: Number(gasExpense),
valueExactBase36: gasExpense.toString(36),
blockNumber: this.web3Wrapper.parseHexToNumber(transaction.blockNumber),
Expand Down Expand Up @@ -72,7 +73,7 @@ class FeesDecoder {
const result = [];
block.transactions.forEach((transaction) => {
const feeTransfers =
constants.IS_ETH && blockNumber >= constants.LONDON_FORK_BLOCK ?
this.IS_ETH && blockNumber >= this.LONDON_FORK_BLOCK ?
this.getPostLondonForkFees(transaction, block, receipts) :
this.getPreLondonForkFees(transaction, block, receipts);

Expand Down
6 changes: 3 additions & 3 deletions src/blockchains/eth/lib/next_interval_calculator.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ function setWorkerSleepTime(worker, context) {
* @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class.
* @returns {object} The interval, derived from the progress of the worker
*/
function nextIntervalCalculator(worker) {
function nextIntervalCalculator(lastExportedBlock, lastConfirmedBlock, blockInterval) {
return {
fromBlock: worker.lastExportedBlock + 1,
toBlock: Math.min(worker.lastExportedBlock + worker.settings.BLOCK_INTERVAL, worker.lastConfirmedBlock)
fromBlock: lastExportedBlock + 1,
toBlock: Math.min(lastExportedBlock + blockInterval, lastConfirmedBlock)
};
}

Expand Down
6 changes: 2 additions & 4 deletions src/blockchains/eth/lib/withdrawals_decoder.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
const constants = require('./constants');

class WithdrawalsDecoder {
constructor(web3Wrapper) {
this.web3Wrapper = web3Wrapper;
}

getBeaconChainWithdrawals(withdrawals, blockNumber, blockTimestamp) {
getBeaconChainWithdrawals(withdrawals, blockNumber, blockTimestamp, ethWithdrawal) {
return withdrawals.map((withdrawal) => {
const gweiAmount = BigInt(this.web3Wrapper.gweiToWei(withdrawal.amount));
return {
from: constants.ETH_WITHDRAWAL,
from: ethWithdrawal,
to: withdrawal.address,
value: Number(gweiAmount),
valueExactBase36: gweiAmount.toString(36),
Expand Down
7 changes: 5 additions & 2 deletions src/blockchains/matic/matic_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@ class MaticWorker extends BaseWorker {
setWorkerSleepTime(this, workerContext);
if (workerContext === NO_WORK_SLEEP) return [];

const result = nextIntervalCalculator(this);
const result = nextIntervalCalculator(
this.lastExportedBlock,
this.lastConfirmedBlock,
this.settings.BLOCK_INTERVAL);

logger.info(`Fetching transfer events for interval ${result.fromBlock}:${result.toBlock}`);

const events = await getPastEvents(this.ethClient, this.web3Wrapper, result.fromBlock, result.toBlock);

if (events.length > 0) {
extendEventsWithPrimaryKey(events);
extendEventsWithPrimaryKey(events, this.settings.PRIMARY_KEY_MULTIPLIER);
logger.info(`Setting primary keys ${events.length} messages for blocks ${result.fromBlock}:${result.toBlock}`);
this.lastPrimaryKey = events[events.length - 1].primaryKey;
}
Expand Down
5 changes: 4 additions & 1 deletion src/blockchains/receipts/receipts_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ class ReceiptsWorker extends BaseWorker {
setWorkerSleepTime(this, workerContext);
if (workerContext === NO_WORK_SLEEP) return [];

const { fromBlock, toBlock } = nextIntervalCalculator(this);
const { fromBlock, toBlock } = nextIntervalCalculator(
this.lastExportedBlock,
this.lastConfirmedBlock,
this.settings.BLOCK_INTERVAL);
logger.info(`Fetching receipts for interval ${fromBlock}:${toBlock}`);
const receipts = await this.getReceiptsForBlocks(fromBlock, toBlock);

Expand Down
2 changes: 1 addition & 1 deletion src/blockchains/utxo/utxo_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class UtxoWorker extends BaseWorker {
return await this.sendRequestWithRetry('getblock', [blockHash, 2]);
}

async work() {
async work(): Promise<any> {
if (this.lastConfirmedBlock === this.lastExportedBlock) {
this.sleepTimeMsec = this.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000;

Expand Down
Loading