Skip to content

Commit

Permalink
feat: faster first block load
Browse files Browse the repository at this point in the history
  • Loading branch information
ncitron committed Aug 29, 2023
1 parent e538f6b commit dcf406e
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 56 deletions.
14 changes: 3 additions & 11 deletions client/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::sync::Arc;
use std::time::Duration;

use ethers::prelude::{Address, U256};
use ethers::types::{
Expand Down Expand Up @@ -54,13 +53,6 @@ impl Node {
})
}

pub fn duration_until_next_update(&self) -> Duration {
self.consensus
.duration_until_next_update()
.to_std()
.unwrap()
}

pub async fn call(&self, opts: &CallOpts, block: BlockTag) -> Result<Vec<u8>, NodeError> {
self.check_blocktag_age(&block).await?;

Expand Down Expand Up @@ -232,14 +224,14 @@ impl Node {
if self.check_head_age().await.is_ok() {
Ok(SyncingStatus::IsFalse)
} else {
let latest_synced_block = self.get_block_number().await?;
let starting_block = 0.into();
let latest_synced_block = self.get_block_number().await.unwrap_or(starting_block);
let highest_block = self.consensus.expected_current_slot();

Ok(SyncingStatus::IsSyncing(Box::new(SyncProgress {
current_block: latest_synced_block.as_u64().into(),
highest_block: highest_block.into(),
// TODO: use better start value
starting_block: 0.into(),
starting_block: starting_block.as_u64().into(),

// these fields don't make sense for helios
pulled_states: None,
Expand Down
123 changes: 79 additions & 44 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use futures::future::join_all;
use log::{debug, error, info, warn};
use milagro_bls::PublicKey;
use ssz_rs::prelude::*;
use tokio::sync::mpsc::Sender;
use wasm_timer::{SystemTime, UNIX_EPOCH};

use tokio::sync::mpsc::channel;
Expand Down Expand Up @@ -43,7 +44,10 @@ pub struct ConsensusClient<R: ConsensusRpc, DB: Database> {
pub struct Inner<R: ConsensusRpc> {
rpc: R,
store: LightClientStore,
pub last_checkpoint: Option<Vec<u8>>,
last_checkpoint: Option<Vec<u8>>,
block_send: Sender<Block>,
finalized_block_send: watch::Sender<Option<Block>>,
checkpoint_send: watch::Sender<Option<Vec<u8>>>,
pub config: Arc<Config>,
}

Expand Down Expand Up @@ -72,7 +76,13 @@ impl<R: ConsensusRpc, DB: Database> ConsensusClient<R, DB> {
});

tokio::spawn(async move {
let mut inner = Inner::<R>::new(&rpc, config.clone());
let mut inner = Inner::<R>::new(
&rpc,
block_send,
finalized_block_send,
checkpoint_send,
config.clone(),
);

let res = inner.sync(&initial_checkpoint).await;
if let Err(err) = res {
Expand All @@ -94,23 +104,22 @@ impl<R: ConsensusRpc, DB: Database> ConsensusClient<R, DB> {
}
}

loop {
inner.advance().await.unwrap();

let slot = inner.get_header().slot.as_u64();
let payload = inner.get_execution_payload(&Some(slot)).await.unwrap();
let finalized_slot = inner.get_finalized_header().slot.as_u64();
let finalized_payload = inner
.get_execution_payload(&Some(finalized_slot))
.await
.unwrap();
_ = inner.send_blocks().await;

_ = block_send.send(payload.into()).await;
_ = finalized_block_send.send(Some(finalized_payload.into()));
loop {
sleep(inner.duration_until_next_update().to_std().unwrap()).await;

checkpoint_send.send(inner.last_checkpoint.clone()).unwrap();
let res = inner.advance().await;
if let Err(err) = res {
warn!("advance error: {}", err);
continue;
}

sleep(std::time::Duration::from_secs(12)).await;
let res = inner.send_blocks().await;
if let Err(err) = res {
warn!("send error: {}", err);
continue;
}
}
});

Expand Down Expand Up @@ -140,24 +149,6 @@ impl<R: ConsensusRpc, DB: Database> ConsensusClient<R, DB> {
since_genesis.as_secs() / 12
}

/// Gets the duration until the next update
/// Updates are scheduled for 4 seconds into each slot
pub fn duration_until_next_update(&self) -> Duration {
let current_slot = self.expected_current_slot();
let next_slot = current_slot + 1;
let next_slot_timestamp = self.slot_timestamp(next_slot);

let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();

let time_to_next_slot = next_slot_timestamp - now;
let next_update = time_to_next_slot + 4;

Duration::seconds(next_update as i64)
}

fn slot_timestamp(&self, slot: u64) -> u64 {

Check failure on line 152 in consensus/src/consensus.rs

View workflow job for this annotation

GitHub Actions / clippy

method `slot_timestamp` is never used

Check warning on line 152 in consensus/src/consensus.rs

View workflow job for this annotation

GitHub Actions / check

method `slot_timestamp` is never used

Check warning on line 152 in consensus/src/consensus.rs

View workflow job for this annotation

GitHub Actions / test

method `slot_timestamp` is never used
slot * 12 + self.genesis_time
}
Expand All @@ -180,13 +171,22 @@ async fn sync_all_fallbacks<R: ConsensusRpc>(inner: &mut Inner<R>, chain_id: u64
}

impl<R: ConsensusRpc> Inner<R> {
pub fn new(rpc: &str, config: Arc<Config>) -> Inner<R> {
pub fn new(
rpc: &str,
block_send: Sender<Block>,
finalized_block_send: watch::Sender<Option<Block>>,
checkpoint_send: watch::Sender<Option<Vec<u8>>>,
config: Arc<Config>,
) -> Inner<R> {
let rpc = R::new(rpc);

Inner {
rpc,
store: LightClientStore::default(),
last_checkpoint: None,
block_send,
finalized_block_send,
checkpoint_send,
config,
}
}
Expand Down Expand Up @@ -268,14 +268,6 @@ impl<R: ConsensusRpc> Inner<R> {
Ok(payloads)
}

pub fn get_header(&self) -> &Header {
&self.store.optimistic_header
}

pub fn get_finalized_header(&self) -> &Header {
&self.store.finalized_header
}

pub async fn sync(&mut self, checkpoint: &[u8]) -> Result<()> {
self.store = LightClientStore::default();
self.last_checkpoint = None;
Expand Down Expand Up @@ -337,6 +329,38 @@ impl<R: ConsensusRpc> Inner<R> {
Ok(())
}

pub async fn send_blocks(&self) -> Result<()> {
let slot = self.store.optimistic_header.slot.as_u64();
let payload = self.get_execution_payload(&Some(slot)).await?;
let finalized_slot = self.store.finalized_header.slot.as_u64();
let finalized_payload = self.get_execution_payload(&Some(finalized_slot)).await?;

self.block_send.send(payload.into()).await?;
self.finalized_block_send
.send(Some(finalized_payload.into()))?;
self.checkpoint_send.send(self.last_checkpoint.clone())?;

Ok(())
}

/// Gets the duration until the next update
/// Updates are scheduled for 4 seconds into each slot
pub fn duration_until_next_update(&self) -> Duration {
let current_slot = self.expected_current_slot();
let next_slot = current_slot + 1;
let next_slot_timestamp = self.slot_timestamp(next_slot);

let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();

let time_to_next_slot = next_slot_timestamp - now;
let next_update = time_to_next_slot + 4;

Duration::seconds(next_update as i64)
}

async fn bootstrap(&mut self, checkpoint: &[u8]) -> Result<()> {
let mut bootstrap = self
.rpc
Expand Down Expand Up @@ -765,6 +789,7 @@ mod tests {
};

use config::{networks, Config};
use tokio::sync::{mpsc::channel, watch};

async fn get_client(strict_checkpoint_age: bool, sync: bool) -> Inner<MockRpc> {
let base_config = networks::mainnet();
Expand All @@ -781,7 +806,17 @@ mod tests {
hex::decode("5afc212a7924789b2bc86acad3ab3a6ffb1f6e97253ea50bee7f4f51422c9275")
.unwrap();

let mut client = Inner::new("testdata/", Arc::new(config));
let (block_send, _) = channel(256);
let (finalized_block_send, _) = watch::channel(None);
let (channel_send, _) = watch::channel(None);

let mut client = Inner::new(
"testdata/",
block_send,
finalized_block_send,
channel_send,
Arc::new(config),
);

if sync {
client.sync(&checkpoint).await.unwrap()
Expand Down
2 changes: 1 addition & 1 deletion execution/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl State {
}
},
_ = finalized_block_recv.changed() => {
let block = finalized_block_recv.borrow().clone();
let block = finalized_block_recv.borrow_and_update().clone();
if let Some(block) = block {
inner_ref.write().await.push_finalized_block(block);
}
Expand Down

0 comments on commit dcf406e

Please sign in to comment.