diff --git a/bin/export_env_vars.sh b/bin/export_env_vars.sh index 1624a4f3..c6aea074 100755 --- a/bin/export_env_vars.sh +++ b/bin/export_env_vars.sh @@ -2,11 +2,12 @@ export KAFKA_URL=kafka-hz.stage.san:30911 export ZOOKEEPER_URL=zookeeper-hz.stage.san:30921 export NODE_URL=https://ethereum.santiment.net export START_BLOCK="15676731" -export BLOCK_INTERVAL="50" +export BLOCK_INTERVAL="5" export EXPORT_TIMEOUT_MLS=300000 export CONTRACT_MODE="extract_exact_overwrite" export BLOCKCHAIN="eth" -export KAFKA_TOPIC="erc20_exporter_test_topic" +#export KAFKA_TOPIC="erc20_exporter_test_topic" +export KAFKA_TOPIC='native_token_transfers:erc20_exporter_test_topic' export CARDANO_GRAPHQL_URL=https://cardano.santiment.net export ZOOKEEPER_SESSION_TIMEOUT=20000 export IS_ETH=false diff --git a/src/blockchains/construct_worker.ts b/src/blockchains/construct_worker.ts index 3c266cf5..ad1062fe 100644 --- a/src/blockchains/construct_worker.ts +++ b/src/blockchains/construct_worker.ts @@ -6,7 +6,6 @@ import { ETHWorker } from './eth/eth_worker'; import { ETHBlocksWorker } from './eth_blocks/eth_blocks_worker'; import { ETHContractsWorker } from './eth_contracts/eth_contracts_worker'; import { MaticWorker } from './matic/matic_worker'; -import { ReceiptsWorker } from './receipts/receipts_worker'; import { UTXOWorker } from './utxo/utxo_worker'; import { XRPWorker } from './xrp/xrp_worker'; @@ -22,8 +21,6 @@ export function constructWorker(blockchain: string, settings: any): BaseWorker { return new ETHBlocksWorker(settings); case 'matic': return new MaticWorker(settings); - case 'receipts': - return new ReceiptsWorker(settings); case 'utxo': return new UTXOWorker(settings); case 'xrp': diff --git a/src/blockchains/erc20/erc20_worker.ts b/src/blockchains/erc20/erc20_worker.ts index a3f1e789..71ab5644 100644 --- a/src/blockchains/erc20/erc20_worker.ts +++ b/src/blockchains/erc20/erc20_worker.ts @@ -1,11 +1,11 @@ 'use strict'; import { logger } from '../../lib/logger'; -import { Exporter } from '../../lib/kafka_storage'; +import { KafkaStorage } from '../../lib/kafka_storage'; import { constructRPCClient } from '../../lib/http_client'; import { extendEventsWithPrimaryKey } from './lib/extend_events_key'; import { ContractOverwrite, changeContractAddresses, extractChangedContractAddresses } from './lib/contract_overwrite'; import { stableSort, readJsonFile } from './lib/util'; -import { BaseWorker } from '../../lib/worker_base'; +import { BaseWorker, WorkResult, WorkResultMultiMode } from '../../lib/worker_base'; import { nextIntervalCalculator, setWorkerSleepTime, analyzeWorkerContext, NO_WORK_SLEEP } from '../eth/lib/next_interval_calculator'; import { Web3Interface, constructWeb3Wrapper } from '../eth/lib/web3_wrapper'; import { TimestampsCache } from './lib/timestamps_cache'; @@ -59,7 +59,7 @@ export class ERC20Worker extends BaseWorker { this.allOldContracts = []; } - async init(exporter?: Exporter) { + async init(storage: KafkaStorage | Map) { this.lastConfirmedBlock = await this.web3Wrapper.getBlockNumber() - this.settings.CONFIRMATIONS; if (this.settings.EXPORT_BLOCKS_LIST) { @@ -84,10 +84,10 @@ export class ERC20Worker extends BaseWorker { } if (this.settings.EVENTS_IN_SAME_PARTITION) { - if (exporter === undefined) { - throw Error('Exporter reference need to be provided for events in same partition') + if (!(storage instanceof KafkaStorage)) { + throw Error('Single Kafka storage needs to be provided for events in same partition') } - await exporter.initPartitioner((event: any) => simpleHash(event.contract)); + await storage.initPartitioner((event: any) => simpleHash(event.contract)); } } @@ -112,7 +112,7 @@ export class ERC20Worker extends BaseWorker { }; } - async work() { + async work(): Promise { const workerContext = await analyzeWorkerContext(this); setWorkerSleepTime(this, workerContext); if (workerContext === NO_WORK_SLEEP) return []; diff --git a/src/blockchains/eth/eth_types.ts b/src/blockchains/eth/eth_types.ts index a505999d..531fdac6 100644 --- a/src/blockchains/eth/eth_types.ts +++ b/src/blockchains/eth/eth_types.ts @@ -91,6 +91,18 @@ export type ETHReceipt = { transactionIndex: string } +export type ETHReceiptDecoded = { + blockNumber: number, + blockHash: string, + gasUsed: string, + transactionHash: string, + cumulativeGasUsed: string, + logs: any[], + transactionIndex: number, + status: number, + timestamp: string +} + export type ETHReceiptsMap = { [transactionHash: string]: ETHReceipt; } diff --git a/src/blockchains/eth/eth_worker.ts b/src/blockchains/eth/eth_worker.ts index 8fdda996..6f1fafa1 100644 --- a/src/blockchains/eth/eth_worker.ts +++ b/src/blockchains/eth/eth_worker.ts @@ -3,7 +3,7 @@ import { constructRPCClient } from '../../lib/http_client'; import { injectDAOHackTransfers, DAO_HACK_FORK_BLOCK } from './lib/dao_hack'; import { getGenesisTransfers } from './lib/genesis_transfers'; import { transactionOrder, stableSort } from './lib/util'; -import { BaseWorker } from '../../lib/worker_base'; +import { BaseWorker, WorkResult, WorkResultMultiMode } from '../../lib/worker_base'; import { Web3Interface, constructWeb3Wrapper, safeCastToNumber } from './lib/web3_wrapper'; import { decodeTransferTrace } from './lib/decode_transfers'; import { FeesDecoder } from './lib/fees_decoder'; @@ -11,15 +11,17 @@ import { nextIntervalCalculator, analyzeWorkerContext, setWorkerSleepTime, NO_WO import { WithdrawalsDecoder } from './lib/withdrawals_decoder'; import { fetchEthInternalTrx, fetchBlocks, fetchReceipts } from './lib/fetch_data'; import { HTTPClientInterface } from '../../types'; -import { Trace, ETHBlock, ETHTransfer, ETHReceiptsMap } from './eth_types'; +import { Trace, ETHBlock, ETHTransfer, ETHReceipt } from './eth_types'; import { EOB, collectEndOfBlocks } from './lib/end_of_block'; - +import { assertIsDefined, parseKafkaTopicToObject } from '../../lib/utils'; +import { decodeReceipt } from './lib/helper_receipts' export class ETHWorker extends BaseWorker { private web3Wrapper: Web3Interface; private ethClient: HTTPClientInterface; private feesDecoder: FeesDecoder; private withdrawalsDecoder: WithdrawalsDecoder; + private modes: string[]; constructor(settings: any) { super(settings); @@ -31,19 +33,18 @@ export class ETHWorker extends BaseWorker { this.feesDecoder = new FeesDecoder(this.web3Wrapper); this.withdrawalsDecoder = new WithdrawalsDecoder(this.web3Wrapper); + this.modes = []; } - async fetchData(fromBlock: number, toBlock: number): Promise<[Trace[], Map, ETHReceiptsMap]> { - return await Promise.all([ - fetchEthInternalTrx(this.ethClient, this.web3Wrapper, fromBlock, toBlock), - fetchBlocks(this.ethClient, this.web3Wrapper, fromBlock, toBlock, true), - fetchReceipts(this.ethClient, this.web3Wrapper, - this.settings.RECEIPTS_API_METHOD, fromBlock, toBlock), - ]); + async fetchData(fromBlock: number, toBlock: number): Promise<[Trace[], Map, ETHReceipt[]]> { + const traces: Promise = this.isTracesNeeded() ? fetchEthInternalTrx(this.ethClient, this.web3Wrapper, fromBlock, toBlock) : Promise.resolve([]); + const blocks: Promise> = fetchBlocks(this.ethClient, this.web3Wrapper, fromBlock, toBlock, true); + const receipts: Promise = this.isReceiptsNeeded() ? fetchReceipts(this.ethClient, this.web3Wrapper, + this.settings.RECEIPTS_API_METHOD, fromBlock, toBlock) : Promise.resolve([]); + return await Promise.all([traces, blocks, receipts]); } - transformPastEvents(fromBlock: number, toBlock: number, traces: Trace[], - blocks: any, receipts: ETHReceiptsMap): ETHTransfer[] { + transformPastEvents(fromBlock: number, toBlock: number, traces: Trace[], blocks: any, receipts: ETHReceipt[]): ETHTransfer[] { let events: ETHTransfer[] = []; if (fromBlock === 0) { logger.info('Adding the GENESIS transfers'); @@ -78,7 +79,7 @@ export class ETHWorker extends BaseWorker { return result; } - transformPastTransactionEvents(blocks: ETHBlock[], receipts: ETHReceiptsMap): ETHTransfer[] { + transformPastTransactionEvents(blocks: ETHBlock[], receipts: ETHReceipt[]): ETHTransfer[] { const result: ETHTransfer[] = []; for (const block of blocks) { @@ -95,17 +96,23 @@ export class ETHWorker extends BaseWorker { return result; } - async work(): Promise<(ETHTransfer | EOB)[]> { - const workerContext = await analyzeWorkerContext(this); - setWorkerSleepTime(this, workerContext); - if (workerContext === NO_WORK_SLEEP) return []; + isReceiptsNeeded(): boolean { + return this.modes.includes(this.settings.NATIVE_TOKEN_MODE) || this.modes.includes(this.settings.RECEIPTS_MODE) + } - const { fromBlock, toBlock } = nextIntervalCalculator(this); - logger.info(`Fetching transfer events for interval ${fromBlock}:${toBlock}`); - const [traces, blocks, receipts] = await this.fetchData(fromBlock, toBlock); - let events: (ETHTransfer | EOB)[] = this.transformPastEvents(fromBlock, toBlock, traces, blocks, receipts); + isTracesNeeded(): boolean { + return this.modes.includes(this.settings.NATIVE_TOKEN_MODE) + } + + getTransfersOutput(fromBlock: number, toBlock: number, traces: Trace[], + blocks: Map, receipts: ETHReceipt[], endOfBlockEvents: EOB[]): WorkResult { + assertIsDefined(traces, "Traces are needed for native token transfers"); + assertIsDefined(receipts, "Receipts are needed for native token transfers"); - events.push(...collectEndOfBlocks(fromBlock, toBlock, blocks, this.web3Wrapper)) + const events: (ETHTransfer | EOB)[] = this.transformPastEvents(fromBlock, toBlock, traces, blocks, receipts); + + + events.push(...endOfBlockEvents) if (events.length > 0) { stableSort(events, transactionOrder); extendEventsWithPrimaryKey(events, this.lastPrimaryKey); @@ -113,13 +120,60 @@ export class ETHWorker extends BaseWorker { this.lastPrimaryKey += events.length; } + return events; + } + + getReceiptsOutput(blocks: Map, receipts: ETHReceipt[]): WorkResult { + assertIsDefined(receipts, "Receipts are needed for receipts extraction"); + assertIsDefined(blocks, "Blocks are needed for extraction"); + const decodedReceipts = receipts.map((receipt: any) => decodeReceipt(receipt, this.web3Wrapper)); + decodedReceipts.forEach(receipt => { + const block = blocks.get(receipt.blockNumber) + assertIsDefined(block, `Block ${receipt.blockNumber} is missing`) + receipt.timestamp = block.timestamp; + }); + return decodedReceipts; + } + + async work(): Promise { + const result: WorkResultMultiMode = {}; + const workerContext = await analyzeWorkerContext(this); + setWorkerSleepTime(this, workerContext); + if (workerContext === NO_WORK_SLEEP) return result; + + const { fromBlock, toBlock } = nextIntervalCalculator(this); + + logger.info(`Fetching events for interval ${fromBlock}:${toBlock}`); + + const [traces, blocks, receipts] = await this.fetchData(fromBlock, toBlock); + + this.lastExportedBlock = toBlock; - return events; + assertIsDefined(blocks, "Blocks are needed for extraction"); + // TODO consider if EOB events should also be present in other output topics + const endOfBlockEvents = collectEndOfBlocks(fromBlock, toBlock, blocks, this.web3Wrapper) + if (this.modes.includes(this.settings.NATIVE_TOKEN_MODE)) { + result[this.settings.NATIVE_TOKEN_MODE] = this.getTransfersOutput(fromBlock, toBlock, traces, blocks, + receipts, endOfBlockEvents); + } + if (this.modes.includes(this.settings.RECEIPTS_MODE)) { + result[this.settings.RECEIPTS_MODE] = this.getReceiptsOutput(blocks, receipts); + } + + return result; } async init(): Promise { this.lastConfirmedBlock = await this.web3Wrapper.getBlockNumber() - this.settings.CONFIRMATIONS; + + if (!this.settings.KAFKA_TOPIC.includes(":")) { + throw new Error("ETH worker, expects KAFKA_TOPIC in mode:name format") + } + else { + const mapping = parseKafkaTopicToObject(this.settings.KAFKA_TOPIC) + this.modes = Object.keys(mapping); + } } } diff --git a/src/blockchains/eth/lib/constants.ts b/src/blockchains/eth/lib/constants.ts index 4f692688..1205bcea 100644 --- a/src/blockchains/eth/lib/constants.ts +++ b/src/blockchains/eth/lib/constants.ts @@ -10,5 +10,7 @@ export const BLOCK_INTERVAL = getIntEnvVariable('BLOCK_INTERVAL', 100); export const RECEIPTS_API_METHOD = process.env.RECEIPTS_API_METHOD || 'eth_getBlockReceipts'; export const NODE_URL = process.env.NODE_URL || 'http://localhost:8545/'; export const LOOP_INTERVAL_CURRENT_MODE_SEC = getIntEnvVariable('LOOP_INTERVAL_CURRENT_MODE_SEC', 30); +export const NATIVE_TOKEN_MODE = 'native_token_transfers'; +export const RECEIPTS_MODE = 'receipts'; diff --git a/src/blockchains/eth/lib/fees_decoder.ts b/src/blockchains/eth/lib/fees_decoder.ts index 05dc1885..256d28b0 100644 --- a/src/blockchains/eth/lib/fees_decoder.ts +++ b/src/blockchains/eth/lib/fees_decoder.ts @@ -1,6 +1,6 @@ import assert from 'assert' import { Web3Interface, safeCastToNumber } from './web3_wrapper'; -import { ETHBlock, ETHTransaction, ETHTransfer, ETHReceiptsMap } from '../eth_types'; +import { ETHBlock, ETHTransaction, ETHTransfer, ETHReceipt, ETHReceiptsMap } from '../eth_types'; import { BURN_ADDRESS, LONDON_FORK_BLOCK } from './constants'; @@ -15,7 +15,7 @@ export class FeesDecoder { this.web3Wrapper = web3Wrapper; } - getPreLondonForkFees(transaction: ETHTransaction, block: ETHBlock, receipts: any): ETHTransfer[] { + getPreLondonForkFees(transaction: ETHTransaction, block: ETHBlock, receipts: ETHReceiptsMap): ETHTransfer[] { const gasExpense = BigInt(this.web3Wrapper.parseHexToNumber(transaction.gasPrice)) * BigInt(this.web3Wrapper.parseHexToNumber(receipts[transaction.hash].gasUsed)); return [{ @@ -94,15 +94,19 @@ export class FeesDecoder { return result; } - getFeesFromTransactionsInBlock(block: ETHBlock, blockNumber: number, receipts: ETHReceiptsMap, isETH: boolean): ETHTransfer[] { + getFeesFromTransactionsInBlock(block: ETHBlock, blockNumber: number, receipts: ETHReceipt[], isETH: boolean): ETHTransfer[] { const result: ETHTransfer[] = []; + const receiptsMap: ETHReceiptsMap = {}; + receipts.forEach((receipt: ETHReceipt) => { + receiptsMap[receipt.transactionHash] = receipt; + }); block.transactions.forEach((transaction: ETHTransaction | string) => { assert(isETHTransaction(transaction), "To get fees, ETH transaction should be expanded and not just the hash."); const feeTransfers: ETHTransfer[] = isETH && blockNumber >= LONDON_FORK_BLOCK ? - this.getPostLondonForkFees(transaction, block, receipts) : - this.getPreLondonForkFees(transaction, block, receipts); + this.getPostLondonForkFees(transaction, block, receiptsMap) : + this.getPreLondonForkFees(transaction, block, receiptsMap); result.push(...feeTransfers); }); diff --git a/src/blockchains/eth/lib/fetch_data.ts b/src/blockchains/eth/lib/fetch_data.ts index d602a17f..5b5ed639 100644 --- a/src/blockchains/eth/lib/fetch_data.ts +++ b/src/blockchains/eth/lib/fetch_data.ts @@ -1,6 +1,6 @@ import { filterErrors } from './filter_errors'; import { Web3Interface } from './web3_wrapper'; -import { Trace, ETHBlock, ETHReceiptsMap, ETHReceipt } from '../eth_types'; +import { Trace, ETHBlock, ETHReceipt } from '../eth_types'; import { HTTPClientInterface } from '../../../types' @@ -15,13 +15,14 @@ export function parseEthInternalTrx(result: Trace[]): Trace[] { ); } -export function fetchEthInternalTrx(ethClient: HTTPClientInterface, +export async function fetchEthInternalTrx(ethClient: HTTPClientInterface, web3Wrapper: Web3Interface, fromBlock: number, toBlock: number): Promise { const filterParams = { fromBlock: web3Wrapper.parseNumberToHex(fromBlock), toBlock: web3Wrapper.parseNumberToHex(toBlock) }; - return ethClient.request('trace_filter', [filterParams]).then((data: any) => parseEthInternalTrx(data['result'])); + const data: any = await ethClient.request('trace_filter', [filterParams]); + return parseEthInternalTrx(data['result']); } export async function fetchBlocks(ethClient: HTTPClientInterface, @@ -43,7 +44,7 @@ export async function fetchBlocks(ethClient: HTTPClientInterface, } export async function fetchReceipts(ethClient: HTTPClientInterface, - web3Wrapper: Web3Interface, receiptsAPIMethod: string, fromBlock: number, toBlock: number): Promise { + web3Wrapper: Web3Interface, receiptsAPIMethod: string, fromBlock: number, toBlock: number): Promise { const batch: any[] = []; for (let currBlock = fromBlock; currBlock <= toBlock; currBlock++) { batch.push( @@ -54,12 +55,12 @@ export async function fetchReceipts(ethClient: HTTPClientInterface, ); } const finishedRequests = await ethClient.requestBulk(batch); - const result: ETHReceiptsMap = {}; + const result: ETHReceipt[] = []; finishedRequests.forEach((response: any) => { if (response.result) { response.result.forEach((receipt: ETHReceipt) => { - result[receipt.transactionHash] = receipt; + result.push(receipt); }); } else { diff --git a/src/blockchains/eth/lib/helper_receipts.ts b/src/blockchains/eth/lib/helper_receipts.ts new file mode 100644 index 00000000..87639f72 --- /dev/null +++ b/src/blockchains/eth/lib/helper_receipts.ts @@ -0,0 +1,63 @@ +import { ETHReceipt, ETHReceiptDecoded } from "../eth_types"; + +const lang = require('lodash/lang'); +const object = require('lodash/object'); +const collection = require('lodash/collection'); + +import { Web3Interface } from './web3_wrapper'; + +const decodeLog = (log: any, web3Wrapper: Web3Interface) => { + collection.forEach(['blockNumber', 'blockHash', 'transactionHash', 'transactionIndex'], + (key: string) => object.unset(log, key)); + + collection.forEach(['logIndex', 'transactionLogIndex'], + (key: string) => { + if (Object.prototype.hasOwnProperty.call(log, key) && log[key] !== undefined) { + log[key] = web3Wrapper.parseHexToNumber(log[key]); + } + else { + log[key] = null; + } + } + ); + + return log; +}; + +const columnizeLogs = (logs: any[], web3Wrapper: Web3Interface) => { + if (logs.length === 0) { return []; } + + const decodedLogs = collection.map(logs, (log: any) => decodeLog(log, web3Wrapper)); + const logKeys = object.keys(decodedLogs[0]); + const result: any = {}; + collection.forEach(logKeys, (key: string) => result[`logs.${key}`] = decodedLogs.map((log: any) => log[key])); + + return result; +}; + +export function decodeReceipt(receipt: ETHReceipt, web3Wrapper: Web3Interface): ETHReceiptDecoded { + const clonedReceipt = lang.cloneDeep(receipt); + + collection.forEach(['blockNumber', 'status', 'transactionIndex'], + (key: string) => { + if (Object.prototype.hasOwnProperty.call(clonedReceipt, key) && clonedReceipt[key] !== undefined) { + clonedReceipt[key] = web3Wrapper.parseHexToNumber(clonedReceipt[key]); + } + else { + clonedReceipt[key] = null; + } + } + ); + + collection.forEach(['cumulativeGasUsed', 'gasUsed'], + (key: string) => clonedReceipt[key] = web3Wrapper.parseHexToNumberString(clonedReceipt[key]) + ); + + object.merge(clonedReceipt, columnizeLogs(clonedReceipt['logs'], web3Wrapper)); + object.unset(clonedReceipt, 'logs'); + + return clonedReceipt; +}; + + + diff --git a/src/blockchains/receipts/lib/constants.ts b/src/blockchains/receipts/lib/constants.ts deleted file mode 100644 index 232c64c9..00000000 --- a/src/blockchains/receipts/lib/constants.ts +++ /dev/null @@ -1,10 +0,0 @@ -export const DRY_RUN = parseInt(process.env.DRY_RUN || '1'); -export const BLOCK_INTERVAL = parseInt(process.env.BLOCK_INTERVAL || '50'); -export const CONFIRMATIONS = parseInt(process.env.CONFIRMATIONS || '3'); -export const START_BLOCK = parseInt(process.env.START_BLOCK || '0'); -export const GET_RECEIPTS_ENDPOINT = process.env.GET_RECEIPTS_ENDPOINT || 'eth_getBlockReceipts'; -export const NODE_URL = process.env.NODE_URL || 'http://localhost:8545/'; -export const TRANSACTION = parseInt(process.env.TRANSACTION || '0'); -export const GET_BLOCK_ENDPOINT = process.env.GET_BLOCK_ENDPOINT || 'eth_getBlockByNumber'; -export const LOOP_INTERVAL_CURRENT_MODE_SEC = parseInt(process.env.LOOP_INTERVAL_CURRENT_MODE_SEC || '30'); - diff --git a/src/blockchains/receipts/lib/helper.js b/src/blockchains/receipts/lib/helper.js deleted file mode 100644 index a0e255e2..00000000 --- a/src/blockchains/receipts/lib/helper.js +++ /dev/null @@ -1,100 +0,0 @@ -const lang = require('lodash/lang'); -const array = require('lodash/array'); -const object = require('lodash/object'); -const collection = require('lodash/collection'); - -const parseReceipts = (responses) => { - const receipts = responses.map((response) => response['result']); - return array.compact(array.flatten(receipts)); -}; - -const parseBlocks = (responses) => { - return responses.map((response) => response['result']); -}; - -const parseTransactionReceipts = (responses) => { - const receipts = responses.map((response) => response['result']); - return receipts; -}; - -const decodeLog = (log, web3Wrapper) => { - collection.forEach(['blockNumber', 'blockHash', 'transactionHash', 'transactionIndex'], - key => object.unset(log, key)); - - collection.forEach(['logIndex', 'transactionLogIndex'], - key => { - if (Object.prototype.hasOwnProperty.call(log, key) && log[key] !== undefined) { - log[key] = web3Wrapper.parseHexToNumber(log[key]); - } - else { - log[key] = null; - } - } - ); - - return log; -}; - -const columnizeLogs = (logs, web3Wrapper) => { - if (logs.length === 0) { return []; } - - const decodedLogs = collection.map(logs, log => decodeLog(log, web3Wrapper)); - const logKeys = object.keys(decodedLogs[0]); - const result = {}; - collection.forEach(logKeys, key => result[`logs.${key}`] = decodedLogs.map(log => log[key])); - - return result; -}; - -const decodeReceipt = (receipt, web3Wrapper) => { - const clonedReceipt = lang.cloneDeep(receipt); - - collection.forEach(['blockNumber', 'status', 'transactionIndex'], - key => { - if (Object.prototype.hasOwnProperty.call(clonedReceipt, key) && clonedReceipt[key] !== undefined) { - clonedReceipt[key] = web3Wrapper.parseHexToNumber(clonedReceipt[key]); - } - else { - clonedReceipt[key] = null; - } - } - ); - - collection.forEach(['cumulativeGasUsed', 'gasUsed'], - key => clonedReceipt[key] = web3Wrapper.parseHexToNumberString(clonedReceipt[key]) - ); - - object.merge(clonedReceipt, columnizeLogs(clonedReceipt['logs'], web3Wrapper)); - object.unset(clonedReceipt, 'logs'); - - return clonedReceipt; -}; - -const decodeBlock = (block, web3Wrapper) => { - return { - timestamp: web3Wrapper.parseHexToNumber(block.timestamp), - number: web3Wrapper.parseHexToNumber(block.number) - }; -}; - -const prepareBlockTimestampsObject = (blocks) => { - let obj = {}; - for (const block of blocks) { obj[block.number] = block.timestamp; } - - return obj; -}; - -const setReceiptsTimestamp = async (receipts, timestamps) => { - return collection.forEach(receipts, receipt => receipt['timestamp'] = timestamps[receipt.blockNumber]); -}; - -module.exports = { - parseReceipts, - parseBlocks, - parseTransactionReceipts, - decodeLog, - decodeReceipt, - decodeBlock, - prepareBlockTimestampsObject, - setReceiptsTimestamp -}; diff --git a/src/blockchains/receipts/receipts_worker.ts b/src/blockchains/receipts/receipts_worker.ts deleted file mode 100644 index f21dca8f..00000000 --- a/src/blockchains/receipts/receipts_worker.ts +++ /dev/null @@ -1,107 +0,0 @@ -'use strict'; -import helper from './lib/helper'; -import { logger } from '../../lib/logger'; -import { constructRPCClient } from '../../lib/http_client'; -import { BaseWorker } from '../../lib/worker_base'; -import { Web3Interface, constructWeb3Wrapper } from '../eth/lib/web3_wrapper'; -import { HTTPClientInterface } from '../../types'; -import { nextIntervalCalculator, analyzeWorkerContext, setWorkerSleepTime, NO_WORK_SLEEP } from '../eth/lib/next_interval_calculator'; - - -export class ReceiptsWorker extends BaseWorker { - private web3Wrapper: Web3Interface; - private client: HTTPClientInterface; - - constructor(settings: any) { - super(settings); - - logger.info(`Connecting to node ${settings.NODE_URL}`); - this.web3Wrapper = constructWeb3Wrapper(settings.NODE_URL, settings.RPC_USERNAME, settings.RPC_PASSWORD); - this.client = constructRPCClient(settings.NODE_URL, settings.RPC_USERNAME, settings.RPC_PASSWORD, - settings.DEFAULT_TIMEOUT); - } - - async init() { - this.lastConfirmedBlock = await this.web3Wrapper.getBlockNumber() - this.settings.CONFIRMATIONS; - } - - async fetchBlockTimestamps(fromBlock: number, toBlock: number) { - const batch = []; - for (let i = fromBlock; i < toBlock + 1; i++) { - batch.push( - this.client.generateRequest( - this.settings.GET_BLOCK_ENDPOINT, - [this.web3Wrapper.parseNumberToHex(i), - true] - ) - ); - } - - return this.client.requestBulk(batch).then((responses) => helper.parseBlocks(responses)); - } - - async fetchReceiptsFromTransaction(blocks: any[]) { - var batch = []; - for (let block = 0; block < blocks.length; block++) { - var transactions = blocks[block]['transactions']; - if (transactions.length === 0) continue; - for (let trx = 0; trx < transactions.length; trx++) { - var transactionHash = transactions[trx]['hash']; - batch.push( - this.client.generateRequest( - this.settings.GET_RECEIPTS_ENDPOINT, - [transactionHash] - ) - ); - } - } - return (!batch.length) ? [] : this.client.requestBulk(batch).then((responses) => helper.parseTransactionReceipts(responses)); - } - - async getReceiptsForBlocks(fromBlock: number, toBlock: number) { - logger.info(`Fetching blocks ${fromBlock}:${toBlock}`); - const blocks = await this.fetchBlockTimestamps(fromBlock, toBlock); - let receipts; - - if (!this.settings.TRANSACTION) { - receipts = await this.fetchReceipts(fromBlock, toBlock); - } - else { - receipts = await this.fetchReceiptsFromTransaction(blocks); - } - const decodedReceipts = receipts.map((receipt: any) => helper.decodeReceipt(receipt, this.web3Wrapper)); - const decodedBlocks = blocks.map((block: any) => helper.decodeBlock(block, this.web3Wrapper)); - const timestamps = helper.prepareBlockTimestampsObject(decodedBlocks); - - return helper.setReceiptsTimestamp(decodedReceipts, timestamps); - } - - async fetchReceipts(fromBlock: number, toBlock: number) { - const batch = []; - for (let i = fromBlock; i <= toBlock; i++) { - batch.push( - this.client.generateRequest( - this.settings.GET_RECEIPTS_ENDPOINT, - [this.web3Wrapper.parseNumberToHex(i)] - ) - ); - } - return this.client.requestBulk(batch).then((responses) => helper.parseReceipts(responses)); - } - - async work() { - const workerContext = await analyzeWorkerContext(this); - setWorkerSleepTime(this, workerContext); - if (workerContext === NO_WORK_SLEEP) return []; - - const { fromBlock, toBlock } = nextIntervalCalculator(this); - logger.info(`Fetching receipts for interval ${fromBlock}:${toBlock}`); - const receipts = await this.getReceiptsForBlocks(fromBlock, toBlock); - - this.lastExportedBlock = toBlock; - - return receipts; - } -} - - diff --git a/src/blockchains/utxo/utxo_worker.ts b/src/blockchains/utxo/utxo_worker.ts index af1c5039..6d5793c9 100644 --- a/src/blockchains/utxo/utxo_worker.ts +++ b/src/blockchains/utxo/utxo_worker.ts @@ -1,8 +1,8 @@ 'use strict'; import { logger } from '../../lib/logger'; import { constructRPCClient } from '../../lib/http_client'; -import { BaseWorker } from '../../lib/worker_base'; -import { Exporter } from '../../lib/kafka_storage'; +import { BaseWorker, WorkResult, WorkResultMultiMode } from '../../lib/worker_base'; +import { KafkaStorage } from '../../lib/kafka_storage'; import { HTTPClientInterface } from '../../types'; @@ -33,10 +33,13 @@ export class UTXOWorker extends BaseWorker { this.client = constructRPCClient(this.NODE_URL, this.RPC_USERNAME, this.RPC_PASSWORD, this.DEFAULT_TIMEOUT); } - async init(exporter: Exporter) { + async init(storage: KafkaStorage | Map) { const blockchainInfo = await this.sendRequestWithRetry('getblockchaininfo', []); this.lastConfirmedBlock = blockchainInfo.blocks - this.CONFIRMATIONS; - await exporter.initPartitioner((event: any) => event['height']); + if (!(storage instanceof KafkaStorage)) { + throw Error('Single Kafka storage needs to be provided for UTXO exporter') + } + await storage.initPartitioner((event: any) => event['height']); } async sendRequest(method: string, params: any) { @@ -80,7 +83,7 @@ export class UTXOWorker extends BaseWorker { return await this.sendRequestWithRetry('getblock', [blockHash, 2]); } - async work() { + async work(): Promise { if (this.lastConfirmedBlock === this.lastExportedBlock) { this.sleepTimeMsec = this.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000; diff --git a/src/e2e/producer-transaction.spec.ts b/src/e2e/producer-transaction.spec.ts index 89b74a37..548583c3 100644 --- a/src/e2e/producer-transaction.spec.ts +++ b/src/e2e/producer-transaction.spec.ts @@ -1,4 +1,4 @@ -import { Exporter } from '../lib/kafka_storage'; +import { KafkaStorage } from '../lib/kafka_storage'; import Kafka from 'node-rdkafka'; const KAFKA_URL: string = assertStringEnv(process.env.KAFKA_URL); const KAFKA_TOPIC: string = assertStringEnv(process.env.KAFKA_TOPIC); @@ -81,26 +81,26 @@ class TestConsumer { describe('Producer transactions', function () { - let exporter: Exporter; + let kafkaStorage: KafkaStorage; let testConsumer: TestConsumer; let num_messages_test = 3; beforeEach(function (done) { this.timeout(20000); - exporter = new Exporter('test-exporter', true, KAFKA_TOPIC); - exporter.connect().then(() => { + kafkaStorage = new KafkaStorage('test-exporter', true, KAFKA_TOPIC); + kafkaStorage.connect().then(() => { testConsumer = new TestConsumer(KAFKA_TOPIC, num_messages_test); done(); }); }); - afterEach(function (done) { + afterEach(async function (done) { this.timeout(10000); - exporter.disconnect(() => { - testConsumer.disconnect(function () { - done(); - }); + await kafkaStorage.disconnect() + + testConsumer.disconnect(function () { + done(); }); }); @@ -109,21 +109,21 @@ describe('Producer transactions', function () { await testConsumer.waitSubscribed(); - await exporter.initTransactions(); - await exporter.beginTransaction(); + await kafkaStorage.initTransactions(); + await kafkaStorage.beginTransaction(); // Do a small delay before starting writing messages, otherwise the consumer is missing them. // This should not really be needed, because we have received the 'subscribed' event in the // consumer but there is something I am missing. setTimeout(async function () { for (let i = 0; i < num_messages_test; i++) { - exporter.sendDataWithKey({ + kafkaStorage.sendDataWithKey({ timestamp: 10000000, iso_date: new Date().toISOString(), key: 1 }, 'key', null); } - await exporter.commitTransaction(); + await kafkaStorage.commitTransaction(); }, 2000); await testConsumer.waitData(); @@ -132,7 +132,7 @@ describe('Producer transactions', function () { it('using the \'storeEvents\' function should begin and commit a transaction', async function () { // We need the huge timeout because starting and closing a transaction takes around 1 sec this.timeout(10000); - await exporter.initTransactions(); + await kafkaStorage.initTransactions(); const testEvent = { 'contract': '0xdac17f958d2ee523a2206206994597c13d831ec7', @@ -151,7 +151,7 @@ describe('Producer transactions', function () { setTimeout(async function () { for (let i = 0; i < num_messages_test; i++) { - await exporter.storeEvents([testEvent], false); + await kafkaStorage.storeEvents([testEvent], false); } }, 1000); diff --git a/src/index.ts b/src/index.ts index 6f2d1910..4d218041 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,6 @@ 'use strict'; import { logger } from './lib/logger'; -import { BLOCKCHAIN } from './lib/constants'; +import * as constantsBase from './lib/constants'; import { getBoolEnvVariable } from './lib/utils'; import { Main } from './main' @@ -11,10 +11,7 @@ export async function main() { mainInstance = new Main(); try { - if (BLOCKCHAIN === undefined) { - throw Error("'BLOCKCHAIN' variable need to be defined") - } - await mainInstance.init(BLOCKCHAIN); + await mainInstance.init(constantsBase); } catch (err: any) { logger.error(err.stack); throw new Error(`Error initializing exporter: ${err.message}`); diff --git a/src/lib/kafka_storage.ts b/src/lib/kafka_storage.ts index ed6dd44f..5a97d1f1 100644 --- a/src/lib/kafka_storage.ts +++ b/src/lib/kafka_storage.ts @@ -1,14 +1,10 @@ import crypto from 'crypto'; -import Kafka, { LibrdKafkaError, ProducerGlobalConfig } from 'node-rdkafka'; +import Kafka, { ProducerGlobalConfig } from 'node-rdkafka'; import { BLOCKCHAIN } from './constants'; -import ZookeeperClientAsync from './zookeeper_client_async'; import { log_according_to_syslog_level, logger, SYSLOG_LOG_LEVEL } from './logger'; const ZOOKEEPER_URL: string = process.env.ZOOKEEPER_URL || 'localhost:2181'; -const ZOOKEEPER_RETRIES: number = parseInt(process.env.ZOOKEEPER_RETRIES || '0'); -const ZOOKEEPER_SPIN_DELAY: number = parseInt(process.env.ZOOKEEPER_SPIN_DELAY || '1000'); -const ZOOKEEPER_SESSION_TIMEOUT: number = parseInt(process.env.ZOOKEEPER_SESSION_TIMEOUT || '30000'); const FORMAT_HEADER: string = 'format=json;'; const RDKAFKA_DEBUG: string | null = process.env.RDKAFKA_DEBUG || null; @@ -19,17 +15,6 @@ const BUFFERING_MAX_MESSAGES: number = parseInt(process.env.BUFFERING_MAX_MESSAG const TRANSACTIONS_TIMEOUT_MS: number = parseInt(process.env.TRANSACTIONS_TIMEOUT_MS || '60000'); const KAFKA_MESSAGE_MAX_BYTES: number = parseInt(process.env.KAFKA_MESSAGE_MAX_BYTES || '10485760'); -process.on('unhandledRejection', (reason: unknown, p: Promise): void => { - // Otherwise unhandled promises are not possible to trace with the information logged - if (reason instanceof Error) { - logger.error('Unhandled Rejection at: ', p, 'reason:', reason, 'error stack:', (reason as Error).stack); - } - else { - logger.error('Unhandled Rejection at: ', p, 'reason:', reason); - } - process.exit(1); -}); - /** * A class to pick partition for an event. */ @@ -99,11 +84,10 @@ function castCompression(compression: string): 'none' | 'gzip' | 'snappy' | 'lz4 throw new Error(`Invalid compression value: ${compression}`); } -export class Exporter { +export class KafkaStorage { private readonly exporter_name: string; private readonly producer: Kafka.Producer; private readonly topicName: string; - private readonly zookeeperClient: ZookeeperClientAsync; private partitioner: Partitioner | null; constructor(exporter_name: string, transactional: boolean, topicName: string) { @@ -139,14 +123,6 @@ export class Exporter { log_according_to_syslog_level(log.severity, log.fac, log.message); }); - this.zookeeperClient = new ZookeeperClientAsync(ZOOKEEPER_URL, - { - sessionTimeout: ZOOKEEPER_SESSION_TIMEOUT, - spinDelay: ZOOKEEPER_SPIN_DELAY, - retries: ZOOKEEPER_RETRIES - } - ); - this.partitioner = null; } @@ -162,20 +138,13 @@ export class Exporter { /** * @returns {Promise} Promise, resolved on connection completed. */ - async connect() { + async connect(): Promise { logger.info(`Connecting to zookeeper host ${ZOOKEEPER_URL}`); - try { - await this.zookeeperClient.connectAsync(); - } - catch (ex) { - console.error('Error connecting to Zookeeper: ', ex); - throw ex; - } logger.info(`Connecting to kafka host ${KAFKA_URL}`); - const promise_result = new Promise((resolve, reject) => { - this.producer.on('ready', resolve); + const promise_result = new Promise((resolve, reject) => { + this.producer.on('ready', () => resolve()); this.producer.on('event.error', reject); // The user can provide a callback for delivery reports with the // dedicated method 'subscribeDeliveryReports'. @@ -190,131 +159,23 @@ export class Exporter { } /** - * Disconnect from Zookeeper and Kafka. + * Disconnect from Kafka. * This method is completed once the callback is invoked. */ - disconnect(callback?: () => void) { - logger.info(`Disconnecting from zookeeper host ${ZOOKEEPER_URL}`); - this.zookeeperClient.closeAsync().then(() => { - if (this.producer.isConnected()) { - logger.info(`Disconnecting from kafka host ${KAFKA_URL}`); - this.producer.disconnect(callback); - } - else { - logger.info(`Producer is NOT connected to kafka host ${KAFKA_URL}`); - } - }); - } - - async getLastPosition() { - if (await this.zookeeperClient.existsAsync(this.zookeeperPositionNode)) { - const previousPosition = await this.zookeeperClient.getDataAsync( - this.zookeeperPositionNode - ); - - try { - if (Buffer.isBuffer(previousPosition && previousPosition.data)) { - const value = previousPosition.data.toString('utf8'); - - if (value.startsWith(FORMAT_HEADER)) { - return JSON.parse(value.replace(FORMAT_HEADER, '')); - } else { - return previousPosition.data; - } - } - } catch (err) { - logger.error(err); - } - } - - return null; - } - - async getLastBlockTimestamp() { - if (await this.zookeeperClient.existsAsync(this.zookeeperTimestampNode)) { - const previousPosition = await this.zookeeperClient.getDataAsync( - this.zookeeperTimestampNode - ); - - try { - if (Buffer.isBuffer(previousPosition && previousPosition.data)) { - const value = previousPosition.data.toString('utf8'); - - if (value.startsWith(FORMAT_HEADER)) { - return JSON.parse(value.replace(FORMAT_HEADER, '')); - } else { - return previousPosition.data; - } - } - } catch (err) { - logger.error(err); - } + async disconnect(): Promise { + if (!this.producer.isConnected()) { + logger.info(`Producer is NOT connected to kafka host ${KAFKA_URL}`); + return; } - return null; - } - - async savePosition(position: object) { - if (typeof position !== 'undefined') { - const newNodeValue = Buffer.from( - FORMAT_HEADER + JSON.stringify(position), - 'utf-8' - ); - - if (await this.zookeeperClient.existsAsync(this.zookeeperPositionNode)) { - return this.zookeeperClient.setDataAsync( - this.zookeeperPositionNode, - newNodeValue - ); - } else { - return this.zookeeperClient.mkdirpAsync( - this.zookeeperPositionNode, - newNodeValue - ); - } - } - } + logger.info(`Disconnecting from kafka host ${KAFKA_URL}`); + const promise_result = new Promise((resolve, reject) => { + this.producer.disconnect(() => resolve()); + }) - async saveLastBlockTimestamp(blockTimestamp: number) { - if (typeof blockTimestamp !== 'undefined') { - const newNodeValue = Buffer.from( - FORMAT_HEADER + JSON.stringify(blockTimestamp), - 'utf-8' - ); - - if (await this.zookeeperClient.existsAsync(this.zookeeperTimestampNode)) { - return this.zookeeperClient.setDataAsync( - this.zookeeperTimestampNode, - newNodeValue - ); - } else { - return this.zookeeperClient.mkdirpAsync( - this.zookeeperTimestampNode, - newNodeValue - ); - } - } + await promise_result; } - async sendData(events: Array) { - if (events.constructor !== Array) { - events = [events]; - } - - events = events.map( - event => (typeof event === 'object' ? JSON.stringify(event) : event) - ); - events.forEach(event => { - this.producer.produce(this.topicName, null, Buffer.from(event)); - }); - - return new Promise((resolve, reject) => - this.producer.flush(KAFKA_FLUSH_TIMEOUT, (err: LibrdKafkaError) => { - if (err) return reject(err); - resolve(); - }) - ); - } async sendDataWithKey(events: object | Array, keyField: string, signalRecordData: object | null) { const arrayEvents: Array = (events.constructor !== Array) ? [events] : events diff --git a/src/lib/utils.ts b/src/lib/utils.ts index 779762ef..8474fc5a 100644 --- a/src/lib/utils.ts +++ b/src/lib/utils.ts @@ -36,3 +36,28 @@ export function assertIsDefined(value: any, errorMsg: string): asserts value throw Error(errorMsg); } } + +export type ModeToKafkaTopic = { + [topicName: string]: string; +} + +export function parseKafkaTopicToObject(kafkaTopic: string): ModeToKafkaTopic { + const keyValuePairs = kafkaTopic.split(','); + + const result: ModeToKafkaTopic = keyValuePairs.reduce((acc, pair) => { + const [key, value] = pair.split(':'); + if (key && value) { + acc[key.trim()] = value; + } + else { + throw new Error(`key-value pair format is unexpected in KAFKA_TOPIC`); + } + return acc; + }, {} as ModeToKafkaTopic); + + if (Object.keys(result).length < 1) { + throw new Error(`Can not construct multi mode from ${kafkaTopic}`) + } + + return result; +} diff --git a/src/lib/worker_base.ts b/src/lib/worker_base.ts index 28f76758..2e684bed 100644 --- a/src/lib/worker_base.ts +++ b/src/lib/worker_base.ts @@ -1,8 +1,11 @@ 'use strict'; import { logger } from './logger'; -import { Exporter } from './kafka_storage'; +import { KafkaStorage } from './kafka_storage'; import { ExporterPosition } from '../types' +export type WorkResult = any[] +export type WorkResultMultiMode = { [key: string]: WorkResult; } + export class BaseWorker { public lastExportTime: number; public lastConfirmedBlock: number; @@ -30,11 +33,11 @@ export class BaseWorker { * Upon returning from the method call the implementation should have updated all the member variables of the * base class. */ - work(): Promise> { + work(): Promise { throw new Error('"work" method need to be overriden'); } // To be implemented on inheritance. - async init(_exporter: Exporter) { + async init(_storage: KafkaStorage | Map) { throw new Error('"init" method need to be overriden'); } diff --git a/src/lib/zookeeper_state.ts b/src/lib/zookeeper_state.ts new file mode 100644 index 00000000..485e1957 --- /dev/null +++ b/src/lib/zookeeper_state.ts @@ -0,0 +1,110 @@ +import ZookeeperClientAsync from './zookeeper_client_async'; +import { logger } from './logger'; + + +const ZOOKEEPER_URL: string = process.env.ZOOKEEPER_URL || 'localhost:2181'; +const ZOOKEEPER_RETRIES: number = parseInt(process.env.ZOOKEEPER_RETRIES || '0'); +const ZOOKEEPER_SPIN_DELAY: number = parseInt(process.env.ZOOKEEPER_SPIN_DELAY || '1000'); +const ZOOKEEPER_SESSION_TIMEOUT: number = parseInt(process.env.ZOOKEEPER_SESSION_TIMEOUT || '30000'); + +const FORMAT_HEADER: string = 'format=json;'; + + +export class ZookeeperState { + private readonly exporter_name: string; + private readonly topicName: string; + private readonly zookeeperClient: ZookeeperClientAsync; + + + constructor(exporter_name: string, topicName: string) { + this.exporter_name = exporter_name; + + this.topicName = topicName; + + this.zookeeperClient = new ZookeeperClientAsync(ZOOKEEPER_URL, + { + sessionTimeout: ZOOKEEPER_SESSION_TIMEOUT, + spinDelay: ZOOKEEPER_SPIN_DELAY, + retries: ZOOKEEPER_RETRIES + } + ); + } + + get zookeeperPositionNode() { + // Generally it may be an arbitrary position object, not necessarily block number. We keep this name for backward compatibility + return `/${this.exporter_name}/${this.topicName}/block-number`; + } + + get zookeeperTimestampNode() { + return `/${this.exporter_name}/${this.topicName}/timestamp`; + } + + /** + * @returns {Promise} Promise, resolved on connection completed. + */ + async connect(): Promise { + logger.info(`Connecting to zookeeper host ${ZOOKEEPER_URL}`); + + try { + await this.zookeeperClient.connectAsync(); + } + catch (ex) { + console.error('Error connecting to Zookeeper: ', ex); + throw ex; + } + } + + /** + * Disconnect from Zookeeper. + */ + async disconnect() { + logger.info(`Disconnecting from zookeeper host ${ZOOKEEPER_URL}`); + await this.zookeeperClient.closeAsync(); + } + + async getLastPosition() { + if (await this.zookeeperClient.existsAsync(this.zookeeperPositionNode)) { + const previousPosition = await this.zookeeperClient.getDataAsync( + this.zookeeperPositionNode + ); + + try { + if (Buffer.isBuffer(previousPosition && previousPosition.data)) { + const value = previousPosition.data.toString('utf8'); + + if (value.startsWith(FORMAT_HEADER)) { + return JSON.parse(value.replace(FORMAT_HEADER, '')); + } else { + return previousPosition.data; + } + } + } catch (err) { + logger.error(err); + } + } + + return null; + } + + async savePosition(position: object) { + if (typeof position !== 'undefined') { + const newNodeValue = Buffer.from( + FORMAT_HEADER + JSON.stringify(position), + 'utf-8' + ); + + if (await this.zookeeperClient.existsAsync(this.zookeeperPositionNode)) { + return this.zookeeperClient.setDataAsync( + this.zookeeperPositionNode, + newNodeValue + ); + } else { + return this.zookeeperClient.mkdirpAsync( + this.zookeeperPositionNode, + newNodeValue + ); + } + } + } +} + diff --git a/src/main.ts b/src/main.ts index 76478982..f13f6d7a 100644 --- a/src/main.ts +++ b/src/main.ts @@ -4,20 +4,23 @@ import { Server, IncomingMessage, ServerResponse } from 'http' const { send, serve } = require('micro'); const metrics = require('./lib/metrics'); import { logger } from './lib/logger'; -import { Exporter } from './lib/kafka_storage'; +import { KafkaStorage } from './lib/kafka_storage'; +import { ZookeeperState } from './lib/zookeeper_state'; const EXPORTER_NAME = process.env.EXPORTER_NAME || 'san-chain-exporter'; import { EXPORT_TIMEOUT_MLS } from './lib/constants'; import { constructWorker } from './blockchains/construct_worker' -import * as constantsBase from './lib/constants'; import { ExporterPosition } from './types' -import { BaseWorker } from './lib/worker_base'; +import { BaseWorker, WorkResult, WorkResultMultiMode } from './lib/worker_base'; +import { parseKafkaTopicToObject } from './lib/utils'; export class Main { private worker!: BaseWorker; private shouldWork: boolean; - private exporter!: Exporter; + private kafkaStorage!: KafkaStorage | Map; + private zookeeperState!: ZookeeperState; private lastProcessedPosition!: ExporterPosition; private microServer: Server; + private mergedConstants: any; constructor() { this.shouldWork = true; @@ -28,18 +31,35 @@ export class Main { } async initExporter(exporterName: string, isTransactions: boolean, kafkaTopic: string) { - const INIT_EXPORTER_ERR_MSG = 'Error when initializing exporter: '; - this.exporter = new Exporter(exporterName, isTransactions, kafkaTopic); - await this.exporter - .connect() - .then(() => this.exporter.initTransactions()) - .catch((err) => { throw new Error(`${INIT_EXPORTER_ERR_MSG}${err.message}`); }); + if (!kafkaTopic.includes(':')) { + logger.info(`Constructing single Kafka producer`) + this.kafkaStorage = new KafkaStorage(exporterName, isTransactions, kafkaTopic); + this.zookeeperState = new ZookeeperState(exporterName, kafkaTopic); + } + else { + const mapping = parseKafkaTopicToObject(kafkaTopic) + + this.kafkaStorage = Object.entries(mapping).reduce((acc, [mode, topicName]) => { + acc.set(mode, new KafkaStorage(exporterName, isTransactions, topicName)); + return acc; + }, new Map()); + + logger.info(`Constructed ${this.kafkaStorage.size} Kafka producers`) + this.zookeeperState = new ZookeeperState(exporterName, kafkaTopic); + } + + const kafkaStoragesArray = (this.kafkaStorage instanceof Map) ? Array.from(this.kafkaStorage.values()) : [this.kafkaStorage] + if (kafkaStoragesArray.length === 0) { + throw new Error("At least one KafkaStorage needs to be constructed") + } + await Promise.all(kafkaStoragesArray.map(storage => storage.connect().then(() => storage.initTransactions()))) + await this.zookeeperState.connect(); } async handleInitPosition() { - const lastRecoveredPosition = await this.exporter.getLastPosition(); + const lastRecoveredPosition = await this.zookeeperState.getLastPosition(); this.lastProcessedPosition = this.worker.initPosition(lastRecoveredPosition); - await this.exporter.savePosition(this.lastProcessedPosition); + await this.zookeeperState.savePosition(this.lastProcessedPosition); } #isWorkerSet() { @@ -59,19 +79,22 @@ export class Main { return copy; } - async initWorker(blockchain: string, mergedConstants: any) { + private async initWorker() { this.#isWorkerSet(); - logger.info(`Applying the following settings: ${JSON.stringify(this.getSettingsWithHiddenPasswords(mergedConstants))}`); - this.worker = constructWorker(blockchain, mergedConstants); - await this.worker.init(this.exporter); + logger.info(`Applying the following settings: ${JSON.stringify(this.getSettingsWithHiddenPasswords(this.mergedConstants))}`); + this.worker = constructWorker(this.mergedConstants.BLOCKCHAIN, this.mergedConstants); + await this.worker.init(this.kafkaStorage); await this.handleInitPosition(); } - async init(blockchain: string) { - const blockchainSpecificConstants = require(`./blockchains/${blockchain}/lib/constants`); - const mergedConstants = { ...constantsBase, ...blockchainSpecificConstants }; - await this.initExporter(EXPORTER_NAME, true, mergedConstants.KAFKA_TOPIC); - await this.initWorker(blockchain, mergedConstants); + async init(constantsBase: any) { + if (constantsBase.BLOCKCHAIN === undefined) { + throw Error("'BLOCKCHAIN' variable need to be defined") + } + const blockchainSpecificConstants = require(`./blockchains/${constantsBase.BLOCKCHAIN}/lib/constants`); + this.mergedConstants = { ...constantsBase, ...blockchainSpecificConstants }; + await this.initExporter(EXPORTER_NAME, true, this.mergedConstants.KAFKA_TOPIC); + await this.initWorker(); metrics.startCollection(); this.microServer.on('error', (err) => { @@ -94,20 +117,47 @@ export class Main { metrics.lastExportedBlock.set(this.worker.lastExportedBlock); } + async writeDataToKafka(workResult: WorkResult | WorkResultMultiMode) { + if (Array.isArray(workResult)) { + if (!(this.kafkaStorage instanceof KafkaStorage)) { + throw new Error('Worker returns data for single Kafka storage and multiple are defined') + } + + if (workResult.length > 0) { + await this.kafkaStorage.storeEvents(workResult, this.mergedConstants.WRITE_SIGNAL_RECORDS_KAFKA); + } + } + else if (typeof workResult === 'object') { + if (!(this.kafkaStorage instanceof Map)) { + throw new Error('Worker returns data for multiple Kafka storages and single is defined') + } + for (const [mode, data] of Object.entries(workResult)) { + const kafkaStoragePerMode = this.kafkaStorage.get(mode) + if (!kafkaStoragePerMode) { + throw Error(`Workers returns data for mode ${mode} and no worker is defined for this mode`) + } + + await kafkaStoragePerMode.storeEvents(data, this.mergedConstants.WRITE_SIGNAL_RECORDS_KAFKA); + } + } + else { + throw new Error('Worker returns unexpected data type') + } + } + async workLoop() { while (this.shouldWork) { this.worker.lastRequestStartTime = Date.now(); - const events = await this.worker.work(); + const workResult: WorkResult | WorkResultMultiMode = await this.worker.work(); this.worker.lastExportTime = Date.now(); this.updateMetrics(); this.lastProcessedPosition = this.worker.getLastProcessedPosition(); - if (events && events.length > 0) { - await this.exporter.storeEvents(events, constantsBase.WRITE_SIGNAL_RECORDS_KAFKA); - } - await this.exporter.savePosition(this.lastProcessedPosition); + await this.writeDataToKafka(workResult); + + await this.zookeeperState.savePosition(this.lastProcessedPosition); logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`); if (this.shouldWork) { @@ -117,11 +167,16 @@ export class Main { } async disconnect() { - // This call should be refactored to work with async/await - if (this.exporter !== undefined) { - this.exporter.disconnect(); + if (this.kafkaStorage instanceof KafkaStorage) { + await this.kafkaStorage.disconnect(); + } + else if (this.kafkaStorage instanceof Map) { + await Promise.all(Array.from(this.kafkaStorage.values()).map(storage => storage.disconnect())); } - if (this.microServer !== undefined) { + if (this.zookeeperState) { + await this.zookeeperState.disconnect(); + } + if (this.microServer) { this.microServer.close(); } } @@ -137,29 +192,32 @@ export class Main { } } - healthcheckKafka(): Promise { - if (this.exporter.isConnected()) { - return Promise.resolve(); - } else { - return Promise.reject('Kafka client is not connected to any brokers'); + healthcheckKafka(): boolean { + if (this.kafkaStorage instanceof KafkaStorage) { + return this.kafkaStorage.isConnected(); + } + else if (this.kafkaStorage instanceof Map) { + return Array.from(this.kafkaStorage.values()).every(storage => storage.isConnected()); + } + else { + return false; } } - healthcheckExportTimeout(): Promise { + healthcheckExportTimeout(): boolean { const timeFromLastExport = Date.now() - this.worker.lastExportTime; const isExportTimeoutExceeded = timeFromLastExport > EXPORT_TIMEOUT_MLS; if (isExportTimeoutExceeded) { - const errorMessage = `Time from the last export ${timeFromLastExport}ms exceeded limit ` + - `${EXPORT_TIMEOUT_MLS}ms. Node last block is ${this.worker.lastConfirmedBlock}.`; - return Promise.reject(errorMessage); + logger.warn(`Time from the last export ${timeFromLastExport}ms exceeded limit ` + + `${EXPORT_TIMEOUT_MLS}ms. Node last block is ${this.worker.lastConfirmedBlock}.`); + return false; } else { - return Promise.resolve(); + return true; } } - healthcheck(): Promise { - return this.healthcheckKafka() - .then(() => this.healthcheckExportTimeout()); + healthcheck(): boolean { + return this.healthcheckKafka() && this.healthcheckExportTimeout(); } } @@ -178,12 +236,13 @@ const microHandler = async (request: IncomingMessage, response: ServerResponse, switch (req.pathname) { case '/healthcheck': - return mainInstance.healthcheck() - .then(() => send(response, 200, 'ok')) - .catch((err: any) => { - logger.error(`Healthcheck failed: ${err.toString()}`); - send(response, 500, err.toString()); - }); + if (mainInstance.healthcheck()) { + return send(response, 200, 'ok'); + } + else { + logger.error('Healthcheck failed'); + return send(response, 500, "Healthcheck failed"); + } case '/metrics': response.setHeader('Content-Type', metrics.register.contentType); return send(response, 200, await metrics.register.metrics()); diff --git a/src/test/erc20/worker.spec.ts b/src/test/erc20/worker.spec.ts index d414e29a..06864fa7 100644 --- a/src/test/erc20/worker.spec.ts +++ b/src/test/erc20/worker.spec.ts @@ -7,6 +7,7 @@ import { ContractOverwrite } from '../../blockchains/erc20/lib/contract_overwrit import helpers from './helpers'; import { ERC20Transfer } from '../../blockchains/erc20/erc20_types'; import { MockWeb3Wrapper } from '../eth/mock_web3_wrapper'; +import { KafkaStorage } from '../../lib/kafka_storage'; @@ -79,7 +80,7 @@ describe('Test ERC20 worker', function () { sinon.stub(worker, 'ethClient').value(new MockEthClient()) sinon.stub(worker, 'getPastEventsFun').resolves([originalEvent]); - await worker.init(undefined); + await worker.init(sinon.createStubInstance(KafkaStorage)); worker.lastConfirmedBlock = 1; worker.lastExportedBlock = 0; @@ -98,7 +99,7 @@ describe('Test ERC20 worker', function () { sinon.stub(worker, 'web3Wrapper').value(new MockWeb3Wrapper(1)) sinon.stub(worker, 'ethClient').value(new MockEthClient()) sinon.stub(worker, 'getPastEventsFun').resolves([originalEvent]); - await worker.init(undefined); + await worker.init(sinon.createStubInstance(KafkaStorage)); sinon.stub(worker, 'contractsOverwriteArray').value([new ContractOverwrite( { @@ -135,7 +136,7 @@ describe('Test ERC20 worker', function () { sinon.stub(worker, 'web3Wrapper').value(new MockWeb3Wrapper(1)) sinon.stub(worker, 'ethClient').value(new MockEthClient()) sinon.stub(worker, 'getPastEventsFun').resolves([originalEvent]); - await worker.init(undefined); + await worker.init(sinon.createStubInstance(KafkaStorage)); sinon.stub(worker, 'contractsOverwriteArray').value([new ContractOverwrite( { @@ -174,7 +175,7 @@ describe('Test ERC20 worker', function () { sinon.stub(worker, 'ethClient').value(new MockEthClient()) sinon.stub(worker, 'getPastEventsFun').resolves([originalEvent, originalEvent2]); - await worker.init(undefined); + await worker.init(sinon.createStubInstance(KafkaStorage)); sinon.stub(worker, 'contractsOverwriteArray').value([new ContractOverwrite( { diff --git a/src/test/eth/fees_decoder.spec.ts b/src/test/eth/fees_decoder.spec.ts index 4e996017..047c32bc 100644 --- a/src/test/eth/fees_decoder.spec.ts +++ b/src/test/eth/fees_decoder.spec.ts @@ -156,15 +156,6 @@ const receipts_json_pre_london: ETHReceipt[] = [{ }]; -function turnReceiptsToMap(receipts: any[]) { - const result: any = {}; - receipts.forEach((receipt: any) => { - result[receipt.transactionHash] = receipt; - }); - - return result; -} - describe('Fees decoder test', function () { const web3Wrapper: Web3Interface = constructWeb3WrapperNoCredentials(constants.NODE_URL); const feesDecoder = new FeesDecoder(web3Wrapper); @@ -172,7 +163,7 @@ describe('Fees decoder test', function () { it('test fees post London zero priority', async function () { const postLondonFees = feesDecoder.getFeesFromTransactionsInBlock(block_json_post_london_zero_priority, safeCastToNumber(web3Wrapper.parseHexToNumber(block_json_post_london_zero_priority.number)), - turnReceiptsToMap(receipts_json_post_london_no_priority), true); + receipts_json_post_london_no_priority, true); const expected = [{ from: '0xea674fdde714fd979de3edf0f56aa9716b898ec8', @@ -192,7 +183,7 @@ describe('Fees decoder test', function () { it('test fees post London with priority', async function () { const postLondonFees = feesDecoder.getFeesFromTransactionsInBlock(block_json_post_london_with_priority, safeCastToNumber(web3Wrapper.parseHexToNumber(block_json_post_london_with_priority.number)), - turnReceiptsToMap(receipts_json_post_london_with_priority), true); + receipts_json_post_london_with_priority, true); const expected = [{ blockNumber: 13447057, @@ -222,7 +213,7 @@ describe('Fees decoder test', function () { it('test old type fees post London', async function () { const postLondonFees = feesDecoder.getFeesFromTransactionsInBlock(block_json_post_london_old_tx_type, safeCastToNumber(web3Wrapper.parseHexToNumber(block_json_post_london_old_tx_type.number)), - turnReceiptsToMap(receipts_json_post_london_old_tx_type), true); + receipts_json_post_london_old_tx_type, true); const expected = [{ blockNumber: 13318440, @@ -252,7 +243,7 @@ describe('Fees decoder test', function () { it('test fees pre London', async function () { const preLondonFees = feesDecoder.getFeesFromTransactionsInBlock(block_json_pre_london, safeCastToNumber(web3Wrapper.parseHexToNumber(block_json_pre_london.number)), - turnReceiptsToMap(receipts_json_pre_london), true); + receipts_json_pre_london, true); const expected = [{ from: '0x39fa8c5f2793459d6622857e7d9fbb4bd91766d3', diff --git a/src/test/eth/fetch_events.spec.ts b/src/test/eth/fetch_events.spec.ts index cf78082a..781db6a5 100644 --- a/src/test/eth/fetch_events.spec.ts +++ b/src/test/eth/fetch_events.spec.ts @@ -3,7 +3,7 @@ import { ETHWorker } from '../../blockchains/eth/eth_worker'; import * as constants from '../../blockchains/eth/lib/constants'; import { injectDAOHackTransfers, DAO_HACK_ADDRESSES, DAO_HACK_FORK_BLOCK } from '../../blockchains/eth/lib/dao_hack'; import { Web3Interface, constructWeb3WrapperNoCredentials } from '../../blockchains/eth/lib/web3_wrapper'; -import { ETHBlock, ETHReceiptsMap, ETHTransfer } from '../../blockchains/eth/eth_types'; +import { ETHBlock, ETHReceipt, ETHTransfer } from '../../blockchains/eth/eth_types'; describe('fetch past events', function () { const transaction = { @@ -45,8 +45,7 @@ describe('fetch past events', function () { transactions: [transaction], }); - const receipts: ETHReceiptsMap = { - '0x1a06a3a86d2897741f3ddd774df060a63d626b01197c62015f404e1f007fa04d': + const receipts: ETHReceipt[] = [ { 'blockHash': '0x22854625d4c18b3034461851a6fb181209e77a242adbd923989e7113a60fec56', 'blockNumber': '0x572559', @@ -80,7 +79,7 @@ describe('fetch past events', function () { 'transactionHash': '0x1a06a3a86d2897741f3ddd774df060a63d626b01197c62015f404e1f007fa04d', 'transactionIndex': '0x0' } - }; + ]; const trace = { 'action': { 'callType': 'call', diff --git a/src/test/eth/worker.spec.ts b/src/test/eth/worker.spec.ts index e47cb1c2..e9fc07c7 100644 --- a/src/test/eth/worker.spec.ts +++ b/src/test/eth/worker.spec.ts @@ -1,22 +1,29 @@ -import assert from 'assert'; +const sinon = require('sinon'); import v8 from 'v8'; import { extendEventsWithPrimaryKey, ETHWorker } from '../../blockchains/eth/eth_worker'; import { EOB } from '../../blockchains/eth/lib/end_of_block'; import * as constants from '../../blockchains/eth/lib/constants'; -import { Trace, ETHBlock, ETHTransfer, ETHReceiptsMap } from '../../blockchains/eth/eth_types'; +import { ETHBlock, ETHTransfer } from '../../blockchains/eth/eth_types'; import { expect } from 'earl' +import { MockWeb3Wrapper } from '../eth/mock_web3_wrapper'; + describe('Test worker', function () { let feeResult: ETHTransfer; let callResult: ETHTransfer; let endOfBlock: EOB; let eobWithPrimaryKey: EOB & { primaryKey: number }; - let worker = new ETHWorker(constants); + // This will construct the worker in the 'native token' mode + const mergedConstants = { + KAFKA_TOPIC: `${constants.NATIVE_TOKEN_MODE}:topic_name_not_used`, + ...constants + }; + let worker = new ETHWorker(mergedConstants); let blockInfos = new Map() let feeResultWithPrimaryKey: ETHTransfer; let callResultWithPrimaryKey: ETHTransfer; - beforeEach(function () { + beforeEach(async function () { feeResult = { from: '0x03b16ab6e23bdbeeab719d8e4c49d63674876253', to: '0x829bd824b016326a401d083b33d092293333a830', @@ -52,20 +59,23 @@ describe('Test worker', function () { blockInfos.set(5711191, ethBlockEvent(5711191)) blockInfos.set(5711192, ethBlockEvent(5711192)) blockInfos.set(5711193, ethBlockEvent(5711193)) + + sinon.stub(worker, 'web3Wrapper').value(new MockWeb3Wrapper(1)) + + await worker.init(); }); - it('test primary key assignment', async function () { + it('test primary key assignment 1', async function () { let events = [feeResult, callResult] extendEventsWithPrimaryKey(events, 0) - expect(events).toLooseEqual([feeResultWithPrimaryKey, callResultWithPrimaryKey]); // Overwrite variables and methods that the 'work' method would use internally. worker.lastConfirmedBlock = 5711193; worker.lastExportedBlock = 5711192; worker.fetchData = async function (from: number, to: number) { - return Promise.resolve([[], blockInfos, {}]); + return Promise.resolve([[], blockInfos, []]); }; worker.transformPastEvents = function () { return [feeResult, callResult]; @@ -73,31 +83,31 @@ describe('Test worker', function () { const result = await worker.work(); - expect(result).toLooseEqual([feeResultWithPrimaryKey, callResultWithPrimaryKey, eobWithPrimaryKey]); + expect(result[constants.NATIVE_TOKEN_MODE]).toLooseEqual([feeResultWithPrimaryKey, callResultWithPrimaryKey, eobWithPrimaryKey]); }); - + it('test end of block events', async function () { worker.lastConfirmedBlock = 5711193; worker.lastExportedBlock = 5711190; + worker.fetchData = async function () { - return Promise.resolve([[], blockInfos, {}]); + return Promise.resolve([[], blockInfos, []]); }; worker.transformPastEvents = function () { return [feeResult, callResult]; }; const result = await worker.work(); - // input event is for block 5711193 // last exported block 5711190 // so there should be 3 EOB - const blocks = result.map((value) => value.blockNumber); - const types = result.map((value) => value.type); + const blocks = result[constants.NATIVE_TOKEN_MODE].map((value: ETHTransfer) => value.blockNumber); + const types = result[constants.NATIVE_TOKEN_MODE].map((value: ETHTransfer) => value.type); expect(blocks).toEqual([5711191, 5711192, 5711193, 5711193, 5711193]); expect(types).toEqual(["EOB", "EOB", "fee", "call", "EOB"]); }) - it('test primary key assignment', async function () { + it('test primary key assignment 2', async function () { const events = [feeResult, callResult] extendEventsWithPrimaryKey(events, 0) @@ -117,7 +127,7 @@ function ethBlockEvent(blockNumber: number): ETHBlock { totalDifficulty: "3", difficulty: "2", size: '2', - transactions: [] + transactions: [] } satisfies ETHBlock } diff --git a/src/test/index.test.ts b/src/test/index.test.ts index 0793c54b..ed1421e4 100644 --- a/src/test/index.test.ts +++ b/src/test/index.test.ts @@ -1,6 +1,7 @@ const sinon = require('sinon'); const rewire = require('rewire'); import assert from 'assert'; +import { Server } from 'http' // For this test, presume we are creating the ETH worker process.env.BLOCKCHAIN = 'eth'; process.env.TEST_ENV = 'true'; @@ -8,34 +9,44 @@ import { Main } from '../main'; const { Main: MainRewired } = rewire('../main'); const { main } = rewire('../index'); import { BaseWorker } from '../lib/worker_base'; -import { Exporter } from '../lib/kafka_storage'; +import { KafkaStorage } from '../lib/kafka_storage'; +import { ZookeeperState } from '../lib/zookeeper_state'; import { ETHWorker } from '../blockchains/eth/eth_worker'; -import * as ethConstants from '../blockchains/eth/lib/constants'; import zkClientAsync from '../lib/zookeeper_client_async'; -const blockchain = 'eth'; + describe('Main tests', () => { const constants = { START_BLOCK: -1, - START_PRIMARY_KEY: -1 + START_PRIMARY_KEY: -1, + BLOCKCHAIN: 'eth', + KAFKA_TOPIC: 'NOT_USED' }; + let sandbox: any = null; + + beforeEach(() => { + sandbox = sinon.createSandbox(); + }); + afterEach(() => { - sinon.restore(); + sandbox.restore(); }); it('initExporter returns error when Exporter connect() fails', async () => { - sinon - .stub(zkClientAsync.prototype, 'connectAsync') - .rejects(new Error('Exporter connection failed')); + sandbox.stub(zkClientAsync.prototype, 'connectAsync').rejects(new Error('Exporter connection failed')); const mainInstance = new Main(); + + sandbox.stub(KafkaStorage.prototype, 'connect').resolves(); + sandbox.stub(KafkaStorage.prototype, 'initTransactions').resolves(); + try { - await mainInstance.init(blockchain); + await mainInstance.init(constants); } catch (err) { if (err instanceof Error) { - assert.strictEqual(err.message, 'Error when initializing exporter: Exporter connection failed'); + assert.strictEqual(err.message, 'Exporter connection failed'); } else { assert.fail('Exception is not an instance of Error') @@ -44,20 +55,16 @@ describe('Main tests', () => { }); it('initExporter returns error when Exporter initTransactions() fails', async () => { - sinon - .stub(Exporter.prototype, 'connect') - .resolves(); - - sinon - .stub(Exporter.prototype, 'initTransactions') - .rejects(new Error('Exporter initTransactions failed')); + sandbox.stub(KafkaStorage.prototype, 'connect').resolves(); + sandbox.stub(ZookeeperState.prototype, 'connect').resolves(); + sandbox.stub(KafkaStorage.prototype, 'initTransactions').rejects(new Error('Exporter initTransactions failed')); const mainInstance = new Main(); try { - await mainInstance.init(blockchain); + await mainInstance.init(constants); } catch (err) { if (err instanceof Error) { - assert.strictEqual(err.message, 'Error when initializing exporter: Exporter initTransactions failed'); + assert.strictEqual(err.message, 'Exporter initTransactions failed'); } else { assert.fail('Exception is not an instance of Error') @@ -66,14 +73,14 @@ describe('Main tests', () => { }); it('handleInitPosition changes the lastProcessedPosition accordingly 1', async () => { - const exporterStub = sinon.createStubInstance(Exporter); - exporterStub.getLastPosition.returns(JSON.parse('{"blockNumber":123456,"primaryKey":0}')); + const zookeeperStub = sandbox.createStubInstance(ZookeeperState); + zookeeperStub.getLastPosition.returns(JSON.parse('{"blockNumber":123456,"primaryKey":0}')); const mainInstance = new MainRewired(); - mainInstance.exporter = exporterStub; + mainInstance.zookeeperState = zookeeperStub; mainInstance.worker = new BaseWorker(constants); - sinon.spy(mainInstance, 'handleInitPosition'); + sandbox.spy(mainInstance, 'handleInitPosition'); await mainInstance.handleInitPosition(); assert(mainInstance.handleInitPosition.calledOnce); @@ -82,14 +89,14 @@ describe('Main tests', () => { }); it('handleInitPosition changes the lastProcessedPosition accordingly 2', async () => { - const exporterStub = sinon.createStubInstance(Exporter); - exporterStub.getLastPosition.returns(null); + const zookeeperStub = sandbox.createStubInstance(ZookeeperState); + zookeeperStub.getLastPosition.returns(null); const mainInstance = new MainRewired(); - mainInstance.exporter = exporterStub; + mainInstance.zookeeperState = zookeeperStub; mainInstance.worker = new BaseWorker(constants); - sinon.spy(mainInstance, 'handleInitPosition'); + sandbox.spy(mainInstance, 'handleInitPosition'); await mainInstance.handleInitPosition(); assert(mainInstance.handleInitPosition.calledOnce); @@ -98,12 +105,12 @@ describe('Main tests', () => { }); it('handleInitPosition throws error when exporter.getLastPosition() fails', async () => { - const exporterStub = sinon.createStubInstance(Exporter); - exporterStub.getLastPosition.throws(new Error('Exporter getLastPosition failed')); + const zookeeperStub = sandbox.createStubInstance(ZookeeperState); + zookeeperStub.getLastPosition.throws(new Error('Exporter getLastPosition failed')); const mainInstance = new Main(); - sinon.stub(mainInstance, 'exporter').value(exporterStub); - sinon.stub(mainInstance, 'worker').value(new BaseWorker(constants)); + sandbox.stub(mainInstance, 'zookeeperState').value(zookeeperStub); + sandbox.stub(mainInstance, 'worker').value(new BaseWorker(constants)); try { await mainInstance.handleInitPosition(); @@ -119,13 +126,13 @@ describe('Main tests', () => { }); it('handleInitPosition throws error when exporter.savePosition() fails', async () => { - const exporterStub = sinon.createStubInstance(Exporter); - exporterStub.getLastPosition.returns(null); - exporterStub.savePosition.throws(new Error('Exporter savePosition failed')); + const zookeeperStub = sandbox.createStubInstance(ZookeeperState); + zookeeperStub.getLastPosition.returns(null); + zookeeperStub.savePosition.throws(new Error('Exporter savePosition failed')); const mainInstance = new Main(); - sinon.stub(mainInstance, 'exporter').value(exporterStub); - sinon.stub(mainInstance, 'worker').value(new BaseWorker(constants)); + sandbox.stub(mainInstance, 'zookeeperState').value(zookeeperStub); + sandbox.stub(mainInstance, 'worker').value(new BaseWorker(constants)); try { await mainInstance.handleInitPosition(); @@ -140,11 +147,15 @@ describe('Main tests', () => { } }); - it('initWorker throws error when worker is already present', async () => { + it('init throws error when worker is already present', async () => { + sandbox.stub(KafkaStorage.prototype, 'connect').resolves(); + sandbox.stub(KafkaStorage.prototype, 'initTransactions').resolves(); + sandbox.stub(ZookeeperState.prototype, 'connect').resolves(); + const mainInstance = new Main(); - sinon.stub(mainInstance, 'worker').value(new BaseWorker(constants)); + sandbox.stub(mainInstance, 'worker').value(new BaseWorker(constants)); try { - await mainInstance.initWorker('eth', {}); + await mainInstance.init(constants); assert.fail('initWorker should have thrown an error'); } catch (err) { if (err instanceof Error) { @@ -156,14 +167,17 @@ describe('Main tests', () => { } }); - it('initWorker throws an error when worker.init() fails', async () => { + it('init throws an error when worker.init() fails', async () => { + sandbox.stub(KafkaStorage.prototype, 'connect').resolves(); + sandbox.stub(KafkaStorage.prototype, 'initTransactions').resolves(); + sandbox.stub(ZookeeperState.prototype, 'connect').resolves(); + const mainInstance = new Main(); - sinon.stub(mainInstance, 'exporter').value(new Exporter('test-exporter', true, 'topic-not-used')); - sinon.stub(ETHWorker.prototype, 'init').rejects(new Error('Worker init failed')); + sandbox.stub(ETHWorker.prototype, 'init').rejects(new Error('Worker init failed')); try { - await mainInstance.initWorker('eth', ethConstants); + await mainInstance.init(constants); assert.fail('initWorker should have thrown an error'); } catch (err) { if (err instanceof Error) { @@ -175,15 +189,17 @@ describe('Main tests', () => { } }); - it('initWorker throws an error when handleInitPosition() fails', async () => { + it('init throws an error when handleInitPosition() fails', async () => { + sandbox.stub(KafkaStorage.prototype, 'connect').resolves(); + sandbox.stub(KafkaStorage.prototype, 'initTransactions').resolves(); + sandbox.stub(ZookeeperState.prototype, 'connect').resolves(); const mainInstance = new Main(); - sinon.stub(mainInstance, 'exporter').value(new Exporter('test-exporter', true, 'topic-not-used')); - sinon.stub(ETHWorker.prototype, 'init').resolves(); + sandbox.stub(ETHWorker.prototype, 'init').resolves(); - sinon.stub(mainInstance, 'handleInitPosition').throws(new Error('Error when initializing position')); + sandbox.stub(mainInstance, 'handleInitPosition').throws(new Error('Error when initializing position')); try { - await mainInstance.initWorker('eth', ethConstants); + await mainInstance.init(constants); assert.fail('initWorker should have thrown an error'); } catch (err) { if (err instanceof Error) { @@ -196,19 +212,27 @@ describe('Main tests', () => { }); it('initWorker success', async () => { - const mainInstance = new MainRewired(); - mainInstance.exporter = new Exporter('test-exporter', true, 'topic-not-used'); - sinon.stub(ETHWorker.prototype, 'init').resolves(); - sinon.stub(mainInstance, 'handleInitPosition').resolves(); + sandbox.stub(KafkaStorage.prototype, 'connect').resolves(); + sandbox.stub(KafkaStorage.prototype, 'initTransactions').resolves(); + sandbox.stub(ZookeeperState.prototype, 'connect').resolves(); + sandbox.stub(Server.prototype, 'on') + sandbox.stub(Server.prototype, 'listen') + const mainInstance = new Main(); - await mainInstance.initWorker('eth', ethConstants); - assert(mainInstance.handleInitPosition.calledOnce); + sandbox.stub(ETHWorker.prototype, 'init').resolves(); + const handleInitPositionStub = sandbox.stub(mainInstance, 'handleInitPosition') + + + handleInitPositionStub.resolves(); + + await mainInstance.init(constants); + assert(handleInitPositionStub.calledOnce); }); - it('workLoop throws error when worker can\'t be initialised', async () => { - sinon.stub(BaseWorker.prototype, 'work').rejects(new Error('Error in worker "work" method')); + it('workLoop throws error when worker can not be initialised', async () => { + sandbox.stub(BaseWorker.prototype, 'work').rejects(new Error('Error in worker "work" method')); const mainInstance = new Main(); - sinon.stub(mainInstance, 'worker').value(new BaseWorker(constants)); + sandbox.stub(mainInstance, 'worker').value(new BaseWorker(constants)); try { await mainInstance.workLoop(); assert.fail('workLoop should have thrown an error'); @@ -223,13 +247,38 @@ describe('Main tests', () => { }); it('workLoop throws error when storeEvents() fails', async () => { - sinon.stub(BaseWorker.prototype, 'work').resolves([1, 2, 3]); - sinon.stub(Main.prototype, 'updateMetrics').returns(null); - sinon.stub(Exporter.prototype, 'storeEvents').rejects(new Error('storeEvents failed')); + class MockWorker extends BaseWorker { + constructor() { + super({}); + } + + work() { + return Promise.resolve([{}]) + } + getLastProcessedPostion() { + return {} + } + } + + class MockKafkaStorage extends KafkaStorage { + constructor() { + super('not_used', false, 'not_used') + } + + async storeEvents() { + throw new Error('storeEvents failed'); + } + } + + sandbox.stub(BaseWorker.prototype, 'work').resolves([1, 2, 3]); + sandbox.stub(Main.prototype, 'updateMetrics').returns(null); + sandbox.stub(KafkaStorage.prototype, 'storeEvents').rejects(new Error('storeEvents failed')); const mainInstance = new Main(); - sinon.stub(mainInstance, 'worker').value(new BaseWorker(constants)); - sinon.stub(mainInstance, 'exporter').value(new Exporter('test-exporter', true, 'topic-not-used')); + sandbox.stub(mainInstance, 'worker').value(new MockWorker()); + sandbox.stub(mainInstance, 'kafkaStorage').value(new MockKafkaStorage()); + sandbox.stub(mainInstance, 'mergedConstants').value({ WRITE_SIGNAL_RECORDS_KAFKA: false }); + try { await mainInstance.workLoop(); assert.fail('workLoop should have thrown an error'); @@ -244,14 +293,16 @@ describe('Main tests', () => { }); it('workLoop throws error when savePosition() fails', async () => { - sinon.stub(BaseWorker.prototype, 'work').resolves([1, 2, 3]); - sinon.stub(Main.prototype, 'updateMetrics').returns(null); - sinon.stub(Exporter.prototype, 'storeEvents').resolves(); - sinon.stub(Exporter.prototype, 'savePosition').rejects(new Error('savePosition failed')); + sandbox.stub(BaseWorker.prototype, 'work').resolves([1, 2, 3]); + sandbox.stub(Main.prototype, 'updateMetrics').returns(null); + sandbox.stub(KafkaStorage.prototype, 'storeEvents').resolves(); + sandbox.stub(ZookeeperState.prototype, 'savePosition').rejects(new Error('savePosition failed')); const mainInstance = new Main(); - sinon.stub(mainInstance, 'worker').value(new BaseWorker(constants)); - sinon.stub(mainInstance, 'exporter').value(new Exporter('test-exporter', true, 'topic-not-used')); + sandbox.stub(mainInstance, 'worker').value(new BaseWorker(constants)); + sandbox.stub(mainInstance, 'kafkaStorage').value(new KafkaStorage('test-exporter', true, 'topic-not-used')); + sandbox.stub(mainInstance, 'zookeeperState').value(new ZookeeperState('test-exporter', 'topic-not-used')); + sandbox.stub(mainInstance, 'mergedConstants').value({ WRITE_SIGNAL_RECORDS_KAFKA: false }); try { await mainInstance.workLoop(); @@ -269,12 +320,18 @@ describe('Main tests', () => { describe('main function', () => { + let sandbox: any = null; + + beforeEach(() => { + sandbox = sinon.createSandbox(); + }); + afterEach(() => { - sinon.restore(); + sandbox.restore(); }); it('main function throws error when initialization fails', async () => { - sinon.stub(Main.prototype, 'init').rejects(new Error('Main init failed')); + sandbox.stub(Main.prototype, 'init').rejects(new Error('Main init failed')); try { await main(); @@ -290,8 +347,8 @@ describe('main function', () => { }); it('main function throws error when workLoop fails', async () => { - sinon.stub(Main.prototype, 'init').resolves(); - sinon.stub(Main.prototype, 'workLoop').rejects(new Error('Main workLoop failed')); + sandbox.stub(Main.prototype, 'init').resolves(); + sandbox.stub(Main.prototype, 'workLoop').rejects(new Error('Main workLoop failed')); try { await main(); @@ -307,9 +364,9 @@ describe('main function', () => { }); it('main function throws error when disconnecting fails', async () => { - sinon.stub(Main.prototype, 'init').resolves(); - sinon.stub(Main.prototype, 'workLoop').resolves(); - sinon.stub(Main.prototype, 'disconnect').rejects(new Error('Main disconnect failed')); + sandbox.stub(Main.prototype, 'init').resolves(); + sandbox.stub(Main.prototype, 'workLoop').resolves(); + sandbox.stub(Main.prototype, 'disconnect').rejects(new Error('Main disconnect failed')); try { await main(); @@ -325,16 +382,14 @@ describe('main function', () => { }); it('main function works', async () => { - const initStub = sinon.stub(Main.prototype, 'init').resolves(); - const workLoopStub = sinon.stub(Main.prototype, 'workLoop').resolves(); - const disconnectStub = sinon.stub(Main.prototype, 'disconnect').resolves(); + const initStub = sandbox.stub(Main.prototype, 'init').resolves(); + const workLoopStub = sandbox.stub(Main.prototype, 'workLoop').resolves(); + const disconnectStub = sandbox.stub(Main.prototype, 'disconnect').resolves(); await main(); - sinon.assert.calledOnce(initStub); - sinon.assert.calledOnce(workLoopStub); - sinon.assert.calledOnce(disconnectStub); + sandbox.assert.calledOnce(initStub); + sandbox.assert.calledOnce(workLoopStub); + sandbox.assert.calledOnce(disconnectStub); }); -} - -); +}); diff --git a/src/test/receipts/helper.spec.ts b/src/test/receipts/helper.spec.ts index 86709667..9f0a93d6 100644 --- a/src/test/receipts/helper.spec.ts +++ b/src/test/receipts/helper.spec.ts @@ -1,77 +1,10 @@ import assert from 'assert'; -import helper from '../../blockchains/receipts/lib/helper'; +import { decodeReceipt } from '../../blockchains/eth/lib/helper_receipts'; import { Web3Interface, constructWeb3WrapperNoCredentials } from '../../blockchains/eth/lib/web3_wrapper'; import { NODE_URL } from '../../blockchains/eth/lib/constants'; const web3Wrapper: Web3Interface = constructWeb3WrapperNoCredentials(NODE_URL); -describe('blocks parsing', () => { - it('parses blocks', () => { - const responses = [ - { - jsonrpc: '2.0', - result: { timestamp: '0x56c097f1', number: '0xf53d5' } - }, - { - jsonrpc: '2.0', - result: { timestamp: '0x56c097f4', number: '0xf5408' } - } - ]; - - const result = helper.parseBlocks(responses); - assert.deepStrictEqual(result, [ - { timestamp: '0x56c097f1', number: '0xf53d5' }, - { timestamp: '0x56c097f4', number: '0xf5408' } - ]); - }); -}); - -describe('receipt parsing', () => { - it('parses receipts', () => { - const responses = [ - { - jsonrpc: '2.0', - result: [ - { - blockHash: '0x209bc40be9e6961d88435382b91754b7a6e180d6cbf9120a61246e1d2506f3a6', - blockNumber: '0xf4fbb', - contractAddress: null, - cumulativeGasUsed: '0x5208', - from: '0x2a65aca4d5fc5b5c859090a6c34d164135398226', - gasUsed: '0x5208', - logs: [], - logsBloom: '0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000', - root: '0x1806fd9f2ef8bf8dce03665b4c80a1740efe4194f90864c662e7af6a80a02a08', - to: '0xe33977e292ccef99ea8828733e97562f3690a8ad', - transactionHash: '0x88217032c83348c7aae522090d7a5b932609860a5f6760e98e9048f6ddc55ad8', - transactionIndex: '0x0' - } - ] - } - ]; - - const result = helper.parseReceipts(responses); - assert.deepStrictEqual(result, - [ - { - blockHash: '0x209bc40be9e6961d88435382b91754b7a6e180d6cbf9120a61246e1d2506f3a6', - blockNumber: '0xf4fbb', - contractAddress: null, - cumulativeGasUsed: '0x5208', - from: '0x2a65aca4d5fc5b5c859090a6c34d164135398226', - gasUsed: '0x5208', - logs: [], - logsBloom: '0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000', - root: '0x1806fd9f2ef8bf8dce03665b4c80a1740efe4194f90864c662e7af6a80a02a08', - to: '0xe33977e292ccef99ea8828733e97562f3690a8ad', - transactionHash: '0x88217032c83348c7aae522090d7a5b932609860a5f6760e98e9048f6ddc55ad8', - transactionIndex: '0x0' - } - ] - ); - }); -}); - context('receipt without logs', () => { const receipt = { @@ -91,7 +24,7 @@ context('receipt without logs', () => { describe('receipt decoding', () => { it('converts blockNumber from hex to number', () => { - const result = helper.decodeReceipt(receipt, web3Wrapper); + const result = decodeReceipt(receipt, web3Wrapper); const expected = { blockHash: '0x209bc40be9e6961d88435382b91754b7a6e180d6cbf9120a61246e1d2506f3a6', @@ -171,19 +104,11 @@ context('receipt with logs', () => { transactionIndex: 4 }; - const result = helper.decodeReceipt(receipt, web3Wrapper); + const result = decodeReceipt(receipt, web3Wrapper); assert.deepStrictEqual(result, expected); }); }); }); -describe('setting reciept\'s timestamp', () => { - it('sets receipt\'s timestamp', async () => { - const receipt = { blockNumber: 1004250 }; - const timestamps = { '1004250': 1455576747 }; - const result = await helper.setReceiptsTimestamp([receipt], timestamps); - assert.deepStrictEqual(result, [{ blockNumber: 1004250, timestamp: 1455576747 }]); - }); -});