Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate receipts extraction into ETHWorker #201

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

YordanPavlov
Copy link
Contributor

This PR integrates the receipts extraction into ETHWorker. To achieve this it does the following steps:

  1. Break the Exporter class into its two main functions, KafkaStorage and ZookeeperState.
  2. Support multiple KafkaStorage instances per running exporter, each mode of operation is associated with one KafkaStorage.
  3. Integrate the ReceiptsWorker into the ETHWorker classes. The latter already extracts receipts, it just needs to format them in the proper format and fill the receipts output topic.

export EXPORT_TIMEOUT_MLS=300000
export CONTRACT_MODE="extract_exact_overwrite"
export BLOCKCHAIN="eth"
export KAFKA_TOPIC="erc20_exporter_test_topic"
#export KAFKA_TOPIC="erc20_exporter_test_topic"
export KAFKA_TOPIC='native_token_transfers:erc20_exporter_test_topic'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides a single topic, we now support multiple ones. The format is comma separate mode:topic_name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is kind of ugly. I believe we can use several arguments instead of packing multiple values inside a single one.
Can we so something like this?

KAFKA_TOPIC_ERC20 = ...
KAFKA_TOPIC_TRANSFER = ...

@@ -22,8 +21,6 @@ export function constructWorker(blockchain: string, settings: any): BaseWorker {
return new ETHBlocksWorker(settings);
case 'matic':
return new MaticWorker(settings);
case 'receipts':
return new ReceiptsWorker(settings);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate worker is no longer needed. We would use the ETHWorker to extract receipts, we can do it like this:

KAFKA_TOPIC='native_token_transfers:eth_transfers_test_topic,receipts:receipts_test_topic'

If we want to extract transfers and receipts at once. If for some reason we want to have only receipts deploy, we can do:

KAFKA_TOPIC='receipts:receipts_test_topic'

@@ -59,7 +59,7 @@ export class ERC20Worker extends BaseWorker {
this.allOldContracts = [];
}

async init(exporter?: Exporter) {
async init(storage: KafkaStorage | Map<string, KafkaStorage>) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Workers receive information for which output Kafka topics need to be filled and produce the needed data.

fetchReceipts(this.ethClient, this.web3Wrapper,
this.settings.RECEIPTS_API_METHOD, fromBlock, toBlock),
]);
async fetchData(fromBlock: number, toBlock: number): Promise<[Trace[], Map<number, ETHBlock>, ETHReceipt[]]> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We fetch raw data depending on what outputs are needed. Some basic raw data like block times is needed for any output.

@@ -2,11 +2,12 @@ export KAFKA_URL=kafka-hz.stage.san:30911
export ZOOKEEPER_URL=zookeeper-hz.stage.san:30921
export NODE_URL=https://ethereum.santiment.net
export START_BLOCK="15676731"
export BLOCK_INTERVAL="50"
export BLOCK_INTERVAL="5"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a change required for this PR?

export EXPORT_TIMEOUT_MLS=300000
export CONTRACT_MODE="extract_exact_overwrite"
export BLOCKCHAIN="eth"
export KAFKA_TOPIC="erc20_exporter_test_topic"
#export KAFKA_TOPIC="erc20_exporter_test_topic"
export KAFKA_TOPIC='native_token_transfers:erc20_exporter_test_topic'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is kind of ugly. I believe we can use several arguments instead of packing multiple values inside a single one.
Can we so something like this?

KAFKA_TOPIC_ERC20 = ...
KAFKA_TOPIC_TRANSFER = ...

receipts, endOfBlockEvents);
}
if (this.modes.includes(this.settings.RECEIPTS_MODE)) {
result[this.settings.RECEIPTS_MODE] = this.getReceiptsOutput(blocks, receipts);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not really modular design because each module has a footprint in the eth_worker file.
imo here we should have something like this:

foreach(module: modules) {
  result[module] = module.extract(rawData)
}


import { Web3Interface } from './web3_wrapper';

const decodeLog = (log: any, web3Wrapper: Web3Interface) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please define a return types for this functions?

}
}
else if (typeof workResult === 'object') {
if (!(this.kafkaStorage instanceof Map)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it better to always use Map and checking for amount of keys defined than trying to dynamically resolve the type?

return this.kafkaStorage.isConnected();
}
else if (this.kafkaStorage instanceof Map) {
return Array.from(this.kafkaStorage.values()).every(storage => storage.isConnected());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. we maintain a separate connection per mode but should we? any benefits?

send(response, 500, err.toString());
});
if (mainInstance.healthcheck()) {
return send(response, 200, 'ok');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that this is out of scope for this PR but could we later change this to something more advanced than a simple 200 all the time? Can we measure throughput, ensure we connected to kafka and ZK for example?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants