Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch build_tpu_quic_client to actually connect to the given validator #4151

Merged
merged 3 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions bench-tps/tests/bench_tps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,13 @@ fn test_bench_tps_local_cluster(config: Config) {

cluster.transfer(&cluster.funding_keypair, &faucet_pubkey, 100_000_000);

let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}));
let client = Arc::new(
cluster
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}),
);

let lamports_per_account = 100;

Expand Down
20 changes: 14 additions & 6 deletions dos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -970,9 +970,13 @@ pub mod test {
let node = cluster.get_contact_info(&nodes[0]).unwrap().clone();
let nodes_slice = [node];

let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}));
let client = Arc::new(
cluster
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}),
);

// creates one transaction with 8 valid signatures and sends it 10 times
run_dos(
Expand Down Expand Up @@ -1100,9 +1104,13 @@ pub mod test {
let node = cluster.get_contact_info(&nodes[0]).unwrap().clone();
let nodes_slice = [node];

let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}));
let client = Arc::new(
cluster
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}),
);

// creates one transaction and sends it 10 times
// this is done in single thread
Expand Down
6 changes: 3 additions & 3 deletions local-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ impl ClusterValidatorInfo {

pub trait Cluster {
fn get_node_pubkeys(&self) -> Vec<Pubkey>;
fn get_validator_client(&self, pubkey: &Pubkey) -> Option<QuicTpuClient>;
fn build_tpu_quic_client(&self) -> Result<QuicTpuClient>;
fn build_tpu_quic_client_with_commitment(
fn build_validator_tpu_quic_client(&self, pubkey: &Pubkey) -> Result<QuicTpuClient>;
fn build_validator_tpu_quic_client_with_commitment(
&self,
pubkey: &Pubkey,
commitment_config: CommitmentConfig,
) -> Result<QuicTpuClient>;
fn get_contact_info(&self, pubkey: &Pubkey) -> Option<&ContactInfo>;
Expand Down
48 changes: 25 additions & 23 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use {
collections::HashMap,
io::{Error, ErrorKind, Result},
iter,
net::{IpAddr, Ipv4Addr},
net::{IpAddr, Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
sync::{Arc, RwLock},
time::Instant,
Expand Down Expand Up @@ -493,7 +493,9 @@ impl LocalCluster {
mut voting_keypair: Option<Arc<Keypair>>,
socket_addr_space: SocketAddrSpace,
) -> Pubkey {
let client = self.build_tpu_quic_client().expect("tpu_client");
let client = self
.build_validator_tpu_quic_client(self.entry_point_info.pubkey())
.expect("tpu_client");

// Must have enough tokens to fund vote account and set delegate
let should_create_vote_pubkey = voting_keypair.is_none();
Expand Down Expand Up @@ -592,7 +594,9 @@ impl LocalCluster {
}

pub fn transfer(&self, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64) -> u64 {
let client = self.build_tpu_quic_client().expect("new tpu quic client");
let client = self
.build_validator_tpu_quic_client(self.entry_point_info.pubkey())
.expect("new tpu quic client");
Self::transfer_with_client(&client, source_keypair, dest_pubkey, lamports)
}

Expand Down Expand Up @@ -941,12 +945,12 @@ impl LocalCluster {
}
}

fn build_tpu_client<F>(&self, rpc_client_builder: F) -> Result<QuicTpuClient>
where
F: FnOnce(String) -> Arc<RpcClient>,
{
let rpc_pubsub_url = format!("ws://{}/", self.entry_point_info.rpc_pubsub().unwrap());
let rpc_url = format!("http://{}", self.entry_point_info.rpc().unwrap());
fn build_tpu_client(
&self,
rpc_client: Arc<RpcClient>,
rpc_pubsub_addr: SocketAddr,
) -> Result<QuicTpuClient> {
let rpc_pubsub_url = format!("ws://{}/", rpc_pubsub_addr);

let cache = match &*self.connection_cache {
ConnectionCache::Quic(cache) => cache,
Expand All @@ -959,7 +963,7 @@ impl LocalCluster {
};

let tpu_client = TpuClient::new_with_connection_cache(
rpc_client_builder(rpc_url),
rpc_client,
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
Expand All @@ -975,24 +979,22 @@ impl Cluster for LocalCluster {
self.validators.keys().cloned().collect()
}

fn get_validator_client(&self, pubkey: &Pubkey) -> Option<QuicTpuClient> {
self.validators.get(pubkey).map(|_| {
self.build_tpu_quic_client()
.expect("should build tpu quic client")
})
fn build_validator_tpu_quic_client(&self, pubkey: &Pubkey) -> Result<QuicTpuClient> {
let contact_info = self.get_contact_info(pubkey).unwrap();
let rpc_url: String = format!("http://{}", contact_info.rpc().unwrap());
let rpc_client = Arc::new(RpcClient::new(rpc_url));
self.build_tpu_client(rpc_client, contact_info.rpc_pubsub().unwrap())
}

fn build_tpu_quic_client(&self) -> Result<QuicTpuClient> {
self.build_tpu_client(|rpc_url| Arc::new(RpcClient::new(rpc_url)))
}

fn build_tpu_quic_client_with_commitment(
fn build_validator_tpu_quic_client_with_commitment(
&self,
pubkey: &Pubkey,
commitment_config: CommitmentConfig,
) -> Result<QuicTpuClient> {
self.build_tpu_client(|rpc_url| {
Arc::new(RpcClient::new_with_commitment(rpc_url, commitment_config))
})
let contact_info = self.get_contact_info(pubkey).unwrap();
let rpc_url = format!("http://{}", contact_info.rpc().unwrap());
let rpc_client = Arc::new(RpcClient::new_with_commitment(rpc_url, commitment_config));
self.build_tpu_client(rpc_client, contact_info.rpc_pubsub().unwrap())
}

fn exit_node(&mut self, pubkey: &Pubkey) -> ClusterValidatorInfo {
Expand Down
2 changes: 1 addition & 1 deletion local-cluster/src/local_cluster_snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl LocalCluster {
) -> NextSnapshotResult {
// Get slot after which this was generated
let client = self
.get_validator_client(self.entry_point_info.pubkey())
.build_validator_tpu_quic_client(self.entry_point_info.pubkey())
.unwrap();
let last_slot = client
.rpc_client()
Expand Down
59 changes: 43 additions & 16 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ fn test_local_cluster_signature_subscribe() {
.unwrap();
let non_bootstrap_info = cluster.get_contact_info(&non_bootstrap_id).unwrap();

let tx_client = cluster.build_tpu_quic_client().unwrap();
let tx_client = cluster
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap();

let (blockhash, _) = tx_client
.rpc_client()
Expand Down Expand Up @@ -431,7 +433,9 @@ fn test_mainnet_beta_cluster_type() {
.unwrap();
assert_eq!(cluster_nodes.len(), 1);

let client = cluster.build_tpu_quic_client().unwrap();
let client = cluster
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap();

// Programs that are available at epoch 0
for program_id in [
Expand Down Expand Up @@ -1002,7 +1006,7 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st
let timer = Instant::now();
loop {
let validator_current_slot = cluster
.get_validator_client(&validator_identity.pubkey())
.build_validator_tpu_quic_client(&validator_identity.pubkey())
.unwrap()
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::finalized())
Expand Down Expand Up @@ -1377,7 +1381,9 @@ fn test_snapshots_blockstore_floor() {
.into_iter()
.find(|x| x != cluster.entry_point_info.pubkey())
.unwrap();
let validator_client = cluster.get_validator_client(&validator_id).unwrap();
let validator_client = cluster
.build_validator_tpu_quic_client(&validator_id)
.unwrap();
let mut current_slot = 0;

// Let this validator run a while with repair
Expand Down Expand Up @@ -1611,7 +1617,7 @@ fn test_no_voting() {
};
let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);
let client = cluster
.get_validator_client(cluster.entry_point_info.pubkey())
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap();
loop {
let last_slot = client
Expand Down Expand Up @@ -1676,13 +1682,16 @@ fn test_optimistic_confirmation_violation_detection() {
// so that the vote on `S-1` is definitely in gossip and optimistic confirmation is
// detected on slot `S-1` for sure, then stop the heavier of the two
// validators
let client = cluster.get_validator_client(&node_to_restart).unwrap();
let client = cluster
.build_validator_tpu_quic_client(&node_to_restart)
.unwrap();
let mut prev_voted_slot = 0;
loop {
let last_voted_slot = client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::processed())
.unwrap();
info!("last voted slot: {}", last_voted_slot);
if last_voted_slot > 50 {
if prev_voted_slot == 0 {
prev_voted_slot = last_voted_slot;
Expand All @@ -1693,7 +1702,10 @@ fn test_optimistic_confirmation_violation_detection() {
sleep(Duration::from_millis(100));
}

info!("exiting node");
drop(client);
let exited_validator_info = cluster.exit_node(&node_to_restart);
info!("exiting node success");

// Mark fork as dead on the heavier validator, this should make the fork effectively
// dead, even though it was optimistically confirmed. The smaller validator should
Expand Down Expand Up @@ -1731,8 +1743,11 @@ fn test_optimistic_confirmation_violation_detection() {
// Wait for a root > prev_voted_slot to be set. Because the root is on a
// different fork than `prev_voted_slot`, then optimistic confirmation is
// violated
let client = cluster.get_validator_client(&node_to_restart).unwrap();
let client = cluster
.build_validator_tpu_quic_client(&node_to_restart)
.unwrap();
loop {
info!("Client connecting to: {}", client.rpc_client().url());
let last_root = client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::finalized())
Expand Down Expand Up @@ -1798,7 +1813,9 @@ fn test_validator_saves_tower() {
};
let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);

let validator_client = cluster.get_validator_client(&validator_id).unwrap();
let validator_client = cluster
.build_validator_tpu_quic_client(&validator_id)
.unwrap();

let ledger_path = cluster
.validators
Expand Down Expand Up @@ -1833,7 +1850,9 @@ fn test_validator_saves_tower() {

// Restart the validator and wait for a new root
cluster.restart_node(&validator_id, validator_info, SocketAddrSpace::Unspecified);
let validator_client = cluster.get_validator_client(&validator_id).unwrap();
let validator_client = cluster
.build_validator_tpu_quic_client(&validator_id)
.unwrap();

// Wait for the first new root
let last_replayed_root = loop {
Expand Down Expand Up @@ -1862,7 +1881,9 @@ fn test_validator_saves_tower() {
.unwrap();

cluster.restart_node(&validator_id, validator_info, SocketAddrSpace::Unspecified);
let validator_client = cluster.get_validator_client(&validator_id).unwrap();
let validator_client = cluster
.build_validator_tpu_quic_client(&validator_id)
.unwrap();

// Wait for a new root, demonstrating the validator was able to make progress from the older `tower1`
let new_root = loop {
Expand Down Expand Up @@ -1895,7 +1916,9 @@ fn test_validator_saves_tower() {
validator_info.config.require_tower = false;

cluster.restart_node(&validator_id, validator_info, SocketAddrSpace::Unspecified);
let validator_client = cluster.get_validator_client(&validator_id).unwrap();
let validator_client = cluster
.build_validator_tpu_quic_client(&validator_id)
.unwrap();

// Wait for another new root
let new_root = loop {
Expand Down Expand Up @@ -2553,11 +2576,11 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) {

let on_partition_start = |cluster: &mut LocalCluster, _: &mut ()| {
let update_client = cluster
.get_validator_client(cluster.entry_point_info.pubkey())
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap();
update_client_sender.send(update_client).unwrap();
let scan_client = cluster
.get_validator_client(cluster.entry_point_info.pubkey())
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap();
scan_client_sender.send(scan_client).unwrap();
};
Expand Down Expand Up @@ -2710,7 +2733,9 @@ fn test_oc_bad_signatures() {
);

// 3) Start up a spy to listen for and push votes to leader TPU
let client = cluster.build_tpu_quic_client().unwrap();
let client = cluster
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap();
let cluster_funding_keypair = cluster.funding_keypair.insecure_clone();
let voter_thread_sleep_ms: usize = 100;
let num_votes_simulated = Arc::new(AtomicUsize::new(0));
Expand Down Expand Up @@ -3080,10 +3105,12 @@ fn run_test_load_program_accounts(scan_commitment: CommitmentConfig) {
.find(|x| x != cluster.entry_point_info.pubkey())
.unwrap();
let client = cluster
.get_validator_client(cluster.entry_point_info.pubkey())
.build_validator_tpu_quic_client(cluster.entry_point_info.pubkey())
.unwrap();
update_client_sender.send(client).unwrap();
let scan_client = cluster.get_validator_client(&other_validator_id).unwrap();
let scan_client = cluster
.build_validator_tpu_quic_client(&other_validator_id)
.unwrap();
scan_client_sender.send(scan_client).unwrap();

// Wait for some roots to pass
Expand Down
Loading