Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add traces exporter #107

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions blockchains/traces/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# ETH Traces Exporter

A small service that exports all ETH Traces from the Ethereum blockchain to a Kafka topic. It is written in javascript and uses ![web3.js](https://github.com/ethereum/web3.js/) library which implements the Ethereum JSON RPC spec.

The script is exporting block traces and reorganizing them before they are pushed to Kafka topic.

## Running the service

To test that the exporter is running correctly use:

```bash
$ ./bin/run_traces.sh
```

## Running the tests

The 'traces' tests are discovered by Mocha test framework. To run all tests run:

```bash
$ ./bin/test.sh
```
11 changes: 11 additions & 0 deletions blockchains/traces/lib/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
const BLOCK_INTERVAL = parseInt(process.env.BLOCK_INTERVAL || '50');
const CONFIRMATIONS = parseInt(process.env.CONFIRMATIONS || '3');
const NODE_URL = process.env.NODE_URL || process.env.PARITY_URL || 'http://localhost:8545/';
const LOOP_INTERVAL_CURRENT_MODE_SEC = parseInt(process.env.LOOP_INTERVAL_CURRENT_MODE_SEC || '30');

module.exports = {
BLOCK_INTERVAL,
CONFIRMATIONS,
NODE_URL,
LOOP_INTERVAL_CURRENT_MODE_SEC,
};
91 changes: 91 additions & 0 deletions blockchains/traces/traces_worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
const Web3 = require('web3');
const jayson = require('jayson/promise');
const constants = require('./lib/constants');
const { logger } = require('../../lib/logger');
const { transactionOrder, stableSort } = require('../eth/lib/util');
const BaseWorker = require('../../lib/worker_base');
const Web3Wrapper = require('../eth/lib/web3_wrapper');


class TRACEWorker extends BaseWorker {
constructor() {
super();

logger.info(`Connecting to Ethereum node ${constants.NODE_URL}`);
this.web3 = new Web3(new Web3.providers.HttpProvider(constants.NODE_URL));
this.web3Wrapper = new Web3Wrapper(this.web3);
this.ethClient = jayson.client.http(constants.NODE_URL);
}

async fetchTraces(fromBlock, toBlock) {
const blockRequests = [];
for (let i = fromBlock; i <= toBlock; i++) {
blockRequests.push(
this.ethClient.request(
'trace_block',
[this.web3Wrapper.parseNumberToHex(i)],
undefined,
false
)
);
}
const responses = await this.ethClient.request(blockRequests);
const results = responses.map((response) => response['result'])
const traces = []
for (const blockTraces of results) {
for (const trace of blockTraces) {
traces.push(trace);
}
}
return traces;
}

async work() {
if (this.lastConfirmedBlock === this.lastExportedBlock) {
// We are up to date with the blockchain (aka 'current mode'). Sleep longer after finishing this loop.
this.sleepTimeMsec = constants.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000;

// On the previous cycle we closed the gap to the head of the blockchain.
// Check if there are new blocks now.
const newConfirmedBlock = await this.web3.eth.getBlockNumber() - constants.CONFIRMATIONS;
if (newConfirmedBlock === this.lastConfirmedBlock) {
// The Node has not progressed
return [];
}
this.lastConfirmedBlock = newConfirmedBlock;
}
else {
// We are still catching with the blockchain (aka 'historic mode'). Do not sleep after this loop.
this.sleepTimeMsec = 0;
}

const toBlock = Math.min(this.lastExportedBlock + constants.BLOCK_INTERVAL, this.lastConfirmedBlock);
const fromBlock = this.lastExportedBlock + 1;

logger.info(`Fetching traces for interval ${fromBlock}:${toBlock}`);
const traces = await this.fetchTraces(fromBlock, toBlock);

if (traces.length > 0) {
stableSort(traces, transactionOrder);
for (let i = 0; i < traces.length; i++) {
traces[i].primaryKey = this.lastPrimaryKey + i + 1;
}

this.lastPrimaryKey += traces.length;
}

this.lastExportTime = Date.now();
this.lastExportedBlock = toBlock;

return traces;
}

async init() {
this.lastConfirmedBlock = await this.web3.eth.getBlockNumber() - constants.CONFIRMATIONS;
}

}

module.exports = {
worker: TRACEWorker
};