Skip to content

Commit

Permalink
add "blocks" params
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisCarriere committed Aug 20, 2024
1 parent 606a2d4 commit 7ee9ff9
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 44 deletions.
15 changes: 15 additions & 0 deletions blocks/evm/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ gui:
sql-setup:
# EVM blocks
substreams-sink-sql setup clickhouse://default:default@localhost:9000/eth substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/eth substreams.yaml --cursors-table cursors1
substreams-sink-sql setup clickhouse://default:default@localhost:9000/eth substreams.yaml --cursors-table cursors2
substreams-sink-sql setup clickhouse://default:default@localhost:9000/eth substreams.yaml --cursors-table cursors3
substreams-sink-sql setup clickhouse://default:default@localhost:9000/base substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/bsc substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/polygon substreams.yaml
Expand Down Expand Up @@ -66,6 +69,18 @@ sql-setup:
sql-run-eth:
substreams-sink-sql run clickhouse://default:default@localhost:9000/eth substreams.yaml -e eth.substreams.pinax.network:443 20444295:20451460 --final-blocks-only --undo-buffer-size 1 --on-module-hash-mistmatch=warn --batch-block-flush-interval 100 --development-mode

.PHONY: sql-run-eth1
sql-run-eth1:
substreams-sink-sql run clickhouse://default:default@localhost:9000/eth substreams.yaml -e eth.substreams.pinax.network:443 1:14164145 --final-blocks-only --undo-buffer-size 1 --on-module-hash-mistmatch=warn --batch-block-flush-interval 100 --cursors-table cursors1 --params ch_out=blocks

.PHONY: sql-run-eth2
sql-run-eth2:
substreams-sink-sql run clickhouse://default:default@localhost:9000/eth substreams.yaml -e eth.substreams.pinax.network:443 14164145:17865817 --final-blocks-only --undo-buffer-size 1 --on-module-hash-mistmatch=warn --batch-block-flush-interval 100 --cursors-table cursors2 --params ch_out=blocks

.PHONY: sql-run-eth3
sql-run-eth3:
substreams-sink-sql run clickhouse://default:default@localhost:9000/eth substreams.yaml -e eth.substreams.pinax.network:443 17865817:20543112 --final-blocks-only --undo-buffer-size 1 --on-module-hash-mistmatch=warn --batch-block-flush-interval 100 --cursors-table cursors3 --params ch_out=blocks

.PHONY: sql-run-base
sql-run-base:
substreams-sink-sql run clickhouse://default:default@localhost:9000/base substreams.yaml -e base.substreams.pinax.network:443 17926927:17970127 --final-blocks-only --undo-buffer-size 1 --on-module-hash-mistmatch=warn --batch-block-flush-interval 100 --development-mode
Expand Down
10 changes: 5 additions & 5 deletions blocks/evm/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ CREATE TABLE IF NOT EXISTS blocks
base_fee_per_gas String DEFAULT '' COMMENT 'EIP-1559 (London Fork)',
blob_gas_used String DEFAULT '' COMMENT 'EIP-4844 (Dencun Fork)',
excess_blob_gas String DEFAULT '' COMMENT 'EIP-4844 (Dencun Fork)',
total_transactions UInt64,
successful_transactions UInt64,
failed_transactions UInt64,
total_balance_changes UInt64,
total_withdrawals UInt64,
total_transactions UInt64 DEFAULT 0,
successful_transactions UInt64 DEFAULT 0,
failed_transactions UInt64 DEFAULT 0,
total_balance_changes UInt64 DEFAULT 0,
total_withdrawals UInt64 DEFAULT 0,

-- detail level --
detail_level LowCardinality(String),
Expand Down
6 changes: 3 additions & 3 deletions blocks/evm/src/balance_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ pub fn balance_change_reason_to_string(reason: i32) -> String {
// DetailLevel: EXTENDED
pub fn insert_balance_change_row(row: &mut TableChange, balance_change: &BalanceChange) {
let address = bytes_to_hex(&balance_change.address);
let new_balance = optional_bigint_to_string(balance_change.new_value.clone(), "0");
let old_balance: String = optional_bigint_to_string(balance_change.old_value.clone(), "0");
let new_balance = optional_bigint_to_string(&balance_change.new_value.clone(), "0");
let old_balance = optional_bigint_to_string(&balance_change.old_value.clone(), "0");
let amount = optional_bigint_to_decimal(balance_change.new_value.clone()) - optional_bigint_to_decimal(balance_change.old_value.clone());
let ordinal = balance_change.ordinal;
let reason_code = balance_change.reason;
Expand Down Expand Up @@ -73,4 +73,4 @@ pub fn insert_balance_change_counts(row: &mut TableChange, all_balance_changes_r
}
row.change("total_balance_changes", ("", total_balance_changes.to_string().as_str()))
.change("total_withdrawals", ("", total_withdrawals.to_string().as_str()));
}
}
27 changes: 20 additions & 7 deletions blocks/evm/src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use substreams_ethereum::pb::eth::v2::Block;
use crate::balance_changes::{insert_balance_change, insert_balance_change_counts};
use crate::code_changes::insert_code_change;
use crate::traces::insert_system_trace;
use crate::transactions::insert_transaction;

pub fn block_detail_to_string(detail_level: i32) -> String {
match detail_level {
Expand All @@ -20,7 +21,7 @@ pub fn block_detail_to_string(detail_level: i32) -> String {

// https://github.com/streamingfast/firehose-ethereum/blob/develop/proto/sf/ethereum/type/v2/type.proto
// DetailLevel: BASE
pub fn insert_blocks(tables: &mut DatabaseChanges, clock: &Clock, block: &Block) {
pub fn insert_blocks(params: &String, tables: &mut DatabaseChanges, clock: &Clock, block: &Block) {
let header = block.header.clone().unwrap_or_default();
let parent_hash = bytes_to_hex(&header.parent_hash);
let nonce = header.nonce;
Expand All @@ -33,11 +34,11 @@ pub fn insert_blocks(tables: &mut DatabaseChanges, clock: &Clock, block: &Block)
let size = block.size;
let mix_hash = bytes_to_hex(&header.mix_hash);
let extra_data = bytes_to_hex(&header.extra_data.clone());
let extra_data_utf8 = String::from_utf8(header.extra_data).unwrap_or_default();
let extra_data_utf8 = String::from_utf8(header.extra_data.clone()).unwrap_or_default();
let gas_limit = header.gas_limit;
let gas_used = header.gas_used;
let difficulty = optional_bigint_to_string(header.difficulty, "0"); // UInt64
let total_difficulty = optional_bigint_to_string(header.total_difficulty.clone(), "0"); // UInt256
let difficulty = optional_bigint_to_string(&header.difficulty, "0"); // UInt64
let total_difficulty = optional_bigint_to_string(&header.total_difficulty.clone(), "0"); // UInt256

// block detail levels
// https://streamingfastio.medium.com/new-block-model-to-accelerate-chain-integration-9f65126e5425
Expand All @@ -47,9 +48,9 @@ pub fn insert_blocks(tables: &mut DatabaseChanges, clock: &Clock, block: &Block)
// forks
let withdrawals_root = bytes_to_hex(&header.withdrawals_root); // EIP-4895 (Shangai Fork)
let parent_beacon_root = bytes_to_hex(&header.parent_beacon_root); // EIP-4788 (Dencun Fork)
let base_fee_per_gas = optional_bigint_to_string(header.base_fee_per_gas, ""); // UInt256 - EIP-1559 (London Fork)
let excess_blob_gas = optional_u64_to_string(header.excess_blob_gas, ""); // UInt64 - EIP-4844 (Dencun Fork)
let blob_gas_used = optional_u64_to_string(header.blob_gas_used, ""); // UInt64 - EIP-4844 (Dencun Fork)
let base_fee_per_gas = optional_bigint_to_string(&header.base_fee_per_gas, ""); // UInt256 - EIP-1559 (London Fork)
let excess_blob_gas = optional_u64_to_string(&header.excess_blob_gas, ""); // UInt64 - EIP-4844 (Dencun Fork)
let blob_gas_used = optional_u64_to_string(&header.blob_gas_used, ""); // UInt64 - EIP-4844 (Dencun Fork)

// blocks
let keys = blocks_keys(&clock, true);
Expand Down Expand Up @@ -87,6 +88,14 @@ pub fn insert_blocks(tables: &mut DatabaseChanges, clock: &Clock, block: &Block)

insert_timestamp(row, clock, true);

// skip the rest if blocks is the only requested table
// designed for high throughput to calculate total block size of the entire chain
if params == "blocks" {
insert_transaction_counts(row, vec!{});
insert_balance_change_counts(row, vec!{});
return;
}

// transaction status counts
let all_transaction_status: Vec<i32> = block.transaction_traces.iter().map(|transaction| transaction.status).collect();
insert_transaction_counts(row, all_transaction_status);
Expand All @@ -107,4 +116,8 @@ pub fn insert_blocks(tables: &mut DatabaseChanges, clock: &Clock, block: &Block)
for balance_change in block.balance_changes.iter() {
insert_balance_change(tables, clock, balance_change);
}
// TABLE::transactions
for transaction in block.transaction_traces.iter() {
insert_transaction( tables, clock, &transaction, &header, &detail_level);
}
}
8 changes: 2 additions & 6 deletions blocks/evm/src/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@ use substreams_database_change::pb::database::DatabaseChanges;
use substreams_ethereum::pb::eth::v2::Block;

use crate::blocks::insert_blocks;
use crate::transactions::insert_transactions;

#[substreams::handlers::map]
pub fn ch_out(clock: Clock, block: Block) -> Result<DatabaseChanges, Error> {
pub fn ch_out(params: String, clock: Clock, block: Block) -> Result<DatabaseChanges, Error> {
let mut tables: DatabaseChanges = DatabaseChanges::default();

// TABLE::blocks
insert_blocks(&mut tables, &clock, &block);

// TABLE::transactions
insert_transactions(&mut tables, &clock, &block);
insert_blocks(&params, &mut tables, &clock, &block);

Ok(tables)
}
Expand Down
2 changes: 1 addition & 1 deletion blocks/evm/src/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub fn insert_trace_row(row: &mut TableChange, call: &Call) {
let status_failed = call.status_failed;
let status_reverted = call.status_reverted;
let suicide = call.suicide; // or `selfdestruct`?
let value = optional_bigint_to_string(call.value.clone(), "0"); // UInt256
let value = optional_bigint_to_string(&call.value, "0"); // UInt256

// not available in system traces
let failure_reason = &call.failure_reason;
Expand Down
28 changes: 10 additions & 18 deletions blocks/evm/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use common::utils::optional_bigint_to_string;
use substreams::pb::substreams::Clock;
use substreams_database_change::pb::database::TableChange;
use substreams_database_change::pb::database::{table_change, DatabaseChanges};
use substreams_ethereum::pb::eth::v2::Block;
use substreams_ethereum::pb::eth::v2::BlockHeader;
use substreams_ethereum::pb::eth::v2::TransactionTrace;

use crate::logs::insert_log;
Expand Down Expand Up @@ -43,32 +43,26 @@ pub fn is_transaction_success(status: i32) -> bool {
status == 1
}

pub fn insert_transactions(tables: &mut DatabaseChanges, clock: &Clock, block: &Block) {
for transaction in block.transaction_traces.iter() {
insert_transaction(tables, clock, &transaction, &block);
}
}

// https://github.com/streamingfast/firehose-ethereum/blob/1bcb32a8eb3e43347972b6b5c9b1fcc4a08c751e/proto/sf/ethereum/type/v2/type.proto#L658
// DetailLevel: BASE & EXTENDED
pub fn insert_transaction(tables: &mut DatabaseChanges, clock: &Clock, transaction: &TransactionTrace, block: &Block) {
pub fn insert_transaction(tables: &mut DatabaseChanges, clock: &Clock, transaction: &TransactionTrace, block_header: &BlockHeader, detail_level: &String) {
let index = transaction.index;
let hash = bytes_to_hex(&transaction.hash);
let from = bytes_to_hex(&transaction.from); // EVM Address
let to = bytes_to_hex(&transaction.to); // EVM Address
let nonce = transaction.nonce;
let gas_used = transaction.gas_used; // TO-DO: rename to `gas`? https://github.com/pinax-network/substreams-raw-blocks/issues/1
let gas_price = optional_bigint_to_string(transaction.gas_price.clone(), "0"); // UInt256
let gas_price = optional_bigint_to_string(&transaction.gas_price, "0"); // UInt256
let gas_limit = transaction.gas_limit;
let value = optional_bigint_to_string(transaction.value.clone(), "0"); // UInt256
let value = optional_bigint_to_string(&transaction.value, "0"); // UInt256
let data = bytes_to_hex(&transaction.input); // TO-DO: change to 0x? https://github.com/pinax-network/substreams-raw-blocks/issues/1
let v = bytes_to_hex(&transaction.v);
let r = bytes_to_hex(&transaction.r);
let s = bytes_to_hex(&transaction.s);
let r#type = transaction_type_to_string(transaction.r#type);
let type_code = transaction.r#type;
let max_fee_per_gas = optional_bigint_to_string(transaction.max_fee_per_gas.clone(), "0"); // UInt256
let max_priority_fee_per_gas = optional_bigint_to_string(transaction.max_priority_fee_per_gas.clone(), "0"); // UInt256
let max_fee_per_gas = optional_bigint_to_string(&transaction.max_fee_per_gas, "0"); // UInt256
let max_priority_fee_per_gas = optional_bigint_to_string(&transaction.max_priority_fee_per_gas, "0"); // UInt256
let begin_ordinal = transaction.begin_ordinal;
let end_ordinal = transaction.end_ordinal;
let success = is_transaction_success(transaction.status);
Expand All @@ -77,16 +71,15 @@ pub fn insert_transaction(tables: &mut DatabaseChanges, clock: &Clock, transacti

// transaction receipt
let receipt = transaction.receipt.clone().unwrap();
let blob_gas_price = optional_bigint_to_string(receipt.clone().blob_gas_price, "0");
let blob_gas_price = optional_bigint_to_string(&receipt.blob_gas_price, "0");
let blob_gas_used = receipt.blob_gas_used();
let cumulative_gas_used = receipt.cumulative_gas_used;
let logs_bloom = bytes_to_hex(&receipt.logs_bloom);
let state_root = bytes_to_hex(&receipt.state_root);

// block roots
let header = block.header.clone().unwrap();
let transactions_root = bytes_to_hex(&header.transactions_root);
let receipts_root = bytes_to_hex(&header.receipt_root);
let transactions_root = bytes_to_hex(&block_header.transactions_root);
let receipts_root = bytes_to_hex(&block_header.receipt_root);

let keys = transaction_keys(&clock, &hash);
let row = tables
Expand Down Expand Up @@ -136,8 +129,7 @@ pub fn insert_transaction(tables: &mut DatabaseChanges, clock: &Clock, transacti

// TABLE::logs
// Only required DetailLevel=BASE since traces are not available in BASE
let detail_level = block.detail_level;
if detail_level == 2 {
if detail_level == "Base" {
for log in receipt.logs {
insert_log(tables, clock, &log, transaction);
}
Expand Down
6 changes: 5 additions & 1 deletion blocks/evm/substreams.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ modules:
- name: ch_out
kind: map
inputs:
- params: string
- source: sf.substreams.v1.Clock
- source: sf.ethereum.type.v2.Block
output:
Expand All @@ -33,4 +34,7 @@ sink:
schema: "./schema.sql"
engine: clickhouse
postgraphile_frontend:
enabled: false
enabled: false

params:
ch_out: '*'
6 changes: 3 additions & 3 deletions common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ pub fn bytes_to_u64(bytes: &Vec<u8>) -> u64 {
}
}

pub fn optional_bigint_to_string(value: Option<BigInt>, default: &str) -> String {
pub fn optional_bigint_to_string(value: &Option<BigInt>, default: &str) -> String {
match value {
Some(bigint) => bigint.with_decimal(0).to_string(),
Some(bigint) => bigint.clone().with_decimal(0).to_string(),
None => default.to_string(),
}
}
Expand All @@ -68,7 +68,7 @@ pub fn optional_bigint_to_hex(value: Option<BigInt>) -> String {
}
}

pub fn optional_u64_to_string(value: Option<u64>, default: &str) -> String {
pub fn optional_u64_to_string(value: &Option<u64>, default: &str) -> String {
match value {
Some(uint) => uint.to_string(),
None => default.to_string(),
Expand Down

0 comments on commit 7ee9ff9

Please sign in to comment.