Skip to content

Commit

Permalink
add transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisCarriere committed Jul 30, 2024
1 parent 43c6ecf commit 49568c0
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 79 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
- [ ] `contract_internal_transactions`
- [x] **BalanceChanges**
- [x] **Logs**
- [ ] **Transactions**
- [ ] **Traces**
- [x] **Transactions**
- [x] **Traces**
- [ ] **BalanceChanges**
- [ ] **Creation Traces**
- [ ] **Storage Changes**
Expand Down
106 changes: 73 additions & 33 deletions blocks/evm/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ CREATE TABLE IF NOT EXISTS logs
tx_to String
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY (block_date, block_time, block_number, log_index, tx_hash)
ORDER BY (block_date, block_time, block_number, log_index, tx_hash)
PRIMARY KEY (block_date, block_time, block_number, tx_hash, log_index)
ORDER BY (block_date, block_time, block_number, tx_hash, log_index)
COMMENT 'Ethereum event logs';

CREATE TABLE IF NOT EXISTS balance_changes
Expand All @@ -91,43 +91,83 @@ CREATE TABLE IF NOT EXISTS balance_changes
CREATE TABLE IF NOT EXISTS traces
(
-- block --
block_time DateTime('UTC'),
block_number UInt64,
block_hash String,
block_date Date,
block_time DateTime('UTC'),
block_number UInt64,
block_hash String,
block_date Date,

-- transaction --
tx_hash String,
tx_index UInt32,
tx_status LowCardinality(String),
tx_status_code Int32,
tx_is_successful Bool,
from String,
to String,
tx_hash String,
tx_index UInt32,
tx_status LowCardinality(String),
tx_status_code Int32,
tx_success Bool,
from String,
to String,

-- trace --
index UInt32,
parent_index UInt32,
depth UInt32,
caller String,
call_type LowCardinality(String),
call_type_code Int32,
address String,
value UInt256,
gas_limit UInt64,
gas_consumed UInt64,
return_data String,
input String,
suicide Bool,
failure_reason LowCardinality(String),
state_reverted Bool,
status_reverted Bool,
status_failed Bool,
executed_code Bool,
begin_ordinal UInt64,
end_ordinal UInt64
index UInt32,
parent_index UInt32,
depth UInt32,
caller String,
call_type LowCardinality(String),
call_type_code Int32,
address String,
value UInt256,
gas_limit UInt64,
gas_consumed UInt64,
return_data String,
input String,
suicide Bool,
failure_reason LowCardinality(String),
state_reverted Bool,
status_reverted Bool,
status_failed Bool,
executed_code Bool,
begin_ordinal UInt64,
end_ordinal UInt64
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY (block_date, block_time, block_number, tx_hash, tx_index, index)
ORDER BY (block_date, block_time, block_number, tx_hash, tx_index, index)
COMMENT 'Ethereum traces';


CREATE TABLE IF NOT EXISTS transactions
(
-- block --
block_time DateTime('UTC'),
block_number UInt64,
block_hash String,
block_date Date,

-- transaction --
index UInt32,
hash String,
from String,
to String,
nonce UInt64,
status LowCardinality(String),
status_code Int32,
success Bool,
gas_price UInt256,
gas_limit UInt64,
value UInt256,
input String,
v String,
r String,
s String,
gas_used UInt64,
type LowCardinality(String),
type_code Int32,
max_fee_per_gas UInt256,
max_priority_fee_per_gas UInt256,
return_data String,
public_key String,
begin_ordinal UInt64,
end_ordinal UInt64
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY (block_date, block_time, block_number, tx_hash, tx_index, index)
ORDER BY (block_date, block_time, block_number, tx_hash, tx_index, index)
COMMENT 'Ethereum transactions';
6 changes: 4 additions & 2 deletions blocks/evm/src/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use substreams_ethereum::pb::eth::v2::Block;

use crate::balance_changes::insert_balance_change;
use crate::blocks::insert_blocks;
use crate::transactions::insert_transactions;
use crate::transactions::insert_transaction;

#[substreams::handlers::map]
pub fn ch_out(clock: Clock, block: Block) -> Result<DatabaseChanges, Error> {
Expand All @@ -16,6 +16,8 @@ pub fn ch_out(clock: Clock, block: Block) -> Result<DatabaseChanges, Error> {
insert_balance_change(&mut tables, &clock, &balance_change);
}
// transactions
insert_transactions(&mut tables, &clock, &block);
for transaction in block.transaction_traces.iter() {
insert_transaction(&mut tables, &clock, &transaction);
}
Ok(tables)
}
20 changes: 6 additions & 14 deletions blocks/evm/src/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,9 @@ use substreams::pb::substreams::Clock;
use substreams_database_change::pb::database::{table_change, DatabaseChanges};
use substreams_ethereum::block_view::CallView;

fn trace_status_to_string(status: i32) -> String {
match status {
0 => "Unknown".to_string(),
1 => "Succeeded".to_string(),
2 => "Failed".to_string(),
3 => "Reverted".to_string(),
_ => "Unknown".to_string(),
}
}
use crate::transactions::{is_transaction_success, transaction_status_to_string};

fn call_types_to_string(call_type: i32) -> String {
pub fn call_types_to_string(call_type: i32) -> String {
match call_type {
0 => "Unspecified".to_string(),
1 => "Call".to_string(),
Expand All @@ -30,15 +22,15 @@ fn call_types_to_string(call_type: i32) -> String {
}

// https://github.com/streamingfast/firehose-ethereum/blob/1bcb32a8eb3e43347972b6b5c9b1fcc4a08c751e/proto/sf/ethereum/type/v2/type.proto#L546
pub fn insert_traces(tables: &mut DatabaseChanges, clock: &Clock, call: &CallView) {
pub fn insert_trace(tables: &mut DatabaseChanges, clock: &Clock, call: &CallView) {
let transaction = call.transaction;
let tx_index = transaction.index.to_string();
let tx_hash = bytes_to_hex(transaction.hash.clone());
let from = bytes_to_hex(transaction.from.clone()); // does trace contain `from`?
let to = bytes_to_hex(transaction.to.clone()); // does trace contain `to`?
let tx_status = trace_status_to_string(transaction.status);
let tx_status = transaction_status_to_string(transaction.status);
let tx_status_code = transaction.status.to_string();
let tx_is_successful = (transaction.status == 1).to_string();
let tx_success = is_transaction_success(transaction.status).to_string();

// traces
for trace in transaction.calls.iter() {
Expand Down Expand Up @@ -76,7 +68,7 @@ pub fn insert_traces(tables: &mut DatabaseChanges, clock: &Clock, call: &CallVie
.change("to", ("", to.as_str()))
.change("tx_status", ("", tx_status.as_str()))
.change("tx_status_code", ("", tx_status_code.as_str()))
.change("tx_is_successful", ("", tx_is_successful.as_str()))
.change("tx_success", ("", tx_success.as_str()))
// trace
.change("address", ("", address.as_str()))
.change("begin_ordinal", ("", begin_ordinal.as_str()))
Expand Down
123 changes: 97 additions & 26 deletions blocks/evm/src/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,107 @@
use common::keys::transaction_keys;
use common::sinks::insert_timestamp;
use common::utils::bytes_to_hex;
use common::{keys::balance_changes_keys, utils::optional_bigint_to_string};
use common::utils::optional_bigint_to_string;
use substreams::pb::substreams::Clock;
use substreams_database_change::pb::database::{table_change, DatabaseChanges};
use substreams_ethereum::pb::eth::v2::Block;
use substreams_ethereum::pb::eth::v2::TransactionTrace;

use crate::logs::insert_log;
use crate::traces::insert_traces;
use crate::traces::insert_trace;

pub fn transaction_type_to_string(r#type: i32) -> String {
match r#type {
0 => "Legacy".to_string(),
1 => "AccessList".to_string(),
2 => "DynamicFee".to_string(),
3 => "Blob".to_string(),
100 => "ArbitrumDeposit".to_string(),
101 => "ArbitrumUnsigned".to_string(),
102 => "ArbitrumContract".to_string(),
104 => "ArbitrumRetry".to_string(),
105 => "ArbitrumSubmitRetryable".to_string(),
106 => "ArbitrumInternal".to_string(),
120 => "ArbitrumLegacy".to_string(),
126 => "OptimismDeposit".to_string(),
_ => "Unknown".to_string(),
}
}

pub fn transaction_status_to_string(status: i32) -> String {
match status {
0 => "Unknown".to_string(),
1 => "Succeeded".to_string(),
2 => "Failed".to_string(),
3 => "Reverted".to_string(),
_ => "Unknown".to_string(),
}
}

pub fn is_transaction_success(status: i32) -> bool {
status == 1
}

// https://github.com/streamingfast/firehose-ethereum/blob/1bcb32a8eb3e43347972b6b5c9b1fcc4a08c751e/proto/sf/ethereum/type/v2/type.proto#L658
pub fn insert_transactions(tables: &mut DatabaseChanges, clock: &Clock, block: &Block) {
// transactions
for transaction in block.transaction_traces.iter() {
// let address = bytes_to_hex(balance_change.address);
// let new_value = bytes_to_hex(balance_change.new_value.unwrap_or_default().bytes);
// let old_value = bytes_to_hex(balance_change.old_value.unwrap_or_default().bytes);
// let ordinal = balance_change.ordinal.to_string();
// let reason = balance_change.reason.to_string();
// let keys = balance_changes_keys(&clock, &ordinal);
// let row = tables
// .push_change_composite("balance_changes", keys, 0, table_change::Operation::Create)
// .change("address", ("", address.as_str()))
// .change("new_value", ("", new_value.as_str()))
// .change("old_value", ("", old_value.as_str()))
// .change("ordinal", ("", ordinal.as_str()))
// .change("reason", ("", reason.as_str()));

// insert_timestamp(row, clock, false);

for (log, call) in transaction.logs_with_calls() {
insert_log(tables, clock, log, &transaction);
insert_traces(tables, clock, &call);
}
pub fn insert_transaction(tables: &mut DatabaseChanges, clock: &Clock, transaction: &TransactionTrace) {
let index = transaction.index.to_string();
let hash = bytes_to_hex(transaction.hash.clone());
let from = bytes_to_hex(transaction.from.clone());
let to = bytes_to_hex(transaction.to.clone());
let nonce = transaction.nonce.to_string();
let gas_price = optional_bigint_to_string(transaction.gas_price.clone()); // UInt256
let gas_limit = transaction.gas_limit.to_string();
let value = optional_bigint_to_string(transaction.value.clone());
let input = bytes_to_hex(transaction.input.clone());
let v = bytes_to_hex(transaction.v.clone());
let r = bytes_to_hex(transaction.r.clone());
let s = bytes_to_hex(transaction.s.clone());
let gas_used = transaction.gas_used.to_string();
let r#type = transaction_type_to_string(transaction.r#type);
let type_code = transaction.r#type.to_string();
let max_fee_per_gas = optional_bigint_to_string(transaction.max_fee_per_gas.clone());
let max_priority_fee_per_gas = optional_bigint_to_string(transaction.max_priority_fee_per_gas.clone());
let return_data = bytes_to_hex(transaction.return_data.clone());
let public_key = bytes_to_hex(transaction.public_key.clone());
let begin_ordinal = transaction.begin_ordinal.to_string();
let end_ordinal = transaction.end_ordinal.to_string();
let success = is_transaction_success(transaction.status).to_string();
let status = transaction_status_to_string(transaction.status);
let status_code = transaction.status.to_string();

let keys = transaction_keys(&clock, &hash);
let row = tables
.push_change_composite("balance_changes", keys, 0, table_change::Operation::Create)
.change("index", ("", index.as_str()))
.change("hash", ("", hash.as_str()))
.change("from", ("", from.as_str()))
.change("to", ("", to.as_str()))
.change("nonce", ("", nonce.as_str()))
.change("gas_price", ("", gas_price.as_str()))
.change("gas_limit", ("", gas_limit.as_str()))
.change("value", ("", value.as_str()))
.change("input", ("", input.as_str()))
.change("v", ("", v.as_str()))
.change("r", ("", r.as_str()))
.change("s", ("", s.as_str()))
.change("gas_used", ("", gas_used.as_str()))
.change("r", ("", r.as_str()))
.change("type", ("", r#type.as_str()))
.change("type_code", ("", type_code.as_str()))
.change("max_fee_per_gas", ("", max_fee_per_gas.as_str()))
.change("max_priority_fee_per_gas", ("", max_priority_fee_per_gas.as_str()))
.change("return_data", ("", return_data.as_str()))
.change("public_key", ("", public_key.as_str()))
.change("begin_ordinal", ("", begin_ordinal.as_str()))
.change("end_ordinal", ("", end_ordinal.as_str()))
.change("success", ("", success.as_str()))
.change("status", ("", status.as_str()))
.change("status_code", ("", status_code.as_str()));

insert_timestamp(row, clock, false);

// traces & logs
for (log, call) in transaction.logs_with_calls() {
insert_log(tables, clock, log, &transaction);
insert_trace(tables, clock, &call);
}
}
18 changes: 16 additions & 2 deletions common/src/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,25 @@ pub fn blocks_keys(clock: &Clock) -> HashMap<String, String> {
let block_number = clock.number.to_string();
let block_hash = format!("0x{}", clock.id);

HashMap::from([
("date".to_string(), block_date),
("time".to_string(), block_time),
("number".to_string(), block_number),
("hash".to_string(), block_hash),
])
}

pub fn transaction_keys(clock: &Clock, hash: &String) -> HashMap<String, String> {
let timestamp = clock.clone().timestamp.unwrap();
let block_date = block_time_to_date(&timestamp.to_string()).to_string();
let block_time = timestamp.seconds.to_string();
let block_number = clock.number.to_string();

HashMap::from([
("block_date".to_string(), block_date),
("block_time".to_string(), block_time),
("block_number".to_string(), block_number),
("block_hash".to_string(), block_hash),
("hash".to_string(), hash.to_string()),
])
}

Expand All @@ -29,8 +43,8 @@ pub fn logs_keys(clock: &Clock, log_index: &String, tx_hash: &String) -> HashMap
("block_date".to_string(), block_date),
("block_time".to_string(), block_time),
("block_number".to_string(), block_number),
("log_index".to_string(), log_index.to_string()),
("tx_hash".to_string(), tx_hash.to_string()),
("log_index".to_string(), log_index.to_string()),
])
}

Expand Down

0 comments on commit 49568c0

Please sign in to comment.