diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 76a1f686bb48d1..06e054cd7f6f72 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -959,17 +959,17 @@ fn do_tx_transfers( let mut shared_txs_wl = shared_txs.write().expect("write lock in do_tx_transfers"); shared_txs_wl.pop_front() }; - if let Some(txs0) = txs { + if let Some(txs) = txs { shared_tx_thread_count.fetch_add(1, Ordering::Relaxed); - info!("Transferring 1 unit {} times...", txs0.len()); - let tx_len = txs0.len(); + let num_txs = txs.len(); + info!("Transferring 1 unit {} times...", num_txs); let transfer_start = Instant::now(); let mut old_transactions = false; let mut min_timestamp = u64::MAX; - let mut transactions = Vec::<_>::with_capacity(txs0.len()); - let mut signatures = Vec::<_>::with_capacity(txs0.len()); - let mut compute_unit_prices = Vec::<_>::with_capacity(txs0.len()); - for tx in txs0 { + let mut transactions = Vec::<_>::with_capacity(num_txs); + let mut signatures = Vec::<_>::with_capacity(num_txs); + let mut compute_unit_prices = Vec::<_>::with_capacity(num_txs); + for tx in txs { let now = timestamp(); // Transactions without durable nonce that are too old will be rejected by the cluster Don't bother // sending them. @@ -1025,16 +1025,16 @@ fn do_tx_transfers( shared_txs_wl.clear(); } shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed); - total_tx_sent_count.fetch_add(tx_len, Ordering::Relaxed); + total_tx_sent_count.fetch_add(num_txs, Ordering::Relaxed); info!( "Tx send done. {} ms {} tps", duration_as_ms(&transfer_start.elapsed()), - tx_len as f32 / duration_as_s(&transfer_start.elapsed()), + num_txs as f32 / duration_as_s(&transfer_start.elapsed()), ); datapoint_info!( "bench-tps-do_tx_transfers", ("duration", duration_as_us(&transfer_start.elapsed()), i64), - ("count", tx_len, i64) + ("count", num_txs, i64) ); } if exit_signal.load(Ordering::Relaxed) { diff --git a/bench-tps/src/lib.rs b/bench-tps/src/lib.rs index 62d185431cc318..6f55a4122e4c0b 100644 --- a/bench-tps/src/lib.rs +++ b/bench-tps/src/lib.rs @@ -5,4 +5,5 @@ pub mod cli; pub mod keypairs; mod log_transaction_service; mod perf_utils; +mod rpc_with_retry_utils; pub mod send_batch; diff --git a/bench-tps/src/log_transaction_service.rs b/bench-tps/src/log_transaction_service.rs index dec9353fc038a5..da9f344255c4c3 100644 --- a/bench-tps/src/log_transaction_service.rs +++ b/bench-tps/src/log_transaction_service.rs @@ -2,7 +2,10 @@ //! and saves log files in csv format. use { - crate::bench_tps_client::{BenchTpsClient, Result}, + crate::{ + bench_tps_client::BenchTpsClient, + rpc_with_retry_utils::{get_blocks_with_retry, get_slot_with_retry}, + }, chrono::{DateTime, TimeZone, Utc}, crossbeam_channel::{select, tick, unbounded, Receiver, Sender}, log::*, @@ -23,7 +26,7 @@ use { collections::HashMap, fs::File, sync::Arc, - thread::{self, sleep, Builder, JoinHandle}, + thread::{self, Builder, JoinHandle}, time::Duration, }, }; @@ -51,7 +54,7 @@ pub(crate) fn create_log_transactions_service_and_sender( where Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { - if verify_data_files(block_data_file, transaction_data_file) { + if data_file_provided(block_data_file, transaction_data_file) { let (sender, receiver) = unbounded(); let log_tx_service = LogTransactionService::new(client, receiver, block_data_file, transaction_data_file); @@ -70,7 +73,7 @@ const PROCESS_BLOCKS_EVERY_MS: u64 = NUM_SLOTS_PER_ITERATION * DEFAULT_MS_PER_SL // Empirically calculated constant added to MAX_PROCESSING_AGE to avoid cleaning some transactions // that still might be added to the block. const AGE_EPSILON: usize = 50; -// Max age for transaction in the transaction map. +// Max age for transaction in the transaction map, older transactions are cleaned up and marked as timeout. const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 = ((MAX_PROCESSING_AGE + AGE_EPSILON) as f64 * DEFAULT_S_PER_SLOT) as i64; @@ -94,7 +97,7 @@ impl LogTransactionService { where Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { - if !verify_data_files(block_data_file, transaction_data_file) { + if !data_file_provided(block_data_file, transaction_data_file) { panic!("Expect block-data-file or transaction-data-file is specified, must have been verified by callee."); } @@ -107,7 +110,7 @@ impl LogTransactionService { .spawn(move || { Self::run(client, signature_receiver, tx_log_writer, block_log_writer); }) - .expect("LogTransactionService is up."); + .expect("LogTransactionService should have started successfully."); Self { thread_handler } } @@ -123,13 +126,14 @@ impl LogTransactionService { ) where Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { + // used to request blocks data and only confirmed makes sense in this context. let commitment: CommitmentConfig = CommitmentConfig { commitment: CommitmentLevel::Confirmed, }; let block_processing_timer_receiver = tick(Duration::from_millis(PROCESS_BLOCKS_EVERY_MS)); let mut start_slot = get_slot_with_retry(&client, commitment) - .expect("get_slot_with_retry succeed, cannot proceed without having slot. Must be a problem with RPC."); + .expect("get_slot_with_retry should have succeed, cannot proceed without having slot. Must be a problem with RPC."); let mut sender_stopped = false; let mut signature_to_tx_info = MapSignatureToTxInfo::new(); @@ -237,7 +241,10 @@ impl LogTransactionService { tx_log_writer: &mut TransactionLogWriter, block_log_writer: &mut BlockLogWriter, ) { - let rewards = block.rewards.as_ref().expect("Rewards are present."); + let rewards = block + .rewards + .as_ref() + .expect("Rewards should be part of the block information."); let slot_leader = rewards .iter() .find(|r| r.reward_type == Some(RewardType::Fee)) @@ -326,7 +333,7 @@ impl LogTransactionService { } } -fn verify_data_files(block_data_file: Option<&str>, transaction_data_file: Option<&str>) -> bool { +fn data_file_provided(block_data_file: Option<&str>, transaction_data_file: Option<&str>) -> bool { block_data_file.is_some() || transaction_data_file.is_some() } @@ -351,7 +358,10 @@ struct BlockLogWriter { impl BlockLogWriter { fn new(block_data_file: Option<&str>) -> Self { let block_log_writer = block_data_file.map(|block_data_file| { - CsvFileWriter::from_writer(File::create(block_data_file).expect("File can be created.")) + CsvFileWriter::from_writer( + File::create(block_data_file) + .expect("Application should be able to create a file."), + ) }); Self { log_writer: block_log_writer, @@ -419,7 +429,8 @@ impl TransactionLogWriter { fn new(transaction_data_file: Option<&str>) -> Self { let transaction_log_writer = transaction_data_file.map(|transaction_data_file| { CsvFileWriter::from_writer( - File::create(transaction_data_file).expect("File can be created."), + File::create(transaction_data_file) + .expect("Application should be able to create a file."), ) }); Self { @@ -470,53 +481,3 @@ impl TransactionLogWriter { } } } - -const NUM_RETRY: u64 = 5; -const RETRY_EVERY_MS: u64 = 4 * DEFAULT_MS_PER_SLOT; - -fn call_rpc_with_retry(f: Func, retry_warning: &str) -> Result -where - Func: Fn() -> Result, -{ - let mut iretry = 0; - loop { - match f() { - Ok(slot) => { - return Ok(slot); - } - Err(error) => { - if iretry == NUM_RETRY { - return Err(error); - } - warn!("{retry_warning}: {error}, retry."); - sleep(Duration::from_millis(RETRY_EVERY_MS)); - } - } - iretry += 1; - } -} - -fn get_slot_with_retry(client: &Arc, commitment: CommitmentConfig) -> Result -where - Client: 'static + BenchTpsClient + Send + Sync + ?Sized, -{ - call_rpc_with_retry( - || client.get_slot_with_commitment(commitment), - "Failed to get slot", - ) -} - -fn get_blocks_with_retry( - client: &Arc, - start_slot: Slot, - end_slot: Option, - commitment: CommitmentConfig, -) -> Result> -where - Client: 'static + BenchTpsClient + Send + Sync + ?Sized, -{ - call_rpc_with_retry( - || client.get_blocks_with_commitment(start_slot, end_slot, commitment), - "Failed to download blocks", - ) -} diff --git a/bench-tps/src/rpc_with_retry_utils.rs b/bench-tps/src/rpc_with_retry_utils.rs new file mode 100644 index 00000000000000..57af3923f0aeda --- /dev/null +++ b/bench-tps/src/rpc_with_retry_utils.rs @@ -0,0 +1,61 @@ +use { + crate::bench_tps_client::{BenchTpsClient, Result}, + log::*, + solana_sdk::{ + clock::DEFAULT_MS_PER_SLOT, commitment_config::CommitmentConfig, slot_history::Slot, + }, + std::{sync::Arc, thread::sleep, time::Duration}, +}; + +const NUM_RETRY: u64 = 5; +const RETRY_EVERY_MS: u64 = 4 * DEFAULT_MS_PER_SLOT; + +fn call_rpc_with_retry(f: Func, retry_warning: &str) -> Result +where + Func: Fn() -> Result, +{ + let mut iretry = 0; + loop { + match f() { + Ok(slot) => { + return Ok(slot); + } + Err(error) => { + if iretry == NUM_RETRY { + return Err(error); + } + warn!("{retry_warning}: {error}, retry."); + sleep(Duration::from_millis(RETRY_EVERY_MS)); + } + } + iretry += 1; + } +} + +pub(crate) fn get_slot_with_retry( + client: &Arc, + commitment: CommitmentConfig, +) -> Result +where + Client: 'static + BenchTpsClient + Send + Sync + ?Sized, +{ + call_rpc_with_retry( + || client.get_slot_with_commitment(commitment), + "Failed to get slot", + ) +} + +pub(crate) fn get_blocks_with_retry( + client: &Arc, + start_slot: Slot, + end_slot: Option, + commitment: CommitmentConfig, +) -> Result> +where + Client: 'static + BenchTpsClient + Send + Sync + ?Sized, +{ + call_rpc_with_retry( + || client.get_blocks_with_commitment(start_slot, end_slot, commitment), + "Failed to download blocks", + ) +}