From d8b7f26a9930a21039bba7336c0c521f02dd9c08 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 24 Oct 2024 16:19:45 -0700 Subject: [PATCH 1/3] bench-vote testing --- Cargo.lock | 16 ++ Cargo.toml | 1 + bench-vote/.gitignore | 2 + bench-vote/Cargo.toml | 24 +++ bench-vote/src/main.rs | 331 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 374 insertions(+) create mode 100644 bench-vote/.gitignore create mode 100644 bench-vote/Cargo.toml create mode 100644 bench-vote/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 042ea8f0e60bca..efeca9031a9c27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6155,6 +6155,22 @@ dependencies = [ "thiserror 2.0.6", ] +[[package]] +name = "solana-bench-vote" +version = "2.2.0" +dependencies = [ + "bincode", + "clap 3.2.23", + "crossbeam-channel", + "solana-client", + "solana-connection-cache", + "solana-net-utils", + "solana-sdk", + "solana-streamer", + "solana-version", + "solana-vote-program", +] + [[package]] name = "solana-bincode" version = "2.2.0" diff --git a/Cargo.toml b/Cargo.toml index 21e2002ee5b3af..df4a5ff9161bfb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ members = [ "banks-server", "bench-streamer", "bench-tps", + "bench-vote", "bloom", "bucket_map", "builtins", diff --git a/bench-vote/.gitignore b/bench-vote/.gitignore new file mode 100644 index 00000000000000..5404b132dba6e1 --- /dev/null +++ b/bench-vote/.gitignore @@ -0,0 +1,2 @@ +/target/ +/farf/ diff --git a/bench-vote/Cargo.toml b/bench-vote/Cargo.toml new file mode 100644 index 00000000000000..54fa6ec87f0014 --- /dev/null +++ b/bench-vote/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "solana-bench-vote" +publish = false +version = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } + +[dependencies] +bincode = { workspace = true } +clap = { version = "3.1.5", features = ["cargo"] } +crossbeam-channel = { workspace = true } +solana-client = { workspace = true } +solana-connection-cache = { workspace = true } +solana-net-utils = { workspace = true } +solana-sdk = { workspace = true } +solana-streamer = { workspace = true } +solana-version = { workspace = true } +solana-vote-program = { workspace = true } + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/bench-vote/src/main.rs b/bench-vote/src/main.rs new file mode 100644 index 00000000000000..399c0b33431dbc --- /dev/null +++ b/bench-vote/src/main.rs @@ -0,0 +1,331 @@ +#![allow(clippy::arithmetic_side_effects)] + +use { + clap::{crate_description, crate_name, Arg, Command}, + crossbeam_channel::unbounded, + solana_client::connection_cache::ConnectionCache, + solana_connection_cache::client_connection::ClientConnection, + solana_net_utils::bind_to_unspecified, + solana_sdk::{ + hash::Hash, message::Message, signature::Keypair, signer::Signer, transaction::Transaction, + }, + solana_streamer::{ + packet::PacketBatchRecycler, + streamer::{receiver, PacketBatchReceiver, StreamerReceiveStats}, + }, + solana_vote_program::{vote_instruction, vote_state::Vote}, + std::{ + cmp::max, + net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, + }, + thread::{self, spawn, JoinHandle, Result}, + time::{Duration, Instant, SystemTime}, + }, +}; + +fn sink( + exit: Arc, + rvs: Arc, + r: PacketBatchReceiver, + verbose: bool, +) -> JoinHandle<()> { + spawn(move || { + let mut last_report = Instant::now(); + let interval: Duration = Duration::from_secs(5); + loop { + if exit.load(Ordering::Relaxed) { + return; + } + let timer = Duration::new(1, 0); + if let Ok(packet_batch) = r.recv_timeout(timer) { + rvs.fetch_add(packet_batch.len(), Ordering::Relaxed); + } + + let count = rvs.load(Ordering::Relaxed); + + if verbose && + /*count % 1000 == 0 && count != 0 &&*/ + last_report.elapsed() > interval + { + println!("Received txns count: {count}"); + last_report = Instant::now(); + } + } + }) +} + +const TRANSACTIONS_PER_THREAD: u64 = 1000000; // Number of transactions per thread + +fn main() -> Result<()> { + let mut num_sockets = 1usize; + + let matches = Command::new(crate_name!()) + .about(crate_description!()) + .version(solana_version::version!()) + .arg( + Arg::new("num-recv-sockets") + .long("num-recv-sockets") + .value_name("NUM") + .takes_value(true) + .help("Use NUM receive sockets"), + ) + .arg( + Arg::new("num-producers") + .long("num-producers") + .value_name("NUM") + .takes_value(true) + .help("Use this many producer threads."), + ) + .arg( + Arg::new("server-only") + .long("server-only") + .takes_value(false) + .help("Use this many producer threads."), + ) + .arg( + Arg::new("client-only") + .long("client-only") + .takes_value(false) + .requires("destination") + .help("Use this many producer threads."), + ) + .arg( + Arg::with_name("server-address") + .short('n') + .long("server-address") + .value_name("HOST:PORT") + .takes_value(true) + .validator(|arg| solana_net_utils::is_host_port(arg.to_string())) + .help("The destination streamer address to which the client will send transactions to"), + ) + .arg( + Arg::new("use-connection-cache") + .long("use-connection-cache") + .takes_value(false) + .help("Use this many producer threads."), + ) + .arg( + Arg::new("verbose") + .long("verbose") + .takes_value(false) + .help("Show verbose messages."), + ) + .get_matches(); + + if let Some(n) = matches.value_of("num-recv-sockets") { + num_sockets = max(num_sockets, n.to_string().parse().expect("integer")); + } + + let num_producers: u64 = matches.value_of_t("num-producers").unwrap_or(4); + + let use_connection_cache = matches.is_present("use-connection-cache"); + + let server_only = matches.is_present("server-only"); + let client_only = matches.is_present("client-only"); + let verbose = matches.is_present("verbose"); + + let destination = matches.is_present("server-address").then(|| { + let addr = matches + .value_of("server-address") + .expect("Destination must be set when --client-only is used"); + solana_net_utils::parse_host_port(addr).expect("Expecting a valid server address") + }); + + let port = destination.map_or(0, |addr| addr.port()); + let ip_addr = destination.map_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED), |addr| addr.ip()); + + let (exit, read_threads, sin_threads, destination) = if !client_only { + let exit = Arc::new(AtomicBool::new(false)); + + let mut read_channels = Vec::new(); + let mut read_threads = Vec::new(); + let recycler = PacketBatchRecycler::default(); + let (port, read_sockets) = solana_net_utils::multi_bind_in_range( + ip_addr, + (port, port + num_sockets as u16), + num_sockets, + ) + .unwrap(); + let stats = Arc::new(StreamerReceiveStats::new("bench-streamer-test")); + for read in read_sockets { + read.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); + + let (s_reader, r_reader) = unbounded(); + read_channels.push(r_reader); + read_threads.push(receiver( + "solRcvrBenStrmr".to_string(), + Arc::new(read), + exit.clone(), + s_reader, + recycler.clone(), + stats.clone(), + Duration::from_millis(1), // coalesce + true, + None, + false, + )); + } + + let rvs = Arc::new(AtomicUsize::new(0)); + let sink_threads: Vec<_> = read_channels + .into_iter() + .map(|r_reader| sink(exit.clone(), rvs.clone(), r_reader, verbose)) + .collect(); + + let destination = SocketAddr::new(ip_addr, port); + println!("Running server at {destination:?}"); + ( + Some(exit), + Some(read_threads), + Some(sink_threads), + destination, + ) + } else { + (None, None, None, destination.unwrap()) + }; + + let start = SystemTime::now(); + + let producer_threads = if !server_only { + let producer_threads: Vec<_> = + producer(destination, num_producers, use_connection_cache, verbose); + Some(producer_threads) + } else { + None + }; + + // sleep(Duration::new(5, 0)); + + if let Some(producer_threads) = producer_threads { + for t_producer in producer_threads { + t_producer.join()?; + } + } + + if !server_only { + if let Some(exit) = exit { + exit.store(true, Ordering::Relaxed); + } + } else { + println!("To stop the server, please press ^C"); + } + + if let Some(read_threads) = read_threads { + for t_reader in read_threads { + t_reader.join()?; + } + } + + if let Some(sink_threads) = sin_threads { + for t_sink in sink_threads { + t_sink.join()?; + } + } + + if !(server_only) { + let elapsed = start.elapsed().unwrap(); + let time = elapsed.as_secs() * 10_000_000_000 + u64::from(elapsed.subsec_nanos()); + let ftime = (time as f64) / 10_000_000_000_f64; + let fcount = (TRANSACTIONS_PER_THREAD * num_producers) as f64; + + println!( + "performance: {:?}, count: {fcount}, time: {ftime}", + fcount / ftime + ); + } + Ok(()) +} + +#[derive(Clone)] +enum Transporter { + Cache(Arc), + DirectSocket(Arc), +} + +fn producer( + sock: SocketAddr, + num_producers: u64, + use_connection_cache: bool, + verbose: bool, +) -> Vec> { + println!("Running clients against {sock:?}"); + let transporter = if use_connection_cache { + Transporter::Cache(Arc::new(ConnectionCache::with_udp( + "connection_cache_vote_udp", + 1, + ))) + } else { + Transporter::DirectSocket(Arc::new(bind_to_unspecified().unwrap())) + }; + + let mut handles = vec![]; + + let current_slot: u64 = 0; + + let identity_keypair = Keypair::new(); // Replace with loaded keypair + + for _i in 0..num_producers { + let transporter = transporter.clone(); + let identity_keypair = identity_keypair.insecure_clone(); + handles.push(thread::spawn(move || { + // Generate and send transactions + for _j in 0..TRANSACTIONS_PER_THREAD { + // Create a vote instruction + let vote = Vote { + slots: vec![current_slot], // Voting for the current slot + hash: Hash::new_unique(), + timestamp: None, // Optional timestamp + }; + + let vote_instruction = vote_instruction::vote( + &identity_keypair.pubkey(), + &identity_keypair.pubkey(), + vote, + ); + + // Build the transaction + let message = Message::new(&[vote_instruction], Some(&identity_keypair.pubkey())); + + let recent_blockhash = Hash::new_unique(); + let transaction = Transaction::new(&[&identity_keypair], message, recent_blockhash); + + let serialized_transaction = bincode::serialize(&transaction).unwrap(); + + match &transporter { + Transporter::Cache(cache) => { + let connection = cache.get_connection(&sock); + + match connection.send_data(&serialized_transaction) { + Ok(_) => { + if verbose { + println!("Sent transaction successfully"); + } + } + Err(ex) => { + println!("Error sending transaction {ex:?}"); + } + } + } + Transporter::DirectSocket(socket) => { + match socket.send_to(&serialized_transaction, sock) { + Ok(_) => { + if verbose { + println!( + "Sent transaction via direct socket successfully {sock:?}" + ); + } + } + Err(ex) => { + println!("Error sending transaction {ex:?}"); + } + } + } + } + } + })); + } + handles +} From 4509323d74337fb1d6ae9d45f858be95c59eaea3 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Fri, 13 Dec 2024 12:17:56 -0800 Subject: [PATCH 2/3] addressed some feedback from Behzad --- bench-vote/.gitignore | 2 - bench-vote/src/main.rs | 100 +++++++++++++++++------------------------ 2 files changed, 42 insertions(+), 60 deletions(-) delete mode 100644 bench-vote/.gitignore diff --git a/bench-vote/.gitignore b/bench-vote/.gitignore deleted file mode 100644 index 5404b132dba6e1..00000000000000 --- a/bench-vote/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -/target/ -/farf/ diff --git a/bench-vote/src/main.rs b/bench-vote/src/main.rs index 399c0b33431dbc..ece54b79244ec7 100644 --- a/bench-vote/src/main.rs +++ b/bench-vote/src/main.rs @@ -26,30 +26,27 @@ use { }, }; +const SINK_REPORT_INTERVAL: Duration = Duration::from_secs(5); +const SINK_RECEIVE_TIMEOUT: Duration = Duration::from_secs(1); +const SOCKET_RECEIVE_TIMEOUT: Duration = Duration::from_secs(1); +const COALESCE_TIME: Duration = Duration::from_millis(1); + fn sink( exit: Arc, - rvs: Arc, - r: PacketBatchReceiver, + received_size: Arc, + receiver: PacketBatchReceiver, verbose: bool, ) -> JoinHandle<()> { spawn(move || { let mut last_report = Instant::now(); - let interval: Duration = Duration::from_secs(5); - loop { - if exit.load(Ordering::Relaxed) { - return; - } - let timer = Duration::new(1, 0); - if let Ok(packet_batch) = r.recv_timeout(timer) { - rvs.fetch_add(packet_batch.len(), Ordering::Relaxed); + while !exit.load(Ordering::Relaxed) { + if let Ok(packet_batch) = receiver.recv_timeout(SINK_RECEIVE_TIMEOUT) { + received_size.fetch_add(packet_batch.len(), Ordering::Relaxed); } - let count = rvs.load(Ordering::Relaxed); + let count = received_size.load(Ordering::Relaxed); - if verbose && - /*count % 1000 == 0 && count != 0 &&*/ - last_report.elapsed() > interval - { + if verbose && last_report.elapsed() > SINK_REPORT_INTERVAL { println!("Received txns count: {count}"); last_report = Instant::now(); } @@ -57,11 +54,9 @@ fn sink( }) } -const TRANSACTIONS_PER_THREAD: u64 = 1000000; // Number of transactions per thread +const TRANSACTIONS_PER_THREAD: u64 = 1_000_000; // Number of transactions per thread fn main() -> Result<()> { - let mut num_sockets = 1usize; - let matches = Command::new(crate_name!()) .about(crate_description!()) .version(solana_version::version!()) @@ -83,14 +78,14 @@ fn main() -> Result<()> { Arg::new("server-only") .long("server-only") .takes_value(false) - .help("Use this many producer threads."), + .help("Run the bench tool as a server only."), ) .arg( Arg::new("client-only") .long("client-only") .takes_value(false) - .requires("destination") - .help("Use this many producer threads."), + .requires("server-address") + .help("Run the bench tool as a client only."), ) .arg( Arg::with_name("server-address") @@ -115,6 +110,7 @@ fn main() -> Result<()> { ) .get_matches(); + let mut num_sockets = 1usize; if let Some(n) = matches.value_of("num-recv-sockets") { num_sockets = max(num_sockets, n.to_string().parse().expect("integer")); } @@ -137,7 +133,7 @@ fn main() -> Result<()> { let port = destination.map_or(0, |addr| addr.port()); let ip_addr = destination.map_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED), |addr| addr.ip()); - let (exit, read_threads, sin_threads, destination) = if !client_only { + let (exit, read_threads, sink_threads, destination) = if !client_only { let exit = Arc::new(AtomicBool::new(false)); let mut read_channels = Vec::new(); @@ -151,7 +147,7 @@ fn main() -> Result<()> { .unwrap(); let stats = Arc::new(StreamerReceiveStats::new("bench-streamer-test")); for read in read_sockets { - read.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); + read.set_read_timeout(Some(SOCKET_RECEIVE_TIMEOUT)).unwrap(); let (s_reader, r_reader) = unbounded(); read_channels.push(r_reader); @@ -162,17 +158,17 @@ fn main() -> Result<()> { s_reader, recycler.clone(), stats.clone(), - Duration::from_millis(1), // coalesce - true, - None, - false, + COALESCE_TIME, // coalesce + true, // use_pinned_memory + None, // in_vote_only_mode + false, // is_staked_service )); } - let rvs = Arc::new(AtomicUsize::new(0)); + let received_size = Arc::new(AtomicUsize::new(0)); let sink_threads: Vec<_> = read_channels .into_iter() - .map(|r_reader| sink(exit.clone(), rvs.clone(), r_reader, verbose)) + .map(|r_reader| sink(exit.clone(), received_size.clone(), r_reader, verbose)) .collect(); let destination = SocketAddr::new(ip_addr, port); @@ -189,21 +185,13 @@ fn main() -> Result<()> { let start = SystemTime::now(); - let producer_threads = if !server_only { - let producer_threads: Vec<_> = - producer(destination, num_producers, use_connection_cache, verbose); - Some(producer_threads) - } else { - None - }; + let producer_threads = + (!server_only).then(|| producer(destination, num_producers, use_connection_cache, verbose)); - // sleep(Duration::new(5, 0)); - - if let Some(producer_threads) = producer_threads { - for t_producer in producer_threads { - t_producer.join()?; - } - } + producer_threads + .into_iter() + .flatten() + .try_for_each(JoinHandle::join)?; if !server_only { if let Some(exit) = exit { @@ -213,26 +201,22 @@ fn main() -> Result<()> { println!("To stop the server, please press ^C"); } - if let Some(read_threads) = read_threads { - for t_reader in read_threads { - t_reader.join()?; - } - } - - if let Some(sink_threads) = sin_threads { - for t_sink in sink_threads { - t_sink.join()?; - } - } + read_threads + .into_iter() + .flatten() + .try_for_each(JoinHandle::join)?; + sink_threads + .into_iter() + .flatten() + .try_for_each(JoinHandle::join)?; if !(server_only) { let elapsed = start.elapsed().unwrap(); - let time = elapsed.as_secs() * 10_000_000_000 + u64::from(elapsed.subsec_nanos()); - let ftime = (time as f64) / 10_000_000_000_f64; + let ftime = elapsed.as_nanos() as f64; let fcount = (TRANSACTIONS_PER_THREAD * num_producers) as f64; println!( - "performance: {:?}, count: {fcount}, time: {ftime}", + "Performance: {:?}/ns, count: {fcount}, time in nano-second: {ftime}", fcount / ftime ); } @@ -255,7 +239,7 @@ fn producer( let transporter = if use_connection_cache { Transporter::Cache(Arc::new(ConnectionCache::with_udp( "connection_cache_vote_udp", - 1, + 1, // connection_pool_size ))) } else { Transporter::DirectSocket(Arc::new(bind_to_unspecified().unwrap())) From 705278789b4715d8faafad5f9079cc314ce2a86b Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Fri, 13 Dec 2024 12:53:53 -0800 Subject: [PATCH 3/3] use second as opposed to nano-sec for ease --- bench-vote/src/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bench-vote/src/main.rs b/bench-vote/src/main.rs index ece54b79244ec7..998f4020139128 100644 --- a/bench-vote/src/main.rs +++ b/bench-vote/src/main.rs @@ -212,11 +212,11 @@ fn main() -> Result<()> { if !(server_only) { let elapsed = start.elapsed().unwrap(); - let ftime = elapsed.as_nanos() as f64; + let ftime = elapsed.as_nanos() as f64 / 1_000_000_000.0; let fcount = (TRANSACTIONS_PER_THREAD * num_producers) as f64; println!( - "Performance: {:?}/ns, count: {fcount}, time in nano-second: {ftime}", + "Performance: {:?}/s, count: {fcount}, time in second: {ftime}", fcount / ftime ); }