diff --git a/client/src/node.rs b/client/src/node.rs index c1bbc416..ad2e9df9 100644 --- a/client/src/node.rs +++ b/client/src/node.rs @@ -1,5 +1,4 @@ use std::sync::Arc; -use std::time::Duration; use ethers::prelude::{Address, U256}; use ethers::types::{ @@ -54,13 +53,6 @@ impl Node { }) } - pub fn duration_until_next_update(&self) -> Duration { - self.consensus - .duration_until_next_update() - .to_std() - .unwrap() - } - pub async fn call(&self, opts: &CallOpts, block: BlockTag) -> Result, NodeError> { self.check_blocktag_age(&block).await?; @@ -232,14 +224,14 @@ impl Node { if self.check_head_age().await.is_ok() { Ok(SyncingStatus::IsFalse) } else { - let latest_synced_block = self.get_block_number().await?; + let starting_block = 0.into(); + let latest_synced_block = self.get_block_number().await.unwrap_or(starting_block); let highest_block = self.consensus.expected_current_slot(); Ok(SyncingStatus::IsSyncing(Box::new(SyncProgress { current_block: latest_synced_block.as_u64().into(), highest_block: highest_block.into(), - // TODO: use better start value - starting_block: 0.into(), + starting_block: starting_block.as_u64().into(), // these fields don't make sense for helios pulled_states: None, diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index 25a76cc5..eef984b0 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -10,6 +10,7 @@ use futures::future::join_all; use log::{debug, error, info, warn}; use milagro_bls::PublicKey; use ssz_rs::prelude::*; +use tokio::sync::mpsc::Sender; use wasm_timer::{SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc::channel; @@ -43,7 +44,10 @@ pub struct ConsensusClient { pub struct Inner { rpc: R, store: LightClientStore, - pub last_checkpoint: Option>, + last_checkpoint: Option>, + block_send: Sender, + finalized_block_send: watch::Sender>, + checkpoint_send: watch::Sender>>, pub config: Arc, } @@ -72,7 +76,13 @@ impl ConsensusClient { }); tokio::spawn(async move { - let mut inner = Inner::::new(&rpc, config.clone()); + let mut inner = Inner::::new( + &rpc, + block_send, + finalized_block_send, + checkpoint_send, + config.clone(), + ); let res = inner.sync(&initial_checkpoint).await; if let Err(err) = res { @@ -94,23 +104,22 @@ impl ConsensusClient { } } - loop { - inner.advance().await.unwrap(); - - let slot = inner.get_header().slot.as_u64(); - let payload = inner.get_execution_payload(&Some(slot)).await.unwrap(); - let finalized_slot = inner.get_finalized_header().slot.as_u64(); - let finalized_payload = inner - .get_execution_payload(&Some(finalized_slot)) - .await - .unwrap(); + _ = inner.send_blocks().await; - _ = block_send.send(payload.into()).await; - _ = finalized_block_send.send(Some(finalized_payload.into())); + loop { + sleep(inner.duration_until_next_update().to_std().unwrap()).await; - checkpoint_send.send(inner.last_checkpoint.clone()).unwrap(); + let res = inner.advance().await; + if let Err(err) = res { + warn!("advance error: {}", err); + continue; + } - sleep(std::time::Duration::from_secs(12)).await; + let res = inner.send_blocks().await; + if let Err(err) = res { + warn!("send error: {}", err); + continue; + } } }); @@ -140,24 +149,6 @@ impl ConsensusClient { since_genesis.as_secs() / 12 } - /// Gets the duration until the next update - /// Updates are scheduled for 4 seconds into each slot - pub fn duration_until_next_update(&self) -> Duration { - let current_slot = self.expected_current_slot(); - let next_slot = current_slot + 1; - let next_slot_timestamp = self.slot_timestamp(next_slot); - - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(); - - let time_to_next_slot = next_slot_timestamp - now; - let next_update = time_to_next_slot + 4; - - Duration::seconds(next_update as i64) - } - fn slot_timestamp(&self, slot: u64) -> u64 { slot * 12 + self.genesis_time } @@ -180,13 +171,22 @@ async fn sync_all_fallbacks(inner: &mut Inner, chain_id: u64 } impl Inner { - pub fn new(rpc: &str, config: Arc) -> Inner { + pub fn new( + rpc: &str, + block_send: Sender, + finalized_block_send: watch::Sender>, + checkpoint_send: watch::Sender>>, + config: Arc, + ) -> Inner { let rpc = R::new(rpc); Inner { rpc, store: LightClientStore::default(), last_checkpoint: None, + block_send, + finalized_block_send, + checkpoint_send, config, } } @@ -268,14 +268,6 @@ impl Inner { Ok(payloads) } - pub fn get_header(&self) -> &Header { - &self.store.optimistic_header - } - - pub fn get_finalized_header(&self) -> &Header { - &self.store.finalized_header - } - pub async fn sync(&mut self, checkpoint: &[u8]) -> Result<()> { self.store = LightClientStore::default(); self.last_checkpoint = None; @@ -337,6 +329,38 @@ impl Inner { Ok(()) } + pub async fn send_blocks(&self) -> Result<()> { + let slot = self.store.optimistic_header.slot.as_u64(); + let payload = self.get_execution_payload(&Some(slot)).await?; + let finalized_slot = self.store.finalized_header.slot.as_u64(); + let finalized_payload = self.get_execution_payload(&Some(finalized_slot)).await?; + + self.block_send.send(payload.into()).await?; + self.finalized_block_send + .send(Some(finalized_payload.into()))?; + self.checkpoint_send.send(self.last_checkpoint.clone())?; + + Ok(()) + } + + /// Gets the duration until the next update + /// Updates are scheduled for 4 seconds into each slot + pub fn duration_until_next_update(&self) -> Duration { + let current_slot = self.expected_current_slot(); + let next_slot = current_slot + 1; + let next_slot_timestamp = self.slot_timestamp(next_slot); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + let time_to_next_slot = next_slot_timestamp - now; + let next_update = time_to_next_slot + 4; + + Duration::seconds(next_update as i64) + } + async fn bootstrap(&mut self, checkpoint: &[u8]) -> Result<()> { let mut bootstrap = self .rpc @@ -765,6 +789,7 @@ mod tests { }; use config::{networks, Config}; + use tokio::sync::{mpsc::channel, watch}; async fn get_client(strict_checkpoint_age: bool, sync: bool) -> Inner { let base_config = networks::mainnet(); @@ -781,7 +806,17 @@ mod tests { hex::decode("5afc212a7924789b2bc86acad3ab3a6ffb1f6e97253ea50bee7f4f51422c9275") .unwrap(); - let mut client = Inner::new("testdata/", Arc::new(config)); + let (block_send, _) = channel(256); + let (finalized_block_send, _) = watch::channel(None); + let (channel_send, _) = watch::channel(None); + + let mut client = Inner::new( + "testdata/", + block_send, + finalized_block_send, + channel_send, + Arc::new(config), + ); if sync { client.sync(&checkpoint).await.unwrap() diff --git a/execution/src/state.rs b/execution/src/state.rs index aaa8f815..61d5bc1e 100644 --- a/execution/src/state.rs +++ b/execution/src/state.rs @@ -33,7 +33,7 @@ impl State { } }, _ = finalized_block_recv.changed() => { - let block = finalized_block_recv.borrow().clone(); + let block = finalized_block_recv.borrow_and_update().clone(); if let Some(block) = block { inner_ref.write().await.push_finalized_block(block); }