Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal watchdog functionality #2010

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions z2/resources/config.tera.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ bootstrap_address = [ "{{ bootstrap_peer_id }}", "/ip4/{{ bootstrap_public_ip }}
{%- endif %}

[[nodes]]
remote_api_url = "{{ remote_api_url }}"
eth_chain_id = {{ eth_chain_id }}
allowed_timestamp_skew = { secs = 60, nanos = 0 }
data_dir = "/data"
Expand Down
2 changes: 2 additions & 0 deletions z2/src/chain/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ impl ChainNode {
let role_name = self.role.to_string();
let eth_chain_id = self.eth_chain_id.to_string();
let bootstrap_public_ip = selected_bootstrap.machine.external_address;
let remote_api_url = self.chain()?.get_endpoint()?;
let whitelisted_evm_contract_addresses = self.chain()?.get_whitelisted_evm_contracts();
// 4201 is the publically exposed port - We don't expose everything there.
let public_api = if self.role == NodeRole::Api {
Expand Down Expand Up @@ -551,6 +552,7 @@ impl ChainNode {
let toml_servers: toml::Value = serde_json::from_value(api_servers)?;
ctx.insert("api_servers", &toml_servers.to_string());
ctx.insert("enable_ots_indices", &enable_ots_indices);
ctx.insert("remote_api_url", remote_api_url);

if let Some(checkpoint_url) = self.chain.checkpoint_url() {
if self.role == NodeRole::Validator {
Expand Down
1 change: 1 addition & 0 deletions z2/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ impl Setup {
state_rpc_limit: state_rpc_limit_default(),
failed_request_sleep_duration: failed_request_sleep_duration_default(),
enable_ots_indices: false,
remote_api_url: Default::default(),
};
println!("🧩 Node {node_index} has RPC port {port}");

Expand Down
4 changes: 4 additions & 0 deletions zilliqa/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ pub struct NodeConfig {
/// Enable additional indices used by some Otterscan APIs. Enabling this will use more disk space and block processing will take longer.
#[serde(default)]
pub enable_ots_indices: bool,
/// Remote API url - set to the API gateway/endpoint - used to query the highest canonical block number by the watchdog.
#[serde(default)]
pub remote_api_url: Option<String>,
}

impl Default for NodeConfig {
Expand All @@ -133,6 +136,7 @@ impl Default for NodeConfig {
state_rpc_limit: state_rpc_limit_default(),
failed_request_sleep_duration: failed_request_sleep_duration_default(),
enable_ots_indices: false,
remote_api_url: Default::default(),
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions zilliqa/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,8 @@ pub enum InternalMessage {
TrieStorage,
Box<Path>,
),
/// Notifies the coordinator to restart a node of the given shard
RestartShard(u64),
}

/// Returns a terse, human-readable summary of a message.
Expand All @@ -351,6 +353,7 @@ impl Display for InternalMessage {
InternalMessage::ExportBlockCheckpoint(block, ..) => {
write!(f, "ExportCheckpoint({})", block.number())
}
InternalMessage::RestartShard(id) => write!(f, "RestartShard({id})"),
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion zilliqa/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,11 @@ impl Node {
Ok(())
}

pub fn internal_restart(&self, shard_id: u64) -> Result<()> {
self.message_sender
.send_message_to_coordinator(InternalMessage::RestartShard(shard_id))
}

pub fn handle_internal_message(&mut self, from: u64, message: InternalMessage) -> Result<()> {
let to = self.chain_id.eth;
tracing::debug!(%from, %to, %message, "handling message");
Expand All @@ -365,7 +370,9 @@ impl Node {
self.message_sender
.send_message_to_coordinator(InternalMessage::LaunchShard(source))?;
}
InternalMessage::LaunchShard(..) | InternalMessage::ExportBlockCheckpoint(..) => {
InternalMessage::LaunchShard(..)
| InternalMessage::ExportBlockCheckpoint(..)
| InternalMessage::RestartShard(..) => {
warn!(
"{message} type messages should be handled by the coordinator, not forwarded to a node.",
);
Expand Down
112 changes: 111 additions & 1 deletion zilliqa/src/node_launcher.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::{
net::Ipv4Addr,
ops::Add,
sync::{atomic::AtomicUsize, Arc, Mutex},
time::{Duration, SystemTime},
};

use anyhow::{anyhow, Result};
use http::{header, Method};
use jsonrpsee::core::client::ClientT;
use libp2p::{futures::StreamExt, PeerId};
use node::Node;
use opentelemetry::KeyValue;
Expand Down Expand Up @@ -34,6 +36,11 @@ use crate::{
p2p_node::{LocalMessageTuple, OutboundMessageTuple},
};

#[derive(Debug, Default)]
struct WatchDogDebounce {
value: u64,
count: u64,
}
pub struct NodeLauncher {
pub node: Arc<Mutex<Node>>,
pub config: NodeConfig,
Expand All @@ -45,6 +52,7 @@ pub struct NodeLauncher {
/// Channel used to steer next sleep time
pub reset_timeout_receiver: UnboundedReceiverStream<Duration>,
node_launched: bool,
watchdog: WatchDogDebounce,
}

// If the `fake_response_channel` feature is enabled, swap out the libp2p ResponseChannel for a `u64`. In our
Expand Down Expand Up @@ -157,6 +165,7 @@ impl NodeLauncher {
reset_timeout_receiver,
node_launched: false,
config,
watchdog: Default::default(),
};
let input_channels = NodeInputChannels {
broadcasts: broadcasts_sender,
Expand All @@ -169,14 +178,97 @@ impl NodeLauncher {
Ok((launcher, input_channels))
}

pub async fn start_shard_node(&mut self) -> Result<()> {
async fn internal_watchdog(&mut self) -> Result<bool> {
// If watchdog is disabled, then do nothing.
if self.config.remote_api_url.is_none() {
return Ok(false);
}

// 1. Collect quick sample
let self_highest = self
.node
.lock()
.unwrap()
.db
.get_highest_canonical_block_number()?
.ok_or_else(|| anyhow!("can't find highest block num in database!"))?;

// 1.5 Debounce
if self.watchdog.value != self_highest {
self.watchdog.value = self_highest;
self.watchdog.count = 0;
} else {
self.watchdog.count += 1;
}

tracing::debug!(
"WDT check value: {} count: {}",
self.watchdog.value,
self.watchdog.count
);

// 2. Internal check to see if node is possibly stuck.
if self.watchdog.count > 3 {
let rpc_url = self.config.remote_api_url.clone().unwrap_or_default();
// 3. External check to see if others are stuck too, as opposed to timeouts.
let client = jsonrpsee::http_client::HttpClientBuilder::default()
.request_timeout(self.config.consensus.consensus_timeout / 2) // fast call
.build(rpc_url.clone())?;

let result = client
.request("eth_blockNumber", jsonrpsee::rpc_params![])
.await
// do not restart due to network/upstream errors, check again later.
.unwrap_or_else(|e| {
tracing::error!("WDT remote call to {} failed: {e}", rpc_url);
"0x0".to_string()
});

let remote_highest = result
.strip_prefix("0x")
.map(|s| u64::from_str_radix(s, 16).unwrap_or_default())
.unwrap_or_default();

// 4. If self < others for > threshold, then we're stuck while others aren't.
if self_highest < remote_highest {
tracing::warn!(?self_highest, ?remote_highest, "WDT node stuck at");
return Ok(true);
} else {
tracing::warn!(
?self_highest,
?remote_highest,
"WDT network possibly stalled at"
)
}
}

// 4. Reset watchdog, do nothing.
Ok(false)
}

pub async fn start_shard_node(&mut self, shard_id: u64) -> Result<()> {
if self.node_launched {
return Err(anyhow!("Node already running!"));
}

let sleep = time::sleep(Duration::from_millis(5));
tokio::pin!(sleep);

// Internal watchdog
let wdt_duration = self
.config
.consensus
.consensus_timeout
.add(self.config.consensus.empty_block_timeout)
.saturating_mul(5); // every 30s or so on default
let watchdog = if self.config.remote_api_url.is_none() {
time::sleep(Duration::MAX)
} else {
tracing::info!("WDT period: {:?}", wdt_duration);
time::sleep(wdt_duration)
};
tokio::pin!(watchdog);

self.node_launched = true;

let meter = opentelemetry::global::meter("zilliqa");
Expand Down Expand Up @@ -281,6 +373,24 @@ impl NodeLauncher {
&attributes,
);
},

() = &mut watchdog => {
let attributes = vec![
KeyValue::new(MESSAGING_OPERATION_NAME, "handle"),
KeyValue::new(MESSAGING_SYSTEM, "tokio_channel"),
KeyValue::new(MESSAGING_DESTINATION_NAME, "watchdog"),
];
let start = SystemTime::now();
if self.internal_watchdog().await? {
tracing::info!("WDT restarting {shard_id}.");
return self.node.lock().unwrap().internal_restart(shard_id);
};
watchdog.as_mut().reset(Instant::now() + wdt_duration);
messaging_process_duration.record(
start.elapsed().map_or(0.0, |d| d.as_secs_f64()),
&attributes,
);
}
r = self.reset_timeout_receiver.next() => {
let sleep_time = r.expect("reset timeout stream should be infinite");
trace!(?sleep_time, "timeout reset");
Expand Down
18 changes: 16 additions & 2 deletions zilliqa/src/p2p_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ impl P2pNode {
}

pub async fn add_shard_node(&mut self, config: NodeConfig) -> Result<()> {
let topic = Self::shard_id_to_topic(config.eth_chain_id);
let shard_id = config.eth_chain_id;
let topic = Self::shard_id_to_topic(shard_id);
if self.shard_nodes.contains_key(&topic.hash()) {
info!("LaunchShard message received for a shard we're already running. Ignoring...");
return Ok(());
Expand All @@ -201,7 +202,7 @@ impl P2pNode {
.await?;
self.shard_nodes.insert(topic.hash(), input_channels);
self.shard_threads
.spawn(async move { node.start_shard_node().await });
.spawn(async move { node.start_shard_node(shard_id).await });
self.swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
Ok(())
}
Expand Down Expand Up @@ -355,6 +356,19 @@ impl P2pNode {
InternalMessage::ExportBlockCheckpoint(block, transactions, parent, trie_storage, path) => {
self.task_threads.spawn(async move { db::checkpoint_block_with_state(&block, &transactions, &parent, trie_storage, source, path) });
}
InternalMessage::RestartShard(shard_id) => {
// remove existing node
let topic = Self::shard_id_to_topic(shard_id);
self.shard_nodes.remove(&topic.hash());
// restart node
let shard_config = self.config.nodes
.iter()
.find(|shard_config| shard_config.eth_chain_id == shard_id)
.cloned()
.unwrap_or_else(
|| Self::generate_child_config(self.config.nodes.first().unwrap(), shard_id));
self.add_shard_node(shard_config.clone()).await?;
},
}
},
message = self.request_responses_receiver.next() => {
Expand Down
3 changes: 3 additions & 0 deletions zilliqa/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ impl Network {
state_rpc_limit: state_rpc_limit_default(),
failed_request_sleep_duration: failed_request_sleep_duration_default(),
enable_ots_indices: true,
remote_api_url: Default::default(),
};

let (nodes, external_receivers, local_receivers, request_response_receivers): (
Expand Down Expand Up @@ -497,6 +498,7 @@ impl Network {
state_rpc_limit: state_rpc_limit_default(),
failed_request_sleep_duration: failed_request_sleep_duration_default(),
enable_ots_indices: true,
remote_api_url: Default::default(),
};

let secret_key = options.secret_key_or_random(self.rng.clone());
Expand Down Expand Up @@ -848,6 +850,7 @@ impl Network {
AnyMessage::Internal(source_shard, destination_shard, ref internal_message) => {
trace!("Handling internal message from node in shard {source_shard}, targetting {destination_shard}");
match internal_message {
InternalMessage::RestartShard(_) => todo!(),
InternalMessage::LaunchShard(new_network_id) => {
let secret_key = self.find_node(source).unwrap().1.secret_key;
if let Some(child_network) = self.children.get_mut(new_network_id) {
Expand Down
1 change: 1 addition & 0 deletions zilliqa/tests/it/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ async fn block_and_tx_data_persistence(mut network: Network) {
state_rpc_limit: state_rpc_limit_default(),
failed_request_sleep_duration: failed_request_sleep_duration_default(),
enable_ots_indices: true,
remote_api_url: Default::default(),
};
let mut rng = network.rng.lock().unwrap();
let result = crate::node(
Expand Down
Loading