Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a more performant client to bench-tps #544

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions bench-tps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ license = { workspace = true }
edition = { workspace = true }

[dependencies]
bincode = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
crossbeam-channel = { workspace = true }
Expand Down Expand Up @@ -44,6 +45,7 @@ solana-transaction-status = { workspace = true }
solana-version = { workspace = true }
spl-instruction-padding = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
serial_test = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions bench-tps/src/bench_tps_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,6 @@ pub trait BenchTpsClient {
}

mod bank_client;
pub mod high_tps_client;
mod rpc_client;
mod tpu_client;
233 changes: 233 additions & 0 deletions bench-tps/src/bench_tps_client/high_tps_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
//! Client optimized for bench-tps: high mean TPS and moderate std
//! This is achieved by:
//! * optimizing the size of send batch
//! * sending batches using connection cache in nonblocking fashion
//! * use leader cache from TpuClient to minimize rpc calls
//! * using several staked connection caches (not implemented yet)
//!
use {
crate::bench_tps_client::{BenchTpsClient, BenchTpsError, Result},
solana_client::{
connection_cache::Protocol, nonblocking::tpu_client::LeaderTpuService,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you import this directly from solana_tpu_client::nonblocking::tpu_client::LeaderTpuService? sounds like we're eventually going to get rid of the solana_client according to Tyera

tpu_connection::TpuConnection,
},
solana_connection_cache::connection_cache::ConnectionCache as BackendConnectionCache,
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc_client::rpc_client::RpcClient,
solana_rpc_client_api::config::RpcBlockConfig,
solana_sdk::{
account::Account, commitment_config::CommitmentConfig, epoch_info::EpochInfo, hash::Hash,
message::Message, pubkey::Pubkey, signature::Signature, slot_history::Slot,
transaction::Transaction,
},
solana_transaction_status::UiConfirmedBlock,
std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};

#[derive(Debug, Clone, Copy)]
pub struct HighTpsClientConfig {
pub send_batch_size: usize,
pub fanout_slots: u64,
}

type ConnectionCache = BackendConnectionCache<QuicPool, QuicConnectionManager, QuicConfig>;
pub struct HighTpsClient {
leader_tpu_service: LeaderTpuService,
exit: Arc<AtomicBool>,
rpc_client: Arc<RpcClient>,
connection_cache: Arc<ConnectionCache>,
config: HighTpsClientConfig,
}

impl Drop for HighTpsClient {
fn drop(&mut self) {
self.exit.store(true, Ordering::Relaxed);
}
}

impl HighTpsClient {
pub fn new(
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: HighTpsClientConfig,
connection_cache: Arc<ConnectionCache>,
) -> Result<Self> {
let exit = Arc::new(AtomicBool::new(false));
let create_leader_tpu_service = LeaderTpuService::new(
rpc_client.get_inner_client().clone(),
websocket_url,
Protocol::QUIC,
exit.clone(),
);
let leader_tpu_service = tokio::task::block_in_place(|| {
rpc_client.runtime().block_on(create_leader_tpu_service)
})?;
Ok(Self {
leader_tpu_service,
exit,
rpc_client,
connection_cache,
config,
})
}
}

impl BenchTpsClient for HighTpsClient {
fn send_transaction(&self, transaction: Transaction) -> Result<Signature> {
self.rpc_client
.send_transaction(&transaction)
.map_err(|err| err.into())
}

fn send_batch(&self, transactions: Vec<Transaction>) -> Result<()> {
let wire_transactions = transactions
.into_iter() //.into_par_iter() any effect of this?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in response to comment here: probably if Vec<Transaction> is long. tpu_client uses .into_par_iter() so may make sense to use into_par_iter() but i haven't tested it to see performance diff

.map(|tx| bincode::serialize(&tx).expect("transaction should be valid."))
.collect::<Vec<_>>();
for c in wire_transactions.chunks(self.config.send_batch_size) {
let tpu_addresses = self

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possible rename tpu_addresses to leader_tpu_addresses??

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually context may clear enough.

.leader_tpu_service
.leader_tpu_sockets(self.config.fanout_slots);
for tpu_address in &tpu_addresses {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only sending to one leader here right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be to fanout_slots leaders.
In configuration that I specify 1 for this parameter because I think that we don't care how many txs have been timed out but only interested in maximazing TPS. Due to that it makes sense to utilize NIC bandwidth to send as many datagrams to one leader as possible (although it might be that for particular cluster configuration we cannot fully utilize it due to staked congestion control)

let conn = self.connection_cache.get_connection(tpu_address);
let _ = conn.send_data_batch_async(c.to_vec());
}
}
Ok(())
}
fn get_latest_blockhash(&self) -> Result<Hash> {
self.rpc_client
.get_latest_blockhash()
.map_err(|err| err.into())
}

fn get_latest_blockhash_with_commitment(
&self,
commitment_config: CommitmentConfig,
) -> Result<(Hash, u64)> {
self.rpc_client
.get_latest_blockhash_with_commitment(commitment_config)
.map_err(|err| err.into())
}

fn get_transaction_count(&self) -> Result<u64> {
self.rpc_client
.get_transaction_count()
.map_err(|err| err.into())
}

fn get_transaction_count_with_commitment(
&self,
commitment_config: CommitmentConfig,
) -> Result<u64> {
self.rpc_client
.get_transaction_count_with_commitment(commitment_config)
.map_err(|err| err.into())
}

fn get_epoch_info(&self) -> Result<EpochInfo> {
self.rpc_client.get_epoch_info().map_err(|err| err.into())
}

fn get_balance(&self, pubkey: &Pubkey) -> Result<u64> {
self.rpc_client
.get_balance(pubkey)
.map_err(|err| err.into())
}

fn get_balance_with_commitment(
&self,
pubkey: &Pubkey,
commitment_config: CommitmentConfig,
) -> Result<u64> {
self.rpc_client
.get_balance_with_commitment(pubkey, commitment_config)
.map(|res| res.value)
.map_err(|err| err.into())
}

fn get_fee_for_message(&self, message: &Message) -> Result<u64> {
self.rpc_client
.get_fee_for_message(message)
.map_err(|err| err.into())
}

fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> Result<u64> {
self.rpc_client
.get_minimum_balance_for_rent_exemption(data_len)
.map_err(|err| err.into())
}

fn addr(&self) -> String {
self.rpc_client.url()
}

fn request_airdrop_with_blockhash(
&self,
pubkey: &Pubkey,
lamports: u64,
recent_blockhash: &Hash,
) -> Result<Signature> {
self.rpc_client
.request_airdrop_with_blockhash(pubkey, lamports, recent_blockhash)
.map_err(|err| err.into())
}

fn get_account(&self, pubkey: &Pubkey) -> Result<Account> {
self.rpc_client
.get_account(pubkey)
.map_err(|err| err.into())
}

fn get_account_with_commitment(
&self,
pubkey: &Pubkey,
commitment_config: CommitmentConfig,
) -> Result<Account> {
self.rpc_client
.get_account_with_commitment(pubkey, commitment_config)
.map(|res| res.value)
.map_err(|err| err.into())
.and_then(|account| {
account.ok_or_else(|| {
BenchTpsError::Custom(format!("AccountNotFound: pubkey={pubkey}"))
})
})
}

fn get_multiple_accounts(&self, pubkeys: &[Pubkey]) -> Result<Vec<Option<Account>>> {
self.rpc_client
.get_multiple_accounts(pubkeys)
.map_err(|err| err.into())
}

fn get_slot_with_commitment(&self, commitment_config: CommitmentConfig) -> Result<Slot> {
self.rpc_client
.get_slot_with_commitment(commitment_config)
.map_err(|err| err.into())
}

fn get_blocks_with_commitment(
&self,
start_slot: Slot,
end_slot: Option<Slot>,
commitment_config: CommitmentConfig,
) -> Result<Vec<Slot>> {
self.rpc_client
.get_blocks_with_commitment(start_slot, end_slot, commitment_config)
.map_err(|err| err.into())
}

fn get_block_with_config(
&self,
slot: Slot,
rpc_block_config: RpcBlockConfig,
) -> Result<UiConfirmedBlock> {
self.rpc_client
.get_block_with_config(slot, rpc_block_config)
.map_err(|err| err.into())
}
}
13 changes: 13 additions & 0 deletions bench-tps/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum ExternalClientType {
// Submits transactions directly to leaders using a TpuClient, broadcasting to upcoming leaders
// via TpuClient default configuration
TpuClient,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to add comment here to detail where we send transactions (direct to leaders) and short summary of difference w/ respect to TpuClient

HighTpsClient,
}

impl Default for ExternalClientType {
Expand Down Expand Up @@ -338,6 +340,13 @@ pub fn build_args<'a>(version: &'_ str) -> App<'a, '_> {
.takes_value(false)
.help("Submit transactions with a TpuClient")
)
.arg(
Arg::with_name("high_tps_client")
.long("use-high-tps-client")
.conflicts_with("rpc_client")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need a .conflicts_with("tpu_client") here?

.takes_value(false)
.help("Submit transactions with a HighTpsClient")
)
.arg(
Arg::with_name("tpu_disable_quic")
.long("tpu-disable-quic")
Expand Down Expand Up @@ -479,6 +488,10 @@ pub fn parse_args(matches: &ArgMatches) -> Result<Config, &'static str> {
args.external_client_type = ExternalClientType::RpcClient;
}

if matches.is_present("high_tps_client") {
args.external_client_type = ExternalClientType::HighTpsClient;
}

if matches.is_present("tpu_disable_quic") {
args.use_quic = false;
}
Expand Down
31 changes: 30 additions & 1 deletion bench-tps/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use {
log::*,
solana_bench_tps::{
bench::{do_bench_tps, max_lamports_for_prioritization},
bench_tps_client::BenchTpsClient,
bench_tps_client::{
high_tps_client::{HighTpsClient, HighTpsClientConfig},
BenchTpsClient,
},
cli::{self, ExternalClientType},
keypairs::get_keypairs,
send_batch::{generate_durable_nonce_accounts, generate_keypairs},
Expand Down Expand Up @@ -162,6 +165,32 @@ fn create_client(
),
}
}
ExternalClientType::HighTpsClient => {
let rpc_client = Arc::new(RpcClient::new_with_commitment(
json_rpc_url.to_string(),
commitment_config,
));
match connection_cache {
ConnectionCache::Udp(_) => {
unimplemented!("UDP protocol is not supported.");
}
ConnectionCache::Quic(cache) => Arc::new(
HighTpsClient::new(
rpc_client,
websocket_url,
HighTpsClientConfig {
fanout_slots: 1,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

were your experiments run with fanout_slots: 1 and any reason you landed on 1? Does it make sense to make configurable via CLI? The code in high_tps_client.rs makes it seem like it should be configurable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrote in the other comment my thoughts about that. Regarding configuration in general, I would try to decrease the number of configurable parameters to simplify the ux of the tool. If we know some optimal parameters, I would prefer to hardcode them to decrease the mental to understand how to configure them. Currently, we have: number of threads, tx count, size of CC, sustained/not, timeout, ... I think we should just find one optimal set of parameters because it most of these are never touched. And if they are changed, it might mean that the defaults are not the most performant.

send_batch_size: 64,
},
cache,
)
.unwrap_or_else(|err| {
eprintln!("Could not create HighTpsClient {err:?}");
exit(1);
}),
),
}
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion tpu-client/src/nonblocking/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ impl LeaderTpuService {
self.recent_slots.estimated_current_slot()
}

fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec<SocketAddr> {
pub fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec<SocketAddr> {
let current_slot = self.recent_slots.estimated_current_slot();
self.leader_tpu_cache
.read()
Expand Down
Loading