From 645452f52a8103dd283f36b855977e07025112db Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Fri, 9 Feb 2024 16:24:52 -0800 Subject: [PATCH] Use individual functions when testing for failures. --- wen-restart/src/wen_restart.rs | 175 +++++++++++++-------------------- 1 file changed, 71 insertions(+), 104 deletions(-) diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index ea744016ebdc95..14d00bc5a19b7b 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -62,7 +62,7 @@ pub enum WenRestartError { UnexpectedState(wen_restart_proto::State), } -fn send_restart_last_voted_fork_slots( +pub(crate) fn send_restart_last_voted_fork_slots( last_vote: VoteTransaction, blockstore: Arc, cluster_info: Arc, @@ -106,7 +106,7 @@ fn send_restart_last_voted_fork_slots( Ok(()) } -fn aggregate_restart_last_voted_fork_slots( +pub(crate) fn aggregate_restart_last_voted_fork_slots( wen_restart_path: &PathBuf, wait_for_supermajority_threshold_percent: u64, cluster_info: Arc, @@ -120,6 +120,9 @@ fn aggregate_restart_last_voted_fork_slots( root_bank = bank_forks.read().unwrap().root_bank().clone(); } let root_slot = root_bank.slot(); + if progress.my_last_voted_fork_slots.is_none() { + return Err(WenRestartError::MissingLastVotedForkSlots); + } let mut last_voted_fork_slots_aggregate = LastVotedForkSlotsAggregate::new( root_slot, REPAIR_THRESHOLD, @@ -467,23 +470,33 @@ mod tests { expected_progress: WenRestartProgress, ) { let start = timestamp(); - let mut progress; + let mut progress = WenRestartProgress { + state: RestartState::Init.into(), + my_last_voted_fork_slots: None, + last_voted_fork_slots_aggregate: None, + }; loop { - progress = read_wen_restart_records(&wen_restart_proto_path).unwrap(); - if let Some(my_last_voted_fork_slots) = &expected_progress.my_last_voted_fork_slots { - if let Some(record) = progress.my_last_voted_fork_slots.as_mut() { - record.wallclock = my_last_voted_fork_slots.wallclock; + if let Ok(new_progress) = read_wen_restart_records(&wen_restart_proto_path) { + progress = new_progress; + if let Some(my_last_voted_fork_slots) = &expected_progress.my_last_voted_fork_slots + { + if let Some(record) = progress.my_last_voted_fork_slots.as_mut() { + record.wallclock = my_last_voted_fork_slots.wallclock; + } + } + if progress == expected_progress { + return; } - } - if progress == expected_progress { - return; } if timestamp().saturating_sub(start) > WAIT_FOR_THREAD_TIMEOUT { - break; + panic!( + "wait_on_expected_progress_with_timeout failed to get expected progress {:?} expected {:?}", + &progress, + expected_progress + ); } sleep(Duration::from_millis(10)); } - assert_eq!(progress, expected_progress); } fn wen_restart_test_succeed_after_failure( @@ -629,35 +642,35 @@ mod tests { } #[test] - fn test_wen_restart_init_failures() { + fn test_wen_restart_send_last_voted_fork_failures() { let ledger_path = get_tmp_ledger_path_auto_delete!(); let test_state = wen_restart_test_init(&ledger_path); + let mut progress = wen_restart_proto::WenRestartProgress { + state: RestartState::Init.into(), + my_last_voted_fork_slots: None, + last_voted_fork_slots_aggregate: None, + }; + let original_progress = progress.clone(); assert_matches!( - wait_for_wen_restart( - &test_state.wen_restart_proto_path, + send_restart_last_voted_fork_slots( VoteTransaction::from(VoteStateUpdate::from(vec![(0, 8), (1, 1)])), test_state.blockstore.clone(), test_state.cluster_info.clone(), - test_state.bank_forks.clone(), - Some(Arc::new(RwLock::new(Vec::new()))), - 80, - Arc::new(AtomicBool::new(false)), + &mut progress ), Err(WenRestartError::InvalidLastVoteType(_)) ); + assert_eq!(progress, original_progress); assert_matches!( - wait_for_wen_restart( - &test_state.wen_restart_proto_path, + send_restart_last_voted_fork_slots( VoteTransaction::from(Vote::new(vec![], Hash::new_unique())), test_state.blockstore.clone(), test_state.cluster_info.clone(), - test_state.bank_forks.clone(), - Some(Arc::new(RwLock::new(Vec::new()))), - 80, - Arc::new(AtomicBool::new(false)), + &mut progress ), Err(WenRestartError::MissingLastVotedForkSlots) ); + assert_eq!(progress, original_progress); let last_vote_bankhash = Hash::new_unique(); let last_voted_fork_slots = test_state.last_voted_fork_slots.clone(); wen_restart_test_succeed_after_failure( @@ -679,33 +692,25 @@ mod tests { } #[test] - fn test_wen_restart_send_last_voted_fork_failures() { + fn test_write_wen_restart_records_failure() { let ledger_path = get_tmp_ledger_path_auto_delete!(); let test_state = wen_restart_test_init(&ledger_path); - assert!(write_wen_restart_records( - &test_state.wen_restart_proto_path, - &WenRestartProgress { - state: RestartState::Init.into(), - my_last_voted_fork_slots: None, - last_voted_fork_slots_aggregate: None, - } - ) - .is_ok()); - let last_vote_slot: Slot = test_state.last_voted_fork_slots[0]; - let last_vote_bankhash = Hash::new_unique(); + let progress = wen_restart_proto::WenRestartProgress { + state: RestartState::Init.into(), + my_last_voted_fork_slots: None, + last_voted_fork_slots_aggregate: None, + }; + assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress).is_ok()); change_proto_file_readonly(&test_state.wen_restart_proto_path, true); - assert_matches!(wait_for_wen_restart( - &test_state.wen_restart_proto_path, - VoteTransaction::from(Vote::new(vec![last_vote_slot], last_vote_bankhash)), - test_state.blockstore.clone(), - test_state.cluster_info.clone(), - test_state.bank_forks.clone(), - Some(Arc::new(RwLock::new(Vec::new()))), - 80, - Arc::new(AtomicBool::new(false)), - ), Err(WenRestartError::IoError(e)) if e.kind() == std::io::ErrorKind::PermissionDenied); + assert_matches!(write_wen_restart_records( + &test_state.wen_restart_proto_path, + &progress + ), + Err(WenRestartError::IoError(e)) if e.kind() == std::io::ErrorKind::PermissionDenied); change_proto_file_readonly(&test_state.wen_restart_proto_path, false); + assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress).is_ok()); let last_voted_fork_slots = test_state.last_voted_fork_slots.clone(); + let last_vote_bankhash = Hash::new_unique(); wen_restart_test_succeed_after_failure( test_state, last_vote_bankhash, @@ -754,29 +759,35 @@ mod tests { (last_vote_slot + 1..last_vote_slot + 3).collect(); // Skip the first 2 validators, because 0 is myself, we need 8 so it hits 80%. assert_eq!(test_state.validator_voting_keypairs.len(), 10); + let progress = WenRestartProgress { + state: RestartState::LastVotedForkSlots.into(), + my_last_voted_fork_slots: Some(LastVotedForkSlotsRecord { + last_voted_fork_slots: test_state.last_voted_fork_slots.clone(), + last_vote_bankhash: last_vote_bankhash.to_string(), + shred_version: SHRED_VERSION as u32, + wallclock: start_time, + }), + last_voted_fork_slots_aggregate: None, + }; for keypairs in test_state.validator_voting_keypairs.iter().skip(2) { let wen_restart_proto_path_clone = test_state.wen_restart_proto_path.clone(); let cluster_info_clone = test_state.cluster_info.clone(); - let blockstore_clone = test_state.blockstore.clone(); let bank_forks_clone = test_state.bank_forks.clone(); let exit = Arc::new(AtomicBool::new(false)); let exit_clone = exit.clone(); + let mut progress_clone = progress.clone(); let wen_restart_thread_handle = Builder::new() .name("solana-wen-restart".to_string()) .spawn(move || { assert_matches!( - wait_for_wen_restart( + aggregate_restart_last_voted_fork_slots( &wen_restart_proto_path_clone, - VoteTransaction::from(Vote::new( - vec![last_vote_slot], - last_vote_bankhash - )), - blockstore_clone, + 80, cluster_info_clone, bank_forks_clone, - Some(Arc::new(RwLock::new(Vec::new()))), - 80, + Arc::new(RwLock::new(Vec::new())), exit_clone, + &mut progress_clone, ), Err(WenRestartError::Exiting) ); @@ -822,54 +833,10 @@ mod tests { exit.store(true, Ordering::Relaxed); let _ = wen_restart_thread_handle.join(); } - // Corrupt proto file, should fail. - let mut record = read_wen_restart_records(&test_state.wen_restart_proto_path).unwrap(); - for slots_record in record - .last_voted_fork_slots_aggregate - .as_mut() - .unwrap() - .received - .values_mut() - { - // hash is 32 bytes, appending to it will cause record to be corrupt. - slots_record.last_vote_bankhash += "1"; - } - assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &record).is_ok()); - assert_matches!( - wait_for_wen_restart( - &test_state.wen_restart_proto_path, - VoteTransaction::from(Vote::new(vec![last_vote_slot], last_vote_bankhash)), - test_state.blockstore.clone(), - test_state.cluster_info.clone(), - test_state.bank_forks.clone(), - Some(Arc::new(RwLock::new(Vec::new()))), - 80, - Arc::new(AtomicBool::new(false)), - ), - Err(WenRestartError::ParseHashError(_)) - ); - for slots_record in record - .last_voted_fork_slots_aggregate - .as_mut() - .unwrap() - .received - .values_mut() - { - // Make the hash valid again. - slots_record.last_vote_bankhash.pop(); - } - assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &record).is_ok()); - let mut parent_bank = test_state.bank_forks.read().unwrap().root_bank(); - for slot in expected_slots_to_repair { - let mut my_bank_forks = test_state.bank_forks.write().unwrap(); - my_bank_forks.insert(Bank::new_from_parent( - parent_bank.clone(), - &Pubkey::default(), - slot, - )); - parent_bank = my_bank_forks.get(slot).unwrap(); - parent_bank.freeze(); - } + + // Simulating successful repair of missing blocks. + insert_and_freeze_slots(test_state.bank_forks.clone(), expected_slots_to_repair); + let last_voted_fork_slots = test_state.last_voted_fork_slots.clone(); wen_restart_test_succeed_after_failure( test_state,