Skip to content

Commit

Permalink
address pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillLykov committed Mar 22, 2024
1 parent 5742a4c commit 29a466f
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 71 deletions.
20 changes: 10 additions & 10 deletions bench-tps/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,17 +959,17 @@ fn do_tx_transfers<T: BenchTpsClient + ?Sized>(
let mut shared_txs_wl = shared_txs.write().expect("write lock in do_tx_transfers");
shared_txs_wl.pop_front()
};
if let Some(txs0) = txs {
if let Some(txs) = txs {
shared_tx_thread_count.fetch_add(1, Ordering::Relaxed);
info!("Transferring 1 unit {} times...", txs0.len());
let tx_len = txs0.len();
let num_txs = txs.len();
info!("Transferring 1 unit {} times...", num_txs);
let transfer_start = Instant::now();
let mut old_transactions = false;
let mut min_timestamp = u64::MAX;
let mut transactions = Vec::<_>::with_capacity(txs0.len());
let mut signatures = Vec::<_>::with_capacity(txs0.len());
let mut compute_unit_prices = Vec::<_>::with_capacity(txs0.len());
for tx in txs0 {
let mut transactions = Vec::<_>::with_capacity(num_txs);
let mut signatures = Vec::<_>::with_capacity(num_txs);
let mut compute_unit_prices = Vec::<_>::with_capacity(num_txs);
for tx in txs {
let now = timestamp();
// Transactions without durable nonce that are too old will be rejected by the cluster Don't bother
// sending them.
Expand Down Expand Up @@ -1025,16 +1025,16 @@ fn do_tx_transfers<T: BenchTpsClient + ?Sized>(
shared_txs_wl.clear();
}
shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed);
total_tx_sent_count.fetch_add(tx_len, Ordering::Relaxed);
total_tx_sent_count.fetch_add(num_txs, Ordering::Relaxed);
info!(
"Tx send done. {} ms {} tps",
duration_as_ms(&transfer_start.elapsed()),
tx_len as f32 / duration_as_s(&transfer_start.elapsed()),
num_txs as f32 / duration_as_s(&transfer_start.elapsed()),
);
datapoint_info!(
"bench-tps-do_tx_transfers",
("duration", duration_as_us(&transfer_start.elapsed()), i64),
("count", tx_len, i64)
("count", num_txs, i64)
);
}
if exit_signal.load(Ordering::Relaxed) {
Expand Down
1 change: 1 addition & 0 deletions bench-tps/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ pub mod cli;
pub mod keypairs;
mod log_transaction_service;
mod perf_utils;
mod rpc_with_retry_utils;
pub mod send_batch;
83 changes: 22 additions & 61 deletions bench-tps/src/log_transaction_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
//! and saves log files in csv format.
use {
crate::bench_tps_client::{BenchTpsClient, Result},
crate::{
bench_tps_client::BenchTpsClient,
rpc_with_retry_utils::{get_blocks_with_retry, get_slot_with_retry},
},
chrono::{DateTime, TimeZone, Utc},
crossbeam_channel::{select, tick, unbounded, Receiver, Sender},
log::*,
Expand All @@ -23,7 +26,7 @@ use {
collections::HashMap,
fs::File,
sync::Arc,
thread::{self, sleep, Builder, JoinHandle},
thread::{self, Builder, JoinHandle},
time::Duration,
},
};
Expand Down Expand Up @@ -51,7 +54,7 @@ pub(crate) fn create_log_transactions_service_and_sender<Client>(
where
Client: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
if verify_data_files(block_data_file, transaction_data_file) {
if data_file_provided(block_data_file, transaction_data_file) {
let (sender, receiver) = unbounded();
let log_tx_service =
LogTransactionService::new(client, receiver, block_data_file, transaction_data_file);
Expand All @@ -70,7 +73,7 @@ const PROCESS_BLOCKS_EVERY_MS: u64 = NUM_SLOTS_PER_ITERATION * DEFAULT_MS_PER_SL
// Empirically calculated constant added to MAX_PROCESSING_AGE to avoid cleaning some transactions
// that still might be added to the block.
const AGE_EPSILON: usize = 50;
// Max age for transaction in the transaction map.
// Max age for transaction in the transaction map, older transactions are cleaned up and marked as timeout.
const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 =
((MAX_PROCESSING_AGE + AGE_EPSILON) as f64 * DEFAULT_S_PER_SLOT) as i64;

Expand All @@ -94,7 +97,7 @@ impl LogTransactionService {
where
Client: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
if !verify_data_files(block_data_file, transaction_data_file) {
if !data_file_provided(block_data_file, transaction_data_file) {
panic!("Expect block-data-file or transaction-data-file is specified, must have been verified by callee.");
}

Expand All @@ -107,7 +110,7 @@ impl LogTransactionService {
.spawn(move || {
Self::run(client, signature_receiver, tx_log_writer, block_log_writer);
})
.expect("LogTransactionService is up.");
.expect("LogTransactionService should have started successfully.");
Self { thread_handler }
}

Expand All @@ -123,13 +126,14 @@ impl LogTransactionService {
) where
Client: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
// used to request blocks data and only confirmed makes sense in this context.
let commitment: CommitmentConfig = CommitmentConfig {
commitment: CommitmentLevel::Confirmed,
};
let block_processing_timer_receiver = tick(Duration::from_millis(PROCESS_BLOCKS_EVERY_MS));

let mut start_slot = get_slot_with_retry(&client, commitment)
.expect("get_slot_with_retry succeed, cannot proceed without having slot. Must be a problem with RPC.");
.expect("get_slot_with_retry should have succeed, cannot proceed without having slot. Must be a problem with RPC.");

let mut sender_stopped = false;
let mut signature_to_tx_info = MapSignatureToTxInfo::new();
Expand Down Expand Up @@ -237,7 +241,10 @@ impl LogTransactionService {
tx_log_writer: &mut TransactionLogWriter,
block_log_writer: &mut BlockLogWriter,
) {
let rewards = block.rewards.as_ref().expect("Rewards are present.");
let rewards = block
.rewards
.as_ref()
.expect("Rewards should be part of the block information.");
let slot_leader = rewards
.iter()
.find(|r| r.reward_type == Some(RewardType::Fee))
Expand Down Expand Up @@ -326,7 +333,7 @@ impl LogTransactionService {
}
}

fn verify_data_files(block_data_file: Option<&str>, transaction_data_file: Option<&str>) -> bool {
fn data_file_provided(block_data_file: Option<&str>, transaction_data_file: Option<&str>) -> bool {
block_data_file.is_some() || transaction_data_file.is_some()
}

Expand All @@ -351,7 +358,10 @@ struct BlockLogWriter {
impl BlockLogWriter {
fn new(block_data_file: Option<&str>) -> Self {
let block_log_writer = block_data_file.map(|block_data_file| {
CsvFileWriter::from_writer(File::create(block_data_file).expect("File can be created."))
CsvFileWriter::from_writer(
File::create(block_data_file)
.expect("Application should be able to create a file."),
)
});
Self {
log_writer: block_log_writer,
Expand Down Expand Up @@ -419,7 +429,8 @@ impl TransactionLogWriter {
fn new(transaction_data_file: Option<&str>) -> Self {
let transaction_log_writer = transaction_data_file.map(|transaction_data_file| {
CsvFileWriter::from_writer(
File::create(transaction_data_file).expect("File can be created."),
File::create(transaction_data_file)
.expect("Application should be able to create a file."),
)
});
Self {
Expand Down Expand Up @@ -470,53 +481,3 @@ impl TransactionLogWriter {
}
}
}

const NUM_RETRY: u64 = 5;
const RETRY_EVERY_MS: u64 = 4 * DEFAULT_MS_PER_SLOT;

fn call_rpc_with_retry<Func, Data>(f: Func, retry_warning: &str) -> Result<Data>
where
Func: Fn() -> Result<Data>,
{
let mut iretry = 0;
loop {
match f() {
Ok(slot) => {
return Ok(slot);
}
Err(error) => {
if iretry == NUM_RETRY {
return Err(error);
}
warn!("{retry_warning}: {error}, retry.");
sleep(Duration::from_millis(RETRY_EVERY_MS));
}
}
iretry += 1;
}
}

fn get_slot_with_retry<Client>(client: &Arc<Client>, commitment: CommitmentConfig) -> Result<Slot>
where
Client: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
call_rpc_with_retry(
|| client.get_slot_with_commitment(commitment),
"Failed to get slot",
)
}

fn get_blocks_with_retry<Client>(
client: &Arc<Client>,
start_slot: Slot,
end_slot: Option<Slot>,
commitment: CommitmentConfig,
) -> Result<Vec<Slot>>
where
Client: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
call_rpc_with_retry(
|| client.get_blocks_with_commitment(start_slot, end_slot, commitment),
"Failed to download blocks",
)
}
61 changes: 61 additions & 0 deletions bench-tps/src/rpc_with_retry_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use {
crate::bench_tps_client::{BenchTpsClient, Result},
log::*,
solana_sdk::{
clock::DEFAULT_MS_PER_SLOT, commitment_config::CommitmentConfig, slot_history::Slot,
},
std::{sync::Arc, thread::sleep, time::Duration},
};

const NUM_RETRY: u64 = 5;
const RETRY_EVERY_MS: u64 = 4 * DEFAULT_MS_PER_SLOT;

fn call_rpc_with_retry<Func, Data>(f: Func, retry_warning: &str) -> Result<Data>
where
Func: Fn() -> Result<Data>,
{
let mut iretry = 0;
loop {
match f() {
Ok(slot) => {
return Ok(slot);
}
Err(error) => {
if iretry == NUM_RETRY {
return Err(error);
}
warn!("{retry_warning}: {error}, retry.");
sleep(Duration::from_millis(RETRY_EVERY_MS));
}
}
iretry += 1;
}
}

pub(crate) fn get_slot_with_retry<Client>(
client: &Arc<Client>,
commitment: CommitmentConfig,
) -> Result<Slot>
where
Client: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
call_rpc_with_retry(
|| client.get_slot_with_commitment(commitment),
"Failed to get slot",
)
}

pub(crate) fn get_blocks_with_retry<Client>(
client: &Arc<Client>,
start_slot: Slot,
end_slot: Option<Slot>,
commitment: CommitmentConfig,
) -> Result<Vec<Slot>>
where
Client: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
call_rpc_with_retry(
|| client.get_blocks_with_commitment(start_slot, end_slot, commitment),
"Failed to download blocks",
)
}

0 comments on commit 29a466f

Please sign in to comment.