Skip to content

Commit

Permalink
Make non-coordinator listen to HeaviestFork from coordinator. (#3125)
Browse files Browse the repository at this point in the history
* Make non-coordinator listen to HeaviestFork from coordinator.

* Add a sleep in the receive loop.
  • Loading branch information
wen-coding authored Oct 16, 2024
1 parent 5ab9e26 commit 38404a6
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 25 deletions.
5 changes: 3 additions & 2 deletions wen-restart/proto/wen_restart.proto
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ message WenRestartProgress {
optional LastVotedForkSlotsAggregateRecord last_voted_fork_slots_aggregate = 3;
optional HeaviestForkRecord my_heaviest_fork = 4;
optional HeaviestForkAggregateRecord heaviest_fork_aggregate = 5;
optional GenerateSnapshotRecord my_snapshot = 6;
map<string, ConflictMessage> conflict_message = 7;
optional HeaviestForkRecord coordinator_heaviest_fork = 6;
optional GenerateSnapshotRecord my_snapshot = 7;
map<string, ConflictMessage> conflict_message = 8;
}
171 changes: 148 additions & 23 deletions wen-restart/src/wen_restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ pub(crate) enum WenRestartProgressInternalState {
},
HeaviestFork {
new_root_slot: Slot,
new_root_hash: Hash,
},
GenerateSnapshot {
new_root_slot: Slot,
Expand Down Expand Up @@ -871,6 +872,39 @@ pub(crate) fn aggregate_restart_heaviest_fork(
}
}

pub(crate) fn receive_restart_heaviest_fork(
wen_restart_coordinator: Pubkey,
cluster_info: Arc<ClusterInfo>,
exit: Arc<AtomicBool>,
progress: &mut WenRestartProgress,
) -> Result<(Slot, Hash)> {
let mut cursor = solana_gossip::crds::Cursor::default();
loop {
if exit.load(Ordering::Relaxed) {
return Err(WenRestartError::Exiting.into());
}
for new_heaviest_fork in cluster_info.get_restart_heaviest_fork(&mut cursor) {
if new_heaviest_fork.from == wen_restart_coordinator {
info!(
"Received new heaviest fork from coordinator: {} {:?}",
wen_restart_coordinator, new_heaviest_fork
);
let coordinator_heaviest_slot = new_heaviest_fork.last_slot;
let coordinator_heaviest_hash = new_heaviest_fork.last_slot_hash;
progress.coordinator_heaviest_fork = Some(HeaviestForkRecord {
slot: coordinator_heaviest_slot,
bankhash: coordinator_heaviest_hash.to_string(),
total_active_stake: 0,
wallclock: new_heaviest_fork.wallclock,
shred_version: new_heaviest_fork.shred_version as u32,
});
return Ok((coordinator_heaviest_slot, coordinator_heaviest_hash));
}
}
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
}
}

#[derive(Clone)]
pub struct WenRestartConfig {
pub wen_restart_path: PathBuf,
Expand Down Expand Up @@ -963,16 +997,40 @@ pub fn wait_for_wen_restart(config: WenRestartConfig) -> Result<()> {
my_heaviest_fork: Some(heaviest_fork),
}
}
WenRestartProgressInternalState::HeaviestFork { new_root_slot } => {
aggregate_restart_heaviest_fork(
&config.wen_restart_path,
config.wait_for_supermajority_threshold_percent,
config.cluster_info.clone(),
config.bank_forks.clone(),
config.exit.clone(),
&mut progress,
)?;
WenRestartProgressInternalState::HeaviestFork { new_root_slot }
WenRestartProgressInternalState::HeaviestFork {
new_root_slot,
new_root_hash,
} => {
if config.cluster_info.id() == config.wen_restart_coordinator {
config
.cluster_info
.push_restart_heaviest_fork(new_root_slot, new_root_hash, 0);
// TODO(wen): remove this aggregate.
aggregate_restart_heaviest_fork(
&config.wen_restart_path,
config.wait_for_supermajority_threshold_percent,
config.cluster_info.clone(),
config.bank_forks.clone(),
config.exit.clone(),
&mut progress,
)?;
WenRestartProgressInternalState::HeaviestFork {
new_root_slot,
new_root_hash,
}
} else {
let (coordinator_slot, coordinator_hash) = receive_restart_heaviest_fork(
config.wen_restart_coordinator,
config.cluster_info.clone(),
config.exit.clone(),
&mut progress,
)?;
// TODO(wen): add verification here.
WenRestartProgressInternalState::HeaviestFork {
new_root_slot: coordinator_slot,
new_root_hash: coordinator_hash,
}
}
}
WenRestartProgressInternalState::GenerateSnapshot {
new_root_slot,
Expand Down Expand Up @@ -1072,12 +1130,13 @@ pub(crate) fn increment_and_write_wen_restart_records(
progress.my_heaviest_fork = Some(my_heaviest_fork.clone());
WenRestartProgressInternalState::HeaviestFork {
new_root_slot: my_heaviest_fork.slot,
new_root_hash: Hash::from_str(&my_heaviest_fork.bankhash).unwrap(),
}
} else {
return Err(WenRestartError::UnexpectedState(RestartState::HeaviestFork).into());
}
}
WenRestartProgressInternalState::HeaviestFork { new_root_slot } => {
WenRestartProgressInternalState::HeaviestFork { new_root_slot, .. } => {
progress.set_state(RestartState::GenerateSnapshot);
WenRestartProgressInternalState::GenerateSnapshot {
new_root_slot,
Expand Down Expand Up @@ -1812,25 +1871,28 @@ mod tests {
.unwrap()
.bankhash
.to_string(),
total_active_stake: total_active_stake_during_heaviest_fork,
total_active_stake: 0,
shred_version: SHRED_VERSION as u32,
wallclock: 0,
}),
heaviest_fork_aggregate: Some(HeaviestForkAggregateRecord {
received: expected_received_heaviest_fork,
final_result: Some(HeaviestForkAggregateFinal {
total_active_stake: total_active_stake_during_heaviest_fork,
total_active_stake_seen_supermajority:
total_active_stake_during_heaviest_fork,
total_active_stake_agreed_with_me: total_active_stake_during_heaviest_fork,
}),
}),
heaviest_fork_aggregate: None,
my_snapshot: Some(GenerateSnapshotRecord {
slot: expected_heaviest_fork_slot,
bankhash: progress.my_snapshot.as_ref().unwrap().bankhash.clone(),
shred_version: progress.my_snapshot.as_ref().unwrap().shred_version,
path: progress.my_snapshot.as_ref().unwrap().path.clone(),
}),
coordinator_heaviest_fork: Some(HeaviestForkRecord {
slot: expected_heaviest_fork_slot,
bankhash: expected_heaviest_fork_bankhash.to_string(),
total_active_stake: 0,
shred_version: SHRED_VERSION as u32,
wallclock: progress
.coordinator_heaviest_fork
.as_ref()
.unwrap()
.wallclock,
}),
..Default::default()
}
);
Expand Down Expand Up @@ -2703,7 +2765,10 @@ mod tests {
wallclock: 0,
}),
},
WenRestartProgressInternalState::HeaviestFork { new_root_slot: 1 },
WenRestartProgressInternalState::HeaviestFork {
new_root_slot: 1,
new_root_hash: Hash::default(),
},
WenRestartProgress {
state: RestartState::HeaviestFork.into(),
my_last_voted_fork_slots: my_last_voted_fork_slots.clone(),
Expand All @@ -2719,7 +2784,10 @@ mod tests {
},
),
(
WenRestartProgressInternalState::HeaviestFork { new_root_slot: 1 },
WenRestartProgressInternalState::HeaviestFork {
new_root_slot: 1,
new_root_hash: Hash::default(),
},
WenRestartProgressInternalState::GenerateSnapshot {
new_root_slot: 1,
my_snapshot: None,
Expand Down Expand Up @@ -3428,4 +3496,61 @@ mod tests {
.is_ok());
assert!(wait_for_wen_restart(config).is_ok());
}

#[test]
fn test_receive_restart_heaviest_fork() {
let mut rng = rand::thread_rng();
let coordinator_keypair = Keypair::new();
let node_keypair = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(
{
let mut contact_info =
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp());
contact_info.set_shred_version(SHRED_VERSION);
contact_info
},
node_keypair.clone(),
SocketAddrSpace::Unspecified,
));
let exit = Arc::new(AtomicBool::new(false));
let random_keypair = Keypair::new();
let random_node = ContactInfo::new_rand(&mut rng, Some(random_keypair.pubkey()));
let random_slot = 3;
let random_hash = Hash::new_unique();
push_restart_heaviest_fork(
cluster_info.clone(),
&random_node,
random_slot,
&random_hash,
0,
&random_keypair,
timestamp(),
);
let coordinator_node = ContactInfo::new_rand(&mut rng, Some(coordinator_keypair.pubkey()));
let coordinator_slot = 6;
let coordinator_hash = Hash::new_unique();
push_restart_heaviest_fork(
cluster_info.clone(),
&coordinator_node,
coordinator_slot,
&coordinator_hash,
0,
&coordinator_keypair,
timestamp(),
);
let mut progress = WenRestartProgress {
state: RestartState::HeaviestFork.into(),
..Default::default()
};
assert_eq!(
receive_restart_heaviest_fork(
coordinator_keypair.pubkey(),
cluster_info,
exit,
&mut progress
)
.unwrap(),
(coordinator_slot, coordinator_hash)
);
}
}

0 comments on commit 38404a6

Please sign in to comment.