diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index 44af862ec38cde..5e794ce2f43560 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -68,8 +68,8 @@ const REPAIR_THRESHOLD: f64 = 0.42; const HEAVIEST_FORK_THRESHOLD_DELTA: f64 = 0.38; // We allow at most 5% of the stake to disagree with us. const HEAVIEST_FORK_DISAGREE_THRESHOLD_PERCENT: f64 = 5.0; -// We update HeaviestFork every 30 minutes or when we can exit. -const HEAVIEST_REFRESH_INTERVAL_IN_SECONDS: u64 = 1800; +// We update HeaviestFork every 5 minutes at least. +const HEAVIEST_REFRESH_INTERVAL_IN_SECONDS: u64 = 300; #[derive(Debug, PartialEq)] pub enum WenRestartError { @@ -639,17 +639,14 @@ pub(crate) fn aggregate_restart_heaviest_fork( .as_mut() .unwrap() .total_active_stake = total_active_stake; - cluster_info.push_restart_heaviest_fork( - heaviest_fork_slot, - heaviest_fork_hash, - total_active_stake, - ); let mut progress_last_sent = Instant::now(); let mut cursor = solana_gossip::crds::Cursor::default(); let mut progress_changed = false; let majority_stake_required = (total_stake as f64 / 100.0 * adjusted_threshold_percent as f64).round() as u64; + let mut total_active_stake_higher_than_supermajority = false; + let mut first_time_entering_loop = true; loop { if exit.load(Ordering::Relaxed) { return Err(WenRestartError::Exiting.into()); @@ -690,10 +687,21 @@ pub(crate) fn aggregate_restart_heaviest_fork( total_stake ); let can_exit = total_active_stake_seen_supermajority >= majority_stake_required; - // Only send out updates every 30 minutes or when we can exit. + let saw_supermajority_first_time = current_total_active_stake + >= majority_stake_required + && !total_active_stake_higher_than_supermajority + && { + total_active_stake_higher_than_supermajority = true; + true + }; + // Only send out updates every 5 minutes or when we can exit or active stake passes supermajority + // the first time. if progress_last_sent.elapsed().as_secs() >= HEAVIEST_REFRESH_INTERVAL_IN_SECONDS || can_exit + || first_time_entering_loop + || saw_supermajority_first_time { + first_time_entering_loop = false; cluster_info.push_restart_heaviest_fork( heaviest_fork_slot, heaviest_fork_hash, @@ -1183,8 +1191,10 @@ mod tests { const EXPECTED_SLOTS: Slot = 90; const TICKS_PER_SLOT: u64 = 2; const TOTAL_VALIDATOR_COUNT: u16 = 20; - const MY_INDEX: usize = 0; + const MY_INDEX: usize = TOTAL_VALIDATOR_COUNT as usize - 1; const WAIT_FOR_THREAD_TIMEOUT: u64 = 10_000; + const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80; + const NON_CONFORMING_VALIDATOR_PERCENT: u64 = 5; fn push_restart_last_voted_fork_slots( cluster_info: Arc, @@ -1492,11 +1502,18 @@ mod tests { .unwrap(); let mut rng = rand::thread_rng(); let mut expected_received_last_voted_fork_slots = HashMap::new(); - // Skip the first 5 validators, because 0 is myself, we only need 15 more to reach 80%. + let validators_to_take: usize = + (WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT * TOTAL_VALIDATOR_COUNT as u64 / 100 - 1) + .try_into() + .unwrap(); let mut last_voted_fork_slots_from_others = test_state.last_voted_fork_slots.clone(); last_voted_fork_slots_from_others.reverse(); last_voted_fork_slots_from_others.append(&mut expected_slots_to_repair.clone()); - for keypairs in test_state.validator_voting_keypairs.iter().skip(5) { + for keypairs in test_state + .validator_voting_keypairs + .iter() + .take(validators_to_take) + { let node_pubkey = keypairs.node_keypair.pubkey(); let node = ContactInfo::new_rand(&mut rng, Some(node_pubkey)); let last_vote_hash = Hash::new_unique(); @@ -1545,8 +1562,20 @@ mod tests { } // Now simulate receiving HeaviestFork messages. let mut expected_received_heaviest_fork = HashMap::new(); + let validators_to_take: usize = ((WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT + - NON_CONFORMING_VALIDATOR_PERCENT) + * TOTAL_VALIDATOR_COUNT as u64 + / 100 + - 1) + .try_into() + .unwrap(); // HeaviestFork only requires 75% vs 80% required for LastVotedForkSlots. We have 5% stake, so we need 70%. - for keypairs in test_state.validator_voting_keypairs.iter().skip(6) { + let total_active_stake_during_heaviest_fork = (validators_to_take + 1) as u64 * 100; + for keypairs in test_state + .validator_voting_keypairs + .iter() + .take(validators_to_take) + { let node_pubkey = keypairs.node_keypair.pubkey(); let node = ContactInfo::new_rand(&mut rng, Some(node_pubkey)); let now = timestamp(); @@ -1555,7 +1584,7 @@ mod tests { &node, expected_heaviest_fork_slot, &expected_heaviest_fork_bankhash, - 1500, + total_active_stake_during_heaviest_fork, &keypairs.node_keypair, now, ); @@ -1564,7 +1593,7 @@ mod tests { HeaviestForkRecord { slot: expected_heaviest_fork_slot, bankhash: expected_heaviest_fork_bankhash.to_string(), - total_active_stake: 1500, + total_active_stake: total_active_stake_during_heaviest_fork, shred_version: SHRED_VERSION as u32, wallclock: now, }, @@ -1582,9 +1611,18 @@ mod tests { let mut expected_slots_stake_map: HashMap = test_state .last_voted_fork_slots .iter() - .map(|slot| (*slot, 1600)) + .map(|slot| { + ( + *slot, + WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT * TOTAL_VALIDATOR_COUNT as u64, + ) + }) .collect(); - expected_slots_stake_map.extend(expected_slots_to_repair.iter().map(|slot| (*slot, 1500))); + expected_slots_stake_map.extend( + expected_slots_to_repair + .iter() + .map(|slot| (*slot, total_active_stake_during_heaviest_fork)), + ); assert_eq!( progress, WenRestartProgress { @@ -1599,7 +1637,8 @@ mod tests { received: expected_received_last_voted_fork_slots, final_result: Some(LastVotedForkSlotsAggregateFinal { slots_stake_map: expected_slots_stake_map, - total_active_stake: 1600, + // We are simulating 5% joined LastVotedForkSlots but not HeaviestFork. + total_active_stake: total_active_stake_during_heaviest_fork + 100, }), }), my_heaviest_fork: Some(HeaviestForkRecord { @@ -1610,16 +1649,17 @@ mod tests { .unwrap() .bankhash .to_string(), - total_active_stake: 1500, + total_active_stake: total_active_stake_during_heaviest_fork, shred_version: SHRED_VERSION as u32, wallclock: 0, }), heaviest_fork_aggregate: Some(HeaviestForkAggregateRecord { received: expected_received_heaviest_fork, final_result: Some(HeaviestForkAggregateFinal { - total_active_stake: 1500, - total_active_stake_seen_supermajority: 1500, - total_active_stake_agreed_with_me: 1500, + total_active_stake: total_active_stake_during_heaviest_fork, + total_active_stake_seen_supermajority: + total_active_stake_during_heaviest_fork, + total_active_stake_agreed_with_me: total_active_stake_during_heaviest_fork, }), }), my_snapshot: Some(GenerateSnapshotRecord { @@ -2026,7 +2066,6 @@ mod tests { let mut last_voted_fork_slots_from_others = test_state.last_voted_fork_slots.clone(); last_voted_fork_slots_from_others.reverse(); last_voted_fork_slots_from_others.append(&mut expected_slots_to_repair.clone()); - // Skip the first 5 validators, because 0 is myself, we need 15 so it hits 80%. let progress = WenRestartProgress { state: RestartState::LastVotedForkSlots.into(), my_last_voted_fork_slots: Some(LastVotedForkSlotsRecord { @@ -2037,7 +2076,15 @@ mod tests { }), ..Default::default() }; - for keypairs in test_state.validator_voting_keypairs.iter().skip(5) { + let validators_to_take: usize = + (WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT * TOTAL_VALIDATOR_COUNT as u64 / 100 - 1) + .try_into() + .unwrap(); + for keypairs in test_state + .validator_voting_keypairs + .iter() + .take(validators_to_take) + { let wen_restart_proto_path_clone = test_state.wen_restart_proto_path.clone(); let cluster_info_clone = test_state.cluster_info.clone(); let bank_forks_clone = test_state.bank_forks.clone(); @@ -2051,7 +2098,7 @@ mod tests { .spawn(move || { assert!(aggregate_restart_last_voted_fork_slots( &wen_restart_proto_path_clone, - 80, + WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT, cluster_info_clone, &last_voted_fork_slots, bank_forks_clone, @@ -2485,25 +2532,157 @@ mod tests { ); } + fn start_aggregate_heaviest_fork_thread( + test_state: &WenRestartTestInitResult, + heaviest_fork_slot: Slot, + heaviest_fork_bankhash: Hash, + exit: Arc, + expected_error: Option, + ) -> std::thread::JoinHandle<()> { + let progress = wen_restart_proto::WenRestartProgress { + state: RestartState::HeaviestFork.into(), + my_heaviest_fork: Some(HeaviestForkRecord { + slot: heaviest_fork_slot, + bankhash: heaviest_fork_bankhash.to_string(), + total_active_stake: WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT + .saturating_mul(TOTAL_VALIDATOR_COUNT as u64), + shred_version: SHRED_VERSION as u32, + wallclock: 0, + }), + ..Default::default() + }; + let wen_restart_path = test_state.wen_restart_proto_path.clone(); + let cluster_info = test_state.cluster_info.clone(); + let bank_forks = test_state.bank_forks.clone(); + Builder::new() + .name("solana-wen-restart-aggregate-heaviest-fork".to_string()) + .spawn(move || { + let result = aggregate_restart_heaviest_fork( + &wen_restart_path, + WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT, + cluster_info, + bank_forks, + exit, + &mut progress.clone(), + ); + if let Some(expected_error) = expected_error { + assert_eq!( + result.unwrap_err().downcast::().unwrap(), + expected_error + ); + } else { + assert!(result.is_ok()); + } + }) + .unwrap() + } + + #[test] + fn test_aggregate_heaviest_fork_send_gossip_early() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let test_state = wen_restart_test_init(&ledger_path); + let heaviest_fork_slot = test_state.last_voted_fork_slots[0] + 3; + let heaviest_fork_bankhash = Hash::new_unique(); + + let mut cursor = solana_gossip::crds::Cursor::default(); + // clear the heaviest fork queue so we make sure a new HeaviestFork is sent out later. + let _ = test_state + .cluster_info + .get_restart_heaviest_fork(&mut cursor); + + let exit = Arc::new(AtomicBool::new(false)); + let thread = start_aggregate_heaviest_fork_thread( + &test_state, + heaviest_fork_slot, + heaviest_fork_bankhash, + exit.clone(), + Some(WenRestartError::Exiting), + ); + // Simulating everyone sending out the first RestartHeaviestFork message, Gossip propagation takes + // time, so the observed_stake is probably smaller than actual active stake. We should send out + // heaviest fork indicating we have active stake exceeding supermajority. + let validators_to_take: usize = ((WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT + - NON_CONFORMING_VALIDATOR_PERCENT) + * TOTAL_VALIDATOR_COUNT as u64 + / 100 + - 1) + .try_into() + .unwrap(); + for keypair in test_state + .validator_voting_keypairs + .iter() + .take(validators_to_take) + { + let node_pubkey = keypair.node_keypair.pubkey(); + let node = ContactInfo::new_rand(&mut rand::thread_rng(), Some(node_pubkey)); + let now = timestamp(); + push_restart_heaviest_fork( + test_state.cluster_info.clone(), + &node, + heaviest_fork_slot, + &heaviest_fork_bankhash, + 100, + &keypair.node_keypair, + now, + ); + } + let my_pubkey = test_state.cluster_info.id(); + let mut found_myself = false; + let expected_active_stake = (WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT + - NON_CONFORMING_VALIDATOR_PERCENT) + * TOTAL_VALIDATOR_COUNT as u64; + while !found_myself { + sleep(Duration::from_millis(100)); + test_state.cluster_info.flush_push_queue(); + for gossip_record in test_state + .cluster_info + .get_restart_heaviest_fork(&mut cursor) + { + if gossip_record.from == my_pubkey + && gossip_record.observed_stake == expected_active_stake + { + found_myself = true; + break; + } + } + } + exit.store(true, Ordering::Relaxed); + assert!(thread.join().is_ok()); + } + #[test] fn test_aggregate_heaviest_fork() { let ledger_path = get_tmp_ledger_path_auto_delete!(); let test_state = wen_restart_test_init(&ledger_path); let heaviest_fork_slot = test_state.last_voted_fork_slots[0] + 3; let heaviest_fork_bankhash = Hash::new_unique(); + let expected_active_stake = (WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT + - NON_CONFORMING_VALIDATOR_PERCENT) + * TOTAL_VALIDATOR_COUNT as u64; let progress = wen_restart_proto::WenRestartProgress { state: RestartState::HeaviestFork.into(), my_heaviest_fork: Some(HeaviestForkRecord { slot: heaviest_fork_slot, bankhash: heaviest_fork_bankhash.to_string(), - total_active_stake: 1500, + total_active_stake: expected_active_stake, shred_version: SHRED_VERSION as u32, wallclock: 0, }), ..Default::default() }; let different_bankhash = Hash::new_unique(); - for keypair in test_state.validator_voting_keypairs.iter().skip(6) { + let validators_to_take: usize = ((WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT + - NON_CONFORMING_VALIDATOR_PERCENT) + * TOTAL_VALIDATOR_COUNT as u64 + / 100 + - 1) + .try_into() + .unwrap(); + for keypair in test_state + .validator_voting_keypairs + .iter() + .take(validators_to_take) + { let node_pubkey = keypair.node_keypair.pubkey(); let node = ContactInfo::new_rand(&mut rand::thread_rng(), Some(node_pubkey)); let now = timestamp(); @@ -2512,7 +2691,7 @@ mod tests { &node, heaviest_fork_slot, &different_bankhash, - 1500, + expected_active_stake, &keypair.node_keypair, now, ); @@ -2523,7 +2702,7 @@ mod tests { assert_eq!( aggregate_restart_heaviest_fork( &test_state.wen_restart_proto_path, - 80, + WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT, test_state.cluster_info.clone(), test_state.bank_forks.clone(), Arc::new(AtomicBool::new(false)), @@ -2539,7 +2718,15 @@ mod tests { ), ); // If we have enough stake agreeing with us, we should be able to aggregate the heaviest fork. - for keypair in test_state.validator_voting_keypairs.iter().skip(6) { + let validators_to_take: usize = + (WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT * TOTAL_VALIDATOR_COUNT as u64 / 100 - 1) + .try_into() + .unwrap(); + for keypair in test_state + .validator_voting_keypairs + .iter() + .take(validators_to_take) + { let node_pubkey = keypair.node_keypair.pubkey(); let node = ContactInfo::new_rand(&mut rand::thread_rng(), Some(node_pubkey)); let now = timestamp(); @@ -2548,14 +2735,14 @@ mod tests { &node, heaviest_fork_slot, &heaviest_fork_bankhash, - 1500, + expected_active_stake, &keypair.node_keypair, now, ); } assert!(aggregate_restart_heaviest_fork( &test_state.wen_restart_proto_path, - 80, + WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT, test_state.cluster_info.clone(), test_state.bank_forks.clone(), Arc::new(AtomicBool::new(false)),