Skip to content

Commit

Permalink
make backoff after download failure configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
saketh-are committed Nov 23, 2024
1 parent 36f11be commit d11323c
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 48 deletions.
6 changes: 4 additions & 2 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ impl Client {
network_adapter.clone().into_sender(),
config.state_sync_external_timeout,
config.state_sync_p2p_timeout,
config.state_sync_retry_timeout,
config.state_sync_retry_backoff,
config.state_sync_external_backoff,
&config.chain_id,
&config.state_sync.sync,
chain_sender_for_state_sync.clone(),
Expand Down Expand Up @@ -2566,7 +2567,8 @@ impl Client {
self.network_adapter.clone().into_sender(),
self.config.state_sync_external_timeout,
self.config.state_sync_p2p_timeout,
self.config.state_sync_retry_timeout,
self.config.state_sync_retry_backoff,
self.config.state_sync_external_backoff,
&self.config.chain_id,
&self.config.state_sync.sync,
self.chain_sender_for_state_sync.clone(),
Expand Down
17 changes: 9 additions & 8 deletions chain/client/src/sync/state/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub(super) struct StateSyncDownloader {
pub header_validation_sender:
AsyncSender<StateHeaderValidationRequest, Result<(), near_chain::Error>>,
pub runtime: Arc<dyn RuntimeAdapter>,
pub retry_timeout: Duration,
pub retry_backoff: Duration,
pub task_tracker: TaskTracker,
}

Expand All @@ -56,7 +56,7 @@ impl StateSyncDownloader {
let num_attempts_before_fallback = self.num_attempts_before_fallback;
let task_tracker = self.task_tracker.clone();
let clock = self.clock.clone();
let retry_timeout = self.retry_timeout;
let retry_backoff = self.retry_backoff;
async move {
let handle = task_tracker.get_handle(&format!("shard {} header", shard_id)).await;
handle.set_status("Reading existing header");
Expand Down Expand Up @@ -107,9 +107,9 @@ impl StateSyncDownloader {
Err(err) => {
handle.set_status(&format!(
"Error: {}, will retry in {}",
err, retry_timeout
err, retry_backoff
));
let deadline = clock.now() + retry_timeout;
let deadline = clock.now() + retry_backoff;
tokio::select! {
_ = cancel.cancelled() => {
return Err(near_chain::Error::Other("Cancelled".to_owned()));
Expand Down Expand Up @@ -145,7 +145,7 @@ impl StateSyncDownloader {
let num_attempts_before_fallback = self.num_attempts_before_fallback;
let clock = self.clock.clone();
let task_tracker = self.task_tracker.clone();
let retry_timeout = self.retry_timeout;
let retry_backoff = self.retry_backoff;
async move {
let handle =
task_tracker.get_handle(&format!("shard {} part {}", shard_id, part_id)).await;
Expand Down Expand Up @@ -190,11 +190,12 @@ impl StateSyncDownloader {
};

let res = attempt().await;
if res.is_err() {
let deadline = clock.now() + retry_timeout;
if let Err(ref err) = res {
handle.set_status(&format!("Error: {}, will retry in {}", err, retry_backoff));
let deadline = clock.now() + retry_backoff;
tokio::select! {
_ = clock.sleep_until(deadline) => {}
_ = cancel.cancelled() => {}
_ = clock.sleep_until(deadline) => {}
}
}
res
Expand Down
12 changes: 8 additions & 4 deletions chain/client/src/sync/state/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@ use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;

const EXTERNAL_REQUEST_COOLDOWN: time::Duration = Duration::seconds(60);

/// Logic for downloading state sync headers and parts from an external source.
pub(super) struct StateSyncDownloadSourceExternal {
pub clock: Clock,
pub store: Store,
pub chain_id: String,
pub conn: ExternalConnection,
pub timeout: Duration,
pub backoff: Duration,
}

impl StateSyncDownloadSourceExternal {
async fn get_file_with_timeout(
clock: Clock,
timeout: Duration,
backoff: Duration,
cancellation: CancellationToken,
conn: ExternalConnection,
shard_id: ShardId,
Expand All @@ -54,8 +54,8 @@ impl StateSyncDownloadSourceExternal {
result = fut => {
// A download error typically indicates that the file is not available yet. At the
// start of the epoch it takes a while for dumpers to populate the external storage
// with state files. This cooldown prevents spamming requests during that time.
let deadline = clock.now() + EXTERNAL_REQUEST_COOLDOWN;
// with state files. This backoff period prevents spamming requests during that time.
let deadline = clock.now() + backoff;
tokio::select! {
_ = clock.sleep_until(deadline) => {}
_ = cancellation.cancelled() => {}
Expand All @@ -81,6 +81,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal {
) -> BoxFuture<Result<ShardStateSyncResponseHeader, near_chain::Error>> {
let clock = self.clock.clone();
let timeout = self.timeout;
let backoff = self.backoff;
let chain_id = self.chain_id.clone();
let conn = self.conn.clone();
let store = self.store.clone();
Expand All @@ -98,6 +99,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal {
let data = Self::get_file_with_timeout(
clock,
timeout,
backoff,
cancel,
conn,
shard_id,
Expand Down Expand Up @@ -127,6 +129,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal {
) -> BoxFuture<Result<Vec<u8>, near_chain::Error>> {
let clock = self.clock.clone();
let timeout = self.timeout;
let backoff = self.backoff;
let chain_id = self.chain_id.clone();
let conn = self.conn.clone();
let store = self.store.clone();
Expand All @@ -149,6 +152,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal {
let data = Self::get_file_with_timeout(
clock,
timeout,
backoff,
cancel,
conn,
shard_id,
Expand Down
6 changes: 4 additions & 2 deletions chain/client/src/sync/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ impl StateSync {
network_adapter: AsyncSender<PeerManagerMessageRequest, PeerManagerMessageResponse>,
external_timeout: Duration,
p2p_timeout: Duration,
retry_timeout: Duration,
retry_backoff: Duration,
external_backoff: Duration,
chain_id: &str,
sync_config: &SyncConfig,
chain_requests_sender: ChainSenderForStateSync,
Expand Down Expand Up @@ -147,6 +148,7 @@ impl StateSync {
chain_id: chain_id.to_string(),
conn: external,
timeout: external_timeout,
backoff: external_backoff,
}) as Arc<dyn StateSyncDownloadSource>;
(
Some(fallback_source),
Expand All @@ -166,7 +168,7 @@ impl StateSync {
num_attempts_before_fallback,
header_validation_sender: chain_requests_sender.clone().into_sender(),
runtime: runtime.clone(),
retry_timeout,
retry_backoff,
task_tracker: downloading_task_tracker.clone(),
});

Expand Down
15 changes: 11 additions & 4 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,14 @@ pub fn default_state_sync_p2p_timeout() -> Duration {
Duration::seconds(10)
}

pub fn default_state_sync_retry_timeout() -> Duration {
pub fn default_state_sync_retry_backoff() -> Duration {
Duration::seconds(1)
}

pub fn default_state_sync_external_backoff() -> Duration {
Duration::seconds(60)
}

pub fn default_header_sync_expected_height_per_second() -> u64 {
10
}
Expand Down Expand Up @@ -453,8 +457,10 @@ pub struct ClientConfig {
pub state_sync_external_timeout: Duration,
/// How long to wait for a response from p2p state sync
pub state_sync_p2p_timeout: Duration,
/// How long to wait between attempts to obtain a state part
pub state_sync_retry_timeout: Duration,
/// How long to wait after a failed state sync request
pub state_sync_retry_backoff: Duration,
/// Additional waiting period after a failed request to external storage
pub state_sync_external_backoff: Duration,
/// Minimum number of peers to start syncing.
pub min_num_peers: usize,
/// Period between logging summary information.
Expand Down Expand Up @@ -598,7 +604,8 @@ impl ClientConfig {
header_sync_stall_ban_timeout: Duration::seconds(30),
state_sync_external_timeout: Duration::seconds(TEST_STATE_SYNC_TIMEOUT),
state_sync_p2p_timeout: Duration::seconds(TEST_STATE_SYNC_TIMEOUT),
state_sync_retry_timeout: Duration::seconds(TEST_STATE_SYNC_TIMEOUT),
state_sync_retry_backoff: Duration::seconds(TEST_STATE_SYNC_TIMEOUT),
state_sync_external_backoff: Duration::seconds(TEST_STATE_SYNC_TIMEOUT),
header_sync_expected_height_per_second: 1,
min_num_peers: 1,
log_summary_period: Duration::seconds(10),
Expand Down
19 changes: 10 additions & 9 deletions core/chain-configs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ pub use client_config::{
default_header_sync_progress_timeout, default_header_sync_stall_ban_timeout,
default_log_summary_period, default_orphan_state_witness_max_size,
default_orphan_state_witness_pool_size, default_produce_chunk_add_transactions_time_limit,
default_state_sync_enabled, default_state_sync_external_timeout,
default_state_sync_p2p_timeout, default_state_sync_retry_timeout, default_sync_check_period,
default_sync_height_threshold, default_sync_max_block_requests, default_sync_step_period,
default_transaction_pool_size_limit, default_trie_viewer_state_size_limit,
default_tx_routing_height_horizon, default_view_client_threads,
default_view_client_throttle_period, ChunkDistributionNetworkConfig, ChunkDistributionUris,
ClientConfig, DumpConfig, EpochSyncConfig, ExternalStorageConfig, ExternalStorageLocation,
GCConfig, LogSummaryStyle, ReshardingConfig, ReshardingHandle, StateSyncConfig, SyncConfig,
DEFAULT_GC_NUM_EPOCHS_TO_KEEP, DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_EXTERNAL,
default_state_sync_enabled, default_state_sync_external_backoff,
default_state_sync_external_timeout, default_state_sync_p2p_timeout,
default_state_sync_retry_backoff, default_sync_check_period, default_sync_height_threshold,
default_sync_max_block_requests, default_sync_step_period, default_transaction_pool_size_limit,
default_trie_viewer_state_size_limit, default_tx_routing_height_horizon,
default_view_client_threads, default_view_client_throttle_period,
ChunkDistributionNetworkConfig, ChunkDistributionUris, ClientConfig, DumpConfig,
EpochSyncConfig, ExternalStorageConfig, ExternalStorageLocation, GCConfig, LogSummaryStyle,
ReshardingConfig, ReshardingHandle, StateSyncConfig, SyncConfig, DEFAULT_GC_NUM_EPOCHS_TO_KEEP,
DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_EXTERNAL,
DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_ON_CATCHUP_EXTERNAL, MIN_GC_NUM_EPOCHS_TO_KEEP,
TEST_STATE_SYNC_TIMEOUT,
};
Expand Down
3 changes: 2 additions & 1 deletion integration-tests/src/test_loop/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,8 @@ impl TestLoopBuilder {
client_config.state_sync_enabled = true;
client_config.state_sync_external_timeout = Duration::milliseconds(100);
client_config.state_sync_p2p_timeout = Duration::milliseconds(100);
client_config.state_sync_retry_timeout = Duration::milliseconds(100);
client_config.state_sync_retry_backoff = Duration::milliseconds(100);
client_config.state_sync_external_backoff = Duration::milliseconds(100);
if let Some(num_epochs) = self.gc_num_epochs_to_keep {
client_config.gc.gc_num_epochs_to_keep = num_epochs;
}
Expand Down
41 changes: 23 additions & 18 deletions nearcore/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@ use near_chain_configs::{
default_header_sync_progress_timeout, default_header_sync_stall_ban_timeout,
default_log_summary_period, default_orphan_state_witness_max_size,
default_orphan_state_witness_pool_size, default_produce_chunk_add_transactions_time_limit,
default_state_sync_enabled, default_state_sync_external_timeout,
default_state_sync_p2p_timeout, default_state_sync_retry_timeout, default_sync_check_period,
default_sync_height_threshold, default_sync_max_block_requests, default_sync_step_period,
default_transaction_pool_size_limit, default_trie_viewer_state_size_limit,
default_tx_routing_height_horizon, default_view_client_threads,
default_view_client_throttle_period, get_initial_supply, ChunkDistributionNetworkConfig,
ClientConfig, EpochSyncConfig, GCConfig, Genesis, GenesisConfig, GenesisValidationMode,
LogSummaryStyle, MutableConfigValue, MutableValidatorSigner, ReshardingConfig, StateSyncConfig,
BLOCK_PRODUCER_KICKOUT_THRESHOLD, CHUNK_PRODUCER_KICKOUT_THRESHOLD,
CHUNK_VALIDATOR_ONLY_KICKOUT_THRESHOLD, EXPECTED_EPOCH_LENGTH, FAST_EPOCH_LENGTH,
FISHERMEN_THRESHOLD, GAS_PRICE_ADJUSTMENT_RATE, GENESIS_CONFIG_FILENAME, INITIAL_GAS_LIMIT,
MAX_INFLATION_RATE, MIN_BLOCK_PRODUCTION_DELAY, MIN_GAS_PRICE, NEAR_BASE, NUM_BLOCKS_PER_YEAR,
NUM_BLOCK_PRODUCER_SEATS, PROTOCOL_REWARD_RATE, PROTOCOL_UPGRADE_STAKE_THRESHOLD,
TRANSACTION_VALIDITY_PERIOD,
default_state_sync_enabled, default_state_sync_external_backoff,
default_state_sync_external_timeout, default_state_sync_p2p_timeout,
default_state_sync_retry_backoff, default_sync_check_period, default_sync_height_threshold,
default_sync_max_block_requests, default_sync_step_period, default_transaction_pool_size_limit,
default_trie_viewer_state_size_limit, default_tx_routing_height_horizon,
default_view_client_threads, default_view_client_throttle_period, get_initial_supply,
ChunkDistributionNetworkConfig, ClientConfig, EpochSyncConfig, GCConfig, Genesis,
GenesisConfig, GenesisValidationMode, LogSummaryStyle, MutableConfigValue,
MutableValidatorSigner, ReshardingConfig, StateSyncConfig, BLOCK_PRODUCER_KICKOUT_THRESHOLD,
CHUNK_PRODUCER_KICKOUT_THRESHOLD, CHUNK_VALIDATOR_ONLY_KICKOUT_THRESHOLD,
EXPECTED_EPOCH_LENGTH, FAST_EPOCH_LENGTH, FISHERMEN_THRESHOLD, GAS_PRICE_ADJUSTMENT_RATE,
GENESIS_CONFIG_FILENAME, INITIAL_GAS_LIMIT, MAX_INFLATION_RATE, MIN_BLOCK_PRODUCTION_DELAY,
MIN_GAS_PRICE, NEAR_BASE, NUM_BLOCKS_PER_YEAR, NUM_BLOCK_PRODUCER_SEATS, PROTOCOL_REWARD_RATE,
PROTOCOL_UPGRADE_STAKE_THRESHOLD, TRANSACTION_VALIDITY_PERIOD,
};
use near_config_utils::{DownloadConfigType, ValidationError, ValidationErrors};
use near_crypto::{InMemorySigner, KeyFile, KeyType, PublicKey};
Expand Down Expand Up @@ -162,9 +162,12 @@ pub struct Consensus {
#[serde(default = "default_state_sync_p2p_timeout")]
#[serde(with = "near_async::time::serde_duration_as_std")]
pub state_sync_p2p_timeout: Duration,
#[serde(default = "default_state_sync_retry_timeout")]
#[serde(default = "default_state_sync_retry_backoff")]
#[serde(with = "near_async::time::serde_duration_as_std")]
pub state_sync_retry_timeout: Duration,
pub state_sync_retry_backoff: Duration,
#[serde(default = "default_state_sync_external_backoff")]
#[serde(with = "near_async::time::serde_duration_as_std")]
pub state_sync_external_backoff: Duration,
/// Expected increase of header head weight per second during header sync
#[serde(default = "default_header_sync_expected_height_per_second")]
pub header_sync_expected_height_per_second: u64,
Expand Down Expand Up @@ -207,7 +210,8 @@ impl Default for Consensus {
header_sync_stall_ban_timeout: default_header_sync_stall_ban_timeout(),
state_sync_external_timeout: default_state_sync_external_timeout(),
state_sync_p2p_timeout: default_state_sync_p2p_timeout(),
state_sync_retry_timeout: default_state_sync_retry_timeout(),
state_sync_retry_backoff: default_state_sync_retry_backoff(),
state_sync_external_backoff: default_state_sync_external_backoff(),
header_sync_expected_height_per_second: default_header_sync_expected_height_per_second(
),
sync_check_period: default_sync_check_period(),
Expand Down Expand Up @@ -562,7 +566,8 @@ impl NearConfig {
.header_sync_expected_height_per_second,
state_sync_external_timeout: config.consensus.state_sync_external_timeout,
state_sync_p2p_timeout: config.consensus.state_sync_p2p_timeout,
state_sync_retry_timeout: config.consensus.state_sync_retry_timeout,
state_sync_retry_backoff: config.consensus.state_sync_retry_backoff,
state_sync_external_backoff: config.consensus.state_sync_external_backoff,
min_num_peers: config.consensus.min_num_peers,
log_summary_period: config.log_summary_period,
produce_empty_blocks: config.consensus.produce_empty_blocks,
Expand Down

0 comments on commit d11323c

Please sign in to comment.