From fbdf74216345b041458b4d232a5ece8eaee9b919 Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Sat, 24 Aug 2024 17:38:34 -0700 Subject: [PATCH] =?UTF-8?q?wen=5Frestart:=20Make=20validator=20restart=20a?= =?UTF-8?q?nd=20wait=20for=20supermajority=20after=20=E2=80=A6=20(#1335)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * wen_restart: Make validator restart and wait for supermajority after Phase One completes. * Fix cargo sort error. * Adding two unittests. * Change function name and return type. * Don't change wait_for_supermajority, try to re-initialzie validator after wen_restart phase one. * Fix a bad merge and fix test. * Make validator exit if wen_restart is finished. * Remove unnecessary dependencies from Cargo.toml * Remove unused function. * Fix tests to check OK() is returned when in DONE state. * Add wen_restart_proto_path if specified in command line. * Fix command line name for wen_restart. * Log how much stake is required to exit. * Don't double count my own RestartLastVotedForkSlots. * Ignore HeaviestFork from myself. * Also send out HeaviestForkAggregate if I saw supermajority for the first time. * Forbid replay if wen_restart is in progress. * We still need the new bank part. * Try not grabbing a big write lock on bankforks. * Should read GenerateSnapshot in initialize as well. * Skip all replay stages while in wen_restart. * Root banks if necessary to send EAH request. * Root banks every 100 slot. * Do not start replay thread if in wen_restart. * Do not need to check in_wen_restart inside replay_stage any more. * Fix failed merge. * Make linter happy. * Fix the bad merge after switching to anyhow. * Remove unused code. * Fix bad merge. * Remove unnecessary clone. * Remove unused map_error. * No need to specify snapshot_path in restart commandline. * Small fixes. * Split the enabling of wen_restart into another PR. --- validator/src/main.rs | 22 +++-- wen-restart/src/wen_restart.rs | 154 ++++++++++++++++++++++++++++++--- 2 files changed, 156 insertions(+), 20 deletions(-) diff --git a/validator/src/main.rs b/validator/src/main.rs index 349f01ecace8d7..87b6a44cd9666e 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -34,7 +34,7 @@ use { tpu::DEFAULT_TPU_COALESCE, validator::{ is_snapshot_config_valid, BlockProductionMethod, BlockVerificationMethod, Validator, - ValidatorConfig, ValidatorStartProgress, + ValidatorConfig, ValidatorError, ValidatorStartProgress, }, }, solana_gossip::{ @@ -2045,7 +2045,7 @@ pub fn main() { // the one pushed by bootstrap. node.info.hot_swap_pubkey(identity_keypair.pubkey()); - let validator = Validator::new( + let validator = match Validator::new( node, identity_keypair, &ledger_path, @@ -2062,11 +2062,19 @@ pub fn main() { tpu_enable_udp, tpu_max_connections_per_ipaddr_per_minute, admin_service_post_init, - ) - .unwrap_or_else(|e| { - error!("Failed to start validator: {:?}", e); - exit(1); - }); + ) { + Ok(validator) => validator, + Err(err) => match err.downcast_ref() { + Some(ValidatorError::WenRestartFinished) => { + error!("Please remove --wen_restart and use --wait_for_supermajority as instructed above"); + exit(200); + } + _ => { + error!("Failed to start validator: {:?}", err); + exit(1); + } + }, + }; if let Some(filename) = init_complete_file { File::create(filename).unwrap_or_else(|_| { diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index 0fa7ec1cb65f3e..85f54779a854f2 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -88,6 +88,7 @@ pub enum WenRestartError { MalformedProgress(RestartState, String), MissingLastVotedForkSlots, MissingFullSnapshot(String), + MissingSnapshotInProtobuf, NotEnoughStakeAgreeingWithUs(Slot, Hash, HashMap<(Slot, Hash), u64>), UnexpectedState(wen_restart_proto::State), } @@ -148,6 +149,9 @@ impl std::fmt::Display for WenRestartError { WenRestartError::MissingFullSnapshot(directory) => { write!(f, "Missing full snapshot, please check whether correct directory is supplied {directory}") } + WenRestartError::MissingSnapshotInProtobuf => { + write!(f, "Missing snapshot in protobuf") + } WenRestartError::NotEnoughStakeAgreeingWithUs(slot, hash, block_stake_map) => { write!( f, @@ -188,7 +192,11 @@ pub(crate) enum WenRestartProgressInternalState { new_root_slot: Slot, my_snapshot: Option, }, - Done, + Done { + slot: Slot, + hash: Hash, + shred_version: u16, + }, } pub(crate) fn send_restart_last_voted_fork_slots( @@ -719,9 +727,10 @@ pub(crate) fn aggregate_restart_heaviest_fork( let total_active_stake_seen_supermajority = heaviest_fork_aggregate.total_active_stake_seen_supermajority(); info!( - "Total active stake seeing supermajority: {} Total active stake: {} Total stake {}", + "Total active stake seeing supermajority: {} Total active stake: {} Required to exit {} Total stake {}", total_active_stake_seen_supermajority, heaviest_fork_aggregate.total_active_stake(), + majority_stake_required, total_stake ); let can_exit = total_active_stake_seen_supermajority >= majority_stake_required; @@ -833,6 +842,7 @@ pub(crate) fn aggregate_restart_heaviest_fork( } } +#[derive(Clone)] pub struct WenRestartConfig { pub wen_restart_path: PathBuf, pub last_vote: VoteTransaction, @@ -953,7 +963,20 @@ pub fn wait_for_wen_restart(config: WenRestartConfig) -> Result<()> { my_snapshot: Some(snapshot_record), } } - WenRestartProgressInternalState::Done => return Ok(()), + // Proceed to restart if we are ready to wait for supermajority. + WenRestartProgressInternalState::Done { + slot, + hash, + shred_version, + } => { + error!( + "Wen start finished, please remove --wen_restart and restart with \ + --wait-for-supermajority {} --expected-bank-hash {} --shred-version {}\ + --hard-fork {} --no-snapshot-fetchsnapshot", + slot, hash, shred_version, slot + ); + return Ok(()); + } }; state = increment_and_write_wen_restart_records( &config.wen_restart_path, @@ -1038,12 +1061,16 @@ pub(crate) fn increment_and_write_wen_restart_records( if let Some(my_snapshot) = my_snapshot { progress.set_state(RestartState::Done); progress.my_snapshot = Some(my_snapshot.clone()); - WenRestartProgressInternalState::Done + WenRestartProgressInternalState::Done { + slot: my_snapshot.slot, + hash: Hash::from_str(&my_snapshot.bankhash).unwrap(), + shred_version: my_snapshot.shred_version as u16, + } } else { - return Err(WenRestartError::UnexpectedState(RestartState::Done).into()); + return Err(WenRestartError::MissingSnapshotInProtobuf.into()); } } - WenRestartProgressInternalState::Done => { + WenRestartProgressInternalState::Done { .. } => { return Err(WenRestartError::UnexpectedState(RestartState::Done).into()) } }; @@ -1077,7 +1104,20 @@ pub(crate) fn initialize( } }; match progress.state() { - RestartState::Done => Ok((WenRestartProgressInternalState::Done, progress)), + RestartState::Done => { + if let Some(my_snapshot) = progress.my_snapshot.as_ref() { + Ok(( + WenRestartProgressInternalState::Done { + slot: my_snapshot.slot, + hash: Hash::from_str(&my_snapshot.bankhash).unwrap(), + shred_version: my_snapshot.shred_version as u16, + }, + progress, + )) + } else { + Err(WenRestartError::MissingSnapshotInProtobuf.into()) + } + } RestartState::Init => { let last_voted_fork_slots; let last_vote_bankhash; @@ -1515,7 +1555,6 @@ mod tests { #[test] fn test_wen_restart_normal_flow() { - solana_logger::setup(); let ledger_path = get_tmp_ledger_path_auto_delete!(); let wen_restart_repair_slots = Some(Arc::new(RwLock::new(Vec::new()))); let test_state = wen_restart_test_init(&ledger_path); @@ -2219,8 +2258,29 @@ mod tests { progress, ) ); + let last_vote_slot = test_state.last_voted_fork_slots[0]; + let snapshot_slot_hash = Hash::new_unique(); let progress = WenRestartProgress { state: RestartState::Done.into(), + my_last_voted_fork_slots: Some(LastVotedForkSlotsRecord { + last_voted_fork_slots: test_state.last_voted_fork_slots.clone(), + last_vote_bankhash: last_vote_bankhash.to_string(), + shred_version: SHRED_VERSION as u32, + wallclock: 0, + }), + my_heaviest_fork: Some(HeaviestForkRecord { + slot: last_vote_slot, + bankhash: snapshot_slot_hash.to_string(), + total_active_stake: 0, + shred_version: SHRED_VERSION as u32, + wallclock: 0, + }), + my_snapshot: Some(GenerateSnapshotRecord { + slot: last_vote_slot, + bankhash: snapshot_slot_hash.to_string(), + shred_version: SHRED_VERSION as u32, + path: "/path/to/snapshot".to_string(), + }), ..Default::default() }; assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress,).is_ok()); @@ -2231,7 +2291,14 @@ mod tests { test_state.blockstore.clone() ) .unwrap(), - (WenRestartProgressInternalState::Done, progress) + ( + WenRestartProgressInternalState::Done { + slot: last_vote_slot, + hash: snapshot_slot_hash, + shred_version: SHRED_VERSION, + }, + progress + ) ); } @@ -2500,11 +2567,13 @@ mod tests { shred_version: SHRED_VERSION as u32, wallclock: 0, }); + let my_bankhash = Hash::new_unique(); + let new_shred_version = SHRED_VERSION + 57; let my_snapshot = Some(GenerateSnapshotRecord { slot: 1, - bankhash: Hash::new_unique().to_string(), + bankhash: my_bankhash.to_string(), path: "snapshot_1".to_string(), - shred_version: SHRED_VERSION as u32, + shred_version: new_shred_version as u32, }); let heaviest_fork_aggregate = Some(HeaviestForkAggregateRecord { received: HashMap::new(), @@ -2637,7 +2706,11 @@ mod tests { new_root_slot: 1, my_snapshot: my_snapshot.clone(), }, - WenRestartProgressInternalState::Done, + WenRestartProgressInternalState::Done { + slot: 1, + hash: my_bankhash, + shred_version: new_shred_version, + }, WenRestartProgress { state: RestartState::HeaviestFork.into(), my_last_voted_fork_slots: my_last_voted_fork_slots.clone(), @@ -2675,7 +2748,11 @@ mod tests { assert_eq!( increment_and_write_wen_restart_records( &wen_restart_proto_path, - WenRestartProgressInternalState::Done, + WenRestartProgressInternalState::Done { + slot: 1, + hash: my_bankhash, + shred_version: new_shred_version, + }, &mut progress ) .unwrap_err() @@ -3242,4 +3319,55 @@ mod tests { WenRestartError::BlockNotFound(empty_slot), ); } + + #[test] + fn test_return_ok_after_wait_is_done() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let test_state = wen_restart_test_init(&ledger_path); + let last_vote_slot = test_state.last_voted_fork_slots[0]; + let last_vote_bankhash = Hash::new_unique(); + let config = WenRestartConfig { + wen_restart_path: test_state.wen_restart_proto_path.clone(), + last_vote: VoteTransaction::from(Vote::new(vec![last_vote_slot], last_vote_bankhash)), + blockstore: test_state.blockstore.clone(), + cluster_info: test_state.cluster_info.clone(), + bank_forks: test_state.bank_forks.clone(), + wen_restart_repair_slots: Some(Arc::new(RwLock::new(Vec::new()))), + wait_for_supermajority_threshold_percent: 80, + snapshot_config: SnapshotConfig::default(), + accounts_background_request_sender: AbsRequestSender::default(), + genesis_config_hash: test_state.genesis_config_hash, + exit: Arc::new(AtomicBool::new(false)), + }; + assert!(write_wen_restart_records( + &test_state.wen_restart_proto_path, + &WenRestartProgress { + state: RestartState::Done.into(), + ..Default::default() + } + ) + .is_ok()); + assert_eq!( + wait_for_wen_restart(config.clone()) + .unwrap_err() + .downcast::() + .unwrap(), + WenRestartError::MissingSnapshotInProtobuf + ); + assert!(write_wen_restart_records( + &test_state.wen_restart_proto_path, + &WenRestartProgress { + state: RestartState::Done.into(), + my_snapshot: Some(GenerateSnapshotRecord { + slot: 0, + bankhash: Hash::new_unique().to_string(), + shred_version: SHRED_VERSION as u32, + path: "snapshot".to_string(), + }), + ..Default::default() + } + ) + .is_ok()); + assert!(wait_for_wen_restart(config).is_ok()); + } }