diff --git a/Cargo.lock b/Cargo.lock index 6a2abab7f46afc..b0390f9a2d926c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5560,11 +5560,14 @@ dependencies = [ name = "solana-bench-tps" version = "2.0.0" dependencies = [ + "chrono", "clap 2.33.3", "crossbeam-channel", + "csv", "log", "rand 0.8.5", "rayon", + "serde", "serde_json", "serde_yaml 0.9.32", "serial_test", diff --git a/bench-tps/Cargo.toml b/bench-tps/Cargo.toml index 3693f70e4ed9b8..80a09fc8048ccd 100644 --- a/bench-tps/Cargo.toml +++ b/bench-tps/Cargo.toml @@ -9,11 +9,14 @@ license = { workspace = true } edition = { workspace = true } [dependencies] +chrono = { workspace = true } clap = { workspace = true } crossbeam-channel = { workspace = true } +csv = { workspace = true } log = { workspace = true } rand = { workspace = true } rayon = { workspace = true } +serde = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } solana-clap-utils = { workspace = true } diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 8b370786861cea..f01745e6ce8c9e 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -2,9 +2,13 @@ use { crate::{ bench_tps_client::*, cli::{ComputeUnitPrice, Config, InstructionPaddingConfig}, + log_transaction_service::{ + create_log_transactions_service_and_sender, SignatureBatchSender, TransactionInfoBatch, + }, perf_utils::{sample_txs, SampleStats}, send_batch::*, }, + chrono::Utc, log::*, rand::distributions::{Distribution, Uniform}, rayon::prelude::*, @@ -87,8 +91,14 @@ fn get_transaction_loaded_accounts_data_size(enable_padding: bool) -> u32 { } } -pub type TimestampedTransaction = (Transaction, Option); -pub type SharedTransactions = Arc>>>; +#[derive(Debug, PartialEq, Default, Eq, Clone)] +pub(crate) struct TimestampedTransaction { + transaction: Transaction, + timestamp: Option, + compute_unit_price: Option, +} + +pub(crate) type SharedTransactions = Arc>>>; /// Keypairs split into source and destination /// used for transfer transactions @@ -356,6 +366,7 @@ fn create_sender_threads( threads: usize, exit_signal: Arc, shared_tx_active_thread_count: &Arc, + signatures_sender: Option, ) -> Vec> where T: 'static + BenchTpsClient + Send + Sync + ?Sized, @@ -367,6 +378,7 @@ where let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); let total_tx_sent_count = total_tx_sent_count.clone(); let client = client.clone(); + let signatures_sender = signatures_sender.clone(); Builder::new() .name("solana-client-sender".to_string()) .spawn(move || { @@ -377,6 +389,7 @@ where &total_tx_sent_count, thread_batch_sleep_ms, &client, + signatures_sender, ); }) .unwrap() @@ -406,6 +419,8 @@ where use_durable_nonce, instruction_padding_config, num_conflict_groups, + block_data_file, + transaction_data_file, .. } = config; @@ -464,7 +479,13 @@ where None }; - let s_threads = create_sender_threads( + let (log_transaction_service, signatures_sender) = create_log_transactions_service_and_sender( + &client, + block_data_file.as_deref(), + transaction_data_file.as_deref(), + ); + + let sender_threads = create_sender_threads( &client, &shared_txs, thread_batch_sleep_ms, @@ -472,6 +493,7 @@ where threads, exit_signal.clone(), &shared_tx_active_thread_count, + signatures_sender, ); wait_for_target_slots_per_epoch(target_slots_per_epoch, &client); @@ -499,7 +521,7 @@ where // join the tx send threads info!("Waiting for transmit threads..."); - for t in s_threads { + for t in sender_threads { if let Err(err) = t.join() { info!(" join() failed with: {:?}", err); } @@ -512,6 +534,13 @@ where } } + if let Some(log_transaction_service) = log_transaction_service { + info!("Waiting for log_transaction_service thread..."); + if let Err(err) = log_transaction_service.join() { + info!(" join() failed with: {:?}", err); + } + } + if let Some(nonce_keypairs) = nonce_keypairs { withdraw_durable_nonce_accounts(client.clone(), &gen_keypairs, &nonce_keypairs); } @@ -575,36 +604,37 @@ fn generate_system_txs( pairs_with_compute_unit_prices .par_iter() .map(|((from, to), compute_unit_price)| { - ( - transfer_with_compute_unit_price_and_padding( + let compute_unit_price = Some(**compute_unit_price); + TimestampedTransaction { + transaction: transfer_with_compute_unit_price_and_padding( from, &to.pubkey(), 1, *blockhash, instruction_padding_config, - Some(**compute_unit_price), + compute_unit_price, skip_tx_account_data_size, ), - Some(timestamp()), - ) + timestamp: Some(timestamp()), + compute_unit_price, + } }) .collect() } else { pairs .par_iter() - .map(|(from, to)| { - ( - transfer_with_compute_unit_price_and_padding( - from, - &to.pubkey(), - 1, - *blockhash, - instruction_padding_config, - None, - skip_tx_account_data_size, - ), - Some(timestamp()), - ) + .map(|(from, to)| TimestampedTransaction { + transaction: transfer_with_compute_unit_price_and_padding( + from, + &to.pubkey(), + 1, + *blockhash, + instruction_padding_config, + None, + skip_tx_account_data_size, + ), + timestamp: Some(timestamp()), + compute_unit_price: None, }) .collect() } @@ -779,8 +809,8 @@ fn generate_nonced_system_txs = get_nonce_blockhashes(&client, &pubkeys); for i in 0..length { - transactions.push(( - nonced_transfer_with_padding( + transactions.push(TimestampedTransaction { + transaction: nonced_transfer_with_padding( source[i], &dest[i].pubkey(), 1, @@ -790,16 +820,17 @@ fn generate_nonced_system_txs = dest_nonce.iter().map(|keypair| keypair.pubkey()).collect(); let blockhashes: Vec = get_nonce_blockhashes(&client, &pubkeys); for i in 0..length { - transactions.push(( - nonced_transfer_with_padding( + transactions.push(TimestampedTransaction { + transaction: nonced_transfer_with_padding( dest[i], &source[i].pubkey(), 1, @@ -809,8 +840,9 @@ fn generate_nonced_system_txs( total_tx_sent_count: &Arc, thread_batch_sleep_ms: usize, client: &Arc, + signatures_sender: Option, ) { let mut last_sent_time = timestamp(); - loop { + 'thread_loop: loop { if thread_batch_sleep_ms > 0 { sleep(Duration::from_millis(thread_batch_sleep_ms as u64)); } @@ -926,19 +959,21 @@ fn do_tx_transfers( let mut shared_txs_wl = shared_txs.write().expect("write lock in do_tx_transfers"); shared_txs_wl.pop_front() }; - if let Some(txs0) = txs { + if let Some(txs) = txs { shared_tx_thread_count.fetch_add(1, Ordering::Relaxed); - info!("Transferring 1 unit {} times...", txs0.len()); - let tx_len = txs0.len(); + let num_txs = txs.len(); + info!("Transferring 1 unit {} times...", num_txs); let transfer_start = Instant::now(); let mut old_transactions = false; - let mut transactions = Vec::<_>::new(); let mut min_timestamp = u64::MAX; - for tx in txs0 { + let mut transactions = Vec::<_>::with_capacity(num_txs); + let mut signatures = Vec::<_>::with_capacity(num_txs); + let mut compute_unit_prices = Vec::<_>::with_capacity(num_txs); + for tx in txs { let now = timestamp(); // Transactions without durable nonce that are too old will be rejected by the cluster Don't bother // sending them. - if let Some(tx_timestamp) = tx.1 { + if let Some(tx_timestamp) = tx.timestamp { if tx_timestamp < min_timestamp { min_timestamp = tx_timestamp; } @@ -947,7 +982,9 @@ fn do_tx_transfers( continue; } } - transactions.push(tx.0); + signatures.push(tx.transaction.signatures[0]); + transactions.push(tx.transaction); + compute_unit_prices.push(tx.compute_unit_price); } if min_timestamp != u64::MAX { @@ -957,6 +994,17 @@ fn do_tx_transfers( ); } + if let Some(signatures_sender) = &signatures_sender { + if let Err(error) = signatures_sender.send(TransactionInfoBatch { + signatures, + sent_at: Utc::now(), + compute_unit_prices, + }) { + error!("Receiver has been dropped with error `{error}`, stop sending transactions."); + break 'thread_loop; + } + } + if let Err(error) = client.send_batch(transactions) { warn!("send_batch_sync in do_tx_transfers failed: {}", error); } @@ -977,16 +1025,16 @@ fn do_tx_transfers( shared_txs_wl.clear(); } shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed); - total_tx_sent_count.fetch_add(tx_len, Ordering::Relaxed); + total_tx_sent_count.fetch_add(num_txs, Ordering::Relaxed); info!( "Tx send done. {} ms {} tps", duration_as_ms(&transfer_start.elapsed()), - tx_len as f32 / duration_as_s(&transfer_start.elapsed()), + num_txs as f32 / duration_as_s(&transfer_start.elapsed()), ); datapoint_info!( "bench-tps-do_tx_transfers", ("duration", duration_as_us(&transfer_start.elapsed()), i64), - ("count", tx_len, i64) + ("count", num_txs, i64) ); } if exit_signal.load(Ordering::Relaxed) { diff --git a/bench-tps/src/cli.rs b/bench-tps/src/cli.rs index 1804dbbc454e02..04bb869c2626bb 100644 --- a/bench-tps/src/cli.rs +++ b/bench-tps/src/cli.rs @@ -76,6 +76,8 @@ pub struct Config { pub bind_address: IpAddr, pub client_node_id: Option, pub commitment_config: CommitmentConfig, + pub block_data_file: Option, + pub transaction_data_file: Option, } impl Eq for Config {} @@ -109,6 +111,8 @@ impl Default for Config { bind_address: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), client_node_id: None, commitment_config: CommitmentConfig::confirmed(), + block_data_file: None, + transaction_data_file: None, } } } @@ -419,6 +423,23 @@ pub fn build_args<'a>(version: &'_ str) -> App<'a, '_> { .default_value("confirmed") .help("Block commitment config for getting latest blockhash"), ) + .arg( + Arg::with_name("block_data_file") + .long("block-data-file") + .value_name("FILENAME") + .takes_value(true) + .help("File to save block statistics relevant to the submitted transactions."), + ) + .arg( + Arg::with_name("transaction_data_file") + .long("transaction-data-file") + .value_name("FILENAME") + .takes_value(true) + .help( + "File to save details about all the submitted transactions.\ + This option is useful for debug purposes." + ), + ) } /// Parses a clap `ArgMatches` structure into a `Config` @@ -587,6 +608,10 @@ pub fn parse_args(matches: &ArgMatches) -> Result { } args.commitment_config = value_t_or_exit!(matches, "commitment_config", CommitmentConfig); + args.block_data_file = matches.value_of("block_data_file").map(|s| s.to_string()); + args.transaction_data_file = matches + .value_of("transaction_data_file") + .map(|s| s.to_string()); Ok(args) } diff --git a/bench-tps/src/lib.rs b/bench-tps/src/lib.rs index 7da3979a30d72c..6f55a4122e4c0b 100644 --- a/bench-tps/src/lib.rs +++ b/bench-tps/src/lib.rs @@ -3,5 +3,7 @@ pub mod bench; pub mod bench_tps_client; pub mod cli; pub mod keypairs; +mod log_transaction_service; mod perf_utils; +mod rpc_with_retry_utils; pub mod send_batch; diff --git a/bench-tps/src/log_transaction_service.rs b/bench-tps/src/log_transaction_service.rs new file mode 100644 index 00000000000000..6363ff59914c83 --- /dev/null +++ b/bench-tps/src/log_transaction_service.rs @@ -0,0 +1,496 @@ +//! `LogTransactionService` requests confirmed blocks, analyses transactions submitted by bench-tps, +//! and saves log files in csv format. + +use { + crate::{ + bench_tps_client::BenchTpsClient, + rpc_with_retry_utils::{get_blocks_with_retry, get_slot_with_retry}, + }, + chrono::{DateTime, TimeZone, Utc}, + crossbeam_channel::{select, tick, unbounded, Receiver, Sender}, + log::*, + serde::Serialize, + solana_client::rpc_config::RpcBlockConfig, + solana_measure::measure::Measure, + solana_sdk::{ + clock::{DEFAULT_MS_PER_SLOT, MAX_PROCESSING_AGE}, + commitment_config::{CommitmentConfig, CommitmentLevel}, + signature::Signature, + slot_history::Slot, + }, + solana_transaction_status::{ + option_serializer::OptionSerializer, EncodedTransactionWithStatusMeta, RewardType, + TransactionDetails, UiConfirmedBlock, UiTransactionEncoding, UiTransactionStatusMeta, + }, + std::{ + collections::HashMap, + fs::File, + sync::Arc, + thread::{self, Builder, JoinHandle}, + time::Duration, + }, +}; + +// Data to establish communication between sender thread and +// LogTransactionService. +#[derive(Clone)] +pub(crate) struct TransactionInfoBatch { + pub signatures: Vec, + pub sent_at: DateTime, + pub compute_unit_prices: Vec>, +} + +pub(crate) type SignatureBatchSender = Sender; + +pub(crate) struct LogTransactionService { + thread_handler: JoinHandle<()>, +} + +pub(crate) fn create_log_transactions_service_and_sender( + client: &Arc, + block_data_file: Option<&str>, + transaction_data_file: Option<&str>, +) -> (Option, Option) +where + Client: 'static + BenchTpsClient + Send + Sync + ?Sized, +{ + if data_file_provided(block_data_file, transaction_data_file) { + let (sender, receiver) = unbounded(); + let log_tx_service = + LogTransactionService::new(client, receiver, block_data_file, transaction_data_file); + (Some(log_tx_service), Some(sender)) + } else { + (None, None) + } +} + +// How many blocks to process during one iteration. +// The time to process blocks is dominated by get_block calls. +// Each call takes slightly less time than slot. +const NUM_SLOTS_PER_ITERATION: u64 = 16; +// How often process blocks. +const PROCESS_BLOCKS_EVERY_MS: u64 = NUM_SLOTS_PER_ITERATION * DEFAULT_MS_PER_SLOT; +// Max age for transaction in the transaction map, older transactions are cleaned up and marked as timeout. +const REMOVE_TIMEOUT_TX_EVERY_MS: i64 = MAX_PROCESSING_AGE as i64 * DEFAULT_MS_PER_SLOT as i64; + +// Map used to filter submitted transactions. +#[derive(Clone)] +struct TransactionSendInfo { + pub sent_at: DateTime, + pub compute_unit_price: Option, +} +type MapSignatureToTxInfo = HashMap; + +type SignatureBatchReceiver = Receiver; + +impl LogTransactionService { + fn new( + client: &Arc, + signature_receiver: SignatureBatchReceiver, + block_data_file: Option<&str>, + transaction_data_file: Option<&str>, + ) -> Self + where + Client: 'static + BenchTpsClient + Send + Sync + ?Sized, + { + if !data_file_provided(block_data_file, transaction_data_file) { + panic!("Expect block-data-file or transaction-data-file is specified, must have been verified by callee."); + } + + let client = client.clone(); + let tx_log_writer = TransactionLogWriter::new(transaction_data_file); + let block_log_writer = BlockLogWriter::new(block_data_file); + + let thread_handler = Builder::new() + .name("LogTransactionService".to_string()) + .spawn(move || { + Self::run(client, signature_receiver, tx_log_writer, block_log_writer); + }) + .expect("LogTransactionService should have started successfully."); + Self { thread_handler } + } + + pub fn join(self) -> thread::Result<()> { + self.thread_handler.join() + } + + fn run( + client: Arc, + signature_receiver: SignatureBatchReceiver, + mut tx_log_writer: TransactionLogWriter, + mut block_log_writer: BlockLogWriter, + ) where + Client: 'static + BenchTpsClient + Send + Sync + ?Sized, + { + // used to request blocks data and only confirmed makes sense in this context. + let commitment: CommitmentConfig = CommitmentConfig { + commitment: CommitmentLevel::Confirmed, + }; + let block_processing_timer_receiver = tick(Duration::from_millis(PROCESS_BLOCKS_EVERY_MS)); + + let mut start_slot = get_slot_with_retry(&client, commitment) + .expect("get_slot_with_retry should have succeed, cannot proceed without having slot. Must be a problem with RPC."); + + let mut sender_stopped = false; + let mut signature_to_tx_info = MapSignatureToTxInfo::new(); + loop { + select! { + recv(signature_receiver) -> msg => { + match msg { + Ok(TransactionInfoBatch { + signatures, + sent_at, + compute_unit_prices + }) => { + signatures.iter().zip(compute_unit_prices).for_each( |(sign, compute_unit_price)| {signature_to_tx_info.insert(*sign, TransactionSendInfo { + sent_at, + compute_unit_price + });}); + } + Err(_) => { + sender_stopped = true; + } + } + }, + recv(block_processing_timer_receiver) -> _ => { + info!("sign_receiver queue len: {}", signature_receiver.len()); + if !signature_receiver.is_empty() { + continue; + } + let mut measure_get_blocks = Measure::start("measure_get_blocks"); + let block_slots = get_blocks_with_retry(&client, start_slot, Some(start_slot + NUM_SLOTS_PER_ITERATION - 1), commitment); + measure_get_blocks.stop(); + let time_get_blocks_us = measure_get_blocks.as_us(); + info!("Time to get_blocks : {time_get_blocks_us}us."); + let Ok(block_slots) = block_slots else { + error!("Failed to get blocks, stop LogWriterService."); + break; + }; + if block_slots.is_empty() { + continue; + } + let last_block_time = Self::process_blocks( + &client, + block_slots, + &mut signature_to_tx_info, + &mut tx_log_writer, + &mut block_log_writer, + commitment, + ); + Self::clean_transaction_map(&mut tx_log_writer, &mut signature_to_tx_info, last_block_time); + + start_slot = start_slot.saturating_add(NUM_SLOTS_PER_ITERATION); + tx_log_writer.flush(); + block_log_writer.flush(); + if sender_stopped && signature_to_tx_info.is_empty() { + info!("Stop LogTransactionService"); + break; + } + }, + } + } + } + + /// Download and process the blocks. + /// Returns the time when the last processed block has been confirmed or now(). + fn process_blocks( + client: &Arc, + block_slots: Vec, + signature_to_tx_info: &mut MapSignatureToTxInfo, + tx_log_writer: &mut TransactionLogWriter, + block_log_writer: &mut BlockLogWriter, + commitment: CommitmentConfig, + ) -> DateTime + where + Client: 'static + BenchTpsClient + Send + Sync + ?Sized, + { + let rpc_block_config = RpcBlockConfig { + encoding: Some(UiTransactionEncoding::Base64), + transaction_details: Some(TransactionDetails::Full), + rewards: Some(true), + commitment: Some(commitment), + max_supported_transaction_version: Some(0), + }; + let mut measure_process_blocks = Measure::start("measure_process_blocks"); + let blocks = block_slots + .iter() + .map(|slot| client.get_block_with_config(*slot, rpc_block_config)); + let num_blocks = blocks.len(); + let mut last_block_time = None; + for (block, slot) in blocks.zip(&block_slots) { + let Ok(block) = block else { + continue; + }; + let block_time = Self::process_block( + block, + signature_to_tx_info, + *slot, + tx_log_writer, + block_log_writer, + ); + // if last_time is some, it means that the there is at least one valid block + if block_time.is_some() { + last_block_time = block_time; + } + } + measure_process_blocks.stop(); + let time_process_blocks_us = measure_process_blocks.as_us(); + info!("Time to process {num_blocks} blocks: {time_process_blocks_us}us."); + last_block_time.unwrap_or_else(Utc::now) + } + + fn process_block( + block: UiConfirmedBlock, + signature_to_tx_info: &mut MapSignatureToTxInfo, + slot: u64, + tx_log_writer: &mut TransactionLogWriter, + block_log_writer: &mut BlockLogWriter, + ) -> Option> { + let rewards = block + .rewards + .as_ref() + .expect("Rewards should be part of the block information."); + let slot_leader = rewards + .iter() + .find(|r| r.reward_type == Some(RewardType::Fee)) + .map_or("".to_string(), |x| x.pubkey.clone()); + + let Some(transactions) = &block.transactions else { + warn!("Empty block: {slot}"); + return None; + }; + + let mut num_bench_tps_transactions: usize = 0; + let mut total_cu_consumed: u64 = 0; + let mut bench_tps_cu_consumed: u64 = 0; + for EncodedTransactionWithStatusMeta { + transaction, meta, .. + } in transactions + { + let Some(transaction) = transaction.decode() else { + continue; + }; + let cu_consumed = meta + .as_ref() + .map_or(0, |meta| match meta.compute_units_consumed { + OptionSerializer::Some(cu_consumed) => cu_consumed, + _ => 0, + }); + let signature = &transaction.signatures[0]; + + total_cu_consumed = total_cu_consumed.saturating_add(cu_consumed); + if let Some(TransactionSendInfo { + sent_at, + compute_unit_price, + }) = signature_to_tx_info.remove(signature) + { + num_bench_tps_transactions = num_bench_tps_transactions.saturating_add(1); + bench_tps_cu_consumed = bench_tps_cu_consumed.saturating_add(cu_consumed); + + tx_log_writer.write( + Some(block.blockhash.clone()), + Some(slot_leader.clone()), + signature, + sent_at, + Some(slot), + block.block_time, + meta.as_ref(), + false, + compute_unit_price, + ); + } + } + block_log_writer.write( + block.blockhash.clone(), + slot_leader, + slot, + block.block_time, + num_bench_tps_transactions, + transactions.len(), + bench_tps_cu_consumed, + total_cu_consumed, + ); + + block.block_time.map(|time| { + Utc.timestamp_opt(time, 0) + .latest() + .expect("valid timestamp") + }) + } + + /// Remove from map all the signatures which we haven't processed before and they are + /// older than the the timestamp of the last processed block plus max blockhash age. + fn clean_transaction_map( + tx_log_writer: &mut TransactionLogWriter, + signature_to_tx_info: &mut MapSignatureToTxInfo, + last_block_time: DateTime, + ) { + signature_to_tx_info.retain(|signature, tx_info| { + let duration_since_sent = last_block_time.signed_duration_since(tx_info.sent_at); + let is_timeout_tx = duration_since_sent.num_milliseconds() > REMOVE_TIMEOUT_TX_EVERY_MS; + if is_timeout_tx { + tx_log_writer.write( + None, + None, + signature, + tx_info.sent_at, + None, + None, + None, + true, + tx_info.compute_unit_price, + ); + } + !is_timeout_tx + }); + } +} + +fn data_file_provided(block_data_file: Option<&str>, transaction_data_file: Option<&str>) -> bool { + block_data_file.is_some() || transaction_data_file.is_some() +} + +type CsvFileWriter = csv::Writer; + +#[derive(Clone, Serialize)] +struct BlockData { + pub blockhash: String, + pub block_slot: Slot, + pub slot_leader: String, + pub block_time: Option>, + pub total_num_transactions: usize, + pub num_bench_tps_transactions: usize, + pub total_cu_consumed: u64, + pub bench_tps_cu_consumed: u64, +} + +struct BlockLogWriter { + log_writer: Option, +} + +impl BlockLogWriter { + fn new(block_data_file: Option<&str>) -> Self { + let block_log_writer = block_data_file.map(|block_data_file| { + CsvFileWriter::from_writer( + File::create(block_data_file) + .expect("Application should be able to create a file."), + ) + }); + Self { + log_writer: block_log_writer, + } + } + + #[allow(clippy::too_many_arguments)] + fn write( + &mut self, + blockhash: String, + slot_leader: String, + slot: Slot, + block_time: Option, + num_bench_tps_transactions: usize, + total_num_transactions: usize, + bench_tps_cu_consumed: u64, + total_cu_consumed: u64, + ) { + let Some(block_log_writer) = &mut self.log_writer else { + return; + }; + let block_data = BlockData { + blockhash, + slot_leader, + block_slot: slot, + block_time: block_time.map(|time| { + Utc.timestamp_opt(time, 0) + .latest() + .expect("timestamp should be valid") + }), + num_bench_tps_transactions, + total_num_transactions, + bench_tps_cu_consumed, + total_cu_consumed, + }; + let _ = block_log_writer.serialize(block_data); + } + + fn flush(&mut self) { + if let Some(block_log_writer) = &mut self.log_writer { + let _ = block_log_writer.flush(); + } + } +} + +#[derive(Clone, Serialize)] +struct TransactionData { + pub blockhash: Option, + pub slot_leader: Option, + pub signature: String, + pub sent_at: Option>, + pub confirmed_slot: Option, + pub block_time: Option>, + pub successful: bool, + pub error: Option, + pub timed_out: bool, + pub compute_unit_price: u64, +} + +struct TransactionLogWriter { + log_writer: Option, +} + +impl TransactionLogWriter { + fn new(transaction_data_file: Option<&str>) -> Self { + let transaction_log_writer = transaction_data_file.map(|transaction_data_file| { + CsvFileWriter::from_writer( + File::create(transaction_data_file) + .expect("Application should be able to create a file."), + ) + }); + Self { + log_writer: transaction_log_writer, + } + } + + #[allow(clippy::too_many_arguments)] + fn write( + &mut self, + blockhash: Option, + slot_leader: Option, + signature: &Signature, + sent_at: DateTime, + confirmed_slot: Option, + block_time: Option, + meta: Option<&UiTransactionStatusMeta>, + timed_out: bool, + compute_unit_price: Option, + ) { + let Some(transaction_log_writer) = &mut self.log_writer else { + return; + }; + let tx_data = TransactionData { + blockhash, + slot_leader, + signature: signature.to_string(), + sent_at: Some(sent_at), + confirmed_slot, + block_time: block_time.map(|time| { + Utc.timestamp_opt(time, 0) + .latest() + .expect("valid timestamp") + }), + successful: meta.as_ref().map_or(false, |m| m.status.is_ok()), + error: meta + .as_ref() + .and_then(|m| m.err.as_ref().map(|x| x.to_string())), + timed_out, + compute_unit_price: compute_unit_price.unwrap_or(0), + }; + let _ = transaction_log_writer.serialize(tx_data); + } + + fn flush(&mut self) { + if let Some(transaction_log_writer) = &mut self.log_writer { + let _ = transaction_log_writer.flush(); + } + } +} diff --git a/bench-tps/src/rpc_with_retry_utils.rs b/bench-tps/src/rpc_with_retry_utils.rs new file mode 100644 index 00000000000000..57af3923f0aeda --- /dev/null +++ b/bench-tps/src/rpc_with_retry_utils.rs @@ -0,0 +1,61 @@ +use { + crate::bench_tps_client::{BenchTpsClient, Result}, + log::*, + solana_sdk::{ + clock::DEFAULT_MS_PER_SLOT, commitment_config::CommitmentConfig, slot_history::Slot, + }, + std::{sync::Arc, thread::sleep, time::Duration}, +}; + +const NUM_RETRY: u64 = 5; +const RETRY_EVERY_MS: u64 = 4 * DEFAULT_MS_PER_SLOT; + +fn call_rpc_with_retry(f: Func, retry_warning: &str) -> Result +where + Func: Fn() -> Result, +{ + let mut iretry = 0; + loop { + match f() { + Ok(slot) => { + return Ok(slot); + } + Err(error) => { + if iretry == NUM_RETRY { + return Err(error); + } + warn!("{retry_warning}: {error}, retry."); + sleep(Duration::from_millis(RETRY_EVERY_MS)); + } + } + iretry += 1; + } +} + +pub(crate) fn get_slot_with_retry( + client: &Arc, + commitment: CommitmentConfig, +) -> Result +where + Client: 'static + BenchTpsClient + Send + Sync + ?Sized, +{ + call_rpc_with_retry( + || client.get_slot_with_commitment(commitment), + "Failed to get slot", + ) +} + +pub(crate) fn get_blocks_with_retry( + client: &Arc, + start_slot: Slot, + end_slot: Option, + commitment: CommitmentConfig, +) -> Result> +where + Client: 'static + BenchTpsClient + Send + Sync + ?Sized, +{ + call_rpc_with_retry( + || client.get_blocks_with_commitment(start_slot, end_slot, commitment), + "Failed to download blocks", + ) +}