From d5faa6e8aaa87166f11e9ad8bda7339fb2f5bd36 Mon Sep 17 00:00:00 2001 From: carllin Date: Tue, 8 Aug 2023 19:29:39 -0400 Subject: [PATCH] Local Cluster Duplicate Switch Test (#32614) * Add test for broken behavior in same batch * tests * redo test * Important fixes to not immediately duplicate confirm by adding extra node * Fixup merge * PR comments * Redo stakes * clippy * fixes * Resolve conflicts * add thread logging * Fixup merge * Fixup bugs * Revert "add thread logging" This reverts commit 9dc22401054b8f91f2b2aa3033e482996913febb. * Hide scope * Fixes * Cleanup test_faulty_node * More fixes * Fixes * Error logging * Fix duplicate confirmed * done * PR comments * Revert "Error logging" This reverts commit 18953c36a5e865ecdd38bbf49b8d0502448087d2. * PR comments * nit --- ledger/src/blockstore.rs | 13 + local-cluster/src/cluster.rs | 3 + local-cluster/src/local_cluster.rs | 23 +- local-cluster/tests/common/mod.rs | 99 ++-- local-cluster/tests/local_cluster.rs | 502 +++++++++++++++++- turbine/src/broadcast_stage.rs | 2 +- .../broadcast_duplicates_run.rs | 63 ++- 7 files changed, 641 insertions(+), 64 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 6e0bdf46b0a603..5c794e0e9df173 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -3220,6 +3220,10 @@ impl Blockstore { self.dead_slots_cf.delete(slot) } + pub fn remove_slot_duplicate_proof(&self, slot: Slot) -> Result<()> { + self.duplicate_slots_cf.delete(slot) + } + pub fn store_duplicate_if_not_existing( &self, slot: Slot, @@ -3233,6 +3237,15 @@ impl Blockstore { } } + pub fn get_first_duplicate_proof(&self) -> Option<(Slot, DuplicateSlotProof)> { + let mut iter = self + .db + .iter::(IteratorMode::From(0, IteratorDirection::Forward)) + .unwrap(); + iter.next() + .map(|(slot, proof_bytes)| (slot, deserialize(&proof_bytes).unwrap())) + } + pub fn store_duplicate_slot(&self, slot: Slot, shred1: Vec, shred2: Vec) -> Result<()> { let duplicate_slot_proof = DuplicateSlotProof::new(shred1, shred2); self.duplicate_slots_cf.put(slot, &duplicate_slot_proof) diff --git a/local-cluster/src/cluster.rs b/local-cluster/src/cluster.rs index b5bcf658fb38e9..03ec1b7abe13f2 100644 --- a/local-cluster/src/cluster.rs +++ b/local-cluster/src/cluster.rs @@ -2,6 +2,7 @@ use { solana_client::thin_client::ThinClient, solana_core::validator::{Validator, ValidatorConfig}, solana_gossip::{cluster_info::Node, contact_info::ContactInfo}, + solana_ledger::shred::Shred, solana_sdk::{pubkey::Pubkey, signature::Keypair}, solana_streamer::socket::SocketAddrSpace, std::{path::PathBuf, sync::Arc}, @@ -62,4 +63,6 @@ pub trait Cluster { config: ValidatorConfig, socket_addr_space: SocketAddrSpace, ); + fn set_entry_point(&mut self, entry_point_info: ContactInfo); + fn send_shreds_to_validator(&self, dup_shreds: Vec<&Shred>, validator_key: &Pubkey); } diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index c066a52165da6f..0ff56d44199fa5 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -13,10 +13,10 @@ use { }, solana_gossip::{ cluster_info::Node, - contact_info::{ContactInfo, LegacyContactInfo}, + contact_info::{ContactInfo, LegacyContactInfo, Protocol}, gossip_service::discover_cluster, }, - solana_ledger::create_new_tmp_ledger, + solana_ledger::{create_new_tmp_ledger, shred::Shred}, solana_runtime::{ genesis_utils::{ create_genesis_config_with_vote_accounts_and_cluster_type, GenesisConfigInfo, @@ -57,6 +57,7 @@ use { collections::HashMap, io::{Error, ErrorKind, Result}, iter, + net::UdpSocket, path::{Path, PathBuf}, sync::{Arc, RwLock}, }, @@ -852,6 +853,10 @@ impl Cluster for LocalCluster { (node, entry_point_info) } + fn set_entry_point(&mut self, entry_point_info: ContactInfo) { + self.entry_point_info = entry_point_info; + } + fn restart_node( &mut self, pubkey: &Pubkey, @@ -922,6 +927,20 @@ impl Cluster for LocalCluster { fn get_contact_info(&self, pubkey: &Pubkey) -> Option<&ContactInfo> { self.validators.get(pubkey).map(|v| &v.info.contact_info) } + + fn send_shreds_to_validator(&self, dup_shreds: Vec<&Shred>, validator_key: &Pubkey) { + let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let validator_tvu = self + .get_contact_info(validator_key) + .unwrap() + .tvu(Protocol::UDP) + .unwrap(); + for shred in dup_shreds { + send_socket + .send_to(shred.payload().as_ref(), validator_tvu) + .unwrap(); + } + } } impl Drop for LocalCluster { diff --git a/local-cluster/tests/common/mod.rs b/local-cluster/tests/common/mod.rs index 15bc6fcc50207a..0926156277a3ff 100644 --- a/local-cluster/tests/common/mod.rs +++ b/local-cluster/tests/common/mod.rs @@ -77,6 +77,14 @@ pub fn restore_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option { Tower::restore(&file_tower_storage, node_pubkey).ok() } +pub fn remove_tower_if_exists(tower_path: &Path, node_pubkey: &Pubkey) { + let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf()); + let filename = file_tower_storage.filename(node_pubkey); + if filename.exists() { + fs::remove_file(file_tower_storage.filename(node_pubkey)).unwrap(); + } +} + pub fn remove_tower(tower_path: &Path, node_pubkey: &Pubkey) { let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf()); fs::remove_file(file_tower_storage.filename(node_pubkey)).unwrap(); @@ -120,17 +128,18 @@ pub fn purge_slots_with_count(blockstore: &Blockstore, start_slot: Slot, slot_co pub fn wait_for_last_vote_in_tower_to_land_in_ledger( ledger_path: &Path, node_pubkey: &Pubkey, -) -> Slot { - let (last_vote, _) = last_vote_in_tower(ledger_path, node_pubkey).unwrap(); - loop { - // We reopen in a loop to make sure we get updates - let blockstore = open_blockstore(ledger_path); - if blockstore.is_full(last_vote) { - break; +) -> Option { + last_vote_in_tower(ledger_path, node_pubkey).map(|(last_vote, _)| { + loop { + // We reopen in a loop to make sure we get updates + let blockstore = open_blockstore(ledger_path); + if blockstore.is_full(last_vote) { + break; + } + sleep(Duration::from_millis(100)); } - sleep(Duration::from_millis(100)); - } - last_vote + last_vote + }) } pub fn copy_blocks(end_slot: Slot, source: &Blockstore, dest: &Blockstore) { @@ -390,40 +399,66 @@ pub fn run_cluster_partition( on_partition_resolved(&mut cluster, &mut context); } +pub struct ValidatorTestConfig { + pub validator_keypair: Arc, + pub validator_config: ValidatorConfig, + pub in_genesis: bool, +} + pub fn test_faulty_node( faulty_node_type: BroadcastStageType, node_stakes: Vec, + validator_test_configs: Option>, + custom_leader_schedule: Option, ) -> (LocalCluster, Vec>) { - solana_logger::setup_with_default("solana_local_cluster=info"); let num_nodes = node_stakes.len(); - let mut validator_keys = Vec::with_capacity(num_nodes); - validator_keys.resize_with(num_nodes, || (Arc::new(Keypair::new()), true)); + let validator_keys = validator_test_configs + .as_ref() + .map(|configs| { + configs + .iter() + .map(|config| (config.validator_keypair.clone(), config.in_genesis)) + .collect() + }) + .unwrap_or_else(|| { + let mut validator_keys = Vec::with_capacity(num_nodes); + validator_keys.resize_with(num_nodes, || (Arc::new(Keypair::new()), true)); + validator_keys + }); + assert_eq!(node_stakes.len(), num_nodes); assert_eq!(validator_keys.len(), num_nodes); - // Use a fixed leader schedule so that only the faulty node gets leader slots. - let validator_to_slots = vec![( - validator_keys[0].0.as_ref().pubkey(), - solana_sdk::clock::DEFAULT_DEV_SLOTS_PER_EPOCH as usize, - )]; - let leader_schedule = create_custom_leader_schedule(validator_to_slots.into_iter()); - let fixed_leader_schedule = Some(FixedSchedule { - leader_schedule: Arc::new(leader_schedule), + let fixed_leader_schedule = custom_leader_schedule.unwrap_or_else(|| { + // Use a fixed leader schedule so that only the faulty node gets leader slots. + let validator_to_slots = vec![( + validator_keys[0].0.as_ref().pubkey(), + solana_sdk::clock::DEFAULT_DEV_SLOTS_PER_EPOCH as usize, + )]; + let leader_schedule = create_custom_leader_schedule(validator_to_slots.into_iter()); + FixedSchedule { + leader_schedule: Arc::new(leader_schedule), + } }); - let error_validator_config = ValidatorConfig { - broadcast_stage_type: faulty_node_type, - fixed_leader_schedule: fixed_leader_schedule.clone(), - ..ValidatorConfig::default_for_test() - }; - let mut validator_configs = Vec::with_capacity(num_nodes); + let mut validator_configs = validator_test_configs + .map(|configs| { + configs + .into_iter() + .map(|config| config.validator_config) + .collect() + }) + .unwrap_or_else(|| { + let mut configs = Vec::with_capacity(num_nodes); + configs.resize_with(num_nodes, ValidatorConfig::default_for_test); + configs + }); // First validator is the bootstrap leader with the malicious broadcast logic. - validator_configs.push(error_validator_config); - validator_configs.resize_with(num_nodes, || ValidatorConfig { - fixed_leader_schedule: fixed_leader_schedule.clone(), - ..ValidatorConfig::default_for_test() - }); + validator_configs[0].broadcast_stage_type = faulty_node_type; + for config in &mut validator_configs { + config.fixed_leader_schedule = Some(fixed_leader_schedule.clone()); + } let mut cluster_config = ClusterConfig { cluster_lamports: 10_000, diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 2dbf6dbc398334..506c75f541a7e0 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -22,8 +22,10 @@ use { ancestor_iterator::AncestorIterator, bank_forks_utils, blockstore::{entries_to_test_shreds, Blockstore}, + blockstore_meta::DuplicateSlotProof, blockstore_processor::ProcessOptions, leader_schedule::FixedSchedule, + shred::Shred, use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup, }, solana_local_cluster::{ @@ -68,7 +70,8 @@ use { }, solana_streamer::socket::SocketAddrSpace, solana_turbine::broadcast_stage::{ - broadcast_duplicates_run::BroadcastDuplicatesConfig, BroadcastStageType, + broadcast_duplicates_run::{BroadcastDuplicatesConfig, ClusterPartition}, + BroadcastStageType, }, solana_vote_program::{vote_state::MAX_LOCKOUT_HISTORY, vote_transaction}, std::{ @@ -1503,12 +1506,15 @@ fn test_snapshots_restart_validity() { #[allow(unused_attributes)] #[ignore] fn test_fail_entry_verification_leader() { + solana_logger::setup_with_default(RUST_LOG_FILTER); let leader_stake = (DUPLICATE_THRESHOLD * 100.0) as u64 + 1; let validator_stake1 = (100 - leader_stake) / 2; let validator_stake2 = 100 - leader_stake - validator_stake1; let (cluster, _) = test_faulty_node( BroadcastStageType::FailEntryVerification, vec![leader_stake, validator_stake1, validator_stake2], + None, + None, ); cluster.check_for_new_roots( 16, @@ -1522,8 +1528,14 @@ fn test_fail_entry_verification_leader() { #[ignore] #[allow(unused_attributes)] fn test_fake_shreds_broadcast_leader() { + solana_logger::setup_with_default(RUST_LOG_FILTER); let node_stakes = vec![300, 100]; - let (cluster, _) = test_faulty_node(BroadcastStageType::BroadcastFakeShreds, node_stakes); + let (cluster, _) = test_faulty_node( + BroadcastStageType::BroadcastFakeShreds, + node_stakes, + None, + None, + ); cluster.check_for_new_roots( 16, "test_fake_shreds_broadcast_leader", @@ -3255,7 +3267,8 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b { // Find latest vote in B, and wait for it to reach blockstore let b_last_vote = - wait_for_last_vote_in_tower_to_land_in_ledger(&val_b_ledger_path, &validator_b_pubkey); + wait_for_last_vote_in_tower_to_land_in_ledger(&val_b_ledger_path, &validator_b_pubkey) + .unwrap(); // Now we copy these blocks to A let b_blockstore = open_blockstore(&val_b_ledger_path); @@ -3465,11 +3478,13 @@ fn test_fork_choice_refresh_old_votes() { let lighter_fork_latest_vote = wait_for_last_vote_in_tower_to_land_in_ledger( &lighter_fork_ledger_path, &context.lighter_fork_validator_key, - ); + ) + .unwrap(); let heaviest_fork_latest_vote = wait_for_last_vote_in_tower_to_land_in_ledger( &heaviest_ledger_path, &context.heaviest_validator_key, - ); + ) + .unwrap(); // Open ledgers let smallest_blockstore = open_blockstore(&smallest_ledger_path); @@ -3758,9 +3773,12 @@ fn test_duplicate_shreds_broadcast_leader() { // 1) Set up the cluster let (mut cluster, validator_keys) = test_faulty_node( BroadcastStageType::BroadcastDuplicates(BroadcastDuplicatesConfig { - stake_partition: partition_node_stake, + partition: ClusterPartition::Stake(partition_node_stake), + duplicate_slot_sender: None, }), node_stakes, + None, + None, ); // This is why it's important our node was last in `node_stakes` @@ -4422,7 +4440,8 @@ fn test_slot_hash_expiry() { let mut last_vote_on_a; // Keep A running for a while longer so the majority fork has some decent size loop { - last_vote_on_a = wait_for_last_vote_in_tower_to_land_in_ledger(&a_ledger_path, &a_pubkey); + last_vote_on_a = + wait_for_last_vote_in_tower_to_land_in_ledger(&a_ledger_path, &a_pubkey).unwrap(); if last_vote_on_a >= common_ancestor_slot + 2 * (solana_sdk::slot_hashes::get_entries() as u64) { @@ -4448,7 +4467,8 @@ fn test_slot_hash_expiry() { info!("Allowing B to fork"); loop { let blockstore = open_blockstore(&b_ledger_path); - let last_vote = wait_for_last_vote_in_tower_to_land_in_ledger(&b_ledger_path, &b_pubkey); + let last_vote = + wait_for_last_vote_in_tower_to_land_in_ledger(&b_ledger_path, &b_pubkey).unwrap(); let mut ancestors = AncestorIterator::new(last_vote, &blockstore); if let Some(index) = ancestors.position(|x| x == common_ancestor_slot) { if index > 7 { @@ -4662,7 +4682,8 @@ fn test_duplicate_with_pruned_ancestor() { last_minority_vote ); let last_minority_vote = - wait_for_last_vote_in_tower_to_land_in_ledger(&minority_ledger_path, &minority_pubkey); + wait_for_last_vote_in_tower_to_land_in_ledger(&minority_ledger_path, &minority_pubkey) + .unwrap(); let minority_validator_info = cluster.exit_node(&minority_pubkey); info!("Truncating majority validator ledger to {fork_slot}"); @@ -4708,7 +4729,8 @@ fn test_duplicate_with_pruned_ancestor() { } let last_majority_vote = - wait_for_last_vote_in_tower_to_land_in_ledger(&majority_ledger_path, &majority_pubkey); + wait_for_last_vote_in_tower_to_land_in_ledger(&majority_ledger_path, &majority_pubkey) + .unwrap(); info!( "Creating duplicate block built off of pruned branch for our node. Last majority vote {last_majority_vote}, Last minority vote {last_minority_vote}" @@ -5015,3 +5037,463 @@ fn test_boot_from_local_state() { info!("Checking if validator{i} has the same snapshots as validator3... DONE"); } } + +// We want to simulate the following: +// /--- 1 --- 3 (duplicate block) +// 0 +// \--- 2 +// +// 1. > DUPLICATE_THRESHOLD of the nodes vote on some version of the the duplicate block 3, +// but don't immediately duplicate confirm so they remove 3 from fork choice and reset PoH back to 1. +// 2. All the votes on 3 don't land because there are no further blocks building off 3. +// 3. Some < SWITCHING_THRESHOLD of nodes vote on 2, making it the heaviest fork because no votes on 3 landed +// 4. Nodes then see duplicate confirmation on 3. +// 5. Unless somebody builds off of 3 to include the duplicate confirmed votes, 2 will still be the heaviest. +// However, because 2 has < SWITCHING_THRESHOLD of the votes, people who voted on 3 can't switch, leading to a +// stall +#[test] +#[serial] +#[allow(unused_attributes)] +fn test_duplicate_shreds_switch_failure() { + fn wait_for_duplicate_fork_frozen(ledger_path: &Path, dup_slot: Slot) -> Hash { + // Ensure all the slots <= dup_slot are also full so we know we can replay up to dup_slot + // on restart + info!( + "Waiting to receive and replay entire duplicate fork with tip {}", + dup_slot + ); + loop { + let duplicate_fork_validator_blockstore = open_blockstore(ledger_path); + if let Some(frozen_hash) = duplicate_fork_validator_blockstore.get_bank_hash(dup_slot) { + return frozen_hash; + } + sleep(Duration::from_millis(1000)); + } + } + + fn clear_ledger_and_tower(ledger_path: &Path, pubkey: &Pubkey, start_slot: Slot) { + remove_tower_if_exists(ledger_path, pubkey); + let blockstore = open_blockstore(ledger_path); + purge_slots_with_count(&blockstore, start_slot, 1000); + { + // Remove all duplicate proofs so that this dup_slot will vote on the `dup_slot`. + while let Some((proof_slot, _)) = blockstore.get_first_duplicate_proof() { + blockstore.remove_slot_duplicate_proof(proof_slot).unwrap(); + } + } + } + + fn restart_dup_validator( + cluster: &mut LocalCluster, + mut duplicate_fork_validator_info: ClusterValidatorInfo, + pubkey: &Pubkey, + dup_slot: Slot, + dup_shred1: &Shred, + dup_shred2: &Shred, + ) { + let disable_turbine = Arc::new(AtomicBool::new(true)); + duplicate_fork_validator_info.config.voting_disabled = false; + duplicate_fork_validator_info.config.turbine_disabled = disable_turbine.clone(); + info!("Restarting node: {}", pubkey); + cluster.restart_node( + pubkey, + duplicate_fork_validator_info, + SocketAddrSpace::Unspecified, + ); + let ledger_path = cluster.ledger_path(pubkey); + + // Lift the partition after `pubkey` votes on the `dup_slot` + info!( + "Waiting on duplicate fork to vote on duplicate slot: {}", + dup_slot + ); + loop { + let last_vote = last_vote_in_tower(&ledger_path, pubkey); + if let Some((latest_vote_slot, _hash)) = last_vote { + info!("latest vote: {}", latest_vote_slot); + if latest_vote_slot == dup_slot { + break; + } + } + sleep(Duration::from_millis(1000)); + } + disable_turbine.store(false, Ordering::Relaxed); + + // Send the validator the other version of the shred so they realize it's duplicate + info!("Resending duplicate shreds to duplicate fork validator"); + cluster.send_shreds_to_validator(vec![dup_shred1, dup_shred2], pubkey); + + // Check the validator detected a duplicate proof + info!("Waiting on duplicate fork validator to see duplicate shreds and make a proof",); + loop { + let duplicate_fork_validator_blockstore = open_blockstore(&ledger_path); + if let Some(dup_proof) = duplicate_fork_validator_blockstore.get_first_duplicate_proof() + { + assert_eq!(dup_proof.0, dup_slot); + break; + } + sleep(Duration::from_millis(1000)); + } + } + + fn wait_for_duplicate_proof(ledger_path: &Path, dup_slot: Slot) -> Option { + for _ in 0..10 { + let duplicate_fork_validator_blockstore = open_blockstore(ledger_path); + if let Some((found_dup_slot, found_duplicate_proof)) = + duplicate_fork_validator_blockstore.get_first_duplicate_proof() + { + if found_dup_slot == dup_slot { + return Some(found_duplicate_proof); + }; + } + + sleep(Duration::from_millis(1000)); + } + None + } + + solana_logger::setup_with_default(RUST_LOG_FILTER); + let validator_keypairs = vec![ + "28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4", + "2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8", + "4mx9yoFBeYasDKBGDWCTWGJdWuJCKbgqmuP8bN9umybCh5Jzngw7KQxe99Rf5uzfyzgba1i65rJW4Wqk7Ab5S8ye", + "2XFPyuzPuXMsPnkH98UNcQpfA7M4b2TUhRxcWEoWjy4M6ojQ7HGJSvotktEVbaq49Qxt16wUjdqvSJc6ecbFfZwj", + ] + .iter() + .map(|s| (Arc::new(Keypair::from_base58_string(s)), true)) + .collect::>(); + + let validators = validator_keypairs + .iter() + .map(|(kp, _)| kp.pubkey()) + .collect::>(); + + // Create 4 nodes: + // 1) Two nodes that sum to > DUPLICATE_THRESHOLD but < 2/3+ supermajority. It's important both + // of them individually have <= DUPLICATE_THRESHOLD to avoid duplicate confirming their own blocks + // immediately upon voting + // 2) One with <= SWITCHING_THRESHOLD so that validator from 1) can't switch to it + // 3) One bad leader to make duplicate slots + let total_stake = 100 * DEFAULT_NODE_STAKE; + let target_switch_fork_stake = (total_stake as f64 * SWITCH_FORK_THRESHOLD) as u64; + // duplicate_fork_node1_stake + duplicate_fork_node2_stake > DUPLICATE_THRESHOLD. Don't want + // one node with > DUPLICATE_THRESHOLD, otherwise they will automatically duplicate confirm a + // slot when they vote, which will prevent them from resetting to an earlier ancestor when they + // later discover that slot as duplicate. + let duplicate_fork_node1_stake = (total_stake as f64 * DUPLICATE_THRESHOLD) as u64; + let duplicate_fork_node2_stake = 1; + let duplicate_leader_stake = total_stake + - target_switch_fork_stake + - duplicate_fork_node1_stake + - duplicate_fork_node2_stake; + assert!( + duplicate_fork_node1_stake + duplicate_fork_node2_stake + > (total_stake as f64 * DUPLICATE_THRESHOLD) as u64 + ); + assert!(duplicate_fork_node1_stake <= (total_stake as f64 * DUPLICATE_THRESHOLD) as u64); + assert!(duplicate_fork_node2_stake <= (total_stake as f64 * DUPLICATE_THRESHOLD) as u64); + + let node_stakes = vec![ + duplicate_leader_stake, + target_switch_fork_stake, + duplicate_fork_node1_stake, + duplicate_fork_node2_stake, + ]; + + let ( + // Has to be first in order to be picked as the duplicate leader + duplicate_leader_validator_pubkey, + target_switch_fork_validator_pubkey, + duplicate_fork_validator1_pubkey, + duplicate_fork_validator2_pubkey, + ) = (validators[0], validators[1], validators[2], validators[3]); + + info!( + "duplicate_fork_validator1_pubkey: {}, + duplicate_fork_validator2_pubkey: {}, + target_switch_fork_validator_pubkey: {}, + duplicate_leader_validator_pubkey: {}", + duplicate_fork_validator1_pubkey, + duplicate_fork_validator2_pubkey, + target_switch_fork_validator_pubkey, + duplicate_leader_validator_pubkey + ); + + let validator_to_slots = vec![ + (duplicate_leader_validator_pubkey, 50), + (target_switch_fork_validator_pubkey, 5), + // The ideal sequence of events for the `duplicate_fork_validator1_pubkey` validator would go: + // 1. Vote for duplicate block `D` + // 2. See `D` is duplicate, remove from fork choice and reset to ancestor `A`, potentially generating a fork off that ancestor + // 3. See `D` is duplicate confirmed, but because of the bug fixed by https://github.com/solana-labs/solana/pull/28172 + // where we disallow resetting to a slot which matches the last vote slot, we still don't build off `D`, + // and continue building on `A`. + // + // The `target_switch_fork_validator_pubkey` fork is necessary in 2. to force the validator stall trying to switch + // vote on that other fork and prevent the validator from making a freebie vote from `A` and allowing consensus to continue. + + // It's important we don't give the `duplicate_fork_validator1_pubkey` leader slots until a certain number + // of slots have elapsed to ensure: + // 1. We have ample time to ensure he doesn't have a chance to make a block until after 2 when they see the block is duplicate. + // Otherwise, they'll build the block on top of the duplicate block, which will possibly include a vote for the duplicate block. + // We want to avoid this because this will make fork choice pick the duplicate block. + // 2. Ensure the `duplicate_fork_validator1_pubkey` sees the target switch fork before it can make another vote + // on any forks he himself generates from A. Otherwise, he will make a freebie vote on his own fork from `A` and + // consensus will continue on that fork. + + // Give the duplicate fork validator plenty of leader slots after the initial delay to prevent + // 1. Switch fork from getting locked out for too long + // 2. A lot of consecutive slots in which to build up lockout in tower and make new roots + // to resolve the partition + (duplicate_fork_validator1_pubkey, 500), + ]; + + let leader_schedule = create_custom_leader_schedule(validator_to_slots.into_iter()); + + // 1) Set up the cluster + let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded(); + let validator_configs = validator_keypairs + .into_iter() + .map(|(validator_keypair, in_genesis)| { + let pubkey = validator_keypair.pubkey(); + // Only allow the leader to vote so that no version gets duplicate confirmed. + // This is to avoid the leader dumping his own block. + let voting_disabled = { pubkey != duplicate_leader_validator_pubkey }; + ValidatorTestConfig { + validator_keypair, + validator_config: ValidatorConfig { + voting_disabled, + ..ValidatorConfig::default() + }, + in_genesis, + } + }) + .collect(); + let (mut cluster, _validator_keypairs) = test_faulty_node( + BroadcastStageType::BroadcastDuplicates(BroadcastDuplicatesConfig { + partition: ClusterPartition::Pubkey(vec![ + // Don't include the other dup validator here, otherwise + // this dup version will have enough to be duplicate confirmed and + // will cause the dup leader to try and dump its own slot, + // crashing before it can signal the duplicate slot via the + // `duplicate_slot_receiver` below + duplicate_fork_validator1_pubkey, + ]), + duplicate_slot_sender: Some(duplicate_slot_sender), + }), + node_stakes, + Some(validator_configs), + Some(FixedSchedule { + leader_schedule: Arc::new(leader_schedule), + }), + ); + + // Kill two validators that might duplicate confirm the duplicate block + info!("Killing unnecessary validators"); + let duplicate_fork_validator2_ledger_path = + cluster.ledger_path(&duplicate_fork_validator2_pubkey); + let duplicate_fork_validator2_info = cluster.exit_node(&duplicate_fork_validator2_pubkey); + let target_switch_fork_validator_ledger_path = + cluster.ledger_path(&target_switch_fork_validator_pubkey); + let mut target_switch_fork_validator_info = + cluster.exit_node(&target_switch_fork_validator_pubkey); + + // 2) Wait for a duplicate slot to land on both validators and for the target switch + // fork validator to get another version of the slot. Also ensure all versions of + // the block are playable + let dup_slot; + loop { + dup_slot = duplicate_slot_receiver + .recv_timeout(Duration::from_millis(30_000)) + .expect("Duplicate leader failed to make a duplicate slot in allotted time"); + + // Make sure both validators received and replay the complete blocks + let dup_frozen_hash = wait_for_duplicate_fork_frozen( + &cluster.ledger_path(&duplicate_fork_validator1_pubkey), + dup_slot, + ); + let original_frozen_hash = wait_for_duplicate_fork_frozen( + &cluster.ledger_path(&duplicate_leader_validator_pubkey), + dup_slot, + ); + if original_frozen_hash != dup_frozen_hash { + break; + } else { + panic!( + "Duplicate leader and partition target got same hash: {}", + original_frozen_hash + ); + } + } + + // 3) Force `duplicate_fork_validator1_pubkey` to see a duplicate proof + info!("Waiting for duplicate proof for slot: {}", dup_slot); + let duplicate_proof = { + // Grab the other version of the slot from the `duplicate_leader_validator_pubkey` + // which we confirmed to have a different version of the frozen hash in the loop + // above + let ledger_path = cluster.ledger_path(&duplicate_leader_validator_pubkey); + let blockstore = open_blockstore(&ledger_path); + let dup_shred = blockstore + .get_data_shreds_for_slot(dup_slot, 0) + .unwrap() + .pop() + .unwrap(); + info!( + "Sending duplicate shred: {:?} to {:?}", + dup_shred.signature(), + duplicate_fork_validator1_pubkey + ); + cluster.send_shreds_to_validator(vec![&dup_shred], &duplicate_fork_validator1_pubkey); + wait_for_duplicate_proof( + &cluster.ledger_path(&duplicate_fork_validator1_pubkey), + dup_slot, + ) + .unwrap_or_else(|| panic!("Duplicate proof for slot {} not found", dup_slot)) + }; + + // 3) Kill all the validators + info!("Killing remaining validators"); + let duplicate_fork_validator1_ledger_path = + cluster.ledger_path(&duplicate_fork_validator1_pubkey); + let duplicate_fork_validator1_info = cluster.exit_node(&duplicate_fork_validator1_pubkey); + let duplicate_leader_ledger_path = cluster.ledger_path(&duplicate_leader_validator_pubkey); + cluster.exit_node(&duplicate_leader_validator_pubkey); + + let dup_shred1 = Shred::new_from_serialized_shred(duplicate_proof.shred1.clone()).unwrap(); + let dup_shred2 = Shred::new_from_serialized_shred(duplicate_proof.shred2).unwrap(); + assert_eq!(dup_shred1.slot(), dup_shred2.slot()); + assert_eq!(dup_shred1.slot(), dup_slot); + + // Purge everything including the `dup_slot` from the `target_switch_fork_validator_pubkey` + info!( + "Purging towers and ledgers for: {:?}", + duplicate_leader_validator_pubkey + ); + Blockstore::destroy(&target_switch_fork_validator_ledger_path).unwrap(); + { + let blockstore1 = open_blockstore(&duplicate_leader_ledger_path); + let blockstore2 = open_blockstore(&target_switch_fork_validator_ledger_path); + copy_blocks(dup_slot, &blockstore1, &blockstore2); + } + clear_ledger_and_tower( + &target_switch_fork_validator_ledger_path, + &target_switch_fork_validator_pubkey, + dup_slot, + ); + + info!( + "Purging towers and ledgers for: {:?}", + duplicate_fork_validator1_pubkey + ); + clear_ledger_and_tower( + &duplicate_fork_validator1_ledger_path, + &duplicate_fork_validator1_pubkey, + dup_slot + 1, + ); + + info!( + "Purging towers and ledgers for: {:?}", + duplicate_fork_validator2_pubkey + ); + // Copy validator 1's ledger to validator 2 so that they have the same version + // of the duplicate slot + clear_ledger_and_tower( + &duplicate_fork_validator2_ledger_path, + &duplicate_fork_validator2_pubkey, + dup_slot, + ); + Blockstore::destroy(&duplicate_fork_validator2_ledger_path).unwrap(); + { + let blockstore1 = open_blockstore(&duplicate_fork_validator1_ledger_path); + let blockstore2 = open_blockstore(&duplicate_fork_validator2_ledger_path); + copy_blocks(dup_slot, &blockstore1, &blockstore2); + } + + // Set entrypoint to `target_switch_fork_validator_pubkey` so we can run discovery in gossip even without the + // bad leader + cluster.set_entry_point(target_switch_fork_validator_info.info.contact_info.clone()); + + // 4) Restart `target_switch_fork_validator_pubkey`, and ensure they vote on their own leader slot + // that's not descended from the duplicate slot + info!("Restarting switch fork node"); + target_switch_fork_validator_info.config.voting_disabled = false; + cluster.restart_node( + &target_switch_fork_validator_pubkey, + target_switch_fork_validator_info, + SocketAddrSpace::Unspecified, + ); + let target_switch_fork_validator_ledger_path = + cluster.ledger_path(&target_switch_fork_validator_pubkey); + + info!("Waiting for switch fork to make block past duplicate fork"); + loop { + let last_vote = wait_for_last_vote_in_tower_to_land_in_ledger( + &target_switch_fork_validator_ledger_path, + &target_switch_fork_validator_pubkey, + ); + if let Some(latest_vote_slot) = last_vote { + if latest_vote_slot > dup_slot { + let blockstore = open_blockstore(&target_switch_fork_validator_ledger_path); + let ancestor_slots: HashSet = + AncestorIterator::new_inclusive(latest_vote_slot, &blockstore).collect(); + assert!(ancestor_slots.contains(&latest_vote_slot)); + assert!(ancestor_slots.contains(&0)); + assert!(!ancestor_slots.contains(&dup_slot)); + break; + } + } + sleep(Duration::from_millis(1000)); + } + + // Now restart the duplicate validators + // Start the node with partition enabled so they don't see the `target_switch_fork_validator_pubkey` + // before voting on the duplicate block + info!("Restarting duplicate fork node"); + // Ensure `duplicate_fork_validator1_pubkey` votes before starting up `duplicate_fork_validator2_pubkey` + // to prevent them seeing `dup_slot` as duplicate confirmed before voting. + restart_dup_validator( + &mut cluster, + duplicate_fork_validator1_info, + &duplicate_fork_validator1_pubkey, + dup_slot, + &dup_shred1, + &dup_shred2, + ); + restart_dup_validator( + &mut cluster, + duplicate_fork_validator2_info, + &duplicate_fork_validator2_pubkey, + dup_slot, + &dup_shred1, + &dup_shred2, + ); + + // Wait for the `duplicate_fork_validator1_pubkey` to make another leader block on top + // of the duplicate fork which includes their own vote for `dup_block`. This + // should make the duplicate fork the heaviest + info!("Waiting on duplicate fork validator to generate block on top of duplicate fork",); + loop { + let duplicate_fork_validator_blockstore = + open_blockstore(&cluster.ledger_path(&duplicate_fork_validator1_pubkey)); + let meta = duplicate_fork_validator_blockstore + .meta(dup_slot) + .unwrap() + .unwrap(); + if !meta.next_slots.is_empty() { + info!( + "duplicate fork validator saw new slots: {:?} on top of duplicate slot", + meta.next_slots + ); + break; + } + sleep(Duration::from_millis(1000)); + } + + // Check that the cluster is making progress + cluster.check_for_new_roots( + 16, + "test_duplicate_shreds_switch_failure", + SocketAddrSpace::Unspecified, + ); +} diff --git a/turbine/src/broadcast_stage.rs b/turbine/src/broadcast_stage.rs index 90a112c24ea011..07be0d0bfd6daa 100644 --- a/turbine/src/broadcast_stage.rs +++ b/turbine/src/broadcast_stage.rs @@ -87,7 +87,7 @@ pub enum BroadcastStageReturnType { ChannelDisconnected, } -#[derive(PartialEq, Eq, Clone, Debug)] +#[derive(Clone, Debug)] pub enum BroadcastStageType { Standard, FailEntryVerification, diff --git a/turbine/src/broadcast_stage/broadcast_duplicates_run.rs b/turbine/src/broadcast_stage/broadcast_duplicates_run.rs index 4ced9739c83f86..0db4003a079ce8 100644 --- a/turbine/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/turbine/src/broadcast_stage/broadcast_duplicates_run.rs @@ -1,6 +1,7 @@ use { super::*, crate::cluster_nodes::ClusterNodesCache, + crossbeam_channel::Sender, itertools::Itertools, solana_entry::entry::Entry, solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder}, @@ -16,10 +17,20 @@ pub const MINIMUM_DUPLICATE_SLOT: Slot = 20; pub const DUPLICATE_RATE: usize = 10; #[derive(PartialEq, Eq, Clone, Debug)] +pub enum ClusterPartition { + Stake(u64), + Pubkey(Vec), +} + +#[derive(Clone, Debug)] pub struct BroadcastDuplicatesConfig { - /// Amount of stake (excluding the leader) to send different version of slots to. + /// Amount of stake (excluding the leader) or a set of validator pubkeys + /// to send a duplicate version of some slots to. /// Note this is sampled from a list of stakes sorted least to greatest. - pub stake_partition: u64, + pub partition: ClusterPartition, + /// If passed `Some(receiver)`, will signal all the duplicate slots via the given + /// `receiver` + pub duplicate_slot_sender: Option>, } #[derive(Clone)] @@ -253,6 +264,9 @@ impl BroadcastRun for BroadcastDuplicatesRun { .iter() .all(|shred| shred.slot() == bank.slot())); + if let Some(duplicate_slot_sender) = &self.config.duplicate_slot_sender { + let _ = duplicate_slot_sender.send(bank.slot()); + } socket_sender.send((original_last_data_shred, None))?; socket_sender.send((partition_last_data_shred, None))?; } @@ -280,20 +294,25 @@ impl BroadcastRun for BroadcastDuplicatesRun { let self_pubkey = cluster_info.id(); // Create cluster partition. let cluster_partition: HashSet = { - let mut cumilative_stake = 0; - let epoch = root_bank.get_leader_schedule_epoch(slot); - root_bank - .epoch_staked_nodes(epoch) - .unwrap() - .iter() - .filter(|(pubkey, _)| **pubkey != self_pubkey) - .sorted_by_key(|(pubkey, stake)| (**stake, **pubkey)) - .take_while(|(_, stake)| { - cumilative_stake += *stake; - cumilative_stake <= self.config.stake_partition - }) - .map(|(pubkey, _)| *pubkey) - .collect() + match &self.config.partition { + ClusterPartition::Stake(partition_total_stake) => { + let mut cumulative_stake = 0; + let epoch = root_bank.get_leader_schedule_epoch(slot); + root_bank + .epoch_staked_nodes(epoch) + .unwrap() + .iter() + .filter(|(pubkey, _)| **pubkey != self_pubkey) + .sorted_by_key(|(pubkey, stake)| (**stake, **pubkey)) + .take_while(|(_, stake)| { + cumulative_stake += *stake; + cumulative_stake <= *partition_total_stake + }) + .map(|(pubkey, _)| *pubkey) + .collect() + } + ClusterPartition::Pubkey(pubkeys) => pubkeys.iter().cloned().collect(), + } }; // Broadcast data @@ -316,10 +335,10 @@ impl BroadcastRun for BroadcastDuplicatesRun { { if cluster_partition.contains(node.pubkey()) { info!( - "skipping node {} for original shred index {}, slot {}", - node.pubkey(), + "Not broadcasting original shred index {}, slot {} to partition node {}", shred.index(), - shred.slot() + shred.slot(), + node.pubkey(), ); return None; } @@ -337,6 +356,12 @@ impl BroadcastRun for BroadcastDuplicatesRun { cluster_partition .iter() .filter_map(|pubkey| { + info!( + "Broadcasting partition shred index {}, slot {} to partition node {}", + shred.index(), + shred.slot(), + pubkey, + ); let tvu = cluster_info .lookup_contact_info(pubkey, |node| node.tvu(Protocol::UDP))? .ok()?;