Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate receipts extraction into ETHWorker #201

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
5 changes: 3 additions & 2 deletions bin/export_env_vars.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ export KAFKA_URL=kafka-hz.stage.san:30911
export ZOOKEEPER_URL=zookeeper-hz.stage.san:30921
export NODE_URL=https://ethereum.santiment.net
export START_BLOCK="15676731"
export BLOCK_INTERVAL="50"
export BLOCK_INTERVAL="5"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a change required for this PR?

export EXPORT_TIMEOUT_MLS=300000
export CONTRACT_MODE="extract_exact_overwrite"
export BLOCKCHAIN="eth"
export KAFKA_TOPIC="erc20_exporter_test_topic"
#export KAFKA_TOPIC="erc20_exporter_test_topic"
export KAFKA_TOPIC='native_token_transfers:erc20_exporter_test_topic'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides a single topic, we now support multiple ones. The format is comma separate mode:topic_name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is kind of ugly. I believe we can use several arguments instead of packing multiple values inside a single one.
Can we so something like this?

KAFKA_TOPIC_ERC20 = ...
KAFKA_TOPIC_TRANSFER = ...

export CARDANO_GRAPHQL_URL=https://cardano.santiment.net
export ZOOKEEPER_SESSION_TIMEOUT=20000
export IS_ETH=false
Expand Down
3 changes: 0 additions & 3 deletions src/blockchains/construct_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { ETHWorker } from './eth/eth_worker';
import { ETHBlocksWorker } from './eth_blocks/eth_blocks_worker';
import { ETHContractsWorker } from './eth_contracts/eth_contracts_worker';
import { MaticWorker } from './matic/matic_worker';
import { ReceiptsWorker } from './receipts/receipts_worker';
import { UTXOWorker } from './utxo/utxo_worker';
import { XRPWorker } from './xrp/xrp_worker';

Expand All @@ -22,8 +21,6 @@ export function constructWorker(blockchain: string, settings: any): BaseWorker {
return new ETHBlocksWorker(settings);
case 'matic':
return new MaticWorker(settings);
case 'receipts':
return new ReceiptsWorker(settings);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate worker is no longer needed. We would use the ETHWorker to extract receipts, we can do it like this:

KAFKA_TOPIC='native_token_transfers:eth_transfers_test_topic,receipts:receipts_test_topic'

If we want to extract transfers and receipts at once. If for some reason we want to have only receipts deploy, we can do:

KAFKA_TOPIC='receipts:receipts_test_topic'

case 'utxo':
return new UTXOWorker(settings);
case 'xrp':
Expand Down
14 changes: 7 additions & 7 deletions src/blockchains/erc20/erc20_worker.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
'use strict';
import { logger } from '../../lib/logger';
import { Exporter } from '../../lib/kafka_storage';
import { KafkaStorage } from '../../lib/kafka_storage';
import { constructRPCClient } from '../../lib/http_client';
import { extendEventsWithPrimaryKey } from './lib/extend_events_key';
import { ContractOverwrite, changeContractAddresses, extractChangedContractAddresses } from './lib/contract_overwrite';
import { stableSort, readJsonFile } from './lib/util';
import { BaseWorker } from '../../lib/worker_base';
import { BaseWorker, WorkResult, WorkResultMultiMode } from '../../lib/worker_base';
import { nextIntervalCalculator, setWorkerSleepTime, analyzeWorkerContext, NO_WORK_SLEEP } from '../eth/lib/next_interval_calculator';
import { Web3Interface, constructWeb3Wrapper } from '../eth/lib/web3_wrapper';
import { TimestampsCache } from './lib/timestamps_cache';
Expand Down Expand Up @@ -59,7 +59,7 @@ export class ERC20Worker extends BaseWorker {
this.allOldContracts = [];
}

async init(exporter?: Exporter) {
async init(storage: KafkaStorage | Map<string, KafkaStorage>) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Workers receive information for which output Kafka topics need to be filled and produce the needed data.

this.lastConfirmedBlock = await this.web3Wrapper.getBlockNumber() - this.settings.CONFIRMATIONS;

if (this.settings.EXPORT_BLOCKS_LIST) {
Expand All @@ -84,10 +84,10 @@ export class ERC20Worker extends BaseWorker {
}

if (this.settings.EVENTS_IN_SAME_PARTITION) {
if (exporter === undefined) {
throw Error('Exporter reference need to be provided for events in same partition')
if (!(storage instanceof KafkaStorage)) {
throw Error('Single Kafka storage needs to be provided for events in same partition')
}
await exporter.initPartitioner((event: any) => simpleHash(event.contract));
await storage.initPartitioner((event: any) => simpleHash(event.contract));
}
}

Expand All @@ -112,7 +112,7 @@ export class ERC20Worker extends BaseWorker {
};
}

async work() {
async work(): Promise<WorkResult | WorkResultMultiMode> {
const workerContext = await analyzeWorkerContext(this);
setWorkerSleepTime(this, workerContext);
if (workerContext === NO_WORK_SLEEP) return [];
Expand Down
91 changes: 66 additions & 25 deletions src/blockchains/eth/eth_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,25 @@ import { constructRPCClient } from '../../lib/http_client';
import { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } from './lib/dao_hack';
import { getGenesisTransfers } from './lib/genesis_transfers';
import { transactionOrder, stableSort } from './lib/util';
import { BaseWorker } from '../../lib/worker_base';
import { BaseWorker, WorkResult, WorkResultMultiMode } from '../../lib/worker_base';
import { Web3Interface, constructWeb3Wrapper, safeCastToNumber } from './lib/web3_wrapper';
import { decodeTransferTrace } from './lib/decode_transfers';
import { FeesDecoder } from './lib/fees_decoder';
import { nextIntervalCalculator, analyzeWorkerContext, setWorkerSleepTime, NO_WORK_SLEEP } from './lib/next_interval_calculator';
import { WithdrawalsDecoder } from './lib/withdrawals_decoder';
import { fetchEthInternalTrx, fetchBlocks, fetchReceipts } from './lib/fetch_data';
import { HTTPClientInterface } from '../../types';
import { Trace, ETHBlock, ETHTransfer, ETHReceiptsMap } from './eth_types';
import { Trace, ETHBlock, ETHTransfer, ETHReceipt } from './eth_types';
import { EOB, collectEndOfBlocks } from './lib/end_of_block';

import { assertIsDefined, parseKafkaTopicToObject } from '../../lib/utils';
import { decodeReceipt } from './lib/helper_receipts'

export class ETHWorker extends BaseWorker {
private web3Wrapper: Web3Interface;
private ethClient: HTTPClientInterface;
private feesDecoder: FeesDecoder;
private withdrawalsDecoder: WithdrawalsDecoder;
private modes: string[];

constructor(settings: any) {
super(settings);
Expand All @@ -31,19 +33,18 @@ export class ETHWorker extends BaseWorker {

this.feesDecoder = new FeesDecoder(this.web3Wrapper);
this.withdrawalsDecoder = new WithdrawalsDecoder(this.web3Wrapper);
this.modes = [];
}

async fetchData(fromBlock: number, toBlock: number): Promise<[Trace[], Map<number, ETHBlock>, ETHReceiptsMap]> {
return await Promise.all([
fetchEthInternalTrx(this.ethClient, this.web3Wrapper, fromBlock, toBlock),
fetchBlocks(this.ethClient, this.web3Wrapper, fromBlock, toBlock, true),
fetchReceipts(this.ethClient, this.web3Wrapper,
this.settings.RECEIPTS_API_METHOD, fromBlock, toBlock),
]);
async fetchData(fromBlock: number, toBlock: number): Promise<[Trace[], Map<number, ETHBlock>, ETHReceipt[]]> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We fetch raw data depending on what outputs are needed. Some basic raw data like block times is needed for any output.

const traces: Promise<Trace[]> = this.isTracesNeeded() ? fetchEthInternalTrx(this.ethClient, this.web3Wrapper, fromBlock, toBlock) : Promise.resolve([]);
const blocks: Promise<Map<number, ETHBlock>> = fetchBlocks(this.ethClient, this.web3Wrapper, fromBlock, toBlock, true);
const receipts: Promise<ETHReceipt[]> = this.isReceiptsNeeded() ? fetchReceipts(this.ethClient, this.web3Wrapper,
this.settings.RECEIPTS_API_METHOD, fromBlock, toBlock) : Promise.resolve([]);
return await Promise.all([traces, blocks, receipts]);
}

transformPastEvents(fromBlock: number, toBlock: number, traces: Trace[],
blocks: any, receipts: ETHReceiptsMap): ETHTransfer[] {
transformPastEvents(fromBlock: number, toBlock: number, traces: Trace[], blocks: any, receipts: ETHReceipt[]): ETHTransfer[] {
let events: ETHTransfer[] = [];
if (fromBlock === 0) {
logger.info('Adding the GENESIS transfers');
Expand Down Expand Up @@ -78,7 +79,7 @@ export class ETHWorker extends BaseWorker {
return result;
}

transformPastTransactionEvents(blocks: ETHBlock[], receipts: ETHReceiptsMap): ETHTransfer[] {
transformPastTransactionEvents(blocks: ETHBlock[], receipts: ETHReceipt[]): ETHTransfer[] {
const result: ETHTransfer[] = [];

for (const block of blocks) {
Expand All @@ -95,31 +96,71 @@ export class ETHWorker extends BaseWorker {
return result;
}

async work(): Promise<(ETHTransfer | EOB)[]> {
isReceiptsNeeded(): boolean {
return this.modes.includes(this.settings.NATIVE_TOKEN_MODE) || this.modes.includes(this.settings.RECEIPTS_MODE)
}

isTracesNeeded(): boolean {
return this.modes.includes(this.settings.NATIVE_TOKEN_MODE)
}

async work(): Promise<WorkResultMultiMode> {
const result: WorkResultMultiMode = {};
const workerContext = await analyzeWorkerContext(this);
setWorkerSleepTime(this, workerContext);
if (workerContext === NO_WORK_SLEEP) return [];
if (workerContext === NO_WORK_SLEEP) return result;

const { fromBlock, toBlock } = nextIntervalCalculator(this);
logger.info(`Fetching transfer events for interval ${fromBlock}:${toBlock}`);
const [traces, blocks, receipts] = await this.fetchData(fromBlock, toBlock);
let events: (ETHTransfer | EOB)[] = this.transformPastEvents(fromBlock, toBlock, traces, blocks, receipts);

events.push(...collectEndOfBlocks(fromBlock, toBlock, blocks, this.web3Wrapper))
if (events.length > 0) {
stableSort(events, transactionOrder);
extendEventsWithPrimaryKey(events, this.lastPrimaryKey);
logger.info(`Fetching events for interval ${fromBlock}:${toBlock}`);

const [traces, blocks, receipts] = await this.fetchData(fromBlock, toBlock);

this.lastPrimaryKey += events.length;
}

this.lastExportedBlock = toBlock;

return events;
assertIsDefined(blocks, "Blocks are needed for extraction");
if (this.modes.includes(this.settings.NATIVE_TOKEN_MODE)) {
assertIsDefined(traces, "Traces are needed for native token transfers");
assertIsDefined(receipts, "Receipts are needed for native token transfers");

const events: (ETHTransfer | EOB)[] = this.transformPastEvents(fromBlock, toBlock, traces, blocks, receipts);

events.push(...collectEndOfBlocks(fromBlock, toBlock, blocks, this.web3Wrapper))
if (events.length > 0) {
stableSort(events, transactionOrder);
extendEventsWithPrimaryKey(events, this.lastPrimaryKey);

this.lastPrimaryKey += events.length;
}

result[this.settings.NATIVE_TOKEN_MODE] = events;
}
if (this.modes.includes(this.settings.RECEIPTS_MODE)) {
assertIsDefined(receipts, "Receipts are needed for receipts extraction");
assertIsDefined(blocks, "Blocks are needed for extraction");
const decodedReceipts = receipts.map((receipt: any) => decodeReceipt(receipt, this.web3Wrapper));
decodedReceipts.forEach(receipt => {
const block = blocks.get(receipt.blockNumber)
assertIsDefined(block, `Block ${receipt.blockNumber} is missing`)
receipt['timestamp'] = block.timestamp;
});
result[this.settings.RECEIPTS_MODE] = decodedReceipts;
}

return result;
}

async init(): Promise<void> {
this.lastConfirmedBlock = await this.web3Wrapper.getBlockNumber() - this.settings.CONFIRMATIONS;

if (!this.settings.KAFKA_TOPIC.includes(":")) {
throw new Error("ETH worker, expects KAFKA_TOPIC in mode:name format")
}
else {
const mapping = parseKafkaTopicToObject(this.settings.KAFKA_TOPIC)
this.modes = Object.keys(mapping);
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/blockchains/eth/lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@ export const BLOCK_INTERVAL = getIntEnvVariable('BLOCK_INTERVAL', 100);
export const RECEIPTS_API_METHOD = process.env.RECEIPTS_API_METHOD || 'eth_getBlockReceipts';
export const NODE_URL = process.env.NODE_URL || 'http://localhost:8545/';
export const LOOP_INTERVAL_CURRENT_MODE_SEC = getIntEnvVariable('LOOP_INTERVAL_CURRENT_MODE_SEC', 30);
export const NATIVE_TOKEN_MODE = 'native_token_transfers';
export const RECEIPTS_MODE = 'receipts';


14 changes: 9 additions & 5 deletions src/blockchains/eth/lib/fees_decoder.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import assert from 'assert'
import { Web3Interface, safeCastToNumber } from './web3_wrapper';
import { ETHBlock, ETHTransaction, ETHTransfer, ETHReceiptsMap } from '../eth_types';
import { ETHBlock, ETHTransaction, ETHTransfer, ETHReceipt, ETHReceiptsMap } from '../eth_types';
import { BURN_ADDRESS, LONDON_FORK_BLOCK } from './constants';


Expand All @@ -15,7 +15,7 @@ export class FeesDecoder {
this.web3Wrapper = web3Wrapper;
}

getPreLondonForkFees(transaction: ETHTransaction, block: ETHBlock, receipts: any): ETHTransfer[] {
getPreLondonForkFees(transaction: ETHTransaction, block: ETHBlock, receipts: ETHReceiptsMap): ETHTransfer[] {
const gasExpense = BigInt(this.web3Wrapper.parseHexToNumber(transaction.gasPrice)) *
BigInt(this.web3Wrapper.parseHexToNumber(receipts[transaction.hash].gasUsed));
return [{
Expand Down Expand Up @@ -94,15 +94,19 @@ export class FeesDecoder {
return result;
}

getFeesFromTransactionsInBlock(block: ETHBlock, blockNumber: number, receipts: ETHReceiptsMap, isETH: boolean): ETHTransfer[] {
getFeesFromTransactionsInBlock(block: ETHBlock, blockNumber: number, receipts: ETHReceipt[], isETH: boolean): ETHTransfer[] {
const result: ETHTransfer[] = [];
const receiptsMap: ETHReceiptsMap = {};
receipts.forEach((receipt: ETHReceipt) => {
receiptsMap[receipt.transactionHash] = receipt;
});
block.transactions.forEach((transaction: ETHTransaction | string) => {
assert(isETHTransaction(transaction), "To get fees, ETH transaction should be expanded and not just the hash.");

const feeTransfers: ETHTransfer[] =
isETH && blockNumber >= LONDON_FORK_BLOCK ?
this.getPostLondonForkFees(transaction, block, receipts) :
this.getPreLondonForkFees(transaction, block, receipts);
this.getPostLondonForkFees(transaction, block, receiptsMap) :
this.getPreLondonForkFees(transaction, block, receiptsMap);

result.push(...feeTransfers);
});
Expand Down
13 changes: 7 additions & 6 deletions src/blockchains/eth/lib/fetch_data.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { filterErrors } from './filter_errors';
import { Web3Interface } from './web3_wrapper';
import { Trace, ETHBlock, ETHReceiptsMap, ETHReceipt } from '../eth_types';
import { Trace, ETHBlock, ETHReceipt } from '../eth_types';
import { HTTPClientInterface } from '../../../types'


Expand All @@ -15,13 +15,14 @@ export function parseEthInternalTrx(result: Trace[]): Trace[] {
);
}

export function fetchEthInternalTrx(ethClient: HTTPClientInterface,
export async function fetchEthInternalTrx(ethClient: HTTPClientInterface,
web3Wrapper: Web3Interface, fromBlock: number, toBlock: number): Promise<Trace[]> {
const filterParams = {
fromBlock: web3Wrapper.parseNumberToHex(fromBlock),
toBlock: web3Wrapper.parseNumberToHex(toBlock)
};
return ethClient.request('trace_filter', [filterParams]).then((data: any) => parseEthInternalTrx(data['result']));
const data: any = await ethClient.request('trace_filter', [filterParams]);
return parseEthInternalTrx(data['result']);
}

export async function fetchBlocks(ethClient: HTTPClientInterface,
Expand All @@ -43,7 +44,7 @@ export async function fetchBlocks(ethClient: HTTPClientInterface,
}

export async function fetchReceipts(ethClient: HTTPClientInterface,
web3Wrapper: Web3Interface, receiptsAPIMethod: string, fromBlock: number, toBlock: number): Promise<ETHReceiptsMap> {
web3Wrapper: Web3Interface, receiptsAPIMethod: string, fromBlock: number, toBlock: number): Promise<ETHReceipt[]> {
const batch: any[] = [];
for (let currBlock = fromBlock; currBlock <= toBlock; currBlock++) {
batch.push(
Expand All @@ -54,12 +55,12 @@ export async function fetchReceipts(ethClient: HTTPClientInterface,
);
}
const finishedRequests = await ethClient.requestBulk(batch);
const result: ETHReceiptsMap = {};
const result: ETHReceipt[] = [];

finishedRequests.forEach((response: any) => {
if (response.result) {
response.result.forEach((receipt: ETHReceipt) => {
result[receipt.transactionHash] = receipt;
result.push(receipt);
});
}
else {
Expand Down
Loading