Skip to content

Commit

Permalink
Add multi CC
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillLykov committed Apr 3, 2024
1 parent b62e9fa commit 148619a
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 64 deletions.
13 changes: 8 additions & 5 deletions bench-tps/src/bench_tps_client/high_tps_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
//!
use {
crate::bench_tps_client::{BenchTpsClient, BenchTpsError, Result},
rand::prelude::*,
solana_client::{
connection_cache::Protocol, nonblocking::tpu_client::LeaderTpuService,
tpu_connection::TpuConnection,
},
solana_connection_cache::connection_cache::ConnectionCache as BackendConnectionCache,
solana_connection_cache::connection_cache::{self, ConnectionCache as BackendConnectionCache},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc_client::rpc_client::RpcClient,
solana_rpc_client_api::config::RpcBlockConfig,
Expand All @@ -38,7 +39,7 @@ pub struct HighTpsClient {
leader_tpu_service: LeaderTpuService,
exit: Arc<AtomicBool>,
rpc_client: Arc<RpcClient>,
connection_cache: Arc<ConnectionCache>,
connection_caches: Vec<Arc<ConnectionCache>>,
config: HighTpsClientConfig,
}

Expand All @@ -53,7 +54,7 @@ impl HighTpsClient {
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: HighTpsClientConfig,
connection_cache: Arc<ConnectionCache>,
connection_caches: Vec<Arc<ConnectionCache>>,
) -> Result<Self> {
let exit = Arc::new(AtomicBool::new(false));
let create_leader_tpu_service = LeaderTpuService::new(
Expand All @@ -69,7 +70,7 @@ impl HighTpsClient {
leader_tpu_service,
exit,
rpc_client,
connection_cache,
connection_caches,
config,
})
}
Expand All @@ -87,12 +88,14 @@ impl BenchTpsClient for HighTpsClient {
.into_iter() //.into_par_iter() any effect of this?
.map(|tx| bincode::serialize(&tx).expect("transaction should be valid."))
.collect::<Vec<_>>();
let mut rng = rand::thread_rng();
let cc_index: usize = rng.gen_range(0..self.connection_caches.len());
for c in wire_transactions.chunks(self.config.send_batch_size) {
let tpu_addresses = self
.leader_tpu_service
.leader_tpu_sockets(self.config.fanout_slots);
for tpu_address in &tpu_addresses {
let conn = self.connection_cache.get_connection(tpu_address);
let conn = self.connection_caches[cc_index].get_connection(tpu_address);
let _ = conn.send_data_batch_async(c.to_vec());
}
}
Expand Down
15 changes: 10 additions & 5 deletions bench-tps/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub struct Config {
pub instruction_padding_config: Option<InstructionPaddingConfig>,
pub num_conflict_groups: Option<usize>,
pub bind_address: IpAddr,
pub client_node_id: Option<Keypair>,
pub client_node_ids: Option<Vec<Keypair>>,
pub commitment_config: CommitmentConfig,
pub block_data_file: Option<String>,
pub transaction_data_file: Option<String>,
Expand Down Expand Up @@ -111,7 +111,7 @@ impl Default for Config {
instruction_padding_config: None,
num_conflict_groups: None,
bind_address: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
client_node_id: None,
client_node_ids: None,
commitment_config: CommitmentConfig::confirmed(),
block_data_file: None,
transaction_data_file: None,
Expand Down Expand Up @@ -422,6 +422,7 @@ pub fn build_args<'a>(version: &'_ str) -> App<'a, '_> {
.takes_value(true)
.requires("json_rpc_url")
.validator(is_keypair)
.multiple(true)
.help("File containing the node identity (keypair) of a validator with active stake. This allows communicating with network using staked connection"),
)
.arg(
Expand Down Expand Up @@ -614,10 +615,14 @@ pub fn parse_args(matches: &ArgMatches) -> Result<Config, &'static str> {
solana_net_utils::parse_host(addr).map_err(|_| "Failed to parse bind-address")?;
}

if let Some(client_node_id_filename) = matches.value_of("client_node_id") {
if let Some(client_node_id_filenames) = matches.values_of("client_node_id") {
// error is checked by arg validator
let client_node_id = read_keypair_file(client_node_id_filename).map_err(|_| "")?;
args.client_node_id = Some(client_node_id);
let client_node_ids = client_node_id_filenames
.map(|filename| {
read_keypair_file(filename).expect("File has been checked by cla validator.")
})
.collect();
args.client_node_ids = Some(client_node_ids);
}

args.commitment_config = value_t_or_exit!(matches, "commitment_config", CommitmentConfig);
Expand Down
120 changes: 66 additions & 54 deletions bench-tps/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use {
send_batch::{generate_durable_nonce_accounts, generate_keypairs},
},
solana_client::connection_cache::ConnectionCache,
solana_connection_cache::connection_cache,
solana_genesis::Base64Account,
solana_rpc_client::rpc_client::RpcClient,
solana_sdk::{
Expand Down Expand Up @@ -78,54 +79,61 @@ fn create_connection_cache(
tpu_connection_pool_size: usize,
use_quic: bool,
bind_address: IpAddr,
client_node_id: Option<&Keypair>,
client_node_ids: Option<&Vec<Keypair>>,
commitment_config: CommitmentConfig,
) -> ConnectionCache {
) -> Vec<ConnectionCache> {
if !use_quic {
return ConnectionCache::with_udp(
return vec![ConnectionCache::with_udp(
"bench-tps-connection_cache_udp",
tpu_connection_pool_size,
);
)];
}
if client_node_id.is_none() {
return ConnectionCache::new_quic(
if client_node_ids.is_none() {
return vec![ConnectionCache::new_quic(
"bench-tps-connection_cache_quic",
tpu_connection_pool_size,
);
)];
}

let rpc_client = Arc::new(RpcClient::new_with_commitment(
json_rpc_url.to_string(),
commitment_config,
));

let client_node_id = client_node_id.unwrap();
let (stake, total_stake) =
find_node_activated_stake(rpc_client, client_node_id.pubkey()).unwrap_or_default();
info!("Stake for specified client_node_id: {stake}, total stake: {total_stake}");
let stakes = HashMap::from([
(client_node_id.pubkey(), stake),
(Pubkey::new_unique(), total_stake - stake),
]);
let staked_nodes = Arc::new(RwLock::new(StakedNodes::new(
Arc::new(stakes),
HashMap::<Pubkey, u64>::default(), // overrides
)));
ConnectionCache::new_with_client_options(
"bench-tps-connection_cache_quic",
tpu_connection_pool_size,
None,
Some((client_node_id, bind_address)),
Some((&staked_nodes, &client_node_id.pubkey())),
)
//TODO(klykov): can we use one HashMap for all the connections?
let client_node_ids = client_node_ids.unwrap();
let mut connection_caches = Vec::with_capacity(client_node_ids.len());
for client_node_id in client_node_ids {
let (stake, total_stake) =
find_node_activated_stake(rpc_client.clone(), client_node_id.pubkey())
.unwrap_or_default();
info!("Stake for specified client_node_id: {stake}, total stake: {total_stake}");
let stakes = HashMap::from([
(client_node_id.pubkey(), stake),
(Pubkey::new_unique(), total_stake - stake),
]);
let staked_nodes = Arc::new(RwLock::new(StakedNodes::new(
Arc::new(stakes),
HashMap::<Pubkey, u64>::default(), // overrides
)));
let cc = ConnectionCache::new_with_client_options(
"bench-tps-connection_cache_quic",
tpu_connection_pool_size,
None,
Some((client_node_id, bind_address)),
Some((&staked_nodes, &client_node_id.pubkey())),
);
connection_caches.push(cc);
}
connection_caches
}

#[allow(clippy::too_many_arguments)]
fn create_client(
external_client_type: &ExternalClientType,
json_rpc_url: &str,
websocket_url: &str,
connection_cache: ConnectionCache,
connection_caches: Vec<ConnectionCache>,
commitment_config: CommitmentConfig,
) -> Arc<dyn BenchTpsClient + Send + Sync> {
match external_client_type {
Expand All @@ -138,13 +146,13 @@ fn create_client(
json_rpc_url.to_string(),
commitment_config,
));
match connection_cache {
match &connection_caches[0] {
ConnectionCache::Udp(cache) => Arc::new(
TpuClient::new_with_connection_cache(
rpc_client,
websocket_url,
TpuClientConfig::default(),
cache,
cache.clone(),
)
.unwrap_or_else(|err| {
eprintln!("Could not create TpuClient {err:?}");
Expand All @@ -156,7 +164,7 @@ fn create_client(
rpc_client,
websocket_url,
TpuClientConfig::default(),
cache,
cache.clone(),
)
.unwrap_or_else(|err| {
eprintln!("Could not create TpuClient {err:?}");
Expand All @@ -170,26 +178,30 @@ fn create_client(
json_rpc_url.to_string(),
commitment_config,
));
match connection_cache {
ConnectionCache::Udp(_) => {
unimplemented!("UDP protocol is not supported.");
}
ConnectionCache::Quic(cache) => Arc::new(
HighTpsClient::new(
rpc_client,
websocket_url,
HighTpsClientConfig {
fanout_slots: 1,
send_batch_size: 64,
},
cache,
)
.unwrap_or_else(|err| {
eprintln!("Could not create HighTpsClient {err:?}");
exit(1);
}),
),
}
let caches = connection_caches
.iter()
.map(|cache| match cache {
ConnectionCache::Udp(_) => {
unimplemented!("UDP protocol is not supported.");
}
ConnectionCache::Quic(cache) => cache.clone(),
})
.collect();
Arc::new(
HighTpsClient::new(
rpc_client,
websocket_url,
HighTpsClientConfig {
fanout_slots: 1,
send_batch_size: 64,
},
caches,
)
.unwrap_or_else(|err| {
eprintln!("Could not create HighTpsClient {err:?}");
exit(1);
}),
)
}
}
}
Expand Down Expand Up @@ -226,7 +238,7 @@ fn main() {
use_durable_nonce,
instruction_padding_config,
bind_address,
client_node_id,
client_node_ids,
commitment_config,
..
} = &cli_config;
Expand Down Expand Up @@ -264,19 +276,19 @@ fn main() {
return;
}

let connection_cache = create_connection_cache(
let connection_caches = create_connection_cache(
json_rpc_url,
*tpu_connection_pool_size,
*use_quic,
*bind_address,
client_node_id.as_ref(),
client_node_ids.as_ref(),
*commitment_config,
);
let client = create_client(
external_client_type,
json_rpc_url,
websocket_url,
connection_cache,
connection_caches,
*commitment_config,
);
if let Some(instruction_padding_config) = instruction_padding_config {
Expand Down

0 comments on commit 148619a

Please sign in to comment.