Skip to content

Commit

Permalink
Local Cluster Duplicate Switch Test (solana-labs#32614)
Browse files Browse the repository at this point in the history
* Add test for broken behavior in same batch

* tests

* redo test

* Important fixes to not immediately duplicate confirm by adding extra node

* Fixup merge

* PR comments

* Redo stakes

* clippy

* fixes

* Resolve conflicts

* add thread logging

* Fixup merge

* Fixup bugs

* Revert "add thread logging"

This reverts commit 9dc2240.

* Hide scope

* Fixes

* Cleanup test_faulty_node

* More fixes

* Fixes

* Error logging

* Fix duplicate confirmed

* done

* PR comments

* Revert "Error logging"

This reverts commit 18953c3.

* PR comments

* nit
  • Loading branch information
carllin authored Aug 8, 2023
1 parent 8e4a9a9 commit d5faa6e
Show file tree
Hide file tree
Showing 7 changed files with 641 additions and 64 deletions.
13 changes: 13 additions & 0 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3220,6 +3220,10 @@ impl Blockstore {
self.dead_slots_cf.delete(slot)
}

pub fn remove_slot_duplicate_proof(&self, slot: Slot) -> Result<()> {
self.duplicate_slots_cf.delete(slot)
}

pub fn store_duplicate_if_not_existing(
&self,
slot: Slot,
Expand All @@ -3233,6 +3237,15 @@ impl Blockstore {
}
}

pub fn get_first_duplicate_proof(&self) -> Option<(Slot, DuplicateSlotProof)> {
let mut iter = self
.db
.iter::<cf::DuplicateSlots>(IteratorMode::From(0, IteratorDirection::Forward))
.unwrap();
iter.next()
.map(|(slot, proof_bytes)| (slot, deserialize(&proof_bytes).unwrap()))
}

pub fn store_duplicate_slot(&self, slot: Slot, shred1: Vec<u8>, shred2: Vec<u8>) -> Result<()> {
let duplicate_slot_proof = DuplicateSlotProof::new(shred1, shred2);
self.duplicate_slots_cf.put(slot, &duplicate_slot_proof)
Expand Down
3 changes: 3 additions & 0 deletions local-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use {
solana_client::thin_client::ThinClient,
solana_core::validator::{Validator, ValidatorConfig},
solana_gossip::{cluster_info::Node, contact_info::ContactInfo},
solana_ledger::shred::Shred,
solana_sdk::{pubkey::Pubkey, signature::Keypair},
solana_streamer::socket::SocketAddrSpace,
std::{path::PathBuf, sync::Arc},
Expand Down Expand Up @@ -62,4 +63,6 @@ pub trait Cluster {
config: ValidatorConfig,
socket_addr_space: SocketAddrSpace,
);
fn set_entry_point(&mut self, entry_point_info: ContactInfo);
fn send_shreds_to_validator(&self, dup_shreds: Vec<&Shred>, validator_key: &Pubkey);
}
23 changes: 21 additions & 2 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use {
},
solana_gossip::{
cluster_info::Node,
contact_info::{ContactInfo, LegacyContactInfo},
contact_info::{ContactInfo, LegacyContactInfo, Protocol},
gossip_service::discover_cluster,
},
solana_ledger::create_new_tmp_ledger,
solana_ledger::{create_new_tmp_ledger, shred::Shred},
solana_runtime::{
genesis_utils::{
create_genesis_config_with_vote_accounts_and_cluster_type, GenesisConfigInfo,
Expand Down Expand Up @@ -57,6 +57,7 @@ use {
collections::HashMap,
io::{Error, ErrorKind, Result},
iter,
net::UdpSocket,
path::{Path, PathBuf},
sync::{Arc, RwLock},
},
Expand Down Expand Up @@ -852,6 +853,10 @@ impl Cluster for LocalCluster {
(node, entry_point_info)
}

fn set_entry_point(&mut self, entry_point_info: ContactInfo) {
self.entry_point_info = entry_point_info;
}

fn restart_node(
&mut self,
pubkey: &Pubkey,
Expand Down Expand Up @@ -922,6 +927,20 @@ impl Cluster for LocalCluster {
fn get_contact_info(&self, pubkey: &Pubkey) -> Option<&ContactInfo> {
self.validators.get(pubkey).map(|v| &v.info.contact_info)
}

fn send_shreds_to_validator(&self, dup_shreds: Vec<&Shred>, validator_key: &Pubkey) {
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let validator_tvu = self
.get_contact_info(validator_key)
.unwrap()
.tvu(Protocol::UDP)
.unwrap();
for shred in dup_shreds {
send_socket
.send_to(shred.payload().as_ref(), validator_tvu)
.unwrap();
}
}
}

impl Drop for LocalCluster {
Expand Down
99 changes: 67 additions & 32 deletions local-cluster/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ pub fn restore_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option<Tower> {
Tower::restore(&file_tower_storage, node_pubkey).ok()
}

pub fn remove_tower_if_exists(tower_path: &Path, node_pubkey: &Pubkey) {
let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf());
let filename = file_tower_storage.filename(node_pubkey);
if filename.exists() {
fs::remove_file(file_tower_storage.filename(node_pubkey)).unwrap();
}
}

pub fn remove_tower(tower_path: &Path, node_pubkey: &Pubkey) {
let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf());
fs::remove_file(file_tower_storage.filename(node_pubkey)).unwrap();
Expand Down Expand Up @@ -120,17 +128,18 @@ pub fn purge_slots_with_count(blockstore: &Blockstore, start_slot: Slot, slot_co
pub fn wait_for_last_vote_in_tower_to_land_in_ledger(
ledger_path: &Path,
node_pubkey: &Pubkey,
) -> Slot {
let (last_vote, _) = last_vote_in_tower(ledger_path, node_pubkey).unwrap();
loop {
// We reopen in a loop to make sure we get updates
let blockstore = open_blockstore(ledger_path);
if blockstore.is_full(last_vote) {
break;
) -> Option<Slot> {
last_vote_in_tower(ledger_path, node_pubkey).map(|(last_vote, _)| {
loop {
// We reopen in a loop to make sure we get updates
let blockstore = open_blockstore(ledger_path);
if blockstore.is_full(last_vote) {
break;
}
sleep(Duration::from_millis(100));
}
sleep(Duration::from_millis(100));
}
last_vote
last_vote
})
}

pub fn copy_blocks(end_slot: Slot, source: &Blockstore, dest: &Blockstore) {
Expand Down Expand Up @@ -390,40 +399,66 @@ pub fn run_cluster_partition<C>(
on_partition_resolved(&mut cluster, &mut context);
}

pub struct ValidatorTestConfig {
pub validator_keypair: Arc<Keypair>,
pub validator_config: ValidatorConfig,
pub in_genesis: bool,
}

pub fn test_faulty_node(
faulty_node_type: BroadcastStageType,
node_stakes: Vec<u64>,
validator_test_configs: Option<Vec<ValidatorTestConfig>>,
custom_leader_schedule: Option<FixedSchedule>,
) -> (LocalCluster, Vec<Arc<Keypair>>) {
solana_logger::setup_with_default("solana_local_cluster=info");
let num_nodes = node_stakes.len();
let mut validator_keys = Vec::with_capacity(num_nodes);
validator_keys.resize_with(num_nodes, || (Arc::new(Keypair::new()), true));
let validator_keys = validator_test_configs
.as_ref()
.map(|configs| {
configs
.iter()
.map(|config| (config.validator_keypair.clone(), config.in_genesis))
.collect()
})
.unwrap_or_else(|| {
let mut validator_keys = Vec::with_capacity(num_nodes);
validator_keys.resize_with(num_nodes, || (Arc::new(Keypair::new()), true));
validator_keys
});

assert_eq!(node_stakes.len(), num_nodes);
assert_eq!(validator_keys.len(), num_nodes);

// Use a fixed leader schedule so that only the faulty node gets leader slots.
let validator_to_slots = vec![(
validator_keys[0].0.as_ref().pubkey(),
solana_sdk::clock::DEFAULT_DEV_SLOTS_PER_EPOCH as usize,
)];
let leader_schedule = create_custom_leader_schedule(validator_to_slots.into_iter());
let fixed_leader_schedule = Some(FixedSchedule {
leader_schedule: Arc::new(leader_schedule),
let fixed_leader_schedule = custom_leader_schedule.unwrap_or_else(|| {
// Use a fixed leader schedule so that only the faulty node gets leader slots.
let validator_to_slots = vec![(
validator_keys[0].0.as_ref().pubkey(),
solana_sdk::clock::DEFAULT_DEV_SLOTS_PER_EPOCH as usize,
)];
let leader_schedule = create_custom_leader_schedule(validator_to_slots.into_iter());
FixedSchedule {
leader_schedule: Arc::new(leader_schedule),
}
});

let error_validator_config = ValidatorConfig {
broadcast_stage_type: faulty_node_type,
fixed_leader_schedule: fixed_leader_schedule.clone(),
..ValidatorConfig::default_for_test()
};
let mut validator_configs = Vec::with_capacity(num_nodes);
let mut validator_configs = validator_test_configs
.map(|configs| {
configs
.into_iter()
.map(|config| config.validator_config)
.collect()
})
.unwrap_or_else(|| {
let mut configs = Vec::with_capacity(num_nodes);
configs.resize_with(num_nodes, ValidatorConfig::default_for_test);
configs
});

// First validator is the bootstrap leader with the malicious broadcast logic.
validator_configs.push(error_validator_config);
validator_configs.resize_with(num_nodes, || ValidatorConfig {
fixed_leader_schedule: fixed_leader_schedule.clone(),
..ValidatorConfig::default_for_test()
});
validator_configs[0].broadcast_stage_type = faulty_node_type;
for config in &mut validator_configs {
config.fixed_leader_schedule = Some(fixed_leader_schedule.clone());
}

let mut cluster_config = ClusterConfig {
cluster_lamports: 10_000,
Expand Down
Loading

0 comments on commit d5faa6e

Please sign in to comment.