Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eth indexer not require start block env variable #116

Merged
merged 4 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chain-signatures/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
)?;

let (eth_indexer_handle, eth_indexer) =
indexer_eth::run(&indexer_eth_options, sign_tx, app_data_storage)?;
indexer_eth::run(&indexer_eth_options, &account_id, sign_tx, app_data_storage)?;

let sign_sk = sign_sk.unwrap_or_else(|| account_sk.clone());
let my_address = my_address
Expand Down
66 changes: 41 additions & 25 deletions chain-signatures/node/src/indexer_eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crypto_shared::kdf::derive_epsilon_eth;
use crypto_shared::ScalarExt;
use hex::ToHex;
use k256::Scalar;
use near_account_id::AccountId;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::sync::Arc;
Expand All @@ -29,10 +30,6 @@ pub struct Options {
#[clap(long, env("MPC_INDEXER_ETH_CONTRACT_ADDRESS"))]
pub eth_contract_address: String,

/// The block height to start indexing from
#[clap(long, env("MPC_INDEXER_ETH_START_BLOCK"), default_value = "0")]
pub eth_start_block_height: u64,

/// The amount of time before we consider the indexer behind
#[clap(long, env("MPC_INDEXER_ETH_BEHIND_THRESHOLD"), default_value = "180")]
pub eth_behind_threshold: u64,
Expand All @@ -50,8 +47,6 @@ impl Options {
self.eth_rpc_url,
"--eth-contract-address".to_string(),
self.eth_contract_address,
"--eth-start-block-height".to_string(),
self.eth_start_block_height.to_string(),
"--eth-behind-threshold".to_string(),
self.eth_behind_threshold.to_string(),
"--eth-running-threshold".to_string(),
Expand Down Expand Up @@ -144,14 +139,13 @@ impl EthIndexer {
#[derive(Clone)]
struct Context {
contract_address: H160,
node_account_id: AccountId,
web3: Web3<web3::transports::Http>,
sign_tx: mpsc::Sender<SignRequest>,
indexer: EthIndexer,
}

async fn handle_block(block_number: u64, ctx: &Context) -> anyhow::Result<()> {
tracing::debug!(block_height = block_number, "handle eth block");

let signature_requested_topic = H256::from_slice(&web3::signing::keccak256(
b"SignatureRequested(bytes32,address,uint256,uint256,string)",
));
Expand All @@ -173,7 +167,10 @@ async fn handle_block(block_number: u64, ctx: &Context) -> anyhow::Result<()> {
let block_timestamp = block.timestamp.as_u64();

let logs = ctx.web3.eth().logs(filter).await?;
tracing::debug!("found {} filtered logs", logs.len());
let logs_cnt = logs.len();
if logs_cnt > 0 {
tracing::debug!("found {logs_cnt} filtered logs");
}

let mut pending_requests = Vec::new();
// Get logs using filter
Expand Down Expand Up @@ -234,6 +231,10 @@ async fn handle_block(block_number: u64, ctx: &Context) -> anyhow::Result<()> {
pending_requests.push(sign_request);
}

crate::metrics::NUM_SIGN_REQUESTS_ETH
.with_label_values(&[ctx.node_account_id.as_str()])
.inc_by(pending_requests.len() as f64);

for request in pending_requests {
if let Err(err) = ctx.sign_tx.send(request).await {
tracing::error!(?err, "failed to send the eth sign request into sign queue");
Expand All @@ -244,6 +245,10 @@ async fn handle_block(block_number: u64, ctx: &Context) -> anyhow::Result<()> {
.update_block_height_and_timestamp(block_number, block_timestamp)
.await;

crate::metrics::LATEST_BLOCK_HEIGHT_ETH
.with_label_values(&[ctx.node_account_id.as_str()])
.set(block_number as i64);

Ok(())
}

Expand Down Expand Up @@ -288,6 +293,7 @@ fn parse_event(log: &Log) -> anyhow::Result<SignatureRequestedEvent> {

pub fn run(
options: &Options,
node_account_id: &AccountId,
sign_tx: mpsc::Sender<SignRequest>,
app_data_storage: AppDataStorage,
) -> anyhow::Result<(JoinHandle<anyhow::Result<()>>, EthIndexer)> {
Expand All @@ -299,13 +305,12 @@ pub fn run(
let indexer = EthIndexer::new(app_data_storage, options);
let context = Context {
contract_address,
node_account_id: node_account_id.clone(),
web3,
sign_tx,
indexer: indexer.clone(),
};

let start_block_height = options.eth_start_block_height;

let join_handle = std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
Expand All @@ -314,26 +319,37 @@ pub fn run(
rt.block_on(async {
loop {
let latest_block = match context.web3.eth().block_number().await {
Ok(block) => block,
Ok(block) => block.as_u64(),
Err(err) => {
tracing::warn!(%err, "failed to get latest eth block number");
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
};
let latest_handled_block = context
.indexer
.last_processed_block()
.await
.unwrap_or(start_block_height);
tracing::debug!(
"eth latest_block {} latest_handled_block {}",
latest_block,
latest_handled_block
);

if latest_handled_block < latest_block.as_u64() {
if let Err(err) = handle_block(latest_handled_block + 1, &context).await {

let latest_handled_block =
context.indexer.last_processed_block().await.unwrap_or(0);

const MAX_DELAYED_BLOCKS: u64 = 100;
let next_block_to_handle: u64 = {
if latest_block - latest_handled_block < MAX_DELAYED_BLOCKS {
latest_handled_block + 1
} else {
latest_block.saturating_sub(MAX_DELAYED_BLOCKS) + 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great optimization!
Due to current rate limit failure rate, there is a small issue. I notice due to frequent rate limit hit, sometimes it can be 5-10 blocks delayed, so there is a small chance of >15 block delayed. If that happens, some block would be skipped.

Therefore, I suggest doing this check only once, when there is no last handled block.

Copy link
Contributor Author

@ppca ppca Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll increase the delay to 100 blocks. I also want this fix to help the case where a node goes back online after being offline for a while, in that case we don't want the node to start way back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ppca I think we should apply the same logic (use the same function) that is used in the NEAR Indexer. I do not think there is a need to differentiate,

}
};

if latest_block % 100 == 0 {
tracing::debug!(
next_block_to_handle,
latest_block,
latest_handled_block,
"Eth indexer running"
)
}

if next_block_to_handle <= latest_block {
if let Err(err) = handle_block(next_block_to_handle, &context).await {
tracing::warn!(%err, "failed to handle eth block");
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
Expand Down
18 changes: 18 additions & 0 deletions chain-signatures/node/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,24 @@ pub(crate) static PROTOCOL_ITER_CNT: Lazy<CounterVec> = Lazy::new(|| {
.unwrap()
});

pub(crate) static LATEST_BLOCK_HEIGHT_ETH: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"multichain_latest_block_height_eth",
"Latest block height handled by the node on ethereum chain",
&["node_account_id"],
)
.unwrap()
});

pub(crate) static NUM_SIGN_REQUESTS_ETH: Lazy<CounterVec> = Lazy::new(|| {
try_create_counter_vec(
"multichain_sign_requests_count_eth",
"number of multichain sign requests from ethereum chain, marked by sign requests indexed",
&["node_account_id"],
)
.unwrap()
});

pub fn try_create_int_gauge_vec(name: &str, help: &str, labels: &[&str]) -> Result<IntGaugeVec> {
check_metric_multichain_prefix(name)?;
let opts = Opts::new(name, help);
Expand Down
1 change: 0 additions & 1 deletion integration-tests/src/containers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ impl Node {
let indexer_eth_options = mpc_node::indexer_eth::Options {
eth_rpc_url: config.cfg.eth_rpc_url.clone(),
eth_contract_address: config.cfg.eth_contract_address.clone(),
eth_start_block_height: config.cfg.eth_start_block_height,
eth_behind_threshold: 120,
eth_running_threshold: 120,
};
Expand Down
2 changes: 0 additions & 2 deletions integration-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pub struct NodeConfig {
pub eth_rpc_url: String,
pub eth_contract_address: String,
pub eth_account_sk: String,
pub eth_start_block_height: u64,
}

impl Default for NodeConfig {
Expand All @@ -60,7 +59,6 @@ impl Default for NodeConfig {
eth_contract_address: "0x5FbDB2315678afecb367f032d93F642f64180aa3".to_string(),
eth_account_sk: "5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a"
.to_string(),
eth_start_block_height: 0,
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions integration-tests/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ impl Node {
let indexer_eth_options = mpc_node::indexer_eth::Options {
eth_rpc_url: cfg.eth_rpc_url.clone(),
eth_contract_address: cfg.eth_contract_address.clone(),
eth_start_block_height: cfg.eth_start_block_height,
eth_behind_threshold: 120,
eth_running_threshold: 120,
};
Expand Down Expand Up @@ -175,7 +174,6 @@ impl Node {
let indexer_eth_options = mpc_node::indexer_eth::Options {
eth_rpc_url: config.cfg.eth_rpc_url.clone(),
eth_contract_address: config.cfg.eth_contract_address.clone(),
eth_start_block_height: config.cfg.eth_start_block_height,
eth_behind_threshold: 120,
eth_running_threshold: 120,
};
Expand Down
4 changes: 0 additions & 4 deletions integration-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ enum Cli {
default_value = "5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a"
)]
eth_account_sk: String,
#[arg(long, default_value = "0")]
eth_start_block_height: u64,
},
/// Spin up dependent services but not mpc nodes
DepServices,
Expand All @@ -55,7 +53,6 @@ async fn main() -> anyhow::Result<()> {
eth_rpc_url,
eth_contract_address,
eth_account_sk,
eth_start_block_height,
} => {
println!(
"Setting up an environment with {} nodes, {} threshold ...",
Expand All @@ -67,7 +64,6 @@ async fn main() -> anyhow::Result<()> {
eth_rpc_url,
eth_contract_address,
eth_account_sk,
eth_start_block_height,
..Default::default()
};
println!("Full config: {:?}", config);
Expand Down
Loading