Skip to content

Commit

Permalink
Different approach to exporter workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
Lyudmil Danailov authored and Lyudmil Danailov committed Dec 22, 2023
1 parent 2a122e5 commit 0b96bdb
Show file tree
Hide file tree
Showing 11 changed files with 270 additions and 202 deletions.
2 changes: 1 addition & 1 deletion bin/export_env_vars.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export KAFKA_TOPIC="erc20_exporter_test_topic"
# 'kubectl -n cardano port-forward cardano-graphql-pod-id --address 172.17.0.1 3100:3100'
# replacing with the actual pod id. The IP on which the container can access the host is
# usually 172.17.0.1
export CARDANO_GRAPHQL_URL=http://172.17.0.1:3100/graphql
export CARDANO_GRAPHQL_URL=https://cardano.santiment.net
export BNB_CHAIN_START_MSEC=1595549200002
export ZOOKEEPER_SESSION_TIMEOUT=20000
export CONTRACT_MAPPING_FILE_PATH="./test/erc20/contract_mapping/contract_mapping.json"
118 changes: 57 additions & 61 deletions blockchains/eth/eth_worker.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
const Web3 = require('web3');
const jayson = require('jayson/promise');
const { filterErrors } = require('./lib/filter_errors');
const constants = require('./lib/constants');
const { logger } = require('../../lib/logger');
const { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } = require('./lib/dao_hack');
const { getGenesisTransfers } = require('./lib/genesis_transfers');
const { transactionOrder, stableSort } = require('./lib/util');
const BaseWorker = require('../../lib/worker_base');
const Web3Wrapper = require('./lib/web3_wrapper');
const { decodeTransferTrace } = require('./lib/decode_transfers');
const BaseWorker = require('../../lib/worker_base');
const { FeesDecoder } = require('./lib/fees_decoder');
const { nextIntervalCalculator } = require('./lib/next_interval_calculator');
const { filterErrors } = require('./lib/filter_errors');
const { decodeTransferTrace } = require('./lib/decode_transfers');
const { getGenesisTransfers } = require('./lib/genesis_transfers');
const { WithdrawalsDecoder } = require('./lib/withdrawals_decoder');
const { nextIntervalCalculator } = require('./lib/next_interval_calculator');
const { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } = require('./lib/dao_hack');


class ETHWorker extends BaseWorker {
constructor() {
Expand All @@ -27,6 +27,7 @@ class ETHWorker extends BaseWorker {
}
this.feesDecoder = new FeesDecoder(this.web3, this.web3Wrapper);
this.withdrawalsDecoder = new WithdrawalsDecoder(this.web3, this.web3Wrapper);
this.buffer = [];
}

parseEthInternalTrx(result) {
Expand All @@ -41,6 +42,7 @@ class ETHWorker extends BaseWorker {
}

fetchEthInternalTrx(fromBlock, toBlock) {
logger.info(`Fetching traces info ${fromBlock}:${toBlock}`);
return this.ethClient.request('trace_filter', [{
fromBlock: this.web3Wrapper.parseNumberToHex(fromBlock),
toBlock: this.web3Wrapper.parseNumberToHex(toBlock)
Expand All @@ -67,55 +69,59 @@ class ETHWorker extends BaseWorker {
}

async fetchReceipts(blockNumbers) {
const responses = [];

const batch = [];
for (const blockNumber of blockNumbers) {
const req = this.ethClient.request(constants.RECEIPTS_API_METHOD, [this.web3Wrapper.parseNumberToHex(blockNumber)], undefined, false);
responses.push(this.ethClient.request([req]));
batch.push(
this.ethClient.request(
constants.RECEIPTS_API_METHOD,
[this.web3Wrapper.parseNumberToHex(blockNumber)],
undefined,
false
)
);
}

const finishedRequests = await Promise.all(responses);
const finishedRequests = await this.ethClient.request(batch);
const result = {};

finishedRequests.forEach((blockResponses) => {
if (!blockResponses) return;

blockResponses.forEach((blockResponse) => {
if (blockResponse.result) {
blockResponse.result.forEach((receipt) => {
result[receipt.transactionHash] = receipt;
});
}
else {
throw new Error(JSON.stringify(blockResponse));
}
});
finishedRequests.forEach((response) => {
if (response.result) {
response.result.forEach((receipt) => {
result[receipt.transactionHash] = receipt;
});
}
else {
throw new Error(JSON.stringify(response));
}
});

return result;
}

async fetchTracesBlocksAndReceipts(fromBlock, toBlock) {
logger.info(`Fetching traces for blocks ${fromBlock}:${toBlock}`);
const [traces, blocks] = await Promise.all([
this.fetchEthInternalTrx(fromBlock, toBlock),
this.fetchBlocks(fromBlock, toBlock)
]);
async fetchBlocksAndReceipts(fromBlock, toBlock) {
logger.info(`Fetching blocks info ${fromBlock}:${toBlock}`);
const blocks = await this.fetchBlocks(fromBlock, toBlock);
logger.info(`Fetching receipts of ${fromBlock}:${toBlock}`);
const receipts = await this.fetchReceipts(blocks.keys());

return [traces, blocks, receipts];
return [blocks, receipts];
}

async getPastEvents(fromBlock, toBlock, traces, blocks, receipts) {
async fetchData(fromBlock, toBlock) {
return await Promise.all([
this.fetchEthInternalTrx(fromBlock, toBlock),
this.fetchBlocksAndReceipts(fromBlock, toBlock)]);
}

transformEvents(fromBlock, toBlock, data) {
const [traces, [blocks, receipts]] = data;
let events = [];
if (fromBlock === 0) {
logger.info('Adding the GENESIS transfers');
events.push(...getGenesisTransfers(this.web3));
}

events.push(... await this.getPastTransferEvents(traces, blocks));
events.push(... await this.getPastTransactionEvents(blocks.values(), receipts));
events.push(...this.transformTransferEvents(traces, blocks));
events.push(...this.transformTransactionEvents(blocks.values(), receipts));
if (fromBlock <= DAO_HACK_FORK_BLOCK && DAO_HACK_FORK_BLOCK <= toBlock) {
logger.info('Adding the DAO hack transfers');
events = injectDAOHackTransfers(events);
Expand All @@ -124,54 +130,44 @@ class ETHWorker extends BaseWorker {
return events;
}

async getPastTransferEvents(traces, blocksMap) {
transformTransferEvents(traces, blocksMap) {
const result = [];

for (let i = 0; i < traces.length; i++) {
const block_timestamp = this.web3Wrapper.decodeTimestampFromBlock(blocksMap.get(traces[i]['blockNumber']));
result.push(decodeTransferTrace(traces[i], block_timestamp, this.web3Wrapper));
result.push(decodeTransferTrace(traces[i], block_timestamp, this.web3Wrapper));//TODO: Maybe push {blocknumbers: data}
}

return result;
}

async getPastTransactionEvents(blocks, receipts) {
transformTransactionEvents(blocks, receipts) {
const result = [];

for (const block of blocks) {
const decoded_transactions = this.feesDecoder.getFeesFromTransactionsInBlock(block, receipts);
const blockNumber = this.web3Wrapper.parseHexToNumber(block.number);
if (constants.IS_ETH && blockNumber >= constants.SHANGHAI_FORK_BLOCK) {
decoded_transactions.push(... await this.withdrawalsDecoder.getBeaconChainWithdrawals(block, blockNumber));
decoded_transactions.push(...this.withdrawalsDecoder.getBeaconChainWithdrawals(block, blockNumber));
}
result.push(...decoded_transactions);
//TODO: Maybe push {blocknumbers: data}
}

return result;
}
//TODO:s - If you have a [{#:data}] then you can check whether the # just doesnt have data or it's missing
async makeQueueTask(interval) {
const data = await this.fetchData(interval.fromBlock, interval.toBlock);
const transformedData = this.transformEvents(interval.fromBlock, interval.toBlock, data);
transformedData.forEach((data) => this.buffer.push(data));
}

async work() {
const result = await nextIntervalCalculator(this);
if (!result.success) {
return [];
}
const intervals = await nextIntervalCalculator(this);
if (intervals.length === 0) return [];

logger.info(`Fetching transfer events for interval ${result.fromBlock}:${result.toBlock}`);
const [traces, blocks, receipts] = await this.fetchTracesBlocksAndReceipts(result.fromBlock, result.toBlock);
const events = await this.getPastEvents(result.fromBlock, result.toBlock, traces, blocks, receipts);

if (events.length > 0) {
stableSort(events, transactionOrder);
for (let i = 0; i < events.length; i++) {
events[i].primaryKey = this.lastPrimaryKey + i + 1;
}
for (const interval of intervals) this.queue.add(() => this.makeQueueTask(interval));

this.lastPrimaryKey += events.length;
}

this.lastExportedBlock = result.toBlock;

return events;
this.lastExportedBlock = Math.max(intervals[intervals.length - 1].toBlock, this.lastExportTime);
}

async init() {
Expand Down
4 changes: 3 additions & 1 deletion blockchains/eth/lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ const ETH_WITHDRAWAL = 'withdrawal';
const LONDON_FORK_BLOCK = 12965000;
const SHANGHAI_FORK_BLOCK = 17034871;
const IS_ETH = parseInt(process.env.IS_ETH || '1');
const RECEIPTS_API_METHOD = process.env.RECEIPTS_API_METHOD || 'eth_getBlockReceipts';
const CONFIRMATIONS = parseInt(process.env.CONFIRMATIONS || '3');
const BLOCK_INTERVAL = parseInt(process.env.BLOCK_INTERVAL || '100');
const RECEIPTS_API_METHOD = process.env.RECEIPTS_API_METHOD || 'eth_getBlockReceipts';
const MAX_CONCURRENT_REQUESTS = parseInt(process.env.MAX_CONCURRENT_REQUESTS || 1);
const NODE_URL = process.env.NODE_URL || process.env.PARITY_URL || 'http://localhost:8545/';
const LOOP_INTERVAL_CURRENT_MODE_SEC = parseInt(process.env.LOOP_INTERVAL_CURRENT_MODE_SEC || '30');

module.exports = {
BLOCK_INTERVAL,
CONFIRMATIONS,
NODE_URL,
MAX_CONCURRENT_REQUESTS,
LOOP_INTERVAL_CURRENT_MODE_SEC,
BURN_ADDRESS,
ETH_WITHDRAWAL,
Expand Down
2 changes: 1 addition & 1 deletion blockchains/eth/lib/genesis_transfers.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const GENESIS_TRANSFERS = fs.readFileSync(path.resolve(__dirname) + '/ethereum_g

const GENESIS_TIMESTAMP = 1438269973;

exports.getGenesisTransfers = function(web3) {
exports.getGenesisTransfers = function (web3) {
const result = [];
GENESIS_TRANSFERS.forEach((transfer) => {
const [id, from, to, amount] = transfer;
Expand Down
65 changes: 25 additions & 40 deletions blockchains/eth/lib/next_interval_calculator.js
Original file line number Diff line number Diff line change
@@ -1,56 +1,41 @@
const constants = require('./constants');


function isNewBlockAvailable(worker) {
return worker.lastExportedBlock < worker.lastConfirmedBlock;
}
/**
* Return the next interval to be fetched.
* NOTE: this method modifies the member variables of its argument
*
* @param {*} worker A worker object, member variables would be modified
* @returns An object like so:
* {
* success: Boolean,
* fromBlock: Integer,
* toBlock: Integer
* }
* A function that returns the appropriate array of intervals,
* depending on the progress that the worker's made.
* If the exporter's caught up, we check for a new block. We then check whether the Node
* returns a valid block (sometimes the Node returns an early block, like 3 for example).
* We don't want to get the new blocks right away, so we set a sleep variable. On the next iteration
* the function will return the appropriate array of intervals.
* @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class.
* @returns {Array} An array of intervals.
*/
async function nextIntervalCalculator(worker) {
// Check if we need to ask the Node for new Head block. This is an optimization to skip this call when the exporter
// is behind the last seen Head anyways.
const firstNewBlockCheck = isNewBlockAvailable(worker);
if (!firstNewBlockCheck) {
// On the previous cycle we closed the gap to the head of the blockchain.
// Check if there are new blocks now.
if (worker.lastExportedBlock >= worker.lastConfirmedBlock) {
const newConfirmedBlock = await worker.web3.eth.getBlockNumber() - constants.CONFIRMATIONS;
if (newConfirmedBlock > worker.lastConfirmedBlock) {
// The Node has progressed
if (worker.lastConfirmedBlock < newConfirmedBlock) {
worker.lastConfirmedBlock = newConfirmedBlock;
}
worker.sleepTimeMsec = constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000;
return [];
}

if (firstNewBlockCheck || isNewBlockAvailable(worker)) {
// If data was available without asking with Node, we are catching up and should come back straight away
if (firstNewBlockCheck) {
worker.sleepTimeMsec = 0;
}
else {
// If data became available only after asking the Node, we are close to the Head, come back later
worker.sleepTimeMsec = constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000;
}
worker.sleepTimeMsec = 0;
const progressDifference = worker.lastConfirmedBlock - worker.lastExportedBlock;
const maxInterval = constants.MAX_CONCURRENT_REQUESTS * constants.BLOCK_INTERVAL;
let intervalArrayLength;
if (progressDifference < maxInterval) {
intervalArrayLength = Math.ceil(progressDifference / constants.BLOCK_INTERVAL);
} else {
intervalArrayLength = constants.MAX_CONCURRENT_REQUESTS;
}

return Array.from({ length: intervalArrayLength }, (_, i) => {
return {
success: true,
fromBlock: worker.lastExportedBlock + 1,
toBlock: Math.min(worker.lastExportedBlock + constants.BLOCK_INTERVAL, worker.lastConfirmedBlock)
fromBlock: worker.lastExportedBlock + constants.BLOCK_INTERVAL * i + 1,
toBlock: Math.min(worker.lastExportedBlock + constants.BLOCK_INTERVAL * (i + 1), worker.lastConfirmedBlock)
};
}
else {
// The Node has not progressed
worker.sleepTimeMsec = constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000;
return { success: false };
}
});
}

module.exports = {
Expand Down
Loading

0 comments on commit 0b96bdb

Please sign in to comment.