Skip to content

Commit

Permalink
change to clickhouse out
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisCarriere committed Jul 25, 2024
1 parent 9f2ae7f commit 97f2f3b
Show file tree
Hide file tree
Showing 11 changed files with 265 additions and 234 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ version = "0.1.0"

[workspace.dependencies]
substreams = "0.5"
substreams-entity-change = "1.3"
substreams-database-change = "1.3"
substreams-ethereum = "0.9"
# substreams-antelope = "0.4"
# substreams-bitcoin = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion blocks/evm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ crate-type = ["cdylib"]
[dependencies]
common = { path = "../../common" }
substreams-ethereum = { workspace = true }
substreams-entity-change = { workspace = true }
substreams-database-change = { workspace = true }
substreams = { workspace = true }
76 changes: 42 additions & 34 deletions blocks/evm/src/blocks.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,44 @@
use common::{block_time_to_date, bytes_to_hex};
use common::keys::block_keys;
use common::utils::bytes_to_hex;
use common::sinks::insert_timestamp;
use substreams::pb::substreams::Clock;
use substreams_entity_change::tables::Tables;
use substreams_database_change::pb::database::{table_change, DatabaseChanges};
use substreams_ethereum::pb::eth::v2::Block;

pub fn insert_blocks(tables: &mut Tables, clock: &Clock, block: &Block) {
pub fn insert_blocks(tables: &mut DatabaseChanges, clock: &Clock, block: &Block) {
let header = block.clone().header.unwrap();
let timestamp = clock.clone().timestamp.unwrap();
let block_time = timestamp.to_string();
let block_number = clock.number.to_string();
let block_hash = format!("0x{}", clock.id);
let block_date = block_time_to_date(block_time.as_str());
let parent_hash = bytes_to_hex(header.parent_hash);
let nonce = header.nonce.to_string();
let ommers_hash = bytes_to_hex(header.uncle_hash);
let logs_bloom = bytes_to_hex(header.logs_bloom);
let transactions_root = bytes_to_hex(header.transactions_root);
let state_root = bytes_to_hex(header.state_root);
let receipts_root = bytes_to_hex(header.receipt_root);
let miner = bytes_to_hex(header.coinbase);
let size = block.size.to_string();
let mix_hash = bytes_to_hex(header.mix_hash);
let extra_data = bytes_to_hex(header.extra_data);
let gas_limit = header.gas_limit.to_string();
let gas_used = header.gas_used.to_string();

// blocks
let row = tables
.create_row("blocks", &block_hash)
.set("time", &block_time)
.set_bigint("number", &block_number)
.set("date", &block_date)
.set("hash", &block_hash)
.set("parent_hash", bytes_to_hex(header.parent_hash))
.set_bigint("nonce", &header.nonce.to_string())
.set("ommers_hash", bytes_to_hex(header.uncle_hash))
.set("logs_bloom", bytes_to_hex(header.logs_bloom))
.set("transactions_root", bytes_to_hex(header.transactions_root))
.set("state_root", bytes_to_hex(header.state_root))
.set("receipts_root", bytes_to_hex(header.receipt_root))
.set("miner", bytes_to_hex(header.coinbase))
.set_bigint("size", &block.size.to_string())
.set("mix_hash", bytes_to_hex(header.mix_hash))
.set("extra_data", bytes_to_hex(header.extra_data))
.set_bigint("gas_limit", &header.gas_limit.to_string())
.set_bigint("gas_used", &header.gas_used.to_string());
.push_change_composite("blocks", block_keys(&clock), 0, table_change::Operation::Create)
.change("parent_hash", ("", parent_hash.as_str()))
.change("nonce", ("", nonce.as_str()))
.change("ommers_hash", ("", ommers_hash.as_str()))
.change("logs_bloom", ("", logs_bloom.as_str()))
.change("transactions_root", ("", transactions_root.as_str()))
.change("state_root", ("", state_root.as_str()))
.change("receipts_root", ("", receipts_root.as_str()))
.change("miner", ("", miner.as_str()))
.change("size", ("", size.as_str()))
.change("mix_hash", ("", mix_hash.as_str()))
.change("extra_data", ("", extra_data.as_str()))
.change("gas_limit", ("", gas_limit.as_str()))
.change("gas_used", ("", gas_used.as_str()));

insert_timestamp(row, clock, true);

let mut total_transactions = 0;
let mut successful_transactions = 0;
Expand All @@ -43,29 +51,29 @@ pub fn insert_blocks(tables: &mut Tables, clock: &Clock, block: &Block) {
}
total_transactions += 1;
}
row.set_bigint("total_transactions", &total_transactions.to_string())
.set_bigint("successful_transactions", &successful_transactions.to_string())
.set_bigint("failed_transactions", &failed_transactions.to_string());
row.change("total_transactions", ("", total_transactions.to_string().as_str()))
.change("successful_transactions", ("", successful_transactions.to_string().as_str()))
.change("failed_transactions", ("", failed_transactions.to_string().as_str()));

// optional fields
match header.difficulty {
Some(difficulty) => row.set_bigint("difficulty", &difficulty.with_decimal(0).to_string()),
Some(difficulty) => row.change("difficulty", ("", difficulty.with_decimal(0).to_string().as_str())),
None => row,
};
match header.total_difficulty {
Some(total_difficulty) => row.set_bigint("total_difficulty", &total_difficulty.with_decimal(0).to_string()),
Some(total_difficulty) => row.change("total_difficulty", ("", total_difficulty.with_decimal(0).to_string().as_str())),
None => row,
};
match header.blob_gas_used {
Some(blob_gas_used) => row.set_bigint("blob_gas_used", &blob_gas_used.to_string()),
Some(blob_gas_used) => row.change("blob_gas_used", ("", blob_gas_used.to_string().as_str())),
None => row,
};
match header.base_fee_per_gas {
Some(base_fee_per_gas) => row.set_bigint("base_fee_per_gas", &base_fee_per_gas.with_decimal(0).to_string()),
Some(base_fee_per_gas) => row.change("base_fee_per_gas", ("", base_fee_per_gas.with_decimal(0).to_string().as_str())),
None => row,
};
match header.parent_beacon_root.len() {
0 => row,
_ => row.set("parent_beacon_root", bytes_to_hex(header.parent_beacon_root)),
_ => row.change("parent_beacon_root", ("", bytes_to_hex(header.parent_beacon_root).as_str())),
};
}
56 changes: 28 additions & 28 deletions blocks/evm/src/logs.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
use common::{block_time_to_date, bytes_to_hex, extract_topic};
use std::collections::HashMap;

use common::utils::{bytes_to_hex, extract_topic};
use common::sinks::insert_timestamp;
use substreams::pb::substreams::Clock;
use substreams_entity_change::tables::Tables;
use substreams_database_change::pb::database::{table_change, DatabaseChanges};
use substreams_ethereum::pb::eth::v2::Block;

pub fn insert_logs(tables: &mut Tables, clock: &Clock, block: &Block) {
let timestamp = clock.clone().timestamp.unwrap();
let block_time = timestamp.to_string();
let block_number = clock.number.to_string();
let block_hash = format!("0x{}", clock.id);
let block_date = block_time_to_date(block_time.as_str());

pub fn insert_logs(tables: &mut DatabaseChanges, clock: &Clock, block: &Block) {
// logs
for log in block.logs() {
let log_index = log.index();
let log_index = log.index().to_string();
let transaction = log.receipt.transaction;
let tx_hash = bytes_to_hex(transaction.hash.to_vec());
let tx_index = transaction.index;
let tx_index = transaction.index.to_string();
let tx_from = bytes_to_hex(transaction.from.to_vec());
let tx_to = bytes_to_hex(transaction.to.to_vec());
let contract_address = bytes_to_hex(log.address().to_vec());
Expand All @@ -26,22 +23,25 @@ pub fn insert_logs(tables: &mut Tables, clock: &Clock, block: &Block) {
let topic3 = extract_topic(topics, 3);
let data = bytes_to_hex(log.data().to_vec());

tables
.create_row("logs", &log_index.to_string())
.set("block_time", &block_time)
.set("block_number", &block_number)
.set("block_hash", &block_hash)
.set("contract_address", &contract_address)
.set("topic0", &topic0)
.set("topic1", &topic1)
.set("topic2", &topic2)
.set("topic3", &topic3)
.set("data", &data)
.set("tx_hash", &tx_hash)
.set_bigint("index", &log_index.to_string())
.set_bigint("tx_index", &tx_index.to_string())
.set("block_date", &block_date)
.set("tx_from", &tx_from)
.set("tx_to", &tx_to);
let keys = HashMap::from([
("contract_address".to_string(), contract_address.to_string()),
("tx_hash".to_string(), tx_hash.to_string()),
("log_index".to_string(), log_index.to_string()),
]);
let row = tables
.push_change_composite("logs", keys, 0, table_change::Operation::Create)
.change("contract_address", ("", contract_address.as_str()))
.change("topic0", ("", topic0.as_str()))
.change("topic1", ("", topic1.as_str()))
.change("topic2", ("", topic2.as_str()))
.change("topic3", ("", topic3.as_str()))
.change("data", ("", data.as_str()))
.change("tx_hash", ("", tx_hash.as_str()))
.change("index", ("", log_index.as_str()))
.change("tx_index", ("", tx_index.as_str()))
.change("tx_from", ("", tx_from.as_str()))
.change("tx_to", ("", tx_to.as_str()));

insert_timestamp(row, clock, true);
}
}
29 changes: 7 additions & 22 deletions blocks/evm/src/sinks.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,15 @@
use substreams::errors::Error;
use substreams::pb::substreams::Clock;
use substreams_entity_change::pb::entity::EntityChanges;
use substreams_entity_change::tables::Tables;
use substreams_database_change::pb::database::DatabaseChanges;
use substreams_ethereum::pb::eth::v2::Block;

use crate::blocks::insert_blocks;
// use crate::blocks::insert_blocks;
use crate::logs::insert_logs;

#[substreams::handlers::map]
pub fn graph_out(clock: Clock, block: Block) -> Result<EntityChanges, Error> {
let mut tables = Tables::new();
insert_blocks(&mut tables, &clock, &block);
insert_logs(&mut tables, &clock, &block);
Ok(tables.to_entity_changes())
}

#[substreams::handlers::map]
pub fn map_blocks(clock: Clock, block: Block) -> Result<EntityChanges, Error> {
let mut tables = Tables::new();
insert_blocks(&mut tables, &clock, &block);
Ok(tables.to_entity_changes())
}

#[substreams::handlers::map]
pub fn map_logs(clock: Clock, block: Block) -> Result<EntityChanges, Error> {
let mut tables = Tables::new();
insert_logs(&mut tables, &clock, &block);
Ok(tables.to_entity_changes())
pub fn ch_out(clock: Clock, block: Block) -> Result<DatabaseChanges, Error> {
let mut tables: DatabaseChanges = DatabaseChanges::default();
// insert_blocks(&mut tables, &clock, &block);
// insert_logs(&mut tables, &clock, &block);
Ok(tables)
}
29 changes: 12 additions & 17 deletions blocks/evm/substreams.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,28 @@ package:

imports:
entities: https://github.com/streamingfast/substreams-sink-entity-changes/releases/download/v1.3.2/substreams-sink-entity-changes-v1.3.2.spkg
database_change: https://github.com/streamingfast/substreams-sink-database-changes/releases/download/v1.3.1/substreams-database-change-v1.3.1.spkg
sql: https://github.com/streamingfast/substreams-sink-sql/releases/download/protodefs-v1.0.7/substreams-sink-sql-protodefs-v1.0.7.spkg

binaries:
default:
type: wasm/rust-v1
file: ../../target/wasm32-unknown-unknown/release/raw_blocks_evm.wasm

modules:
- name: graph_out
- name: ch_out
kind: map
inputs:
- source: sf.substreams.v1.Clock
- source: sf.ethereum.type.v2.Block
output:
type: proto:sf.substreams.sink.entity.v1.EntityChanges
type: proto:sf.substreams.sink.database.v1.DatabaseChanges

- name: map_blocks
kind: map
inputs:
- source: sf.substreams.v1.Clock
- source: sf.ethereum.type.v2.Block
output:
type: proto:sf.substreams.sink.entity.v1.EntityChanges

- name: map_logs
kind: map
inputs:
- source: sf.substreams.v1.Clock
- source: sf.ethereum.type.v2.Block
output:
type: proto:sf.substreams.sink.entity.v1.EntityChanges
sink:
module: ch_out
type: sf.substreams.sink.sql.v1.Service
config:
schema: "./schema.sql"
engine: clickhouse
postgraphile_frontend:
enabled: false
1 change: 1 addition & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ version.workspace = true

[dependencies]
substreams.workspace = true
substreams-database-change.workspace = true
20 changes: 20 additions & 0 deletions common/src/keys.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use std::collections::HashMap;

use substreams::pb::substreams::Clock;

use crate::utils::block_time_to_date;

pub fn block_keys(clock: &Clock) -> HashMap<String, String> {
let timestamp = clock.clone().timestamp.unwrap();
let block_time = timestamp.to_string();
let block_number = clock.number.to_string();
let block_hash = format!("0x{}", clock.id);
let block_date = block_time_to_date(block_time.as_str()).to_string();

HashMap::from([
("block_time".to_string(), block_time),
("block_number".to_string(), block_number),
("block_hash".to_string(), block_hash),
("block_date".to_string(), block_date)
])
}
Loading

0 comments on commit 97f2f3b

Please sign in to comment.