diff --git a/z2/resources/config.tera.toml b/z2/resources/config.tera.toml index abfaf7250..658cc2579 100644 --- a/z2/resources/config.tera.toml +++ b/z2/resources/config.tera.toml @@ -5,6 +5,7 @@ bootstrap_address = [ "{{ bootstrap_peer_id }}", "/ip4/{{ bootstrap_public_ip }} {%- endif %} [[nodes]] +remote_api_url = "{{ remote_api_url }}" eth_chain_id = {{ eth_chain_id }} allowed_timestamp_skew = { secs = 60, nanos = 0 } data_dir = "/data" diff --git a/z2/src/chain/node.rs b/z2/src/chain/node.rs index 631f1d220..f77cb790a 100644 --- a/z2/src/chain/node.rs +++ b/z2/src/chain/node.rs @@ -519,6 +519,7 @@ impl ChainNode { let role_name = self.role.to_string(); let eth_chain_id = self.eth_chain_id.to_string(); let bootstrap_public_ip = selected_bootstrap.machine.external_address; + let remote_api_url = self.chain()?.get_endpoint()?; let whitelisted_evm_contract_addresses = self.chain()?.get_whitelisted_evm_contracts(); // 4201 is the publically exposed port - We don't expose everything there. let public_api = if self.role == NodeRole::Api { @@ -551,6 +552,7 @@ impl ChainNode { let toml_servers: toml::Value = serde_json::from_value(api_servers)?; ctx.insert("api_servers", &toml_servers.to_string()); ctx.insert("enable_ots_indices", &enable_ots_indices); + ctx.insert("remote_api_url", remote_api_url); if let Some(checkpoint_url) = self.chain.checkpoint_url() { if self.role == NodeRole::Validator { diff --git a/z2/src/setup.rs b/z2/src/setup.rs index 1ca97cb5d..004b15d47 100644 --- a/z2/src/setup.rs +++ b/z2/src/setup.rs @@ -546,6 +546,7 @@ impl Setup { state_rpc_limit: state_rpc_limit_default(), failed_request_sleep_duration: failed_request_sleep_duration_default(), enable_ots_indices: false, + remote_api_url: Default::default(), }; println!("🧩 Node {node_index} has RPC port {port}"); diff --git a/zilliqa/src/cfg.rs b/zilliqa/src/cfg.rs index f1d36d7ef..fd4c9d443 100644 --- a/zilliqa/src/cfg.rs +++ b/zilliqa/src/cfg.rs @@ -114,6 +114,9 @@ pub struct NodeConfig { /// Enable additional indices used by some Otterscan APIs. Enabling this will use more disk space and block processing will take longer. #[serde(default)] pub enable_ots_indices: bool, + /// Remote API url - set to the API gateway/endpoint - used to query the highest canonical block number by the watchdog. + #[serde(default)] + pub remote_api_url: Option, } impl Default for NodeConfig { @@ -133,6 +136,7 @@ impl Default for NodeConfig { state_rpc_limit: state_rpc_limit_default(), failed_request_sleep_duration: failed_request_sleep_duration_default(), enable_ots_indices: false, + remote_api_url: Default::default(), } } } diff --git a/zilliqa/src/message.rs b/zilliqa/src/message.rs index 40e8b3ae8..03175df8f 100644 --- a/zilliqa/src/message.rs +++ b/zilliqa/src/message.rs @@ -339,6 +339,8 @@ pub enum InternalMessage { TrieStorage, Box, ), + /// Notifies the coordinator to restart a node of the given shard + RestartShard(u64), } /// Returns a terse, human-readable summary of a message. @@ -351,6 +353,7 @@ impl Display for InternalMessage { InternalMessage::ExportBlockCheckpoint(block, ..) => { write!(f, "ExportCheckpoint({})", block.number()) } + InternalMessage::RestartShard(id) => write!(f, "RestartShard({id})"), } } } diff --git a/zilliqa/src/node.rs b/zilliqa/src/node.rs index ab5a92ee3..f1087b1ef 100644 --- a/zilliqa/src/node.rs +++ b/zilliqa/src/node.rs @@ -354,6 +354,11 @@ impl Node { Ok(()) } + pub fn internal_restart(&self, shard_id: u64) -> Result<()> { + self.message_sender + .send_message_to_coordinator(InternalMessage::RestartShard(shard_id)) + } + pub fn handle_internal_message(&mut self, from: u64, message: InternalMessage) -> Result<()> { let to = self.chain_id.eth; tracing::debug!(%from, %to, %message, "handling message"); @@ -365,7 +370,9 @@ impl Node { self.message_sender .send_message_to_coordinator(InternalMessage::LaunchShard(source))?; } - InternalMessage::LaunchShard(..) | InternalMessage::ExportBlockCheckpoint(..) => { + InternalMessage::LaunchShard(..) + | InternalMessage::ExportBlockCheckpoint(..) + | InternalMessage::RestartShard(..) => { warn!( "{message} type messages should be handled by the coordinator, not forwarded to a node.", ); diff --git a/zilliqa/src/node_launcher.rs b/zilliqa/src/node_launcher.rs index 995191ab8..3206b66fa 100644 --- a/zilliqa/src/node_launcher.rs +++ b/zilliqa/src/node_launcher.rs @@ -1,11 +1,13 @@ use std::{ net::Ipv4Addr, + ops::Add, sync::{atomic::AtomicUsize, Arc, Mutex}, time::{Duration, SystemTime}, }; use anyhow::{anyhow, Result}; use http::{header, Method}; +use jsonrpsee::core::client::ClientT; use libp2p::{futures::StreamExt, PeerId}; use node::Node; use opentelemetry::KeyValue; @@ -34,6 +36,11 @@ use crate::{ p2p_node::{LocalMessageTuple, OutboundMessageTuple}, }; +#[derive(Debug, Default)] +struct WatchDogDebounce { + value: u64, + count: u64, +} pub struct NodeLauncher { pub node: Arc>, pub config: NodeConfig, @@ -45,6 +52,7 @@ pub struct NodeLauncher { /// Channel used to steer next sleep time pub reset_timeout_receiver: UnboundedReceiverStream, node_launched: bool, + watchdog: WatchDogDebounce, } // If the `fake_response_channel` feature is enabled, swap out the libp2p ResponseChannel for a `u64`. In our @@ -157,6 +165,7 @@ impl NodeLauncher { reset_timeout_receiver, node_launched: false, config, + watchdog: Default::default(), }; let input_channels = NodeInputChannels { broadcasts: broadcasts_sender, @@ -169,7 +178,75 @@ impl NodeLauncher { Ok((launcher, input_channels)) } - pub async fn start_shard_node(&mut self) -> Result<()> { + async fn internal_watchdog(&mut self) -> Result { + // If watchdog is disabled, then do nothing. + if self.config.remote_api_url.is_none() { + return Ok(false); + } + + // 1. Collect quick sample + let self_highest = self + .node + .lock() + .unwrap() + .db + .get_highest_canonical_block_number()? + .ok_or_else(|| anyhow!("can't find highest block num in database!"))?; + + // 1.5 Debounce + if self.watchdog.value != self_highest { + self.watchdog.value = self_highest; + self.watchdog.count = 0; + } else { + self.watchdog.count += 1; + } + + tracing::debug!( + "WDT check value: {} count: {}", + self.watchdog.value, + self.watchdog.count + ); + + // 2. Internal check to see if node is possibly stuck. + if self.watchdog.count > 3 { + let rpc_url = self.config.remote_api_url.clone().unwrap_or_default(); + // 3. External check to see if others are stuck too, as opposed to timeouts. + let client = jsonrpsee::http_client::HttpClientBuilder::default() + .request_timeout(self.config.consensus.consensus_timeout / 2) // fast call + .build(rpc_url.clone())?; + + let result = client + .request("eth_blockNumber", jsonrpsee::rpc_params![]) + .await + // do not restart due to network/upstream errors, check again later. + .unwrap_or_else(|e| { + tracing::error!("WDT remote call to {} failed: {e}", rpc_url); + "0x0".to_string() + }); + + let remote_highest = result + .strip_prefix("0x") + .map(|s| u64::from_str_radix(s, 16).unwrap_or_default()) + .unwrap_or_default(); + + // 4. If self < others for > threshold, then we're stuck while others aren't. + if self_highest < remote_highest { + tracing::warn!(?self_highest, ?remote_highest, "WDT node stuck at"); + return Ok(true); + } else { + tracing::warn!( + ?self_highest, + ?remote_highest, + "WDT network possibly stalled at" + ) + } + } + + // 4. Reset watchdog, do nothing. + Ok(false) + } + + pub async fn start_shard_node(&mut self, shard_id: u64) -> Result<()> { if self.node_launched { return Err(anyhow!("Node already running!")); } @@ -177,6 +254,21 @@ impl NodeLauncher { let sleep = time::sleep(Duration::from_millis(5)); tokio::pin!(sleep); + // Internal watchdog + let wdt_duration = self + .config + .consensus + .consensus_timeout + .add(self.config.consensus.empty_block_timeout) + .saturating_mul(5); // every 30s or so on default + let watchdog = if self.config.remote_api_url.is_none() { + time::sleep(Duration::MAX) + } else { + tracing::info!("WDT period: {:?}", wdt_duration); + time::sleep(wdt_duration) + }; + tokio::pin!(watchdog); + self.node_launched = true; let meter = opentelemetry::global::meter("zilliqa"); @@ -281,6 +373,24 @@ impl NodeLauncher { &attributes, ); }, + + () = &mut watchdog => { + let attributes = vec![ + KeyValue::new(MESSAGING_OPERATION_NAME, "handle"), + KeyValue::new(MESSAGING_SYSTEM, "tokio_channel"), + KeyValue::new(MESSAGING_DESTINATION_NAME, "watchdog"), + ]; + let start = SystemTime::now(); + if self.internal_watchdog().await? { + tracing::info!("WDT restarting {shard_id}."); + return self.node.lock().unwrap().internal_restart(shard_id); + }; + watchdog.as_mut().reset(Instant::now() + wdt_duration); + messaging_process_duration.record( + start.elapsed().map_or(0.0, |d| d.as_secs_f64()), + &attributes, + ); + } r = self.reset_timeout_receiver.next() => { let sleep_time = r.expect("reset timeout stream should be infinite"); trace!(?sleep_time, "timeout reset"); diff --git a/zilliqa/src/p2p_node.rs b/zilliqa/src/p2p_node.rs index 112a53a09..bd53e5bfe 100644 --- a/zilliqa/src/p2p_node.rs +++ b/zilliqa/src/p2p_node.rs @@ -185,7 +185,8 @@ impl P2pNode { } pub async fn add_shard_node(&mut self, config: NodeConfig) -> Result<()> { - let topic = Self::shard_id_to_topic(config.eth_chain_id); + let shard_id = config.eth_chain_id; + let topic = Self::shard_id_to_topic(shard_id); if self.shard_nodes.contains_key(&topic.hash()) { info!("LaunchShard message received for a shard we're already running. Ignoring..."); return Ok(()); @@ -201,7 +202,7 @@ impl P2pNode { .await?; self.shard_nodes.insert(topic.hash(), input_channels); self.shard_threads - .spawn(async move { node.start_shard_node().await }); + .spawn(async move { node.start_shard_node(shard_id).await }); self.swarm.behaviour_mut().gossipsub.subscribe(&topic)?; Ok(()) } @@ -355,6 +356,19 @@ impl P2pNode { InternalMessage::ExportBlockCheckpoint(block, transactions, parent, trie_storage, path) => { self.task_threads.spawn(async move { db::checkpoint_block_with_state(&block, &transactions, &parent, trie_storage, source, path) }); } + InternalMessage::RestartShard(shard_id) => { + // remove existing node + let topic = Self::shard_id_to_topic(shard_id); + self.shard_nodes.remove(&topic.hash()); + // restart node + let shard_config = self.config.nodes + .iter() + .find(|shard_config| shard_config.eth_chain_id == shard_id) + .cloned() + .unwrap_or_else( + || Self::generate_child_config(self.config.nodes.first().unwrap(), shard_id)); + self.add_shard_node(shard_config.clone()).await?; + }, } }, message = self.request_responses_receiver.next() => { diff --git a/zilliqa/tests/it/main.rs b/zilliqa/tests/it/main.rs index e14f0f382..b975e7541 100644 --- a/zilliqa/tests/it/main.rs +++ b/zilliqa/tests/it/main.rs @@ -369,6 +369,7 @@ impl Network { state_rpc_limit: state_rpc_limit_default(), failed_request_sleep_duration: failed_request_sleep_duration_default(), enable_ots_indices: true, + remote_api_url: Default::default(), }; let (nodes, external_receivers, local_receivers, request_response_receivers): ( @@ -497,6 +498,7 @@ impl Network { state_rpc_limit: state_rpc_limit_default(), failed_request_sleep_duration: failed_request_sleep_duration_default(), enable_ots_indices: true, + remote_api_url: Default::default(), }; let secret_key = options.secret_key_or_random(self.rng.clone()); @@ -848,6 +850,7 @@ impl Network { AnyMessage::Internal(source_shard, destination_shard, ref internal_message) => { trace!("Handling internal message from node in shard {source_shard}, targetting {destination_shard}"); match internal_message { + InternalMessage::RestartShard(_) => todo!(), InternalMessage::LaunchShard(new_network_id) => { let secret_key = self.find_node(source).unwrap().1.secret_key; if let Some(child_network) = self.children.get_mut(new_network_id) { diff --git a/zilliqa/tests/it/persistence.rs b/zilliqa/tests/it/persistence.rs index 6feabd883..d43f82417 100644 --- a/zilliqa/tests/it/persistence.rs +++ b/zilliqa/tests/it/persistence.rs @@ -134,6 +134,7 @@ async fn block_and_tx_data_persistence(mut network: Network) { state_rpc_limit: state_rpc_limit_default(), failed_request_sleep_duration: failed_request_sleep_duration_default(), enable_ots_indices: true, + remote_api_url: Default::default(), }; let mut rng = network.rng.lock().unwrap(); let result = crate::node(