Skip to content

Commit

Permalink
wen_restart: Make validator restart and wait for supermajority after … (
Browse files Browse the repository at this point in the history
anza-xyz#1335)

* wen_restart: Make validator restart and wait for supermajority after Phase One completes.

* Fix cargo sort error.

* Adding two unittests.

* Change function name and return type.

* Don't change wait_for_supermajority, try to re-initialzie validator after wen_restart phase one.

* Fix a bad merge and fix test.

* Make validator exit if wen_restart is finished.

* Remove unnecessary dependencies from Cargo.toml

* Remove unused function.

* Fix tests to check OK() is returned when in DONE state.

* Add wen_restart_proto_path if specified in command line.

* Fix command line name for wen_restart.

* Log how much stake is required to exit.

* Don't double count my own RestartLastVotedForkSlots.

* Ignore HeaviestFork from myself.

* Also send out HeaviestForkAggregate if I saw supermajority for the first
time.

* Forbid replay if wen_restart is in progress.

* We still need the new bank part.

* Try not grabbing a big write lock on bankforks.

* Should read GenerateSnapshot in initialize as well.

* Skip all replay stages while in wen_restart.

* Root banks if necessary to send EAH request.

* Root banks every 100 slot.

* Do not start replay thread if in wen_restart.

* Do not need to check in_wen_restart inside replay_stage any more.

* Fix failed merge.

* Make linter happy.

* Fix the bad merge after switching to anyhow.

* Remove unused code.

* Fix bad merge.

* Remove unnecessary clone.

* Remove unused map_error.

* No need to specify snapshot_path in restart commandline.

* Small fixes.

* Split the enabling of wen_restart into another PR.
  • Loading branch information
wen-coding authored and ray-kast committed Nov 27, 2024
1 parent 66747c4 commit fbdf742
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 20 deletions.
22 changes: 15 additions & 7 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use {
tpu::DEFAULT_TPU_COALESCE,
validator::{
is_snapshot_config_valid, BlockProductionMethod, BlockVerificationMethod, Validator,
ValidatorConfig, ValidatorStartProgress,
ValidatorConfig, ValidatorError, ValidatorStartProgress,
},
},
solana_gossip::{
Expand Down Expand Up @@ -2045,7 +2045,7 @@ pub fn main() {
// the one pushed by bootstrap.
node.info.hot_swap_pubkey(identity_keypair.pubkey());

let validator = Validator::new(
let validator = match Validator::new(
node,
identity_keypair,
&ledger_path,
Expand All @@ -2062,11 +2062,19 @@ pub fn main() {
tpu_enable_udp,
tpu_max_connections_per_ipaddr_per_minute,
admin_service_post_init,
)
.unwrap_or_else(|e| {
error!("Failed to start validator: {:?}", e);
exit(1);
});
) {
Ok(validator) => validator,
Err(err) => match err.downcast_ref() {
Some(ValidatorError::WenRestartFinished) => {
error!("Please remove --wen_restart and use --wait_for_supermajority as instructed above");
exit(200);
}
_ => {
error!("Failed to start validator: {:?}", err);
exit(1);
}
},
};

if let Some(filename) = init_complete_file {
File::create(filename).unwrap_or_else(|_| {
Expand Down
154 changes: 141 additions & 13 deletions wen-restart/src/wen_restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub enum WenRestartError {
MalformedProgress(RestartState, String),
MissingLastVotedForkSlots,
MissingFullSnapshot(String),
MissingSnapshotInProtobuf,
NotEnoughStakeAgreeingWithUs(Slot, Hash, HashMap<(Slot, Hash), u64>),
UnexpectedState(wen_restart_proto::State),
}
Expand Down Expand Up @@ -148,6 +149,9 @@ impl std::fmt::Display for WenRestartError {
WenRestartError::MissingFullSnapshot(directory) => {
write!(f, "Missing full snapshot, please check whether correct directory is supplied {directory}")
}
WenRestartError::MissingSnapshotInProtobuf => {
write!(f, "Missing snapshot in protobuf")
}
WenRestartError::NotEnoughStakeAgreeingWithUs(slot, hash, block_stake_map) => {
write!(
f,
Expand Down Expand Up @@ -188,7 +192,11 @@ pub(crate) enum WenRestartProgressInternalState {
new_root_slot: Slot,
my_snapshot: Option<GenerateSnapshotRecord>,
},
Done,
Done {
slot: Slot,
hash: Hash,
shred_version: u16,
},
}

pub(crate) fn send_restart_last_voted_fork_slots(
Expand Down Expand Up @@ -719,9 +727,10 @@ pub(crate) fn aggregate_restart_heaviest_fork(
let total_active_stake_seen_supermajority =
heaviest_fork_aggregate.total_active_stake_seen_supermajority();
info!(
"Total active stake seeing supermajority: {} Total active stake: {} Total stake {}",
"Total active stake seeing supermajority: {} Total active stake: {} Required to exit {} Total stake {}",
total_active_stake_seen_supermajority,
heaviest_fork_aggregate.total_active_stake(),
majority_stake_required,
total_stake
);
let can_exit = total_active_stake_seen_supermajority >= majority_stake_required;
Expand Down Expand Up @@ -833,6 +842,7 @@ pub(crate) fn aggregate_restart_heaviest_fork(
}
}

#[derive(Clone)]
pub struct WenRestartConfig {
pub wen_restart_path: PathBuf,
pub last_vote: VoteTransaction,
Expand Down Expand Up @@ -953,7 +963,20 @@ pub fn wait_for_wen_restart(config: WenRestartConfig) -> Result<()> {
my_snapshot: Some(snapshot_record),
}
}
WenRestartProgressInternalState::Done => return Ok(()),
// Proceed to restart if we are ready to wait for supermajority.
WenRestartProgressInternalState::Done {
slot,
hash,
shred_version,
} => {
error!(
"Wen start finished, please remove --wen_restart and restart with \
--wait-for-supermajority {} --expected-bank-hash {} --shred-version {}\
--hard-fork {} --no-snapshot-fetchsnapshot",
slot, hash, shred_version, slot
);
return Ok(());
}
};
state = increment_and_write_wen_restart_records(
&config.wen_restart_path,
Expand Down Expand Up @@ -1038,12 +1061,16 @@ pub(crate) fn increment_and_write_wen_restart_records(
if let Some(my_snapshot) = my_snapshot {
progress.set_state(RestartState::Done);
progress.my_snapshot = Some(my_snapshot.clone());
WenRestartProgressInternalState::Done
WenRestartProgressInternalState::Done {
slot: my_snapshot.slot,
hash: Hash::from_str(&my_snapshot.bankhash).unwrap(),
shred_version: my_snapshot.shred_version as u16,
}
} else {
return Err(WenRestartError::UnexpectedState(RestartState::Done).into());
return Err(WenRestartError::MissingSnapshotInProtobuf.into());
}
}
WenRestartProgressInternalState::Done => {
WenRestartProgressInternalState::Done { .. } => {
return Err(WenRestartError::UnexpectedState(RestartState::Done).into())
}
};
Expand Down Expand Up @@ -1077,7 +1104,20 @@ pub(crate) fn initialize(
}
};
match progress.state() {
RestartState::Done => Ok((WenRestartProgressInternalState::Done, progress)),
RestartState::Done => {
if let Some(my_snapshot) = progress.my_snapshot.as_ref() {
Ok((
WenRestartProgressInternalState::Done {
slot: my_snapshot.slot,
hash: Hash::from_str(&my_snapshot.bankhash).unwrap(),
shred_version: my_snapshot.shred_version as u16,
},
progress,
))
} else {
Err(WenRestartError::MissingSnapshotInProtobuf.into())
}
}
RestartState::Init => {
let last_voted_fork_slots;
let last_vote_bankhash;
Expand Down Expand Up @@ -1515,7 +1555,6 @@ mod tests {

#[test]
fn test_wen_restart_normal_flow() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let wen_restart_repair_slots = Some(Arc::new(RwLock::new(Vec::new())));
let test_state = wen_restart_test_init(&ledger_path);
Expand Down Expand Up @@ -2219,8 +2258,29 @@ mod tests {
progress,
)
);
let last_vote_slot = test_state.last_voted_fork_slots[0];
let snapshot_slot_hash = Hash::new_unique();
let progress = WenRestartProgress {
state: RestartState::Done.into(),
my_last_voted_fork_slots: Some(LastVotedForkSlotsRecord {
last_voted_fork_slots: test_state.last_voted_fork_slots.clone(),
last_vote_bankhash: last_vote_bankhash.to_string(),
shred_version: SHRED_VERSION as u32,
wallclock: 0,
}),
my_heaviest_fork: Some(HeaviestForkRecord {
slot: last_vote_slot,
bankhash: snapshot_slot_hash.to_string(),
total_active_stake: 0,
shred_version: SHRED_VERSION as u32,
wallclock: 0,
}),
my_snapshot: Some(GenerateSnapshotRecord {
slot: last_vote_slot,
bankhash: snapshot_slot_hash.to_string(),
shred_version: SHRED_VERSION as u32,
path: "/path/to/snapshot".to_string(),
}),
..Default::default()
};
assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress,).is_ok());
Expand All @@ -2231,7 +2291,14 @@ mod tests {
test_state.blockstore.clone()
)
.unwrap(),
(WenRestartProgressInternalState::Done, progress)
(
WenRestartProgressInternalState::Done {
slot: last_vote_slot,
hash: snapshot_slot_hash,
shred_version: SHRED_VERSION,
},
progress
)
);
}

Expand Down Expand Up @@ -2500,11 +2567,13 @@ mod tests {
shred_version: SHRED_VERSION as u32,
wallclock: 0,
});
let my_bankhash = Hash::new_unique();
let new_shred_version = SHRED_VERSION + 57;
let my_snapshot = Some(GenerateSnapshotRecord {
slot: 1,
bankhash: Hash::new_unique().to_string(),
bankhash: my_bankhash.to_string(),
path: "snapshot_1".to_string(),
shred_version: SHRED_VERSION as u32,
shred_version: new_shred_version as u32,
});
let heaviest_fork_aggregate = Some(HeaviestForkAggregateRecord {
received: HashMap::new(),
Expand Down Expand Up @@ -2637,7 +2706,11 @@ mod tests {
new_root_slot: 1,
my_snapshot: my_snapshot.clone(),
},
WenRestartProgressInternalState::Done,
WenRestartProgressInternalState::Done {
slot: 1,
hash: my_bankhash,
shred_version: new_shred_version,
},
WenRestartProgress {
state: RestartState::HeaviestFork.into(),
my_last_voted_fork_slots: my_last_voted_fork_slots.clone(),
Expand Down Expand Up @@ -2675,7 +2748,11 @@ mod tests {
assert_eq!(
increment_and_write_wen_restart_records(
&wen_restart_proto_path,
WenRestartProgressInternalState::Done,
WenRestartProgressInternalState::Done {
slot: 1,
hash: my_bankhash,
shred_version: new_shred_version,
},
&mut progress
)
.unwrap_err()
Expand Down Expand Up @@ -3242,4 +3319,55 @@ mod tests {
WenRestartError::BlockNotFound(empty_slot),
);
}

#[test]
fn test_return_ok_after_wait_is_done() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let test_state = wen_restart_test_init(&ledger_path);
let last_vote_slot = test_state.last_voted_fork_slots[0];
let last_vote_bankhash = Hash::new_unique();
let config = WenRestartConfig {
wen_restart_path: test_state.wen_restart_proto_path.clone(),
last_vote: VoteTransaction::from(Vote::new(vec![last_vote_slot], last_vote_bankhash)),
blockstore: test_state.blockstore.clone(),
cluster_info: test_state.cluster_info.clone(),
bank_forks: test_state.bank_forks.clone(),
wen_restart_repair_slots: Some(Arc::new(RwLock::new(Vec::new()))),
wait_for_supermajority_threshold_percent: 80,
snapshot_config: SnapshotConfig::default(),
accounts_background_request_sender: AbsRequestSender::default(),
genesis_config_hash: test_state.genesis_config_hash,
exit: Arc::new(AtomicBool::new(false)),
};
assert!(write_wen_restart_records(
&test_state.wen_restart_proto_path,
&WenRestartProgress {
state: RestartState::Done.into(),
..Default::default()
}
)
.is_ok());
assert_eq!(
wait_for_wen_restart(config.clone())
.unwrap_err()
.downcast::<WenRestartError>()
.unwrap(),
WenRestartError::MissingSnapshotInProtobuf
);
assert!(write_wen_restart_records(
&test_state.wen_restart_proto_path,
&WenRestartProgress {
state: RestartState::Done.into(),
my_snapshot: Some(GenerateSnapshotRecord {
slot: 0,
bankhash: Hash::new_unique().to_string(),
shred_version: SHRED_VERSION as u32,
path: "snapshot".to_string(),
}),
..Default::default()
}
)
.is_ok());
assert!(wait_for_wen_restart(config).is_ok());
}
}

0 comments on commit fbdf742

Please sign in to comment.