From 1c29384a1729f07a2ce9d98dac9663b6fb44b92c Mon Sep 17 00:00:00 2001 From: Marcelo Diop-Gonzalez Date: Wed, 20 Nov 2024 19:27:12 -0500 Subject: [PATCH 1/8] rewrite --- Cargo.lock | 1 + chain/chain/src/store/mod.rs | 14 + core/primitives/src/state_sync.rs | 14 + integration-tests/src/test_loop/builder.rs | 1 + .../src/tests/client/state_dump.rs | 3 + nearcore/Cargo.toml | 1 + nearcore/src/lib.rs | 4 +- nearcore/src/state_sync.rs | 1256 +++++++++++------ 8 files changed, 836 insertions(+), 458 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca34018b8c7..8c68d2a651a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5273,6 +5273,7 @@ dependencies = [ "testlib", "thiserror 2.0.0", "tokio", + "tokio-stream", "tracing", "xz2", ] diff --git a/chain/chain/src/store/mod.rs b/chain/chain/src/store/mod.rs index 1e76237921d..1a19fe72f4f 100644 --- a/chain/chain/src/store/mod.rs +++ b/chain/chain/src/store/mod.rs @@ -1020,6 +1020,20 @@ impl ChainStore { key } + // pub fn iter_state_sync_dump_progress( + // &self, + // shard_id: ShardId, + // ) -> Result { + // let mut prefix = STATE_SYNC_DUMP_KEY.to_vec(); + // prefix.extend(b":".to_vec()); + // self.store.iter_prefix_ser(DBCol::BlockMisc, prefix) + // option_to_not_found( + // self.store + // .get_ser(DBCol::BlockMisc, &ChainStore::state_sync_dump_progress_key(shard_id)), + // format!("STATE_SYNC_DUMP:{}", shard_id), + // ) + // } + /// Retrieves STATE_SYNC_DUMP for the given shard. pub fn get_state_sync_dump_progress( &self, diff --git a/core/primitives/src/state_sync.rs b/core/primitives/src/state_sync.rs index 5156559c9d7..e143eb7a294 100644 --- a/core/primitives/src/state_sync.rs +++ b/core/primitives/src/state_sync.rs @@ -304,6 +304,20 @@ pub enum StateSyncDumpProgress { }, } +impl StateSyncDumpProgress { + // The `StateSyncDumpProgress` type includes information that is not read or used anywhere. We could + // simplify the type with a DB migration, but it's not really important, since the data is small. This + // function returns the two pieces of information that the state dump code actually does use, which is the + // `EpochId` we were last trying to dump, and whether we're finished with it. + pub fn epoch_done(&self) -> (EpochId, bool) { + match self { + Self::AllDumped { epoch_id, .. } => (epoch_id.clone(), true), + Self::Skipped { epoch_id, .. } => (epoch_id.clone(), true), + Self::InProgress { epoch_id, .. } => (epoch_id.clone(), false), + } + } +} + #[cfg(test)] mod tests { use crate::state_sync::{get_num_state_parts, STATE_PART_MEMORY_LIMIT}; diff --git a/integration-tests/src/test_loop/builder.rs b/integration-tests/src/test_loop/builder.rs index ff1fdbde509..2c85c358d2b 100644 --- a/integration-tests/src/test_loop/builder.rs +++ b/integration-tests/src/test_loop/builder.rs @@ -719,6 +719,7 @@ impl TestLoopBuilder { future_spawner.spawn_boxed("state_sync_dumper", future); Box::new(|| {}) }), + future_spawner: Arc::new(self.test_loop.future_spawner()), handle: None, }; let state_sync_dumper_handle = self.test_loop.data.register_data(state_sync_dumper); diff --git a/integration-tests/src/tests/client/state_dump.rs b/integration-tests/src/tests/client/state_dump.rs index 608d4dab927..930797189ab 100644 --- a/integration-tests/src/tests/client/state_dump.rs +++ b/integration-tests/src/tests/client/state_dump.rs @@ -1,5 +1,6 @@ use assert_matches::assert_matches; +use near_async::futures::ActixFutureSpawner; use near_async::time::{Clock, Duration}; use near_chain::near_chain_primitives::error::QueryError; use near_chain::{ChainGenesis, ChainStoreAccess, Provenance}; @@ -66,6 +67,7 @@ fn test_state_dump() { runtime, validator, dump_future_runner: StateSyncDumper::arbiter_dump_future_runner(), + future_spawner: Arc::new(ActixFutureSpawner), handle: None, }; state_sync_dumper.start().unwrap(); @@ -171,6 +173,7 @@ fn run_state_sync_with_dumped_parts( runtime, validator, dump_future_runner: StateSyncDumper::arbiter_dump_future_runner(), + future_spawner: Arc::new(ActixFutureSpawner), handle: None, }; state_sync_dumper.start().unwrap(); diff --git a/nearcore/Cargo.toml b/nearcore/Cargo.toml index 51f83aeaf5f..06a42fb67a9 100644 --- a/nearcore/Cargo.toml +++ b/nearcore/Cargo.toml @@ -44,6 +44,7 @@ strum.workspace = true tempfile.workspace = true thiserror.workspace = true tokio.workspace = true +tokio-stream.workspace = true tracing.workspace = true xz2.workspace = true diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 023876302b0..acc0ed70c1f 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -388,6 +388,7 @@ pub fn start_with_config_and_synchronization( let state_sync_runtime = Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()); + let state_sync_spawner = Arc::new(TokioRuntimeFutureSpawner(state_sync_runtime.clone())); let StartClientResult { client_actor, client_arbiter_handle, resharding_handle } = start_client( Clock::real(), config.client_config.clone(), @@ -396,7 +397,7 @@ pub fn start_with_config_and_synchronization( shard_tracker.clone(), runtime.clone(), node_id, - Arc::new(TokioRuntimeFutureSpawner(state_sync_runtime.clone())), + state_sync_spawner.clone(), network_adapter.as_multi_sender(), shards_manager_adapter.as_sender(), config.validator_signer.clone(), @@ -433,6 +434,7 @@ pub fn start_with_config_and_synchronization( runtime, validator: config.validator_signer.clone(), dump_future_runner: StateSyncDumper::arbiter_dump_future_runner(), + future_spawner: state_sync_spawner.clone(), handle: None, }; state_sync_dumper.start()?; diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index 49b2657ff98..2ae4a4a001f 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -1,10 +1,12 @@ use crate::metrics; use actix_rt::Arbiter; +use anyhow::Context; use borsh::BorshSerialize; use futures::future::BoxFuture; -use futures::FutureExt; -use near_async::time::{Clock, Duration, Instant}; +use futures::{FutureExt, StreamExt}; +use near_async::futures::{FutureSpawner, FutureSpawnerExt}; +use near_async::time::{Clock, Duration, Interval}; use near_chain::types::RuntimeAdapter; use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode, Error}; use near_chain_configs::{ClientConfig, ExternalStorageLocation, MutableValidatorSigner}; @@ -17,19 +19,25 @@ use near_client::sync::external::{ }; use near_epoch_manager::shard_tracker::ShardTracker; use near_epoch_manager::EpochManagerAdapter; +use near_primitives::block::BlockHeader; use near_primitives::hash::CryptoHash; use near_primitives::state_part::PartId; use near_primitives::state_sync::StateSyncDumpProgress; -use near_primitives::types::{AccountId, EpochHeight, EpochId, ShardId, StateRoot}; -use rand::{thread_rng, Rng}; -use std::collections::HashSet; -use std::sync::atomic::AtomicBool; -use std::sync::Arc; +use near_primitives::types::{EpochHeight, EpochId, ShardId, StateRoot}; +use rand::seq::SliceRandom; +use rand::thread_rng; +use std::collections::{HashMap, HashSet}; +use std::i64; +use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; +use std::sync::{Arc, RwLock}; +use tokio::sync::oneshot; +use tokio::sync::Semaphore; /// Time limit per state dump iteration. /// A node must check external storage for parts to dump again once time is up. pub const STATE_DUMP_ITERATION_TIME_LIMIT_SECS: u64 = 300; +// TODO: could refactor this further and just have one "Dumper" struct here pub struct StateSyncDumper { pub clock: Clock, pub client_config: ClientConfig, @@ -42,6 +50,7 @@ pub struct StateSyncDumper { /// Please note that the locked value should not be stored anywhere or passed through the thread boundary. pub validator: MutableValidatorSigner, pub dump_future_runner: Box) -> Box>, + pub future_spawner: Arc, pub handle: Option, } @@ -83,62 +92,44 @@ impl StateSyncDumper { }, }; - // Determine how many threads to start. - // TODO(resharding): Handle the case of changing the shard layout. - let shard_ids = { - // Sadly, `Chain` is not `Send` and each thread needs to create its own `Chain` instance. - let chain = Chain::new_for_view_client( + let chain_id = self.client_config.chain_id.clone(); + let keep_running = Arc::new(AtomicBool::new(true)); + // Start a thread for each shard. + + let chain = Chain::new_for_view_client( + self.clock.clone(), + self.epoch_manager.clone(), + self.shard_tracker.clone(), + self.runtime.clone(), + &self.chain_genesis, + DoomslugThresholdMode::TwoThirds, + false, + ) + .unwrap(); + if let Some(shards) = dump_config.restart_dump_for_shards.as_ref() { + for shard_id in shards { + chain.chain_store().set_state_sync_dump_progress(*shard_id, None).unwrap(); + tracing::debug!(target: "state_sync_dump", ?shard_id, "Dropped existing progress"); + } + } + let handle = (self.dump_future_runner)( + do_state_sync_dump( self.clock.clone(), + chain, self.epoch_manager.clone(), self.shard_tracker.clone(), self.runtime.clone(), - &self.chain_genesis, - DoomslugThresholdMode::TwoThirds, - false, - )?; - let epoch_id = chain.head()?.epoch_id; - self.epoch_manager.shard_ids(&epoch_id) - }?; - - let chain_id = self.client_config.chain_id.clone(); - let keep_running = Arc::new(AtomicBool::new(true)); - // Start a thread for each shard. - let handles = shard_ids - .into_iter() - .map(|shard_id| { - let runtime = self.runtime.clone(); - let chain_genesis = self.chain_genesis.clone(); - let chain = Chain::new_for_view_client( - self.clock.clone(), - self.epoch_manager.clone(), - self.shard_tracker.clone(), - runtime.clone(), - &chain_genesis, - DoomslugThresholdMode::TwoThirds, - false, - ) - .unwrap(); - (self.dump_future_runner)( - state_sync_dump( - self.clock.clone(), - shard_id, - chain, - self.epoch_manager.clone(), - self.shard_tracker.clone(), - runtime.clone(), - chain_id.clone(), - dump_config.restart_dump_for_shards.clone().unwrap_or_default(), - external.clone(), - dump_config.iteration_delay.unwrap_or(Duration::seconds(10)), - self.validator.clone(), - keep_running.clone(), - ) - .boxed(), - ) - }) - .collect(); + chain_id.clone(), + external.clone(), + dump_config.iteration_delay.unwrap_or(Duration::seconds(10)), + self.validator.clone(), + keep_running.clone(), + self.future_spawner.clone(), + ) + .boxed(), + ); - self.handle = Some(StateSyncDumpHandle { handles, keep_running }); + self.handle = Some(StateSyncDumpHandle { handle: Some(handle), keep_running }); Ok(()) } @@ -160,7 +151,7 @@ impl StateSyncDumper { /// Holds arbiter handles controlling the lifetime of the spawned threads. pub struct StateSyncDumpHandle { - pub handles: Vec>, + pub handle: Option>, keep_running: Arc, } @@ -173,25 +164,11 @@ impl Drop for StateSyncDumpHandle { impl StateSyncDumpHandle { fn stop(&mut self) { tracing::warn!(target: "state_sync_dump", "Stopping state dumper"); - self.keep_running.store(false, std::sync::atomic::Ordering::Relaxed); - self.handles.drain(..).for_each(|dropper| { - dropper(); - }); + self.keep_running.store(false, Ordering::Relaxed); + self.handle.take().unwrap()() } } -/// Fetches the state sync header from DB and serializes it. -fn get_serialized_header( - shard_id: ShardId, - sync_hash: CryptoHash, - chain: &Chain, -) -> anyhow::Result> { - let header = chain.get_state_response_header(shard_id, sync_hash)?; - let mut buffer: Vec = Vec::new(); - header.serialize(&mut buffer)?; - Ok(buffer) -} - pub fn extract_part_id_from_part_file_name(file_name: &String) -> u64 { assert!(is_part_filename(file_name)); return get_part_id_from_filename(file_name).unwrap(); @@ -199,12 +176,12 @@ pub fn extract_part_id_from_part_file_name(file_name: &String) -> u64 { async fn get_missing_part_ids_for_epoch( shard_id: ShardId, - chain_id: &String, + chain_id: &str, epoch_id: &EpochId, epoch_height: u64, total_parts: u64, external: &ExternalConnection, -) -> Result, anyhow::Error> { +) -> Result, anyhow::Error> { let directory_path = external_storage_location_directory( chain_id, epoch_id, @@ -218,443 +195,808 @@ async fn get_missing_part_ids_for_epoch( .iter() .map(|file_name| extract_part_id_from_part_file_name(file_name)) .collect(); - let missing_nums: Vec = + let missing_nums: HashSet<_> = (0..total_parts).filter(|i| !existing_nums.contains(i)).collect(); let num_missing = missing_nums.len(); tracing::debug!(target: "state_sync_dump", ?num_missing, ?directory_path, "Some parts have already been dumped."); Ok(missing_nums) } else { tracing::debug!(target: "state_sync_dump", ?total_parts, ?directory_path, "No part has been dumped."); - let missing_nums = (0..total_parts).collect::>(); + let missing_nums = (0..total_parts).collect(); Ok(missing_nums) } } -fn select_random_part_id_with_index(parts_to_be_dumped: &Vec) -> (u64, usize) { - let mut rng = thread_rng(); - let selected_idx = rng.gen_range(0..parts_to_be_dumped.len()); - let selected_element = parts_to_be_dumped[selected_idx]; - tracing::debug!(target: "state_sync_dump", ?selected_element, "selected parts to dump: "); - (selected_element, selected_idx) -} - -enum StateDumpAction { - Wait, - Dump { epoch_id: EpochId, epoch_height: EpochHeight, sync_hash: CryptoHash }, -} - -fn get_current_state( +// TODO: implement it for real +fn get_dump_progress( chain: &Chain, - shard_id: &ShardId, - shard_tracker: &ShardTracker, - account_id: &Option, - epoch_manager: Arc, -) -> Result { - let was_last_epoch_done = match chain.chain_store().get_state_sync_dump_progress(*shard_id) { - Ok(StateSyncDumpProgress::AllDumped { epoch_id, .. }) => Some(epoch_id), - Ok(StateSyncDumpProgress::Skipped { epoch_id, .. }) => Some(epoch_id), - _ => None, + epoch_manager: &dyn EpochManagerAdapter, + fixme_current_epoch_id: &EpochId, +) -> anyhow::Result> { + let shard_ids = epoch_manager.shard_ids(fixme_current_epoch_id)?; + let progress = match chain.chain_store().get_state_sync_dump_progress(shard_ids[0]) { + Ok(p) => p, + Err(Error::DBNotFoundErr(_)) => return Ok(vec![]), + Err(e) => return Err(e).context("failed looking up state dump progress"), }; + Ok(shard_ids.into_iter().map(|s| (s, progress.clone())).collect()) +} - let maybe_latest_epoch_info = get_latest_epoch(shard_id, &chain, epoch_manager.clone()); - let latest_epoch_info = match maybe_latest_epoch_info { - Ok(latest_epoch_info) => latest_epoch_info, - Err(err) => { - tracing::error!(target: "state_sync_dump", ?shard_id, ?err, "Failed to get the latest epoch"); - return Err(err); - } - }; - let Some(LatestEpochInfo { - prev_epoch_id, - epoch_id: new_epoch_id, - epoch_height: new_epoch_height, - sync_hash: new_sync_hash, - }) = latest_epoch_info - else { - return Ok(StateDumpAction::Wait); - }; +// State associated with dumping a shard's state +struct ShardDump { + state_root: StateRoot, + // None if it's already been dumped + header_to_dump: Option>, + num_parts: u64, + parts_dumped: Arc, + parts_missing: Arc>>, + // This will give Ok(()) when they're all done, or Err() when one gives an error + // For now the tasks never fail, since we just retry all errors like the old implementation did, + // but we probably want to make a change to distinguish which errors are actually retriable + // (e.g. the state snapshot isn't ready yet) + upload_parts: oneshot::Receiver>, +} - if Some(&new_epoch_id) == was_last_epoch_done.as_ref() { - tracing::debug!(target: "state_sync_dump", ?shard_id, ?was_last_epoch_done, ?new_epoch_id, new_epoch_height, ?new_sync_hash, "latest epoch is done. No new epoch to dump. Idle"); - Ok(StateDumpAction::Wait) - } else if epoch_manager.get_shard_layout(&prev_epoch_id) - != epoch_manager.get_shard_layout(&new_epoch_id) - { - tracing::debug!(target: "state_sync_dump", ?shard_id, ?was_last_epoch_done, ?new_epoch_id, new_epoch_height, ?new_sync_hash, "Shard layout change detected, will skip dumping for this epoch. Idle"); - chain.chain_store().set_state_sync_dump_progress( - *shard_id, - Some(StateSyncDumpProgress::Skipped { - epoch_id: new_epoch_id, - epoch_height: new_epoch_height, - }), - )?; - Ok(StateDumpAction::Wait) - } else if cares_about_shard(chain, shard_id, &new_sync_hash, &shard_tracker, &account_id)? { - Ok(StateDumpAction::Dump { - epoch_id: new_epoch_id, - epoch_height: new_epoch_height, - sync_hash: new_sync_hash, - }) - } else { - tracing::debug!(target: "state_sync_dump", ?shard_id, ?new_epoch_id, new_epoch_height, ?new_sync_hash, "Doesn't care about the shard in the current epoch. Idle"); - Ok(StateDumpAction::Wait) - } +// State associated with dumping an epoch's state +struct DumpState { + epoch_id: EpochId, + epoch_height: EpochHeight, + sync_prev_prev_hash: CryptoHash, + // Contains state for each shard we need to dump. We remove shard IDs from + // this map as we finish them. + dump_state: HashMap, + canceled: Arc, } -/// Uploads header to external storage. -/// Returns true if it was successful. -async fn upload_state_header( - chain_id: &String, - epoch_id: &EpochId, - epoch_height: u64, - shard_id: ShardId, - state_sync_header: anyhow::Result>, - external: &ExternalConnection, -) -> bool { - match state_sync_header { - Err(err) => { - tracing::error!(target: "state_sync_dump", ?err, ?shard_id, "Failed to serialize header."); - false - } - Ok(header) => { - let file_type = StateFileType::StateHeader; - let location = - external_storage_location(&chain_id, &epoch_id, epoch_height, shard_id, &file_type); - match external.put_file(file_type, &header, shard_id, &location).await { - Err(err) => { - tracing::warn!(target: "state_sync_dump", ?shard_id, epoch_height, ?err, "Failed to put header into external storage. Will retry next iteration."); - false +impl DumpState { + async fn set_missing_parts(&self, external: &ExternalConnection, chain_id: &str) { + for (shard_id, s) in self.dump_state.iter() { + match get_missing_part_ids_for_epoch( + *shard_id, + chain_id, + &self.epoch_id, + self.epoch_height, + s.num_parts, + external, + ) + .await + { + Ok(missing) => { + *s.parts_missing.write().unwrap() = missing; } - Ok(_) => { - tracing::trace!(target: "state_sync_dump", ?shard_id, epoch_height, "Header saved to external storage."); - true + Err(error) => { + tracing::error!(target: "state_sync_dump", ?error, ?shard_id, "Failed to list stored state parts."); } } } } } -const FAILURES_ALLOWED_PER_ITERATION: u32 = 10; +// Represents the state of the current epoch's state part dump +enum CurrentDump { + None, + InProgress(DumpState), + Done(EpochId), +} -async fn state_sync_dump( +// Helper type used as an intermediate return value where the caller will want the sender only +// if there's something to do +enum NewDump { + Dump(DumpState, HashMap>>), + NoTrackedShards, +} + +/// State associated with dumps for all shards responsible for checking when we need to dump for a new epoch +struct StateDumper { clock: Clock, - shard_id: ShardId, + chain_id: String, + validator: MutableValidatorSigner, + shard_tracker: ShardTracker, chain: Chain, epoch_manager: Arc, - shard_tracker: ShardTracker, runtime: Arc, - chain_id: String, - restart_dump_for_shards: Vec, + // State associated with dumping the current epoch + current_dump: CurrentDump, external: ExternalConnection, - iteration_delay: Duration, - validator: MutableValidatorSigner, - keep_running: Arc, -) { - tracing::info!(target: "state_sync_dump", ?shard_id, "Running StateSyncDump loop"); + future_spawner: Arc, + // Used to limit how many tasks can be doing the computation-heavy state part generation at a time + obtain_parts: Arc, +} - if restart_dump_for_shards.contains(&shard_id) { - tracing::debug!(target: "state_sync_dump", ?shard_id, "Dropped existing progress"); - chain.chain_store().set_state_sync_dump_progress(shard_id, None).unwrap(); - } +// Stores needed data for use in part upload futures +struct PartUploader { + clock: Clock, + external: ExternalConnection, + runtime: Arc, + chain_id: String, + epoch_id: EpochId, + epoch_height: EpochHeight, + sync_prev_prev_hash: CryptoHash, + shard_id: ShardId, + state_root: StateRoot, + num_parts: u64, + // Used for setting the num_parts_dumped gauge metric (which is an i64) + // When part upload tasks are cancelled on a new epoch, this is set to -1 so tasks + // know not to touch that metric anymore. + parts_dumped: Arc, + parts_missing: Arc>>, + obtain_parts: Arc, + canceled: Arc, +} - // Stop if the node is stopped. - // Note that without this check the state dumping thread is unstoppable, i.e. non-interruptable. - while keep_running.load(std::sync::atomic::Ordering::Relaxed) { - tracing::debug!(target: "state_sync_dump", ?shard_id, "Running StateSyncDump loop iteration"); - let account_id = validator.get().map(|v| v.validator_id().clone()); - let current_state = get_current_state( - &chain, - &shard_id, - &shard_tracker, - &account_id, - epoch_manager.clone(), - ); - let next_state = match current_state { - Err(err) => { - tracing::error!(target: "state_sync_dump", ?err, ?shard_id, "Failed to get the current state"); +fn respawn_for_parallelism( + future_spawner: &dyn FutureSpawner, + name: &'static str, + f: impl std::future::Future + Send + 'static, +) -> impl std::future::Future + Send + 'static { + let (sender, receiver) = oneshot::channel(); + future_spawner.spawn(name, async move { + sender.send(f.await).ok(); + }); + async move { receiver.await.unwrap() } +} + +impl PartUploader { + fn inc_parts_dumped(&self) { + match self.parts_dumped.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| { + if prev >= 0 { + Some(prev + 1) + } else { None } - Ok(StateDumpAction::Wait) => None, - Ok(StateDumpAction::Dump { epoch_id, epoch_height, sync_hash }) => { - let in_progress_data = get_in_progress_data(shard_id, sync_hash, &chain); - match in_progress_data { - Err(err) => { - tracing::error!(target: "state_sync_dump", ?err, ?shard_id, "Failed to get in progress data"); - None - } - Ok((state_root, num_parts, sync_prev_prev_hash)) => { - // Upload header - let header_in_external_storage = match external - .is_state_sync_header_stored_for_epoch( - shard_id, - &chain_id, - &epoch_id, - epoch_height, - ) - .await - { - Err(err) => { - tracing::error!(target: "state_sync_dump", ?err, ?shard_id, "Failed to determine header presence in external storage."); - false - } - // Header is already stored - Ok(true) => true, - // Header is missing - Ok(false) => { - upload_state_header( - &chain_id, - &epoch_id, - epoch_height, - shard_id, - get_serialized_header(shard_id, sync_hash, &chain), - &external, - ) - .await - } - }; - - let header_upload_status = if header_in_external_storage { - None - } else { - Some(StateSyncDumpProgress::InProgress { - epoch_id: epoch_id, - epoch_height, - sync_hash, - }) - }; - - // Upload parts - let parts_upload_status = match get_missing_part_ids_for_epoch( - shard_id, - &chain_id, - &epoch_id, - epoch_height, - num_parts, - &external, - ) - .await - { - Err(err) => { - tracing::error!(target: "state_sync_dump", ?err, ?shard_id, "Failed to determine missing parts"); - None - } - Ok(missing_parts) if missing_parts.is_empty() => { - update_dumped_size_and_cnt_metrics( - &shard_id, - epoch_height, - None, - num_parts, - num_parts, - ); - Some(StateSyncDumpProgress::AllDumped { epoch_id, epoch_height }) - } - Ok(missing_parts) => { - let mut parts_to_dump = missing_parts.clone(); - let timer = Instant::now(); - let mut dumped_any_state_part = false; - let mut failures_cnt = 0; - // Stop if the node is stopped. - // Note that without this check the state dumping thread is unstoppable, i.e. non-interruptable. - while keep_running.load(std::sync::atomic::Ordering::Relaxed) - && timer.elapsed().as_secs() - <= STATE_DUMP_ITERATION_TIME_LIMIT_SECS - && !parts_to_dump.is_empty() - && failures_cnt < FAILURES_ALLOWED_PER_ITERATION - { - let _timer = metrics::STATE_SYNC_DUMP_ITERATION_ELAPSED - .with_label_values(&[&shard_id.to_string()]) - .start_timer(); - - let (part_id, selected_idx) = - select_random_part_id_with_index(&parts_to_dump); - - let state_part = runtime.obtain_state_part( - shard_id, - &sync_prev_prev_hash, - &state_root, - PartId::new(part_id, num_parts), - ); - let state_part = match state_part { - Ok(state_part) => state_part, - Err(err) => { - tracing::warn!(target: "state_sync_dump", ?shard_id, epoch_height, part_id, ?err, "Failed to obtain and store part. Will skip this part."); - failures_cnt += 1; - continue; - } - }; - - let file_type = StateFileType::StatePart { part_id, num_parts }; - let location = external_storage_location( - &chain_id, - &epoch_id, - epoch_height, - shard_id, - &file_type, - ); - if let Err(err) = external - .put_file(file_type, &state_part, shard_id, &location) - .await - { - // no need to break if there's an error, we should keep dumping other parts. - // reason is we are dumping random selected parts, so it's fine if we are not able to finish all of them - tracing::warn!(target: "state_sync_dump", ?shard_id, epoch_height, part_id, ?err, "Failed to put a store part into external storage. Will skip this part."); - failures_cnt += 1; - continue; - } - - // Remove the dumped part from parts_to_dump so that we draw without replacement. - parts_to_dump.swap_remove(selected_idx); - update_dumped_size_and_cnt_metrics( - &shard_id, - epoch_height, - Some(state_part.len()), - num_parts.checked_sub(parts_to_dump.len() as u64).unwrap(), - num_parts, - ); - dumped_any_state_part = true; - } - if parts_to_dump.is_empty() { - Some(StateSyncDumpProgress::AllDumped { - epoch_id, - epoch_height, - }) - } else if dumped_any_state_part { - Some(StateSyncDumpProgress::InProgress { - epoch_id, - epoch_height, - sync_hash, - }) - } else { - // No progress made. Wait before retrying. - None - } - } - }; - match (&parts_upload_status, &header_upload_status) { - ( - Some(StateSyncDumpProgress::AllDumped { .. }), - Some(StateSyncDumpProgress::InProgress { .. }), - ) => header_upload_status, - _ => parts_upload_status, - } - } + }) { + Ok(prev_parts_dumped) => { + metrics::STATE_SYNC_DUMP_NUM_PARTS_DUMPED + .with_label_values(&[&self.shard_id.to_string()]) + .set(prev_parts_dumped + 1); + } + Err(_) => {} + }; + } + + async fn upload_state_part(self: Arc, part_idx: u64) -> anyhow::Result<()> { + if !self.parts_missing.read().unwrap().contains(&part_idx) { + self.inc_parts_dumped(); + return Ok(()); + } + let part_id = PartId::new(part_idx, self.num_parts); + + let state_part = loop { + if self.canceled.load(Ordering::Relaxed) { + return Ok(()); + } + let _timer = metrics::STATE_SYNC_DUMP_ITERATION_ELAPSED + .with_label_values(&[&self.shard_id.to_string()]) + .start_timer(); + let state_part = { + let _permit = self.obtain_parts.acquire().await.unwrap(); + self.runtime.obtain_state_part( + self.shard_id, + &self.sync_prev_prev_hash, + &self.state_root, + part_id, + ) + }; + match state_part { + Ok(state_part) => { + break state_part; + } + Err(error) => { + // TODO: return non retriable errors. + tracing::warn!(target: "state_sync_dump", shard_id = %self.shard_id, epoch_height=%self.epoch_height, epoch_id=?&self.epoch_id, ?part_id, ?error, "Failed to obtain state part. Retrying in 10 seconds."); + self.clock.sleep(Duration::milliseconds(200)).await; + continue; } } }; - // Record the next state of the state machine. - let has_progress = match next_state { - Some(next_state) => { - tracing::debug!(target: "state_sync_dump", ?shard_id, ?next_state); - match chain.chain_store().set_state_sync_dump_progress(shard_id, Some(next_state)) { - Ok(_) => true, - Err(err) => { - // This will be retried. - tracing::debug!(target: "state_sync_dump", ?shard_id, ?err, "Failed to set progress"); - false - } + let file_type = StateFileType::StatePart { part_id: part_idx, num_parts: self.num_parts }; + let location = external_storage_location( + &self.chain_id, + &self.epoch_id, + self.epoch_height, + self.shard_id, + &file_type, + ); + loop { + if self.canceled.load(Ordering::Relaxed) { + return Ok(()); + } + match self + .external + .put_file(file_type.clone(), &state_part, self.shard_id, &location) + .await + { + Ok(()) => { + self.inc_parts_dumped(); + metrics::STATE_SYNC_DUMP_SIZE_TOTAL + .with_label_values(&[ + &self.epoch_height.to_string(), + &self.shard_id.to_string(), + ]) + .inc_by(state_part.len() as u64); + tracing::debug!(target: "state_sync_dump", shard_id = %self.shard_id, epoch_height=%self.epoch_height, epoch_id=?&self.epoch_id, ?part_id, "Uploaded state part."); + return Ok(()); + } + Err(error) => { + tracing::warn!(target: "state_sync_dump", shard_id = %self.shard_id, epoch_height=%self.epoch_height, epoch_id=?&self.epoch_id, ?part_id, ?error, "Failed to upload state part. Retrying in 10 seconds."); + self.clock.sleep(Duration::milliseconds(200)).await; + continue; } } - None => { - // Nothing to do, will check again later. - tracing::debug!(target: "state_sync_dump", ?shard_id, "Idle"); + } + } +} + +// Stores needed data for use in header upload futures +struct HeaderUploader { + clock: Clock, + external: ExternalConnection, + chain_id: String, + epoch_id: EpochId, + epoch_height: EpochHeight, +} + +impl HeaderUploader { + async fn upload_header(self: Arc, shard_id: ShardId, header: Option>) { + let Some(header) = header else { + return; + }; + let file_type = StateFileType::StateHeader; + let location = external_storage_location( + &self.chain_id, + &self.epoch_id, + self.epoch_height, + shard_id, + &file_type, + ); + loop { + match self.external.put_file(file_type.clone(), &header, shard_id, &location).await { + Ok(_) => { + tracing::info!( + target: "state_sync_dump", %shard_id, epoch_height = %self.epoch_height, + "Header saved to external storage." + ); + return; + } + Err(err) => { + tracing::warn!( + target: "state_sync_dump", %shard_id, epoch_height = %self.epoch_height, ?err, + "Failed to put header into external storage. Will retry next iteration." + ); + self.clock.sleep(Duration::seconds(5)).await; + continue; + } + }; + } + } + + async fn header_stored(self: Arc, shard_id: ShardId) -> bool { + match self + .external + .is_state_sync_header_stored_for_epoch( + shard_id, + &self.chain_id, + &self.epoch_id, + self.epoch_height, + ) + .await + { + Ok(stored) => stored, + Err(err) => { + tracing::error!(target: "state_sync_dump", ?err, ?shard_id, "Failed to determine header presence in external storage."); false } + } + } +} + +impl StateDumper { + fn new( + clock: Clock, + chain_id: String, + validator: MutableValidatorSigner, + shard_tracker: ShardTracker, + chain: Chain, + epoch_manager: Arc, + runtime: Arc, + external: ExternalConnection, + future_spawner: Arc, + ) -> Self { + Self { + clock, + chain_id, + validator, + shard_tracker, + chain, + epoch_manager, + runtime, + current_dump: CurrentDump::None, + external, + future_spawner, + obtain_parts: Arc::new(Semaphore::new(4)), + } + } + + fn get_block_header(&self, hash: &CryptoHash) -> anyhow::Result { + self.chain.get_block_header(hash).with_context(|| format!("Failed getting header {}", hash)) + } + + fn check_old_progress( + &mut self, + epoch_id: &EpochId, + dump: &mut DumpState, + ) -> anyhow::Result<()> { + let progress = get_dump_progress(&self.chain, self.epoch_manager.as_ref(), epoch_id)?; + for (shard_id, progress) in progress.iter() { + let (dumped_epoch_id, done) = progress.epoch_done(); + if &dumped_epoch_id != epoch_id { + self.chain + .chain_store() + .set_state_sync_dump_progress(*shard_id, None) + .context("failed setting state dump progress")?; + } else if done { + dump.dump_state.remove(shard_id); + } + } + Ok(()) + } + + fn latest_sync_header(&self) -> anyhow::Result> { + let head = self.chain.head().context("Failed getting chain head")?; + let header = self.get_block_header(&head.last_block_hash)?; + let final_hash = header.last_final_block(); + if final_hash == &CryptoHash::default() { + return Ok(None); + } + let Some(sync_hash) = self + .chain + .get_sync_hash(final_hash) + .with_context(|| format!("Failed getting sync hash for {}", &final_hash))? + else { + return Ok(None); }; + self.get_block_header(&sync_hash).map(Some) + } - if !has_progress { - // Avoid a busy-loop when there is nothing to do. - clock.sleep(iteration_delay).await; + fn get_shard_dump( + &self, + shard_id: ShardId, + sync_hash: &CryptoHash, + ) -> anyhow::Result<(ShardDump, oneshot::Sender>)> { + let state_header = + self.chain.get_state_response_header(shard_id, *sync_hash).with_context(|| { + format!("Failed getting state response header for {} {}", shard_id, sync_hash) + })?; + let state_root = state_header.chunk_prev_state_root(); + let num_parts = state_header.num_state_parts(); + metrics::STATE_SYNC_DUMP_NUM_PARTS_TOTAL + .with_label_values(&[&shard_id.to_string()]) + .set(num_parts.try_into().unwrap_or(i64::MAX)); + + let mut header_bytes: Vec = Vec::new(); + state_header.serialize(&mut header_bytes)?; + let (sender, receiver) = oneshot::channel(); + Ok(( + ShardDump { + state_root, + header_to_dump: Some(header_bytes), + num_parts, + parts_dumped: Arc::new(AtomicI64::new(0)), + parts_missing: Arc::new(RwLock::new((0..num_parts).collect())), + upload_parts: receiver, + }, + sender, + )) + } + + fn get_dump_state(&mut self, sync_header: &BlockHeader) -> anyhow::Result { + let epoch_info = self + .epoch_manager + .get_epoch_info(sync_header.epoch_id()) + .with_context(|| format!("Failed getting epoch info {:?}", sync_header.epoch_id()))?; + let sync_prev_header = self.get_block_header(sync_header.prev_hash())?; + let sync_prev_prev_hash = *sync_prev_header.prev_hash(); + let shard_ids = self + .epoch_manager + .shard_ids(sync_header.epoch_id()) + .with_context(|| format!("Failed getting shard IDs {:?}", sync_header.epoch_id()))?; + + let v = self.validator.get(); + let account_id = v.as_ref().map(|v| v.validator_id()); + let mut dump_state = HashMap::new(); + let mut senders = HashMap::new(); + for shard_id in shard_ids { + if !self.shard_tracker.care_about_shard( + account_id, + sync_header.prev_hash(), + shard_id, + true, + ) { + tracing::debug!( + target: "state_sync_dump", epoch_height = %epoch_info.epoch_height(), epoch_id = ?sync_header.epoch_id(), %shard_id, + "Not dumping state for non-tracked shard." + ); + continue; + } + metrics::STATE_SYNC_DUMP_EPOCH_HEIGHT + .with_label_values(&[&shard_id.to_string()]) + .set(epoch_info.epoch_height().try_into().unwrap_or(i64::MAX)); + + let (shard_dump, sender) = self.get_shard_dump(shard_id, sync_header.hash())?; + dump_state.insert(shard_id, shard_dump); + senders.insert(shard_id, sender); + } + assert_eq!( + dump_state.keys().collect::>(), + senders.keys().collect::>() + ); + if dump_state.is_empty() { + tracing::warn!( + target: "state_sync_dump", epoch_height = %epoch_info.epoch_height(), epoch_id = ?sync_header.epoch_id(), + "Not doing anything for the current epoch. No shards tracked." + ); + return Ok(NewDump::NoTrackedShards); } + Ok(NewDump::Dump( + DumpState { + epoch_id: *sync_header.epoch_id(), + epoch_height: epoch_info.epoch_height(), + sync_prev_prev_hash, + dump_state, + canceled: Arc::new(AtomicBool::new(false)), + }, + senders, + )) } - tracing::debug!(target: "state_sync_dump", ?shard_id, "Stopped state dump thread"); -} -// Extracts extra data needed for obtaining state parts. -fn get_in_progress_data( - shard_id: ShardId, - sync_hash: CryptoHash, - chain: &Chain, -) -> Result<(StateRoot, u64, CryptoHash), Error> { - let state_header = chain.get_state_response_header(shard_id, sync_hash)?; - let state_root = state_header.chunk_prev_state_root(); - let num_parts = state_header.num_state_parts(); - - let sync_block_header = chain.get_block_header(&sync_hash)?; - let sync_prev_block_header = chain.get_previous_header(&sync_block_header)?; - let sync_prev_prev_hash = sync_prev_block_header.prev_hash(); - Ok((state_root, num_parts, *sync_prev_prev_hash)) -} + async fn check_stored_headers(&mut self, dump: &mut DumpState) -> anyhow::Result<()> { + let uploader = Arc::new(HeaderUploader { + clock: self.clock.clone(), + external: self.external.clone(), + chain_id: self.chain_id.clone(), + epoch_id: dump.epoch_id, + epoch_height: dump.epoch_height, + }); + let shards = dump + .dump_state + .iter() + .map(|(shard_id, _)| (uploader.clone(), *shard_id)) + .collect::>(); + let headers_stored = tokio_stream::iter(shards) + .filter_map(|(uploader, shard_id)| async move { + let stored = uploader.header_stored(shard_id).await; + if stored { + Some(futures::future::ready(shard_id)) + } else { + None + } + }) + .buffer_unordered(10) + .collect::>() + .await; + for shard_id in headers_stored { + tracing::info!( + target: "state_sync_dump", %shard_id, epoch_height = %dump.epoch_height, + "Header already saved to external storage." + ); + let s = dump.dump_state.get_mut(&shard_id).unwrap(); + s.header_to_dump = None; + } + Ok(()) + } -fn update_dumped_size_and_cnt_metrics( - shard_id: &ShardId, - epoch_height: EpochHeight, - part_len: Option, - parts_dumped: u64, - num_parts: u64, -) { - if let Some(part_len) = part_len { - metrics::STATE_SYNC_DUMP_SIZE_TOTAL - .with_label_values(&[&epoch_height.to_string(), &shard_id.to_string()]) - .inc_by(part_len as u64); + async fn store_headers(&mut self, dump: &mut DumpState) -> anyhow::Result<()> { + let uploader = Arc::new(HeaderUploader { + clock: self.clock.clone(), + external: self.external.clone(), + chain_id: self.chain_id.clone(), + epoch_id: dump.epoch_id, + epoch_height: dump.epoch_height, + }); + let headers = dump + .dump_state + .iter_mut() + .map(|(shard_id, shard_dump)| { + (uploader.clone(), *shard_id, shard_dump.header_to_dump.take()) + }) + .collect::>(); + + tokio_stream::iter(headers) + .map(|(uploader, shard_id, header)| async move { + uploader.upload_header(shard_id, header).await + }) + .buffer_unordered(10) + .collect::<()>() + .await; + + Ok(()) } - metrics::STATE_SYNC_DUMP_EPOCH_HEIGHT - .with_label_values(&[&shard_id.to_string()]) - .set(epoch_height as i64); + async fn start_upload_parts( + &mut self, + senders: HashMap>>, + dump: &DumpState, + ) { + let mut senders = senders + .into_iter() + .map(|(shard_id, sender)| { + let d = dump.dump_state.get(&shard_id).unwrap(); + (shard_id, (sender, d.num_parts)) + }) + .collect::>(); + let uploaders = dump + .dump_state + .iter() + .map(|(shard_id, shard_dump)| { + metrics::STATE_SYNC_DUMP_NUM_PARTS_DUMPED + .with_label_values(&[&shard_id.to_string()]) + .set(0); + Arc::new(PartUploader { + clock: self.clock.clone(), + external: self.external.clone(), + runtime: self.runtime.clone(), + chain_id: self.chain_id.clone(), + epoch_id: dump.epoch_id.clone(), + epoch_height: dump.epoch_height, + sync_prev_prev_hash: dump.sync_prev_prev_hash, + shard_id: *shard_id, + state_root: shard_dump.state_root, + num_parts: shard_dump.num_parts, + parts_dumped: shard_dump.parts_dumped.clone(), + parts_missing: shard_dump.parts_missing.clone(), + obtain_parts: self.obtain_parts.clone(), + canceled: dump.canceled.clone(), + }) + }) + .collect::>(); - metrics::STATE_SYNC_DUMP_NUM_PARTS_DUMPED - .with_label_values(&[&shard_id.to_string()]) - .set(parts_dumped as i64); + let mut tasks = uploaders + .clone() + .iter() + .map(|u| (0..u.num_parts).map(|part_id| (u.clone(), part_id))) + .flatten() + .collect::>(); + // We randomize so different nodes uploading parts don't try to upload in the same order + tasks.shuffle(&mut thread_rng()); + + let future_spawner = self.future_spawner.clone(); + let fut = async move { + let mut tasks = tokio_stream::iter(tasks) + .map(|(u, part_id)| { + let shard_id = u.shard_id; + let task = u.upload_state_part(part_id); + let task = respawn_for_parallelism(&*future_spawner, "upload part", task); + async move { (shard_id, task.await) } + }) + .buffer_unordered(10); + + while let Some((shard_id, result)) = tasks.next().await { + let std::collections::hash_map::Entry::Occupied(mut e) = senders.entry(shard_id) + else { + panic!("shard ID {} missing in state dump handles", shard_id); + }; + let (_, parts_left) = e.get_mut(); + if result.is_err() { + let (sender, _) = e.remove(); + let _ = sender.send(result); + return; + } + *parts_left -= 1; + if *parts_left == 0 { + let (sender, _) = e.remove(); + let _ = sender.send(result); + } + } + }; + self.future_spawner.spawn_boxed("upload_parts", fut.boxed()); + } - metrics::STATE_SYNC_DUMP_NUM_PARTS_TOTAL - .with_label_values(&[&shard_id.to_string()]) - .set(num_parts as i64); -} + fn new_dump(&mut self, dump: DumpState, sync_hash: CryptoHash) -> anyhow::Result<()> { + for (shard_id, _) in dump.dump_state.iter() { + self.chain + .chain_store() + .set_state_sync_dump_progress( + *shard_id, + Some(StateSyncDumpProgress::InProgress { + epoch_id: dump.epoch_id, + epoch_height: dump.epoch_height, + sync_hash, + }), + ) + .context("failed setting state dump progress")?; + } + self.current_dump = CurrentDump::InProgress(dump); + Ok(()) + } -fn cares_about_shard( - chain: &Chain, - shard_id: &ShardId, - sync_hash: &CryptoHash, - shard_tracker: &ShardTracker, - account_id: &Option, -) -> Result { - let sync_header = chain.get_block_header(&sync_hash)?; - let sync_prev_hash = sync_header.prev_hash(); - Ok(shard_tracker.care_about_shard(account_id.as_ref(), sync_prev_hash, *shard_id, true)) -} + // Checks the current epoch and initializes the types associated with dumping its state + // if it hasn't already been dumped. + async fn init(&mut self) -> anyhow::Result<()> { + loop { + let Some(sync_header) = self.latest_sync_header()? else { + self.clock.sleep(Duration::milliseconds(500)).await; + continue; + }; + match self.get_dump_state(&sync_header)? { + NewDump::Dump(mut dump, senders) => { + self.check_old_progress(sync_header.epoch_id(), &mut dump)?; + + self.check_stored_headers(&mut dump).await?; + self.store_headers(&mut dump).await?; + + dump.set_missing_parts(&self.external, &self.chain_id).await; + self.start_upload_parts(senders, &dump).await; + self.new_dump(dump, *sync_header.hash())?; + } + NewDump::NoTrackedShards => { + self.current_dump = CurrentDump::Done(*sync_header.epoch_id()); + } + } + return Ok(()); + } + } -struct LatestEpochInfo { - prev_epoch_id: EpochId, - epoch_id: EpochId, - epoch_height: EpochHeight, - sync_hash: CryptoHash, + // Returns when the part upload tasks are finished + async fn check_parts_upload(&mut self) -> anyhow::Result<()> { + let CurrentDump::InProgress(dump) = &mut self.current_dump else { + return std::future::pending().await; + }; + let ((shard_id, result), _, _still_going) = + futures::future::select_all(dump.dump_state.iter_mut().map(|(shard_id, s)| { + async { + let r = (&mut s.upload_parts).await.unwrap(); + (*shard_id, r) + } + .boxed() + })) + .await; + result?; + drop(_still_going); + + tracing::info!(target: "state_sync_dump", epoch_id = ?&dump.epoch_id, %shard_id, "Shard dump finished"); + + self.chain + .chain_store() + .set_state_sync_dump_progress( + shard_id, + Some(StateSyncDumpProgress::AllDumped { + epoch_id: dump.epoch_id, + epoch_height: dump.epoch_height, + }), + ) + .context("failed setting state dump progress")?; + dump.dump_state.remove(&shard_id); + if dump.dump_state.is_empty() { + self.current_dump = CurrentDump::Done(dump.epoch_id); + } + Ok(()) + } + + // Checks which parts have already been uploaded possibly by other nodes + // We use &mut so the do_state_sync_dump() future will be Send, which it won't be if we use a normal + // reference because of the Chain field + async fn check_stored_parts(&mut self) { + let CurrentDump::InProgress(dump) = &self.current_dump else { + return; + }; + dump.set_missing_parts(&self.external, &self.chain_id).await; + } + + async fn check_head(&mut self) -> anyhow::Result<()> { + let Some(sync_header) = self.latest_sync_header()? else { + return Ok(()); + }; + match &self.current_dump { + CurrentDump::InProgress(dump) => { + if &dump.epoch_id == sync_header.epoch_id() { + return Ok(()); + } + dump.canceled.store(true, Ordering::Relaxed); + for (_shard_id, d) in dump.dump_state.iter() { + // Set it to -1 to tell the existing tasks not to set the metrics anymore + d.parts_dumped.store(-1, Ordering::SeqCst); + } + } + CurrentDump::Done(epoch_id) => { + if epoch_id == sync_header.epoch_id() { + return Ok(()); + } + } + CurrentDump::None => {} + }; + match self.get_dump_state(&sync_header)? { + NewDump::Dump(mut dump, sender) => { + self.store_headers(&mut dump).await?; + self.start_upload_parts(sender, &dump).await; + self.new_dump(dump, *sync_header.hash())?; + } + NewDump::NoTrackedShards => { + self.current_dump = CurrentDump::Done(*sync_header.epoch_id()); + } + }; + Ok(()) + } } -/// return epoch_id and sync_hash of the latest complete epoch available locally. -fn get_latest_epoch( - shard_id: &ShardId, - chain: &Chain, +async fn state_sync_dump( + clock: Clock, + chain: Chain, epoch_manager: Arc, -) -> Result, Error> { - let head = chain.head()?; - tracing::debug!(target: "state_sync_dump", ?shard_id, "Check if a new complete epoch is available"); - let hash = head.last_block_hash; - let header = chain.get_block_header(&hash)?; - let final_hash = header.last_final_block(); - if final_hash == &CryptoHash::default() { - return Ok(None); + shard_tracker: ShardTracker, + runtime: Arc, + chain_id: String, + external: ExternalConnection, + iteration_delay: Duration, + validator: MutableValidatorSigner, + keep_running: Arc, + future_spawner: Arc, +) -> anyhow::Result<()> { + tracing::info!(target: "state_sync_dump", "Running StateSyncDump loop"); + + let mut dumper = StateDumper::new( + clock.clone(), + chain_id, + validator, + shard_tracker, + chain, + epoch_manager, + runtime, + external, + future_spawner, + ); + dumper.init().await?; + + let now = clock.now(); + let mut check_head = Interval::new(now + iteration_delay, iteration_delay); + let mut check_stored_parts = Interval::new(now + Duration::seconds(20), Duration::seconds(20)); + + while keep_running.load(Ordering::Relaxed) { + tokio::select! { + _ = check_head.tick(&clock) => { + dumper.check_head().await?; + } + _ = check_stored_parts.tick(&clock) => { + dumper.check_stored_parts().await; + } + result = dumper.check_parts_upload() => { + result?; + } + } } - let Some(sync_hash) = chain.get_sync_hash(final_hash)? else { - return Ok(None); - }; - let final_block_header = chain.get_block_header(&final_hash)?; - let epoch_id = *final_block_header.epoch_id(); - let epoch_info = epoch_manager.get_epoch_info(&epoch_id)?; - let prev_epoch_id = epoch_manager.get_prev_epoch_id_from_prev_block(&head.prev_block_hash)?; - let epoch_height = epoch_info.epoch_height(); - tracing::debug!(target: "state_sync_dump", ?final_hash, ?sync_hash, ?epoch_id, epoch_height, "get_latest_epoch"); + tracing::debug!(target: "state_sync_dump", "Stopped state dump thread"); + Ok(()) +} - Ok(Some(LatestEpochInfo { prev_epoch_id, epoch_id, epoch_height, sync_hash })) +async fn do_state_sync_dump( + clock: Clock, + chain: Chain, + epoch_manager: Arc, + shard_tracker: ShardTracker, + runtime: Arc, + chain_id: String, + external: ExternalConnection, + iteration_delay: Duration, + validator: MutableValidatorSigner, + keep_running: Arc, + future_spawner: Arc, +) { + if let Err(error) = state_sync_dump( + clock, + chain, + epoch_manager, + shard_tracker, + runtime, + chain_id, + external, + iteration_delay, + validator, + keep_running, + future_spawner, + ) + .await + { + tracing::error!(target: "state_sync_dump", ?error, "State dumper failed"); + } } From b1ee0bed0a77a2ab3478b23e7d01ab3796de154a Mon Sep 17 00:00:00 2001 From: Marcelo Diop-Gonzalez Date: Thu, 21 Nov 2024 00:25:48 -0500 Subject: [PATCH 2/8] implement iter_state_sync_dump_progress --- chain/chain/src/store/mod.rs | 53 +++++++++++++++++-------------- core/primitives/src/state_sync.rs | 14 -------- nearcore/src/state_sync.rs | 37 ++++++++------------- 3 files changed, 43 insertions(+), 61 deletions(-) diff --git a/chain/chain/src/store/mod.rs b/chain/chain/src/store/mod.rs index 1a19fe72f4f..13d90701b74 100644 --- a/chain/chain/src/store/mod.rs +++ b/chain/chain/src/store/mod.rs @@ -1020,30 +1020,35 @@ impl ChainStore { key } - // pub fn iter_state_sync_dump_progress( - // &self, - // shard_id: ShardId, - // ) -> Result { - // let mut prefix = STATE_SYNC_DUMP_KEY.to_vec(); - // prefix.extend(b":".to_vec()); - // self.store.iter_prefix_ser(DBCol::BlockMisc, prefix) - // option_to_not_found( - // self.store - // .get_ser(DBCol::BlockMisc, &ChainStore::state_sync_dump_progress_key(shard_id)), - // format!("STATE_SYNC_DUMP:{}", shard_id), - // ) - // } - - /// Retrieves STATE_SYNC_DUMP for the given shard. - pub fn get_state_sync_dump_progress( - &self, - shard_id: ShardId, - ) -> Result { - option_to_not_found( - self.store - .get_ser(DBCol::BlockMisc, &ChainStore::state_sync_dump_progress_key(shard_id)), - format!("STATE_SYNC_DUMP:{}", shard_id), - ) + /// For each value stored, this returs an (EpochId, bool), where the bool tells whether it's finished + /// because those are the only fields we really care about. + pub fn iter_state_sync_dump_progress<'a>( + &'a self, + ) -> impl Iterator> + 'a { + self.store + .iter_prefix_ser::(DBCol::BlockMisc, STATE_SYNC_DUMP_KEY) + .map(|item| { + item.and_then(|(key, progress)| { + // + 1 for the ':' + let prefix_len = STATE_SYNC_DUMP_KEY.len() + 1; + let int_part = &key[prefix_len..]; + let int_part = int_part.try_into().map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Bad StateSyncDump columnn key length: {}", key.len()), + ) + })?; + let shard_id = ShardId::from_le_bytes(int_part); + Ok(( + shard_id, + match progress { + StateSyncDumpProgress::AllDumped { epoch_id, .. } => (epoch_id, true), + StateSyncDumpProgress::InProgress { epoch_id, .. } => (epoch_id, false), + StateSyncDumpProgress::Skipped { epoch_id, .. } => (epoch_id, true), + }, + )) + }) + }) } /// Updates STATE_SYNC_DUMP for the given shard. diff --git a/core/primitives/src/state_sync.rs b/core/primitives/src/state_sync.rs index e143eb7a294..5156559c9d7 100644 --- a/core/primitives/src/state_sync.rs +++ b/core/primitives/src/state_sync.rs @@ -304,20 +304,6 @@ pub enum StateSyncDumpProgress { }, } -impl StateSyncDumpProgress { - // The `StateSyncDumpProgress` type includes information that is not read or used anywhere. We could - // simplify the type with a DB migration, but it's not really important, since the data is small. This - // function returns the two pieces of information that the state dump code actually does use, which is the - // `EpochId` we were last trying to dump, and whether we're finished with it. - pub fn epoch_done(&self) -> (EpochId, bool) { - match self { - Self::AllDumped { epoch_id, .. } => (epoch_id.clone(), true), - Self::Skipped { epoch_id, .. } => (epoch_id.clone(), true), - Self::InProgress { epoch_id, .. } => (epoch_id.clone(), false), - } - } -} - #[cfg(test)] mod tests { use crate::state_sync::{get_num_state_parts, STATE_PART_MEMORY_LIMIT}; diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index 2ae4a4a001f..c796ecbbe23 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -8,7 +8,7 @@ use futures::{FutureExt, StreamExt}; use near_async::futures::{FutureSpawner, FutureSpawnerExt}; use near_async::time::{Clock, Duration, Interval}; use near_chain::types::RuntimeAdapter; -use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode, Error}; +use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode}; use near_chain_configs::{ClientConfig, ExternalStorageLocation, MutableValidatorSigner}; use near_client::sync::external::{ create_bucket_readwrite, external_storage_location, StateFileType, @@ -207,21 +207,6 @@ async fn get_missing_part_ids_for_epoch( } } -// TODO: implement it for real -fn get_dump_progress( - chain: &Chain, - epoch_manager: &dyn EpochManagerAdapter, - fixme_current_epoch_id: &EpochId, -) -> anyhow::Result> { - let shard_ids = epoch_manager.shard_ids(fixme_current_epoch_id)?; - let progress = match chain.chain_store().get_state_sync_dump_progress(shard_ids[0]) { - Ok(p) => p, - Err(Error::DBNotFoundErr(_)) => return Ok(vec![]), - Err(e) => return Err(e).context("failed looking up state dump progress"), - }; - Ok(shard_ids.into_iter().map(|s| (s, progress.clone())).collect()) -} - // State associated with dumping a shard's state struct ShardDump { state_root: StateRoot, @@ -526,17 +511,19 @@ impl StateDumper { &mut self, epoch_id: &EpochId, dump: &mut DumpState, + senders: &mut HashMap>>, ) -> anyhow::Result<()> { - let progress = get_dump_progress(&self.chain, self.epoch_manager.as_ref(), epoch_id)?; - for (shard_id, progress) in progress.iter() { - let (dumped_epoch_id, done) = progress.epoch_done(); + for res in self.chain.chain_store().iter_state_sync_dump_progress() { + let (shard_id, (dumped_epoch_id, done)) = + res.context("failed iterating over stored dump progress")?; if &dumped_epoch_id != epoch_id { self.chain .chain_store() - .set_state_sync_dump_progress(*shard_id, None) + .set_state_sync_dump_progress(shard_id, None) .context("failed setting state dump progress")?; } else if done { - dump.dump_state.remove(shard_id); + dump.dump_state.remove(&shard_id); + senders.remove(&shard_id); } } Ok(()) @@ -819,8 +806,12 @@ impl StateDumper { continue; }; match self.get_dump_state(&sync_header)? { - NewDump::Dump(mut dump, senders) => { - self.check_old_progress(sync_header.epoch_id(), &mut dump)?; + NewDump::Dump(mut dump, mut senders) => { + self.check_old_progress(sync_header.epoch_id(), &mut dump, &mut senders)?; + if dump.dump_state.is_empty() { + self.current_dump = CurrentDump::Done(*sync_header.epoch_id()); + return Ok(()); + } self.check_stored_headers(&mut dump).await?; self.store_headers(&mut dump).await?; From ab69dd5f5699cd76e43481c8e53df087961ecad2 Mon Sep 17 00:00:00 2001 From: Marcelo Diop-Gonzalez Date: Thu, 21 Nov 2024 00:38:58 -0500 Subject: [PATCH 3/8] move respawn_for_parallelism for pub use --- chain/client/src/sync/state/shard.rs | 20 +------------------- core/async/src/futures.rs | 18 ++++++++++++++++++ nearcore/src/state_sync.rs | 14 +------------- 3 files changed, 20 insertions(+), 32 deletions(-) diff --git a/chain/client/src/sync/state/shard.rs b/chain/client/src/sync/state/shard.rs index a174c4cd3a4..bd0f5eae4d8 100644 --- a/chain/client/src/sync/state/shard.rs +++ b/chain/client/src/sync/state/shard.rs @@ -4,7 +4,7 @@ use crate::metrics; use crate::sync::state::chain_requests::ChainFinalizationRequest; use crate::sync::state::util::query_epoch_id_and_height_for_block; use futures::{StreamExt, TryStreamExt}; -use near_async::futures::{FutureSpawner, FutureSpawnerExt}; +use near_async::futures::{respawn_for_parallelism, FutureSpawner}; use near_async::messaging::AsyncSender; use near_chain::types::RuntimeAdapter; use near_chain::BlockHeader; @@ -247,21 +247,3 @@ async fn apply_state_part( )?; Ok(()) } - -/// Given a future, respawn it as an equivalent future but which does not block the -/// driver of the future. For example, if the given future directly performs -/// computation, normally the whoever drives the future (such as a buffered_unordered) -/// would be blocked by the computation, thereby not allowing computation of other -/// futures driven by the same driver to proceed. This function respawns the future -/// onto the FutureSpawner, so the driver of the returned future would not be blocked. -fn respawn_for_parallelism( - future_spawner: &dyn FutureSpawner, - name: &'static str, - f: impl std::future::Future + Send + 'static, -) -> impl std::future::Future + Send + 'static { - let (sender, receiver) = tokio::sync::oneshot::channel(); - future_spawner.spawn(name, async move { - sender.send(f.await).ok(); - }); - async move { receiver.await.unwrap() } -} diff --git a/core/async/src/futures.rs b/core/async/src/futures.rs index 196a2086da0..14bb3f1a3a5 100644 --- a/core/async/src/futures.rs +++ b/core/async/src/futures.rs @@ -42,6 +42,24 @@ impl FutureSpawnerExt for dyn FutureSpawner + '_ { } } +/// Given a future, respawn it as an equivalent future but which does not block the +/// driver of the future. For example, if the given future directly performs +/// computation, normally the whoever drives the future (such as a buffered_unordered) +/// would be blocked by the computation, thereby not allowing computation of other +/// futures driven by the same driver to proceed. This function respawns the future +/// onto the FutureSpawner, so the driver of the returned future would not be blocked. +pub fn respawn_for_parallelism( + future_spawner: &dyn FutureSpawner, + name: &'static str, + f: impl std::future::Future + Send + 'static, +) -> impl std::future::Future + Send + 'static { + let (sender, receiver) = tokio::sync::oneshot::channel(); + future_spawner.spawn(name, async move { + sender.send(f.await).ok(); + }); + async move { receiver.await.unwrap() } +} + /// A FutureSpawner that hands over the future to Actix. pub struct ActixFutureSpawner; diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index c796ecbbe23..0e63dd30435 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -5,7 +5,7 @@ use anyhow::Context; use borsh::BorshSerialize; use futures::future::BoxFuture; use futures::{FutureExt, StreamExt}; -use near_async::futures::{FutureSpawner, FutureSpawnerExt}; +use near_async::futures::{respawn_for_parallelism, FutureSpawner}; use near_async::time::{Clock, Duration, Interval}; use near_chain::types::RuntimeAdapter; use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode}; @@ -309,18 +309,6 @@ struct PartUploader { canceled: Arc, } -fn respawn_for_parallelism( - future_spawner: &dyn FutureSpawner, - name: &'static str, - f: impl std::future::Future + Send + 'static, -) -> impl std::future::Future + Send + 'static { - let (sender, receiver) = oneshot::channel(); - future_spawner.spawn(name, async move { - sender.send(f.await).ok(); - }); - async move { receiver.await.unwrap() } -} - impl PartUploader { fn inc_parts_dumped(&self) { match self.parts_dumped.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| { From af23b47c319551b082fb2ee0acded8fd11980e3a Mon Sep 17 00:00:00 2001 From: Marcelo Diop-Gonzalez Date: Thu, 21 Nov 2024 11:22:48 -0500 Subject: [PATCH 4/8] fix bug --- nearcore/src/state_sync.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index 0e63dd30435..1904680acc4 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -787,10 +787,10 @@ impl StateDumper { // Checks the current epoch and initializes the types associated with dumping its state // if it hasn't already been dumped. - async fn init(&mut self) -> anyhow::Result<()> { + async fn init(&mut self, iteration_delay: Duration) -> anyhow::Result<()> { loop { let Some(sync_header) = self.latest_sync_header()? else { - self.clock.sleep(Duration::milliseconds(500)).await; + self.clock.sleep(iteration_delay).await; continue; }; match self.get_dump_state(&sync_header)? { @@ -906,13 +906,20 @@ async fn state_sync_dump( runtime: Arc, chain_id: String, external: ExternalConnection, - iteration_delay: Duration, + mut iteration_delay: Duration, validator: MutableValidatorSigner, keep_running: Arc, future_spawner: Arc, ) -> anyhow::Result<()> { tracing::info!(target: "state_sync_dump", "Running StateSyncDump loop"); + // This is set to zero in some tests where the block production delay is very small (10 millis). + // In that case we'll actually just wait for 1 millisecond. The previous behavior was to call + // clock.sleep(ZERO), but setting it to 1 is probably fine, and works with the Instant below. + if iteration_delay == Duration::ZERO { + iteration_delay = Duration::milliseconds(1); + } + let mut dumper = StateDumper::new( clock.clone(), chain_id, @@ -924,7 +931,7 @@ async fn state_sync_dump( external, future_spawner, ); - dumper.init().await?; + dumper.init(iteration_delay).await?; let now = clock.now(); let mut check_head = Interval::new(now + iteration_delay, iteration_delay); From 42699a83ccf1da549af426b7859405046c41255c Mon Sep 17 00:00:00 2001 From: Marcelo Diop-Gonzalez Date: Thu, 21 Nov 2024 12:23:53 -0500 Subject: [PATCH 5/8] nit --- nearcore/src/state_sync.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index 1904680acc4..f11f8f3029e 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -356,7 +356,11 @@ impl PartUploader { } Err(error) => { // TODO: return non retriable errors. - tracing::warn!(target: "state_sync_dump", shard_id = %self.shard_id, epoch_height=%self.epoch_height, epoch_id=?&self.epoch_id, ?part_id, ?error, "Failed to obtain state part. Retrying in 10 seconds."); + tracing::warn!( + target: "state_sync_dump", + shard_id = %self.shard_id, epoch_height=%self.epoch_height, epoch_id=?&self.epoch_id, ?part_id, ?error, + "Failed to obtain state part. Retrying in 200 millis." + ); self.clock.sleep(Duration::milliseconds(200)).await; continue; } @@ -392,7 +396,10 @@ impl PartUploader { return Ok(()); } Err(error) => { - tracing::warn!(target: "state_sync_dump", shard_id = %self.shard_id, epoch_height=%self.epoch_height, epoch_id=?&self.epoch_id, ?part_id, ?error, "Failed to upload state part. Retrying in 10 seconds."); + tracing::warn!( + target: "state_sync_dump", shard_id = %self.shard_id, epoch_height=%self.epoch_height, epoch_id=?&self.epoch_id, ?part_id, ?error, + "Failed to upload state part. Retrying in 200 millis." + ); self.clock.sleep(Duration::milliseconds(200)).await; continue; } From 790cfab1359dc29cef255558f23ac894dee02e59 Mon Sep 17 00:00:00 2001 From: Marcelo Diop-Gonzalez Date: Thu, 21 Nov 2024 13:04:52 -0500 Subject: [PATCH 6/8] del old comment --- nearcore/src/state_sync.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index f11f8f3029e..d71842e8cfd 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -94,7 +94,6 @@ impl StateSyncDumper { let chain_id = self.client_config.chain_id.clone(); let keep_running = Arc::new(AtomicBool::new(true)); - // Start a thread for each shard. let chain = Chain::new_for_view_client( self.clock.clone(), From 2271a6097c46e74cac0e4c2abbb64388400740ba Mon Sep 17 00:00:00 2001 From: Marcelo Diop-Gonzalez Date: Thu, 21 Nov 2024 13:47:14 -0500 Subject: [PATCH 7/8] update comment --- nearcore/src/state_sync.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index d71842e8cfd..100bb7eb740 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -55,8 +55,8 @@ pub struct StateSyncDumper { } impl StateSyncDumper { - /// Starts one a thread per tracked shard. - /// Each started thread will be dumping state parts of a single epoch to external storage. + /// Starts a thread that periodically checks whether any new parts need to be uploaded, and then spawns + /// futures to generate and upload them pub fn start(&mut self) -> anyhow::Result<()> { assert!(self.handle.is_none(), "StateSyncDumper already started"); From 77de567caee502691c360b7644732d92a3319cee Mon Sep 17 00:00:00 2001 From: Marcelo Diop-Gonzalez Date: Thu, 21 Nov 2024 15:56:22 -0500 Subject: [PATCH 8/8] fix bug 2 --- nearcore/src/state_sync.rs | 48 ++++++++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index 100bb7eb740..97e817f001c 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -181,6 +181,9 @@ async fn get_missing_part_ids_for_epoch( total_parts: u64, external: &ExternalConnection, ) -> Result, anyhow::Error> { + if total_parts == 0 { + return Ok(HashSet::new()); + } let directory_path = external_storage_location_directory( chain_id, epoch_id, @@ -706,31 +709,42 @@ impl StateDumper { (shard_id, (sender, d.num_parts)) }) .collect::>(); + let mut empty_shards = HashSet::new(); let uploaders = dump .dump_state .iter() - .map(|(shard_id, shard_dump)| { + .filter_map(|(shard_id, shard_dump)| { metrics::STATE_SYNC_DUMP_NUM_PARTS_DUMPED .with_label_values(&[&shard_id.to_string()]) .set(0); - Arc::new(PartUploader { - clock: self.clock.clone(), - external: self.external.clone(), - runtime: self.runtime.clone(), - chain_id: self.chain_id.clone(), - epoch_id: dump.epoch_id.clone(), - epoch_height: dump.epoch_height, - sync_prev_prev_hash: dump.sync_prev_prev_hash, - shard_id: *shard_id, - state_root: shard_dump.state_root, - num_parts: shard_dump.num_parts, - parts_dumped: shard_dump.parts_dumped.clone(), - parts_missing: shard_dump.parts_missing.clone(), - obtain_parts: self.obtain_parts.clone(), - canceled: dump.canceled.clone(), - }) + if shard_dump.num_parts > 0 { + Some(Arc::new(PartUploader { + clock: self.clock.clone(), + external: self.external.clone(), + runtime: self.runtime.clone(), + chain_id: self.chain_id.clone(), + epoch_id: dump.epoch_id.clone(), + epoch_height: dump.epoch_height, + sync_prev_prev_hash: dump.sync_prev_prev_hash, + shard_id: *shard_id, + state_root: shard_dump.state_root, + num_parts: shard_dump.num_parts, + parts_dumped: shard_dump.parts_dumped.clone(), + parts_missing: shard_dump.parts_missing.clone(), + obtain_parts: self.obtain_parts.clone(), + canceled: dump.canceled.clone(), + })) + } else { + empty_shards.insert(shard_id); + None + } }) .collect::>(); + for shard_id in empty_shards { + let (sender, _) = senders.remove(shard_id).unwrap(); + let _ = sender.send(Ok(())); + } + assert_eq!(senders.len(), uploaders.len()); let mut tasks = uploaders .clone()