From 38404a6ca0c07d6108ed5c3bb552d6b9468442c2 Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Wed, 16 Oct 2024 15:15:52 -0700 Subject: [PATCH] Make non-coordinator listen to HeaviestFork from coordinator. (#3125) * Make non-coordinator listen to HeaviestFork from coordinator. * Add a sleep in the receive loop. --- wen-restart/proto/wen_restart.proto | 5 +- wen-restart/src/wen_restart.rs | 171 ++++++++++++++++++++++++---- 2 files changed, 151 insertions(+), 25 deletions(-) diff --git a/wen-restart/proto/wen_restart.proto b/wen-restart/proto/wen_restart.proto index b32ca5f6537283..fc392f9882bd72 100644 --- a/wen-restart/proto/wen_restart.proto +++ b/wen-restart/proto/wen_restart.proto @@ -70,6 +70,7 @@ message WenRestartProgress { optional LastVotedForkSlotsAggregateRecord last_voted_fork_slots_aggregate = 3; optional HeaviestForkRecord my_heaviest_fork = 4; optional HeaviestForkAggregateRecord heaviest_fork_aggregate = 5; - optional GenerateSnapshotRecord my_snapshot = 6; - map conflict_message = 7; + optional HeaviestForkRecord coordinator_heaviest_fork = 6; + optional GenerateSnapshotRecord my_snapshot = 7; + map conflict_message = 8; } diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index c2e8cf5191b1c2..fe1a9006817664 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -188,6 +188,7 @@ pub(crate) enum WenRestartProgressInternalState { }, HeaviestFork { new_root_slot: Slot, + new_root_hash: Hash, }, GenerateSnapshot { new_root_slot: Slot, @@ -871,6 +872,39 @@ pub(crate) fn aggregate_restart_heaviest_fork( } } +pub(crate) fn receive_restart_heaviest_fork( + wen_restart_coordinator: Pubkey, + cluster_info: Arc, + exit: Arc, + progress: &mut WenRestartProgress, +) -> Result<(Slot, Hash)> { + let mut cursor = solana_gossip::crds::Cursor::default(); + loop { + if exit.load(Ordering::Relaxed) { + return Err(WenRestartError::Exiting.into()); + } + for new_heaviest_fork in cluster_info.get_restart_heaviest_fork(&mut cursor) { + if new_heaviest_fork.from == wen_restart_coordinator { + info!( + "Received new heaviest fork from coordinator: {} {:?}", + wen_restart_coordinator, new_heaviest_fork + ); + let coordinator_heaviest_slot = new_heaviest_fork.last_slot; + let coordinator_heaviest_hash = new_heaviest_fork.last_slot_hash; + progress.coordinator_heaviest_fork = Some(HeaviestForkRecord { + slot: coordinator_heaviest_slot, + bankhash: coordinator_heaviest_hash.to_string(), + total_active_stake: 0, + wallclock: new_heaviest_fork.wallclock, + shred_version: new_heaviest_fork.shred_version as u32, + }); + return Ok((coordinator_heaviest_slot, coordinator_heaviest_hash)); + } + } + sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); + } +} + #[derive(Clone)] pub struct WenRestartConfig { pub wen_restart_path: PathBuf, @@ -963,16 +997,40 @@ pub fn wait_for_wen_restart(config: WenRestartConfig) -> Result<()> { my_heaviest_fork: Some(heaviest_fork), } } - WenRestartProgressInternalState::HeaviestFork { new_root_slot } => { - aggregate_restart_heaviest_fork( - &config.wen_restart_path, - config.wait_for_supermajority_threshold_percent, - config.cluster_info.clone(), - config.bank_forks.clone(), - config.exit.clone(), - &mut progress, - )?; - WenRestartProgressInternalState::HeaviestFork { new_root_slot } + WenRestartProgressInternalState::HeaviestFork { + new_root_slot, + new_root_hash, + } => { + if config.cluster_info.id() == config.wen_restart_coordinator { + config + .cluster_info + .push_restart_heaviest_fork(new_root_slot, new_root_hash, 0); + // TODO(wen): remove this aggregate. + aggregate_restart_heaviest_fork( + &config.wen_restart_path, + config.wait_for_supermajority_threshold_percent, + config.cluster_info.clone(), + config.bank_forks.clone(), + config.exit.clone(), + &mut progress, + )?; + WenRestartProgressInternalState::HeaviestFork { + new_root_slot, + new_root_hash, + } + } else { + let (coordinator_slot, coordinator_hash) = receive_restart_heaviest_fork( + config.wen_restart_coordinator, + config.cluster_info.clone(), + config.exit.clone(), + &mut progress, + )?; + // TODO(wen): add verification here. + WenRestartProgressInternalState::HeaviestFork { + new_root_slot: coordinator_slot, + new_root_hash: coordinator_hash, + } + } } WenRestartProgressInternalState::GenerateSnapshot { new_root_slot, @@ -1072,12 +1130,13 @@ pub(crate) fn increment_and_write_wen_restart_records( progress.my_heaviest_fork = Some(my_heaviest_fork.clone()); WenRestartProgressInternalState::HeaviestFork { new_root_slot: my_heaviest_fork.slot, + new_root_hash: Hash::from_str(&my_heaviest_fork.bankhash).unwrap(), } } else { return Err(WenRestartError::UnexpectedState(RestartState::HeaviestFork).into()); } } - WenRestartProgressInternalState::HeaviestFork { new_root_slot } => { + WenRestartProgressInternalState::HeaviestFork { new_root_slot, .. } => { progress.set_state(RestartState::GenerateSnapshot); WenRestartProgressInternalState::GenerateSnapshot { new_root_slot, @@ -1812,25 +1871,28 @@ mod tests { .unwrap() .bankhash .to_string(), - total_active_stake: total_active_stake_during_heaviest_fork, + total_active_stake: 0, shred_version: SHRED_VERSION as u32, wallclock: 0, }), - heaviest_fork_aggregate: Some(HeaviestForkAggregateRecord { - received: expected_received_heaviest_fork, - final_result: Some(HeaviestForkAggregateFinal { - total_active_stake: total_active_stake_during_heaviest_fork, - total_active_stake_seen_supermajority: - total_active_stake_during_heaviest_fork, - total_active_stake_agreed_with_me: total_active_stake_during_heaviest_fork, - }), - }), + heaviest_fork_aggregate: None, my_snapshot: Some(GenerateSnapshotRecord { slot: expected_heaviest_fork_slot, bankhash: progress.my_snapshot.as_ref().unwrap().bankhash.clone(), shred_version: progress.my_snapshot.as_ref().unwrap().shred_version, path: progress.my_snapshot.as_ref().unwrap().path.clone(), }), + coordinator_heaviest_fork: Some(HeaviestForkRecord { + slot: expected_heaviest_fork_slot, + bankhash: expected_heaviest_fork_bankhash.to_string(), + total_active_stake: 0, + shred_version: SHRED_VERSION as u32, + wallclock: progress + .coordinator_heaviest_fork + .as_ref() + .unwrap() + .wallclock, + }), ..Default::default() } ); @@ -2703,7 +2765,10 @@ mod tests { wallclock: 0, }), }, - WenRestartProgressInternalState::HeaviestFork { new_root_slot: 1 }, + WenRestartProgressInternalState::HeaviestFork { + new_root_slot: 1, + new_root_hash: Hash::default(), + }, WenRestartProgress { state: RestartState::HeaviestFork.into(), my_last_voted_fork_slots: my_last_voted_fork_slots.clone(), @@ -2719,7 +2784,10 @@ mod tests { }, ), ( - WenRestartProgressInternalState::HeaviestFork { new_root_slot: 1 }, + WenRestartProgressInternalState::HeaviestFork { + new_root_slot: 1, + new_root_hash: Hash::default(), + }, WenRestartProgressInternalState::GenerateSnapshot { new_root_slot: 1, my_snapshot: None, @@ -3428,4 +3496,61 @@ mod tests { .is_ok()); assert!(wait_for_wen_restart(config).is_ok()); } + + #[test] + fn test_receive_restart_heaviest_fork() { + let mut rng = rand::thread_rng(); + let coordinator_keypair = Keypair::new(); + let node_keypair = Arc::new(Keypair::new()); + let cluster_info = Arc::new(ClusterInfo::new( + { + let mut contact_info = + ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()); + contact_info.set_shred_version(SHRED_VERSION); + contact_info + }, + node_keypair.clone(), + SocketAddrSpace::Unspecified, + )); + let exit = Arc::new(AtomicBool::new(false)); + let random_keypair = Keypair::new(); + let random_node = ContactInfo::new_rand(&mut rng, Some(random_keypair.pubkey())); + let random_slot = 3; + let random_hash = Hash::new_unique(); + push_restart_heaviest_fork( + cluster_info.clone(), + &random_node, + random_slot, + &random_hash, + 0, + &random_keypair, + timestamp(), + ); + let coordinator_node = ContactInfo::new_rand(&mut rng, Some(coordinator_keypair.pubkey())); + let coordinator_slot = 6; + let coordinator_hash = Hash::new_unique(); + push_restart_heaviest_fork( + cluster_info.clone(), + &coordinator_node, + coordinator_slot, + &coordinator_hash, + 0, + &coordinator_keypair, + timestamp(), + ); + let mut progress = WenRestartProgress { + state: RestartState::HeaviestFork.into(), + ..Default::default() + }; + assert_eq!( + receive_restart_heaviest_fork( + coordinator_keypair.pubkey(), + cluster_info, + exit, + &mut progress + ) + .unwrap(), + (coordinator_slot, coordinator_hash) + ); + } }