Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Use individual functions when testing for failures.
Browse files Browse the repository at this point in the history
  • Loading branch information
wen-coding committed Feb 10, 2024
1 parent 056aef7 commit 645452f
Showing 1 changed file with 71 additions and 104 deletions.
175 changes: 71 additions & 104 deletions wen-restart/src/wen_restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub enum WenRestartError {
UnexpectedState(wen_restart_proto::State),
}

fn send_restart_last_voted_fork_slots(
pub(crate) fn send_restart_last_voted_fork_slots(
last_vote: VoteTransaction,
blockstore: Arc<Blockstore>,
cluster_info: Arc<ClusterInfo>,
Expand Down Expand Up @@ -106,7 +106,7 @@ fn send_restart_last_voted_fork_slots(
Ok(())
}

fn aggregate_restart_last_voted_fork_slots(
pub(crate) fn aggregate_restart_last_voted_fork_slots(
wen_restart_path: &PathBuf,
wait_for_supermajority_threshold_percent: u64,
cluster_info: Arc<ClusterInfo>,
Expand All @@ -120,6 +120,9 @@ fn aggregate_restart_last_voted_fork_slots(
root_bank = bank_forks.read().unwrap().root_bank().clone();
}
let root_slot = root_bank.slot();
if progress.my_last_voted_fork_slots.is_none() {
return Err(WenRestartError::MissingLastVotedForkSlots);
}
let mut last_voted_fork_slots_aggregate = LastVotedForkSlotsAggregate::new(
root_slot,
REPAIR_THRESHOLD,
Expand Down Expand Up @@ -467,23 +470,33 @@ mod tests {
expected_progress: WenRestartProgress,
) {
let start = timestamp();
let mut progress;
let mut progress = WenRestartProgress {
state: RestartState::Init.into(),
my_last_voted_fork_slots: None,
last_voted_fork_slots_aggregate: None,
};
loop {
progress = read_wen_restart_records(&wen_restart_proto_path).unwrap();
if let Some(my_last_voted_fork_slots) = &expected_progress.my_last_voted_fork_slots {
if let Some(record) = progress.my_last_voted_fork_slots.as_mut() {
record.wallclock = my_last_voted_fork_slots.wallclock;
if let Ok(new_progress) = read_wen_restart_records(&wen_restart_proto_path) {
progress = new_progress;
if let Some(my_last_voted_fork_slots) = &expected_progress.my_last_voted_fork_slots
{
if let Some(record) = progress.my_last_voted_fork_slots.as_mut() {
record.wallclock = my_last_voted_fork_slots.wallclock;
}
}
if progress == expected_progress {
return;
}
}
if progress == expected_progress {
return;
}
if timestamp().saturating_sub(start) > WAIT_FOR_THREAD_TIMEOUT {
break;
panic!(
"wait_on_expected_progress_with_timeout failed to get expected progress {:?} expected {:?}",
&progress,
expected_progress
);
}
sleep(Duration::from_millis(10));
}
assert_eq!(progress, expected_progress);
}

fn wen_restart_test_succeed_after_failure(
Expand Down Expand Up @@ -629,35 +642,35 @@ mod tests {
}

#[test]
fn test_wen_restart_init_failures() {
fn test_wen_restart_send_last_voted_fork_failures() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let test_state = wen_restart_test_init(&ledger_path);
let mut progress = wen_restart_proto::WenRestartProgress {
state: RestartState::Init.into(),
my_last_voted_fork_slots: None,
last_voted_fork_slots_aggregate: None,
};
let original_progress = progress.clone();
assert_matches!(
wait_for_wen_restart(
&test_state.wen_restart_proto_path,
send_restart_last_voted_fork_slots(
VoteTransaction::from(VoteStateUpdate::from(vec![(0, 8), (1, 1)])),
test_state.blockstore.clone(),
test_state.cluster_info.clone(),
test_state.bank_forks.clone(),
Some(Arc::new(RwLock::new(Vec::new()))),
80,
Arc::new(AtomicBool::new(false)),
&mut progress
),
Err(WenRestartError::InvalidLastVoteType(_))
);
assert_eq!(progress, original_progress);
assert_matches!(
wait_for_wen_restart(
&test_state.wen_restart_proto_path,
send_restart_last_voted_fork_slots(
VoteTransaction::from(Vote::new(vec![], Hash::new_unique())),
test_state.blockstore.clone(),
test_state.cluster_info.clone(),
test_state.bank_forks.clone(),
Some(Arc::new(RwLock::new(Vec::new()))),
80,
Arc::new(AtomicBool::new(false)),
&mut progress
),
Err(WenRestartError::MissingLastVotedForkSlots)
);
assert_eq!(progress, original_progress);
let last_vote_bankhash = Hash::new_unique();
let last_voted_fork_slots = test_state.last_voted_fork_slots.clone();
wen_restart_test_succeed_after_failure(
Expand All @@ -679,33 +692,25 @@ mod tests {
}

#[test]
fn test_wen_restart_send_last_voted_fork_failures() {
fn test_write_wen_restart_records_failure() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let test_state = wen_restart_test_init(&ledger_path);
assert!(write_wen_restart_records(
&test_state.wen_restart_proto_path,
&WenRestartProgress {
state: RestartState::Init.into(),
my_last_voted_fork_slots: None,
last_voted_fork_slots_aggregate: None,
}
)
.is_ok());
let last_vote_slot: Slot = test_state.last_voted_fork_slots[0];
let last_vote_bankhash = Hash::new_unique();
let progress = wen_restart_proto::WenRestartProgress {
state: RestartState::Init.into(),
my_last_voted_fork_slots: None,
last_voted_fork_slots_aggregate: None,
};
assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress).is_ok());
change_proto_file_readonly(&test_state.wen_restart_proto_path, true);
assert_matches!(wait_for_wen_restart(
&test_state.wen_restart_proto_path,
VoteTransaction::from(Vote::new(vec![last_vote_slot], last_vote_bankhash)),
test_state.blockstore.clone(),
test_state.cluster_info.clone(),
test_state.bank_forks.clone(),
Some(Arc::new(RwLock::new(Vec::new()))),
80,
Arc::new(AtomicBool::new(false)),
), Err(WenRestartError::IoError(e)) if e.kind() == std::io::ErrorKind::PermissionDenied);
assert_matches!(write_wen_restart_records(
&test_state.wen_restart_proto_path,
&progress
),
Err(WenRestartError::IoError(e)) if e.kind() == std::io::ErrorKind::PermissionDenied);
change_proto_file_readonly(&test_state.wen_restart_proto_path, false);
assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress).is_ok());
let last_voted_fork_slots = test_state.last_voted_fork_slots.clone();
let last_vote_bankhash = Hash::new_unique();
wen_restart_test_succeed_after_failure(
test_state,
last_vote_bankhash,
Expand Down Expand Up @@ -754,29 +759,35 @@ mod tests {
(last_vote_slot + 1..last_vote_slot + 3).collect();
// Skip the first 2 validators, because 0 is myself, we need 8 so it hits 80%.
assert_eq!(test_state.validator_voting_keypairs.len(), 10);
let progress = WenRestartProgress {
state: RestartState::LastVotedForkSlots.into(),
my_last_voted_fork_slots: Some(LastVotedForkSlotsRecord {
last_voted_fork_slots: test_state.last_voted_fork_slots.clone(),
last_vote_bankhash: last_vote_bankhash.to_string(),
shred_version: SHRED_VERSION as u32,
wallclock: start_time,
}),
last_voted_fork_slots_aggregate: None,
};
for keypairs in test_state.validator_voting_keypairs.iter().skip(2) {
let wen_restart_proto_path_clone = test_state.wen_restart_proto_path.clone();
let cluster_info_clone = test_state.cluster_info.clone();
let blockstore_clone = test_state.blockstore.clone();
let bank_forks_clone = test_state.bank_forks.clone();
let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone();
let mut progress_clone = progress.clone();
let wen_restart_thread_handle = Builder::new()
.name("solana-wen-restart".to_string())
.spawn(move || {
assert_matches!(
wait_for_wen_restart(
aggregate_restart_last_voted_fork_slots(
&wen_restart_proto_path_clone,
VoteTransaction::from(Vote::new(
vec![last_vote_slot],
last_vote_bankhash
)),
blockstore_clone,
80,
cluster_info_clone,
bank_forks_clone,
Some(Arc::new(RwLock::new(Vec::new()))),
80,
Arc::new(RwLock::new(Vec::new())),
exit_clone,
&mut progress_clone,
),
Err(WenRestartError::Exiting)
);
Expand Down Expand Up @@ -822,54 +833,10 @@ mod tests {
exit.store(true, Ordering::Relaxed);
let _ = wen_restart_thread_handle.join();
}
// Corrupt proto file, should fail.
let mut record = read_wen_restart_records(&test_state.wen_restart_proto_path).unwrap();
for slots_record in record
.last_voted_fork_slots_aggregate
.as_mut()
.unwrap()
.received
.values_mut()
{
// hash is 32 bytes, appending to it will cause record to be corrupt.
slots_record.last_vote_bankhash += "1";
}
assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &record).is_ok());
assert_matches!(
wait_for_wen_restart(
&test_state.wen_restart_proto_path,
VoteTransaction::from(Vote::new(vec![last_vote_slot], last_vote_bankhash)),
test_state.blockstore.clone(),
test_state.cluster_info.clone(),
test_state.bank_forks.clone(),
Some(Arc::new(RwLock::new(Vec::new()))),
80,
Arc::new(AtomicBool::new(false)),
),
Err(WenRestartError::ParseHashError(_))
);
for slots_record in record
.last_voted_fork_slots_aggregate
.as_mut()
.unwrap()
.received
.values_mut()
{
// Make the hash valid again.
slots_record.last_vote_bankhash.pop();
}
assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &record).is_ok());
let mut parent_bank = test_state.bank_forks.read().unwrap().root_bank();
for slot in expected_slots_to_repair {
let mut my_bank_forks = test_state.bank_forks.write().unwrap();
my_bank_forks.insert(Bank::new_from_parent(
parent_bank.clone(),
&Pubkey::default(),
slot,
));
parent_bank = my_bank_forks.get(slot).unwrap();
parent_bank.freeze();
}

// Simulating successful repair of missing blocks.
insert_and_freeze_slots(test_state.bank_forks.clone(), expected_slots_to_repair);

let last_voted_fork_slots = test_state.last_voted_fork_slots.clone();
wen_restart_test_succeed_after_failure(
test_state,
Expand Down

0 comments on commit 645452f

Please sign in to comment.