diff --git a/bench-tps/tests/bench_tps.rs b/bench-tps/tests/bench_tps.rs index 857ae4cd2f7e20..da710aed285ce7 100644 --- a/bench-tps/tests/bench_tps.rs +++ b/bench-tps/tests/bench_tps.rs @@ -78,9 +78,13 @@ fn test_bench_tps_local_cluster(config: Config) { cluster.transfer(&cluster.funding_keypair, &faucet_pubkey, 100_000_000); - let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| { - panic!("Could not create TpuClient with Quic Cache {err:?}"); - })); + let client = Arc::new( + cluster + .build_entrypoint_tpu_quic_client() + .unwrap_or_else(|err| { + panic!("Could not create TpuClient with Quic Cache {err:?}"); + }), + ); let lamports_per_account = 100; diff --git a/dos/src/main.rs b/dos/src/main.rs index 62af3c83af381c..e809fb6d11c1d8 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -970,9 +970,13 @@ pub mod test { let node = cluster.get_contact_info(&nodes[0]).unwrap().clone(); let nodes_slice = [node]; - let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| { - panic!("Could not create TpuClient with Quic Cache {err:?}"); - })); + let client = Arc::new( + cluster + .build_entrypoint_tpu_quic_client() + .unwrap_or_else(|err| { + panic!("Could not create TpuClient with Quic Cache {err:?}"); + }), + ); // creates one transaction with 8 valid signatures and sends it 10 times run_dos( @@ -1100,9 +1104,13 @@ pub mod test { let node = cluster.get_contact_info(&nodes[0]).unwrap().clone(); let nodes_slice = [node]; - let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| { - panic!("Could not create TpuClient with Quic Cache {err:?}"); - })); + let client = Arc::new( + cluster + .build_entrypoint_tpu_quic_client() + .unwrap_or_else(|err| { + panic!("Could not create TpuClient with Quic Cache {err:?}"); + }), + ); // creates one transaction and sends it 10 times // this is done in single thread diff --git a/local-cluster/src/cluster.rs b/local-cluster/src/cluster.rs index c2e3acc60751a3..e8bda3d91513ec 100644 --- a/local-cluster/src/cluster.rs +++ b/local-cluster/src/cluster.rs @@ -40,9 +40,14 @@ impl ClusterValidatorInfo { pub trait Cluster { fn get_node_pubkeys(&self) -> Vec; - fn get_validator_client(&self, pubkey: &Pubkey) -> Option; - fn build_tpu_quic_client(&self) -> Result; - fn build_tpu_quic_client_with_commitment( + fn build_validator_tpu_quic_client(&self, pubkey: &Pubkey) -> Result; + fn build_validator_tpu_quic_client_with_commitment( + &self, + pubkey: &Pubkey, + commitment_config: CommitmentConfig, + ) -> Result; + fn build_entrypoint_tpu_quic_client(&self) -> Result; + fn build_entrypoint_tpu_quic_client_with_commitment( &self, commitment_config: CommitmentConfig, ) -> Result; diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 35fcc5f2ed9333..bafa89a326b245 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -61,7 +61,7 @@ use { collections::HashMap, io::{Error, ErrorKind, Result}, iter, - net::{IpAddr, Ipv4Addr}, + net::{IpAddr, Ipv4Addr, SocketAddr}, path::{Path, PathBuf}, sync::{Arc, RwLock}, time::Instant, @@ -493,7 +493,7 @@ impl LocalCluster { mut voting_keypair: Option>, socket_addr_space: SocketAddrSpace, ) -> Pubkey { - let client = self.build_tpu_quic_client().expect("tpu_client"); + let client = self.build_entrypoint_tpu_quic_client().expect("tpu_client"); // Must have enough tokens to fund vote account and set delegate let should_create_vote_pubkey = voting_keypair.is_none(); @@ -592,7 +592,9 @@ impl LocalCluster { } pub fn transfer(&self, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64) -> u64 { - let client = self.build_tpu_quic_client().expect("new tpu quic client"); + let client = self + .build_entrypoint_tpu_quic_client() + .expect("new tpu quic client"); Self::transfer_with_client(&client, source_keypair, dest_pubkey, lamports) } @@ -941,12 +943,12 @@ impl LocalCluster { } } - fn build_tpu_client(&self, rpc_client_builder: F) -> Result - where - F: FnOnce(String) -> Arc, - { - let rpc_pubsub_url = format!("ws://{}/", self.entry_point_info.rpc_pubsub().unwrap()); - let rpc_url = format!("http://{}", self.entry_point_info.rpc().unwrap()); + fn build_tpu_client( + &self, + rpc_client: Arc, + rpc_pubsub_addr: SocketAddr, + ) -> Result { + let rpc_pubsub_url = format!("ws://{}/", rpc_pubsub_addr); let cache = match &*self.connection_cache { ConnectionCache::Quic(cache) => cache, @@ -959,7 +961,7 @@ impl LocalCluster { }; let tpu_client = TpuClient::new_with_connection_cache( - rpc_client_builder(rpc_url), + rpc_client, rpc_pubsub_url.as_str(), TpuClientConfig::default(), cache.clone(), @@ -975,24 +977,36 @@ impl Cluster for LocalCluster { self.validators.keys().cloned().collect() } - fn get_validator_client(&self, pubkey: &Pubkey) -> Option { - self.validators.get(pubkey).map(|_| { - self.build_tpu_quic_client() - .expect("should build tpu quic client") - }) + fn build_validator_tpu_quic_client(&self, pubkey: &Pubkey) -> Result { + let contact_info = self.get_contact_info(pubkey).unwrap(); + let rpc_url: String = format!("http://{}", contact_info.rpc().unwrap()); + let rpc_client = Arc::new(RpcClient::new(rpc_url)); + self.build_tpu_client(rpc_client, contact_info.rpc_pubsub().unwrap()) } - fn build_tpu_quic_client(&self) -> Result { - self.build_tpu_client(|rpc_url| Arc::new(RpcClient::new(rpc_url))) + fn build_validator_tpu_quic_client_with_commitment( + &self, + pubkey: &Pubkey, + commitment_config: CommitmentConfig, + ) -> Result { + let contact_info = self.get_contact_info(pubkey).unwrap(); + let rpc_url = format!("http://{}", contact_info.rpc().unwrap()); + let rpc_client = Arc::new(RpcClient::new_with_commitment(rpc_url, commitment_config)); + self.build_tpu_client(rpc_client, contact_info.rpc_pubsub().unwrap()) + } + + fn build_entrypoint_tpu_quic_client(&self) -> Result { + self.build_validator_tpu_quic_client(self.entry_point_info.pubkey()) } - fn build_tpu_quic_client_with_commitment( + fn build_entrypoint_tpu_quic_client_with_commitment( &self, commitment_config: CommitmentConfig, ) -> Result { - self.build_tpu_client(|rpc_url| { - Arc::new(RpcClient::new_with_commitment(rpc_url, commitment_config)) - }) + self.build_validator_tpu_quic_client_with_commitment( + self.entry_point_info.pubkey(), + commitment_config, + ) } fn exit_node(&mut self, pubkey: &Pubkey) -> ClusterValidatorInfo { diff --git a/local-cluster/src/local_cluster_snapshot_utils.rs b/local-cluster/src/local_cluster_snapshot_utils.rs index 3df5b61d3b8359..6240ade176c086 100644 --- a/local-cluster/src/local_cluster_snapshot_utils.rs +++ b/local-cluster/src/local_cluster_snapshot_utils.rs @@ -73,7 +73,7 @@ impl LocalCluster { ) -> NextSnapshotResult { // Get slot after which this was generated let client = self - .get_validator_client(self.entry_point_info.pubkey()) + .build_validator_tpu_quic_client(self.entry_point_info.pubkey()) .unwrap(); let last_slot = client .rpc_client() diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 4e7444ef9b302c..4c3405489d7dcd 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -222,7 +222,7 @@ fn test_local_cluster_signature_subscribe() { .unwrap(); let non_bootstrap_info = cluster.get_contact_info(&non_bootstrap_id).unwrap(); - let tx_client = cluster.build_tpu_quic_client().unwrap(); + let tx_client = cluster.build_entrypoint_tpu_quic_client().unwrap(); let (blockhash, _) = tx_client .rpc_client() @@ -431,7 +431,7 @@ fn test_mainnet_beta_cluster_type() { .unwrap(); assert_eq!(cluster_nodes.len(), 1); - let client = cluster.build_tpu_quic_client().unwrap(); + let client = cluster.build_entrypoint_tpu_quic_client().unwrap(); // Programs that are available at epoch 0 for program_id in [ @@ -1002,7 +1002,7 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st let timer = Instant::now(); loop { let validator_current_slot = cluster - .get_validator_client(&validator_identity.pubkey()) + .build_validator_tpu_quic_client(&validator_identity.pubkey()) .unwrap() .rpc_client() .get_slot_with_commitment(CommitmentConfig::finalized()) @@ -1377,7 +1377,9 @@ fn test_snapshots_blockstore_floor() { .into_iter() .find(|x| x != cluster.entry_point_info.pubkey()) .unwrap(); - let validator_client = cluster.get_validator_client(&validator_id).unwrap(); + let validator_client = cluster + .build_validator_tpu_quic_client(&validator_id) + .unwrap(); let mut current_slot = 0; // Let this validator run a while with repair @@ -1611,7 +1613,7 @@ fn test_no_voting() { }; let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); let client = cluster - .get_validator_client(cluster.entry_point_info.pubkey()) + .build_validator_tpu_quic_client(cluster.entry_point_info.pubkey()) .unwrap(); loop { let last_slot = client @@ -1676,13 +1678,16 @@ fn test_optimistic_confirmation_violation_detection() { // so that the vote on `S-1` is definitely in gossip and optimistic confirmation is // detected on slot `S-1` for sure, then stop the heavier of the two // validators - let client = cluster.get_validator_client(&node_to_restart).unwrap(); + let client = cluster + .build_validator_tpu_quic_client(&node_to_restart) + .unwrap(); let mut prev_voted_slot = 0; loop { let last_voted_slot = client .rpc_client() .get_slot_with_commitment(CommitmentConfig::processed()) .unwrap(); + info!("last voted slot: {}", last_voted_slot); if last_voted_slot > 50 { if prev_voted_slot == 0 { prev_voted_slot = last_voted_slot; @@ -1693,7 +1698,10 @@ fn test_optimistic_confirmation_violation_detection() { sleep(Duration::from_millis(100)); } + info!("exiting node"); + drop(client); let exited_validator_info = cluster.exit_node(&node_to_restart); + info!("exiting node success"); // Mark fork as dead on the heavier validator, this should make the fork effectively // dead, even though it was optimistically confirmed. The smaller validator should @@ -1731,8 +1739,11 @@ fn test_optimistic_confirmation_violation_detection() { // Wait for a root > prev_voted_slot to be set. Because the root is on a // different fork than `prev_voted_slot`, then optimistic confirmation is // violated - let client = cluster.get_validator_client(&node_to_restart).unwrap(); + let client = cluster + .build_validator_tpu_quic_client(&node_to_restart) + .unwrap(); loop { + info!("Client connecting to: {}", client.rpc_client().url()); let last_root = client .rpc_client() .get_slot_with_commitment(CommitmentConfig::finalized()) @@ -1798,7 +1809,9 @@ fn test_validator_saves_tower() { }; let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); - let validator_client = cluster.get_validator_client(&validator_id).unwrap(); + let validator_client = cluster + .build_validator_tpu_quic_client(&validator_id) + .unwrap(); let ledger_path = cluster .validators @@ -1833,7 +1846,9 @@ fn test_validator_saves_tower() { // Restart the validator and wait for a new root cluster.restart_node(&validator_id, validator_info, SocketAddrSpace::Unspecified); - let validator_client = cluster.get_validator_client(&validator_id).unwrap(); + let validator_client = cluster + .build_validator_tpu_quic_client(&validator_id) + .unwrap(); // Wait for the first new root let last_replayed_root = loop { @@ -1862,7 +1877,9 @@ fn test_validator_saves_tower() { .unwrap(); cluster.restart_node(&validator_id, validator_info, SocketAddrSpace::Unspecified); - let validator_client = cluster.get_validator_client(&validator_id).unwrap(); + let validator_client = cluster + .build_validator_tpu_quic_client(&validator_id) + .unwrap(); // Wait for a new root, demonstrating the validator was able to make progress from the older `tower1` let new_root = loop { @@ -1895,7 +1912,9 @@ fn test_validator_saves_tower() { validator_info.config.require_tower = false; cluster.restart_node(&validator_id, validator_info, SocketAddrSpace::Unspecified); - let validator_client = cluster.get_validator_client(&validator_id).unwrap(); + let validator_client = cluster + .build_validator_tpu_quic_client(&validator_id) + .unwrap(); // Wait for another new root let new_root = loop { @@ -2553,11 +2572,11 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) { let on_partition_start = |cluster: &mut LocalCluster, _: &mut ()| { let update_client = cluster - .get_validator_client(cluster.entry_point_info.pubkey()) + .build_validator_tpu_quic_client(cluster.entry_point_info.pubkey()) .unwrap(); update_client_sender.send(update_client).unwrap(); let scan_client = cluster - .get_validator_client(cluster.entry_point_info.pubkey()) + .build_validator_tpu_quic_client(cluster.entry_point_info.pubkey()) .unwrap(); scan_client_sender.send(scan_client).unwrap(); }; @@ -2710,7 +2729,7 @@ fn test_oc_bad_signatures() { ); // 3) Start up a spy to listen for and push votes to leader TPU - let client = cluster.build_tpu_quic_client().unwrap(); + let client = cluster.build_entrypoint_tpu_quic_client().unwrap(); let cluster_funding_keypair = cluster.funding_keypair.insecure_clone(); let voter_thread_sleep_ms: usize = 100; let num_votes_simulated = Arc::new(AtomicUsize::new(0)); @@ -3080,10 +3099,12 @@ fn run_test_load_program_accounts(scan_commitment: CommitmentConfig) { .find(|x| x != cluster.entry_point_info.pubkey()) .unwrap(); let client = cluster - .get_validator_client(cluster.entry_point_info.pubkey()) + .build_validator_tpu_quic_client(cluster.entry_point_info.pubkey()) .unwrap(); update_client_sender.send(client).unwrap(); - let scan_client = cluster.get_validator_client(&other_validator_id).unwrap(); + let scan_client = cluster + .build_validator_tpu_quic_client(&other_validator_id) + .unwrap(); scan_client_sender.send(scan_client).unwrap(); // Wait for some roots to pass