Skip to content

Commit

Permalink
fix(state-dumper): dump state for post-resharding shards (#12491)
Browse files Browse the repository at this point in the history
Currently, the state dumper starts one thread for each ShardId in the
current epoch, and each of those is responsible for dumping headers and
parts just for that ShardId. But after a resharding, there's no thread
that's aware that it should dump state for any of the new ShardIds. Here
we fix it by iterating not just over the ShardIds in the current epoch
when we start the threads, but also over any future ShardIds that might
belong to post-protocol upgrade epochs.

This is not great because we're starting threads that won't be doing
anything useful (but still doing work in a loop which in tests can be
nontrivial since we set the "iteration_delay" config value to 100ms) for
quite some time, and we don't stop old threads after the shard ID they
correspond to is no longer a valid shard ID in the current epoch. But
it's not horrible and this is an easy first fix.
  • Loading branch information
marcelo-gonzalez authored Nov 21, 2024
1 parent 9e4933b commit 4e87bae
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 34 deletions.
7 changes: 7 additions & 0 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,13 @@ impl EpochManagerAdapter for MockEpochManager {
Ok(ShardLayout::v0(self.num_shards, 0))
}

fn get_shard_layout_from_protocol_version(
&self,
_protocol_version: ProtocolVersion,
) -> ShardLayout {
self.get_shard_layout(&EpochId::default()).unwrap()
}

fn get_shard_config(&self, _epoch_id: &EpochId) -> Result<ShardConfig, EpochError> {
panic!("get_shard_config not implemented for KeyValueRuntime");
}
Expand Down
13 changes: 13 additions & 0 deletions chain/epoch-manager/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ pub trait EpochManagerAdapter: Send + Sync {
parent_hash: &CryptoHash,
) -> Result<ShardLayout, EpochError>;

fn get_shard_layout_from_protocol_version(
&self,
protocol_version: ProtocolVersion,
) -> ShardLayout;

/// Get [`EpochId`] from a block belonging to the epoch.
fn get_epoch_id(&self, block_hash: &CryptoHash) -> Result<EpochId, EpochError>;

Expand Down Expand Up @@ -644,6 +649,14 @@ impl EpochManagerAdapter for EpochManagerHandle {
self.get_shard_layout(&epoch_id)
}

fn get_shard_layout_from_protocol_version(
&self,
protocol_version: ProtocolVersion,
) -> ShardLayout {
let epoch_manager = self.read();
epoch_manager.get_epoch_config(protocol_version).shard_layout
}

fn get_epoch_id(&self, block_hash: &CryptoHash) -> Result<EpochId, EpochError> {
let epoch_manager = self.read();
epoch_manager.get_epoch_id(block_hash)
Expand Down
76 changes: 42 additions & 34 deletions nearcore/src/state_sync.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::metrics;

use actix_rt::Arbiter;
use anyhow::Context;
use borsh::BorshSerialize;
use futures::future::BoxFuture;
use futures::FutureExt;
use itertools::Itertools;
use near_async::time::{Clock, Duration, Instant};
use near_chain::types::RuntimeAdapter;
use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode, Error};
Expand All @@ -21,6 +23,7 @@ use near_primitives::hash::CryptoHash;
use near_primitives::state_part::PartId;
use near_primitives::state_sync::StateSyncDumpProgress;
use near_primitives::types::{AccountId, EpochHeight, EpochId, ShardId, StateRoot};
use near_primitives::version::PROTOCOL_VERSION;
use rand::{thread_rng, Rng};
use std::collections::HashSet;
use std::sync::atomic::AtomicBool;
Expand All @@ -46,6 +49,35 @@ pub struct StateSyncDumper {
}

impl StateSyncDumper {
/// Returns all current ShardIDs, plus any that may belong to a future epoch after a protocol upgrade
/// For now we start a thread for each shard ID even if it won't be needed for a long time.
/// TODO(resharding): fix that, and handle the dynamic resharding case.
fn get_all_shard_ids(&self) -> anyhow::Result<HashSet<ShardId>> {
let chain = Chain::new_for_view_client(
self.clock.clone(),
self.epoch_manager.clone(),
self.shard_tracker.clone(),
self.runtime.clone(),
&self.chain_genesis,
DoomslugThresholdMode::TwoThirds,
false,
)
.context("failed creating Chain")?;
let epoch_id = chain.head().context("failed getting chain head")?.epoch_id;
let head_protocol_version = self
.epoch_manager
.get_epoch_protocol_version(&epoch_id)
.context("failed getting epoch protocol version")?;

let mut shard_ids = HashSet::new();
for protocol_version in head_protocol_version..=PROTOCOL_VERSION {
let shard_layout =
self.epoch_manager.get_shard_layout_from_protocol_version(protocol_version);
shard_ids.extend(shard_layout.shard_ids());
}
Ok(shard_ids)
}

/// Starts one a thread per tracked shard.
/// Each started thread will be dumping state parts of a single epoch to external storage.
pub fn start(&mut self) -> anyhow::Result<()> {
Expand Down Expand Up @@ -84,21 +116,7 @@ impl StateSyncDumper {
};

// Determine how many threads to start.
// TODO(resharding): Handle the case of changing the shard layout.
let shard_ids = {
// Sadly, `Chain` is not `Send` and each thread needs to create its own `Chain` instance.
let chain = Chain::new_for_view_client(
self.clock.clone(),
self.epoch_manager.clone(),
self.shard_tracker.clone(),
self.runtime.clone(),
&self.chain_genesis,
DoomslugThresholdMode::TwoThirds,
false,
)?;
let epoch_id = chain.head()?.epoch_id;
self.epoch_manager.shard_ids(&epoch_id)
}?;
let shard_ids = self.get_all_shard_ids()?;

let chain_id = self.client_config.chain_id.clone();
let keep_running = Arc::new(AtomicBool::new(true));
Expand All @@ -108,6 +126,7 @@ impl StateSyncDumper {
.map(|shard_id| {
let runtime = self.runtime.clone();
let chain_genesis = self.chain_genesis.clone();
// Sadly, `Chain` is not `Send` and each thread needs to create its own `Chain` instance.
let chain = Chain::new_for_view_client(
self.clock.clone(),
self.epoch_manager.clone(),
Expand Down Expand Up @@ -265,7 +284,6 @@ fn get_current_state(
}
};
let Some(LatestEpochInfo {
prev_epoch_id,
epoch_id: new_epoch_id,
epoch_height: new_epoch_height,
sync_hash: new_sync_hash,
Expand All @@ -275,28 +293,20 @@ fn get_current_state(
};

if Some(&new_epoch_id) == was_last_epoch_done.as_ref() {
tracing::debug!(target: "state_sync_dump", ?shard_id, ?was_last_epoch_done, ?new_epoch_id, new_epoch_height, ?new_sync_hash, "latest epoch is done. No new epoch to dump. Idle");
Ok(StateDumpAction::Wait)
} else if epoch_manager.get_shard_layout(&prev_epoch_id)
!= epoch_manager.get_shard_layout(&new_epoch_id)
return Ok(StateDumpAction::Wait);
}

let shard_layout = epoch_manager.get_shard_layout(&new_epoch_id)?;

if shard_layout.shard_ids().contains(shard_id)
&& cares_about_shard(chain, shard_id, &new_sync_hash, &shard_tracker, &account_id)?
{
tracing::debug!(target: "state_sync_dump", ?shard_id, ?was_last_epoch_done, ?new_epoch_id, new_epoch_height, ?new_sync_hash, "Shard layout change detected, will skip dumping for this epoch. Idle");
chain.chain_store().set_state_sync_dump_progress(
*shard_id,
Some(StateSyncDumpProgress::Skipped {
epoch_id: new_epoch_id,
epoch_height: new_epoch_height,
}),
)?;
Ok(StateDumpAction::Wait)
} else if cares_about_shard(chain, shard_id, &new_sync_hash, &shard_tracker, &account_id)? {
Ok(StateDumpAction::Dump {
epoch_id: new_epoch_id,
epoch_height: new_epoch_height,
sync_hash: new_sync_hash,
})
} else {
tracing::debug!(target: "state_sync_dump", ?shard_id, ?new_epoch_id, new_epoch_height, ?new_sync_hash, "Doesn't care about the shard in the current epoch. Idle");
Ok(StateDumpAction::Wait)
}
}
Expand Down Expand Up @@ -625,7 +635,6 @@ fn cares_about_shard(
}

struct LatestEpochInfo {
prev_epoch_id: EpochId,
epoch_id: EpochId,
epoch_height: EpochHeight,
sync_hash: CryptoHash,
Expand All @@ -651,10 +660,9 @@ fn get_latest_epoch(
let final_block_header = chain.get_block_header(&final_hash)?;
let epoch_id = *final_block_header.epoch_id();
let epoch_info = epoch_manager.get_epoch_info(&epoch_id)?;
let prev_epoch_id = epoch_manager.get_prev_epoch_id_from_prev_block(&head.prev_block_hash)?;
let epoch_height = epoch_info.epoch_height();

tracing::debug!(target: "state_sync_dump", ?final_hash, ?sync_hash, ?epoch_id, epoch_height, "get_latest_epoch");

Ok(Some(LatestEpochInfo { prev_epoch_id, epoch_id, epoch_height, sync_hash }))
Ok(Some(LatestEpochInfo { epoch_id, epoch_height, sync_hash }))
}

0 comments on commit 4e87bae

Please sign in to comment.