diff --git a/blockchains/traces/README.md b/blockchains/traces/README.md new file mode 100644 index 00000000..e570270f --- /dev/null +++ b/blockchains/traces/README.md @@ -0,0 +1,21 @@ +# ETH Traces Exporter + +A small service that exports all ETH Traces from the Ethereum blockchain to a Kafka topic. It is written in javascript and uses ![web3.js](https://github.com/ethereum/web3.js/) library which implements the Ethereum JSON RPC spec. + +The script is exporting block traces and reorganizing them before they are pushed to Kafka topic. + +## Running the service + +To test that the exporter is running correctly use: + +```bash +$ ./bin/run_traces.sh +``` + +## Running the tests + +The 'traces' tests are discovered by Mocha test framework. To run all tests run: + +```bash +$ ./bin/test.sh +``` diff --git a/blockchains/traces/lib/constants.js b/blockchains/traces/lib/constants.js new file mode 100644 index 00000000..fb527462 --- /dev/null +++ b/blockchains/traces/lib/constants.js @@ -0,0 +1,11 @@ +const BLOCK_INTERVAL = parseInt(process.env.BLOCK_INTERVAL || '50'); +const CONFIRMATIONS = parseInt(process.env.CONFIRMATIONS || '3'); +const NODE_URL = process.env.NODE_URL || process.env.PARITY_URL || 'http://localhost:8545/'; +const LOOP_INTERVAL_CURRENT_MODE_SEC = parseInt(process.env.LOOP_INTERVAL_CURRENT_MODE_SEC || '30'); + +module.exports = { + BLOCK_INTERVAL, + CONFIRMATIONS, + NODE_URL, + LOOP_INTERVAL_CURRENT_MODE_SEC, +}; diff --git a/blockchains/traces/traces_worker.js b/blockchains/traces/traces_worker.js new file mode 100644 index 00000000..8231d376 --- /dev/null +++ b/blockchains/traces/traces_worker.js @@ -0,0 +1,91 @@ +const Web3 = require('web3'); +const jayson = require('jayson/promise'); +const constants = require('./lib/constants'); +const { logger } = require('../../lib/logger'); +const { transactionOrder, stableSort } = require('../eth/lib/util'); +const BaseWorker = require('../../lib/worker_base'); +const Web3Wrapper = require('../eth/lib/web3_wrapper'); + + +class TRACEWorker extends BaseWorker { + constructor() { + super(); + + logger.info(`Connecting to Ethereum node ${constants.NODE_URL}`); + this.web3 = new Web3(new Web3.providers.HttpProvider(constants.NODE_URL)); + this.web3Wrapper = new Web3Wrapper(this.web3); + this.ethClient = jayson.client.http(constants.NODE_URL); + } + + async fetchTraces(fromBlock, toBlock) { + const blockRequests = []; + for (let i = fromBlock; i <= toBlock; i++) { + blockRequests.push( + this.ethClient.request( + 'trace_block', + [this.web3Wrapper.parseNumberToHex(i)], + undefined, + false + ) + ); + } + const responses = await this.ethClient.request(blockRequests); + const results = responses.map((response) => response['result']) + const traces = [] + for (const blockTraces of results) { + for (const trace of blockTraces) { + traces.push(trace); + } + } + return traces; + } + + async work() { + if (this.lastConfirmedBlock === this.lastExportedBlock) { + // We are up to date with the blockchain (aka 'current mode'). Sleep longer after finishing this loop. + this.sleepTimeMsec = constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000; + + // On the previous cycle we closed the gap to the head of the blockchain. + // Check if there are new blocks now. + const newConfirmedBlock = await this.web3.eth.getBlockNumber() - constants.CONFIRMATIONS; + if (newConfirmedBlock === this.lastConfirmedBlock) { + // The Node has not progressed + return []; + } + this.lastConfirmedBlock = newConfirmedBlock; + } + else { + // We are still catching with the blockchain (aka 'historic mode'). Do not sleep after this loop. + this.sleepTimeMsec = 0; + } + + const toBlock = Math.min(this.lastExportedBlock + constants.BLOCK_INTERVAL, this.lastConfirmedBlock); + const fromBlock = this.lastExportedBlock + 1; + + logger.info(`Fetching traces for interval ${fromBlock}:${toBlock}`); + const traces = await this.fetchTraces(fromBlock, toBlock); + + if (traces.length > 0) { + stableSort(traces, transactionOrder); + for (let i = 0; i < traces.length; i++) { + traces[i].primaryKey = this.lastPrimaryKey + i + 1; + } + + this.lastPrimaryKey += traces.length; + } + + this.lastExportTime = Date.now(); + this.lastExportedBlock = toBlock; + + return traces; + } + + async init() { + this.lastConfirmedBlock = await this.web3.eth.getBlockNumber() - constants.CONFIRMATIONS; + } + +} + +module.exports = { + worker: TRACEWorker +};