Skip to content

Commit

Permalink
state sync: improvements from mocknet testing
Browse files Browse the repository at this point in the history
  • Loading branch information
saketh-are committed Nov 24, 2024
1 parent af6559a commit 1dd1435
Show file tree
Hide file tree
Showing 13 changed files with 167 additions and 96 deletions.
5 changes: 2 additions & 3 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2776,8 +2776,10 @@ impl Chain {
// Check cache
let key = borsh::to_vec(&StatePartKey(sync_hash, shard_id, part_id))?;
if let Ok(Some(state_part)) = self.chain_store.store().get(DBCol::StateParts, &key) {
metrics::STATE_PART_CACHE_HIT.inc();
return Ok(state_part.into());
}
metrics::STATE_PART_CACHE_MISS.inc();

let block = self
.get_block(&sync_hash)
Expand Down Expand Up @@ -2823,9 +2825,6 @@ impl Chain {
self.requested_state_parts
.save_state_part_elapsed(&sync_hash, &shard_id, &part_id, elapsed_ms);

// Before saving State Part data, we need to make sure we can calculate and save State Header
self.get_state_response_header(shard_id, sync_hash)?;

// Saving the part data
let mut store_update = self.chain_store.store().store_update();
store_update.set(DBCol::StateParts, &key, &state_part);
Expand Down
14 changes: 14 additions & 0 deletions chain/chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,20 @@ pub static STATE_PART_ELAPSED: LazyLock<HistogramVec> = LazyLock::new(|| {
)
.unwrap()
});
pub static STATE_PART_CACHE_HIT: LazyLock<IntCounter> = LazyLock::new(|| {
try_create_int_counter(
"near_state_part_cache_hit",
"Total number of state parts served from db cache",
)
.unwrap()
});
pub static STATE_PART_CACHE_MISS: LazyLock<IntCounter> = LazyLock::new(|| {
try_create_int_counter(
"near_state_part_cache_miss",
"Total number of state parts built on-demand",
)
.unwrap()
});
pub static NUM_INVALID_BLOCKS: LazyLock<IntGaugeVec> = LazyLock::new(|| {
try_create_int_gauge_vec("near_num_invalid_blocks", "Number of invalid blocks", &["error"])
.unwrap()
Expand Down
6 changes: 4 additions & 2 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ impl Client {
network_adapter.clone().into_sender(),
config.state_sync_external_timeout,
config.state_sync_p2p_timeout,
config.state_sync_retry_timeout,
config.state_sync_retry_backoff,
config.state_sync_external_backoff,
&config.chain_id,
&config.state_sync.sync,
chain_sender_for_state_sync.clone(),
Expand Down Expand Up @@ -2578,7 +2579,8 @@ impl Client {
self.network_adapter.clone().into_sender(),
self.config.state_sync_external_timeout,
self.config.state_sync_p2p_timeout,
self.config.state_sync_retry_timeout,
self.config.state_sync_retry_backoff,
self.config.state_sync_external_backoff,
&self.config.chain_id,
&self.config.state_sync.sync,
self.chain_sender_for_state_sync.clone(),
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ pub(crate) static STATE_SYNC_STAGE: LazyLock<IntGaugeVec> = LazyLock::new(|| {

pub(crate) static STATE_SYNC_DOWNLOAD_RESULT: LazyLock<IntCounterVec> = LazyLock::new(|| {
try_create_int_counter_vec(
"near_state_sync_header_download_result",
"near_state_sync_download_result",
"Count of number of state sync downloads by type (header, part),
source (network, external), and result (timeout, error, success)",
&["shard_id", "type", "source", "result"],
Expand Down
59 changes: 27 additions & 32 deletions chain/client/src/sync/state/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub(super) struct StateSyncDownloader {
pub header_validation_sender:
AsyncSender<StateHeaderValidationRequest, Result<(), near_chain::Error>>,
pub runtime: Arc<dyn RuntimeAdapter>,
pub retry_timeout: Duration,
pub retry_backoff: Duration,
pub task_tracker: TaskTracker,
}

Expand All @@ -56,7 +56,7 @@ impl StateSyncDownloader {
let num_attempts_before_fallback = self.num_attempts_before_fallback;
let task_tracker = self.task_tracker.clone();
let clock = self.clock.clone();
let retry_timeout = self.retry_timeout;
let retry_backoff = self.retry_backoff;
async move {
let handle = task_tracker.get_handle(&format!("shard {} header", shard_id)).await;
handle.set_status("Reading existing header");
Expand Down Expand Up @@ -104,9 +104,9 @@ impl StateSyncDownloader {
Err(err) => {
handle.set_status(&format!(
"Error: {}, will retry in {}",
err, retry_timeout
err, retry_backoff
));
let deadline = clock.now() + retry_timeout;
let deadline = clock.now() + retry_backoff;
tokio::select! {
_ = cancel.cancelled() => {
return Err(near_chain::Error::Other("Cancelled".to_owned()));
Expand All @@ -122,17 +122,19 @@ impl StateSyncDownloader {
.boxed()
}

/// Ensures that the shard part is downloaded and validated. If the part exists on disk,
/// just returns. Otherwise, downloads the part, validates it, and retries if needed.
/// Attempts once to ensure that the shard part is downloaded and validated.
/// If the part exists on disk, just returns. Otherwise, makes one attempt
/// to download the part and validate it.
///
/// This method will only return an error if the download cannot be completed even
/// with retries, or if the download is cancelled.
pub fn ensure_shard_part_downloaded(
/// This method will return an error if the download fails or is cancelled.
pub fn ensure_shard_part_downloaded_single_attempt(
&self,
shard_id: ShardId,
sync_hash: CryptoHash,
state_root: CryptoHash,
num_state_parts: u64,
part_id: u64,
header: ShardStateSyncResponseHeader,
num_prior_attempts: usize,
cancel: CancellationToken,
) -> BoxFuture<'static, Result<(), near_chain::Error>> {
let store = self.store.clone();
Expand All @@ -142,24 +144,27 @@ impl StateSyncDownloader {
let num_attempts_before_fallback = self.num_attempts_before_fallback;
let clock = self.clock.clone();
let task_tracker = self.task_tracker.clone();
let retry_timeout = self.retry_timeout;
let retry_backoff = self.retry_backoff;
async move {
if cancel.is_cancelled() {
return Err(near_chain::Error::Other("Cancelled".to_owned()));
}
let handle =
task_tracker.get_handle(&format!("shard {} part {}", shard_id, part_id)).await;
handle.set_status("Reading existing part");
if does_state_part_exist_on_disk(&store, sync_hash, shard_id, part_id)? {
return Ok(());
}

let i = AtomicUsize::new(0); // for easier Rust async capture
let attempt = || async {
let source = if fallback_source.is_some()
&& i.load(Ordering::Relaxed) >= num_attempts_before_fallback
&& num_prior_attempts >= num_attempts_before_fallback
{
fallback_source.as_ref().unwrap().as_ref()
} else {
preferred_source.as_ref()
};

let part = source
.download_shard_part(
shard_id,
Expand All @@ -169,10 +174,9 @@ impl StateSyncDownloader {
cancel.clone(),
)
.await?;
let state_root = header.chunk_prev_state_root();
if runtime_adapter.validate_state_part(
&state_root,
PartId { idx: part_id, total: header.num_state_parts() },
PartId { idx: part_id, total: num_state_parts },
&part,
) {
let mut store_update = store.store_update();
Expand All @@ -187,25 +191,16 @@ impl StateSyncDownloader {
Ok(())
};

loop {
match attempt().await {
Ok(()) => return Ok(()),
Err(err) => {
handle.set_status(&format!(
"Error: {}, will retry in {}",
err, retry_timeout
));
let deadline = clock.now() + retry_timeout;
tokio::select! {
_ = cancel.cancelled() => {
return Err(near_chain::Error::Other("Cancelled".to_owned()));
}
_ = clock.sleep_until(deadline) => {}
}
}
let res = attempt().await;
if let Err(ref err) = res {
handle.set_status(&format!("Error: {}, will retry in {}", err, retry_backoff));
let deadline = clock.now() + retry_backoff;
tokio::select! {
_ = cancel.cancelled() => {}
_ = clock.sleep_until(deadline) => {}
}
i.fetch_add(1, Ordering::Relaxed);
}
res
}
.instrument(tracing::debug_span!("StateSyncDownloader::ensure_shard_part_downloaded"))
.boxed()
Expand Down
29 changes: 23 additions & 6 deletions chain/client/src/sync/state/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ pub(super) struct StateSyncDownloadSourceExternal {
pub chain_id: String,
pub conn: ExternalConnection,
pub timeout: Duration,
pub backoff: Duration,
}

impl StateSyncDownloadSourceExternal {
async fn get_file_with_timeout(
clock: Clock,
timeout: Duration,
backoff: Duration,
cancellation: CancellationToken,
conn: ExternalConnection,
shard_id: ShardId,
Expand All @@ -46,14 +48,25 @@ impl StateSyncDownloadSourceExternal {
Err(near_chain::Error::Other("Timeout".to_owned()))
}
_ = cancellation.cancelled() => {
increment_download_count(shard_id, typ, "external", "error");
increment_download_count(shard_id, typ, "external", "cancelled");
Err(near_chain::Error::Other("Cancelled".to_owned()))
}
result = fut => {
result.map_err(|e| {
increment_download_count(shard_id, typ, "network", "error");
near_chain::Error::Other(format!("Failed to download: {}", e))
})
match result {
Err(err) => {
// A download error typically indicates that the file is not available yet. At the
// start of the epoch it takes a while for dumpers to populate the external storage
// with state files. This backoff period prevents spamming requests during that time.
let deadline = clock.now() + backoff;
tokio::select! {
_ = clock.sleep_until(deadline) => {}
_ = cancellation.cancelled() => {}
}
increment_download_count(shard_id, typ, "external", "download_error");
Err(near_chain::Error::Other(format!("Failed to download: {}", err)))
}
Ok(res) => Ok(res)
}
}
}
}
Expand All @@ -69,6 +82,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal {
) -> BoxFuture<Result<ShardStateSyncResponseHeader, near_chain::Error>> {
let clock = self.clock.clone();
let timeout = self.timeout;
let backoff = self.backoff;
let chain_id = self.chain_id.clone();
let conn = self.conn.clone();
let store = self.store.clone();
Expand All @@ -86,6 +100,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal {
let data = Self::get_file_with_timeout(
clock,
timeout,
backoff,
cancel,
conn,
shard_id,
Expand All @@ -94,7 +109,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal {
)
.await?;
let header = ShardStateSyncResponseHeader::try_from_slice(&data).map_err(|e| {
increment_download_count(shard_id, "header", "external", "error");
increment_download_count(shard_id, "header", "external", "parse_error");
near_chain::Error::Other(format!("Failed to parse header: {}", e))
})?;

Expand All @@ -115,6 +130,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal {
) -> BoxFuture<Result<Vec<u8>, near_chain::Error>> {
let clock = self.clock.clone();
let timeout = self.timeout;
let backoff = self.backoff;
let chain_id = self.chain_id.clone();
let conn = self.conn.clone();
let store = self.store.clone();
Expand All @@ -137,6 +153,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal {
let data = Self::get_file_with_timeout(
clock,
timeout,
backoff,
cancel,
conn,
shard_id,
Expand Down
6 changes: 4 additions & 2 deletions chain/client/src/sync/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ impl StateSync {
network_adapter: AsyncSender<PeerManagerMessageRequest, PeerManagerMessageResponse>,
external_timeout: Duration,
p2p_timeout: Duration,
retry_timeout: Duration,
retry_backoff: Duration,
external_backoff: Duration,
chain_id: &str,
sync_config: &SyncConfig,
chain_requests_sender: ChainSenderForStateSync,
Expand Down Expand Up @@ -147,6 +148,7 @@ impl StateSync {
chain_id: chain_id.to_string(),
conn: external,
timeout: external_timeout,
backoff: external_backoff,
}) as Arc<dyn StateSyncDownloadSource>;
(
Some(fallback_source),
Expand All @@ -166,7 +168,7 @@ impl StateSync {
num_attempts_before_fallback,
header_validation_sender: chain_requests_sender.clone().into_sender(),
runtime: runtime.clone(),
retry_timeout,
retry_backoff,
task_tracker: downloading_task_tracker.clone(),
});

Expand Down
8 changes: 4 additions & 4 deletions chain/client/src/sync/state/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,12 @@ impl StateSyncDownloadSourcePeer {
match request_sender.send_async(network_request).await {
Ok(response) => {
if let NetworkResponses::RouteNotFound = response.as_network_response() {
increment_download_count(key.shard_id, typ, "network", "error");
increment_download_count(key.shard_id, typ, "network", "route_not_found");
return Err(near_chain::Error::Other("Route not found".to_owned()));
}
}
Err(e) => {
increment_download_count(key.shard_id, typ, "network", "error");
increment_download_count(key.shard_id, typ, "network", "failed_to_send");
return Err(near_chain::Error::Other(format!("Failed to send request: {}", e)));
}
}
Expand All @@ -206,7 +206,7 @@ impl StateSyncDownloadSourcePeer {
Err(near_chain::Error::Other("Timeout".to_owned()))
}
_ = cancel.cancelled() => {
increment_download_count(key.shard_id, typ, "network", "error");
increment_download_count(key.shard_id, typ, "network", "cancelled");
Err(near_chain::Error::Other("Cancelled".to_owned()))
}
result = receiver => {
Expand All @@ -216,7 +216,7 @@ impl StateSyncDownloadSourcePeer {
Ok(result)
}
Err(_) => {
increment_download_count(key.shard_id, typ, "network", "error");
increment_download_count(key.shard_id, typ, "network", "sender_dropped");
Err(near_chain::Error::Other("Sender dropped".to_owned()))
},
}
Expand Down
Loading

0 comments on commit 1dd1435

Please sign in to comment.