Skip to content

Commit

Permalink
Switch build_tpu_quic_client to actually connect to the given validat…
Browse files Browse the repository at this point in the history
…or (#4151)
  • Loading branch information
carllin authored Dec 19, 2024
1 parent 7fc3fbb commit 4bb6c4a
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 52 deletions.
10 changes: 7 additions & 3 deletions bench-tps/tests/bench_tps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,13 @@ fn test_bench_tps_local_cluster(config: Config) {

cluster.transfer(&cluster.funding_keypair, &faucet_pubkey, 100_000_000);

let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}));
let client = Arc::new(
cluster
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}),
);

let lamports_per_account = 100;

Expand Down
20 changes: 14 additions & 6 deletions dos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -970,9 +970,13 @@ pub mod test {
let node = cluster.get_contact_info(&nodes[0]).unwrap().clone();
let nodes_slice = [node];

let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}));
let client = Arc::new(
cluster
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}),
);

// creates one transaction with 8 valid signatures and sends it 10 times
run_dos(
Expand Down Expand Up @@ -1100,9 +1104,13 @@ pub mod test {
let node = cluster.get_contact_info(&nodes[0]).unwrap().clone();
let nodes_slice = [node];

let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}));
let client = Arc::new(
cluster
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}),
);

// creates one transaction and sends it 10 times
// this is done in single thread
Expand Down
6 changes: 3 additions & 3 deletions local-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ impl ClusterValidatorInfo {

pub trait Cluster {
fn get_node_pubkeys(&self) -> Vec<Pubkey>;
fn get_validator_client(&self, pubkey: &Pubkey) -> Option<QuicTpuClient>;
fn build_tpu_quic_client(&self) -> Result<QuicTpuClient>;
fn build_tpu_quic_client_with_commitment(
fn build_validator_tpu_quic_client(&self, pubkey: &Pubkey) -> Result<QuicTpuClient>;
fn build_validator_tpu_quic_client_with_commitment(
&self,
pubkey: &Pubkey,
commitment_config: CommitmentConfig,
) -> Result<QuicTpuClient>;
fn get_contact_info(&self, pubkey: &Pubkey) -> Option<&ContactInfo>;
Expand Down
48 changes: 25 additions & 23 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use {
collections::HashMap,
io::{Error, ErrorKind, Result},
iter,
net::{IpAddr, Ipv4Addr},
net::{IpAddr, Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
sync::{Arc, RwLock},
time::Instant,
Expand Down Expand Up @@ -493,7 +493,9 @@ impl LocalCluster {
mut voting_keypair: Option<Arc<Keypair>>,
socket_addr_space: SocketAddrSpace,
) -> Pubkey {
let client = self.build_tpu_quic_client().expect("tpu_client");
let client = self
.build_validator_tpu_quic_client(self.entry_point_info.pubkey())
.expect("tpu_client");

// Must have enough tokens to fund vote account and set delegate
let should_create_vote_pubkey = voting_keypair.is_none();
Expand Down Expand Up @@ -592,7 +594,9 @@ impl LocalCluster {
}

pub fn transfer(&self, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64) -> u64 {
let client = self.build_tpu_quic_client().expect("new tpu quic client");
let client = self
.build_validator_tpu_quic_client(self.entry_point_info.pubkey())
.expect("new tpu quic client");
Self::transfer_with_client(&client, source_keypair, dest_pubkey, lamports)
}

Expand Down Expand Up @@ -941,12 +945,12 @@ impl LocalCluster {
}
}

fn build_tpu_client<F>(&self, rpc_client_builder: F) -> Result<QuicTpuClient>
where
F: FnOnce(String) -> Arc<RpcClient>,
{
let rpc_pubsub_url = format!("ws://{}/", self.entry_point_info.rpc_pubsub().unwrap());
let rpc_url = format!("http://{}", self.entry_point_info.rpc().unwrap());
fn build_tpu_client(
&self,
rpc_client: Arc<RpcClient>,
rpc_pubsub_addr: SocketAddr,
) -> Result<QuicTpuClient> {
let rpc_pubsub_url = format!("ws://{}/", rpc_pubsub_addr);

let cache = match &*self.connection_cache {
ConnectionCache::Quic(cache) => cache,
Expand All @@ -959,7 +963,7 @@ impl LocalCluster {
};

let tpu_client = TpuClient::new_with_connection_cache(
rpc_client_builder(rpc_url),
rpc_client,
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
Expand All @@ -975,24 +979,22 @@ impl Cluster for LocalCluster {
self.validators.keys().cloned().collect()
}

fn get_validator_client(&self, pubkey: &Pubkey) -> Option<QuicTpuClient> {
self.validators.get(pubkey).map(|_| {
self.build_tpu_quic_client()
.expect("should build tpu quic client")
})
fn build_validator_tpu_quic_client(&self, pubkey: &Pubkey) -> Result<QuicTpuClient> {
let contact_info = self.get_contact_info(pubkey).unwrap();
let rpc_url: String = format!("http://{}", contact_info.rpc().unwrap());
let rpc_client = Arc::new(RpcClient::new(rpc_url));
self.build_tpu_client(rpc_client, contact_info.rpc_pubsub().unwrap())
}

fn build_tpu_quic_client(&self) -> Result<QuicTpuClient> {
self.build_tpu_client(|rpc_url| Arc::new(RpcClient::new(rpc_url)))
}

fn build_tpu_quic_client_with_commitment(
fn build_validator_tpu_quic_client_with_commitment(
&self,
pubkey: &Pubkey,
commitment_config: CommitmentConfig,
) -> Result<QuicTpuClient> {
self.build_tpu_client(|rpc_url| {
Arc::new(RpcClient::new_with_commitment(rpc_url, commitment_config))
})
let contact_info = self.get_contact_info(pubkey).unwrap();
let rpc_url = format!("http://{}", contact_info.rpc().unwrap());
let rpc_client = Arc::new(RpcClient::new_with_commitment(rpc_url, commitment_config));
self.build_tpu_client(rpc_client, contact_info.rpc_pubsub().unwrap())
}

fn exit_node(&mut self, pubkey: &Pubkey) -> ClusterValidatorInfo {
Expand Down
2 changes: 1 addition & 1 deletion local-cluster/src/local_cluster_snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl LocalCluster {
) -> NextSnapshotResult {
// Get slot after which this was generated
let client = self
.get_validator_client(self.entry_point_info.pubkey())
.build_validator_tpu_quic_client(self.entry_point_info.pubkey())
.unwrap();
let last_slot = client
.rpc_client()
Expand Down
59 changes: 43 additions & 16 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ fn test_local_cluster_signature_subscribe() {
.unwrap();
let non_bootstrap_info = cluster.get_contact_info(&non_bootstrap_id).unwrap();

let tx_client = cluster.build_tpu_quic_client().unwrap();
let tx_client = cluster
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap();

let (blockhash, _) = tx_client
.rpc_client()
Expand Down Expand Up @@ -431,7 +433,9 @@ fn test_mainnet_beta_cluster_type() {
.unwrap();
assert_eq!(cluster_nodes.len(), 1);

let client = cluster.build_tpu_quic_client().unwrap();
let client = cluster
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap();

// Programs that are available at epoch 0
for program_id in [
Expand Down Expand Up @@ -1002,7 +1006,7 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st
let timer = Instant::now();
loop {
let validator_current_slot = cluster
.get_validator_client(&validator_identity.pubkey())
.build_validator_tpu_quic_client(&validator_identity.pubkey())
.unwrap()
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::finalized())
Expand Down Expand Up @@ -1377,7 +1381,9 @@ fn test_snapshots_blockstore_floor() {
.into_iter()
.find(|x| x != cluster.entry_point_info.pubkey())
.unwrap();
let validator_client = cluster.get_validator_client(&validator_id).unwrap();
let validator_client = cluster
.build_validator_tpu_quic_client(&validator_id)
.unwrap();
let mut current_slot = 0;

// Let this validator run a while with repair
Expand Down Expand Up @@ -1611,7 +1617,7 @@ fn test_no_voting() {
};
let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);
let client = cluster
.get_validator_client(cluster.entry_point_info.pubkey())
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap();
loop {
let last_slot = client
Expand Down Expand Up @@ -1676,13 +1682,16 @@ fn test_optimistic_confirmation_violation_detection() {
// so that the vote on `S-1` is definitely in gossip and optimistic confirmation is
// detected on slot `S-1` for sure, then stop the heavier of the two
// validators
let client = cluster.get_validator_client(&node_to_restart).unwrap();
let client = cluster
.build_validator_tpu_quic_client(&node_to_restart)
.unwrap();
let mut prev_voted_slot = 0;
loop {
let last_voted_slot = client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::processed())
.unwrap();
info!("last voted slot: {}", last_voted_slot);
if last_voted_slot > 50 {
if prev_voted_slot == 0 {
prev_voted_slot = last_voted_slot;
Expand All @@ -1693,7 +1702,10 @@ fn test_optimistic_confirmation_violation_detection() {
sleep(Duration::from_millis(100));
}

info!("exiting node");
drop(client);
let exited_validator_info = cluster.exit_node(&node_to_restart);
info!("exiting node success");

// Mark fork as dead on the heavier validator, this should make the fork effectively
// dead, even though it was optimistically confirmed. The smaller validator should
Expand Down Expand Up @@ -1731,8 +1743,11 @@ fn test_optimistic_confirmation_violation_detection() {
// Wait for a root > prev_voted_slot to be set. Because the root is on a
// different fork than `prev_voted_slot`, then optimistic confirmation is
// violated
let client = cluster.get_validator_client(&node_to_restart).unwrap();
let client = cluster
.build_validator_tpu_quic_client(&node_to_restart)
.unwrap();
loop {
info!("Client connecting to: {}", client.rpc_client().url());
let last_root = client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::finalized())
Expand Down Expand Up @@ -1798,7 +1813,9 @@ fn test_validator_saves_tower() {
};
let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);

let validator_client = cluster.get_validator_client(&validator_id).unwrap();
let validator_client = cluster
.build_validator_tpu_quic_client(&validator_id)
.unwrap();

let ledger_path = cluster
.validators
Expand Down Expand Up @@ -1833,7 +1850,9 @@ fn test_validator_saves_tower() {

// Restart the validator and wait for a new root
cluster.restart_node(&validator_id, validator_info, SocketAddrSpace::Unspecified);
let validator_client = cluster.get_validator_client(&validator_id).unwrap();
let validator_client = cluster
.build_validator_tpu_quic_client(&validator_id)
.unwrap();

// Wait for the first new root
let last_replayed_root = loop {
Expand Down Expand Up @@ -1862,7 +1881,9 @@ fn test_validator_saves_tower() {
.unwrap();

cluster.restart_node(&validator_id, validator_info, SocketAddrSpace::Unspecified);
let validator_client = cluster.get_validator_client(&validator_id).unwrap();
let validator_client = cluster
.build_validator_tpu_quic_client(&validator_id)
.unwrap();

// Wait for a new root, demonstrating the validator was able to make progress from the older `tower1`
let new_root = loop {
Expand Down Expand Up @@ -1895,7 +1916,9 @@ fn test_validator_saves_tower() {
validator_info.config.require_tower = false;

cluster.restart_node(&validator_id, validator_info, SocketAddrSpace::Unspecified);
let validator_client = cluster.get_validator_client(&validator_id).unwrap();
let validator_client = cluster
.build_validator_tpu_quic_client(&validator_id)
.unwrap();

// Wait for another new root
let new_root = loop {
Expand Down Expand Up @@ -2553,11 +2576,11 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) {

let on_partition_start = |cluster: &mut LocalCluster, _: &mut ()| {
let update_client = cluster
.get_validator_client(cluster.entry_point_info.pubkey())
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap();
update_client_sender.send(update_client).unwrap();
let scan_client = cluster
.get_validator_client(cluster.entry_point_info.pubkey())
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap();
scan_client_sender.send(scan_client).unwrap();
};
Expand Down Expand Up @@ -2710,7 +2733,9 @@ fn test_oc_bad_signatures() {
);

// 3) Start up a spy to listen for and push votes to leader TPU
let client = cluster.build_tpu_quic_client().unwrap();
let client = cluster
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap();
let cluster_funding_keypair = cluster.funding_keypair.insecure_clone();
let voter_thread_sleep_ms: usize = 100;
let num_votes_simulated = Arc::new(AtomicUsize::new(0));
Expand Down Expand Up @@ -3080,10 +3105,12 @@ fn run_test_load_program_accounts(scan_commitment: CommitmentConfig) {
.find(|x| x != cluster.entry_point_info.pubkey())
.unwrap();
let client = cluster
.get_validator_client(cluster.entry_point_info.pubkey())
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap();
update_client_sender.send(client).unwrap();
let scan_client = cluster.get_validator_client(&other_validator_id).unwrap();
let scan_client = cluster
.build_validator_tpu_quic_client(&other_validator_id)
.unwrap();
scan_client_sender.send(scan_client).unwrap();

// Wait for some roots to pass
Expand Down

0 comments on commit 4bb6c4a

Please sign in to comment.