From 23fab8322d128172574b0383fbb1a041e5ddbd35 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 5 Mar 2024 16:35:56 +0000 Subject: [PATCH] some small improvements --- bench-tps/src/bench.rs | 2 +- bench-tps/src/confirmations_processing.rs | 28 +++++++++++------------ 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 591925159d7452..e7e0ab157ffed4 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -982,7 +982,7 @@ fn do_tx_transfers( }) .is_err() { - info!("Receiver has been dropped, stop sending transactions."); + error!("Receiver has been dropped, stop sending transactions."); break; } } diff --git a/bench-tps/src/confirmations_processing.rs b/bench-tps/src/confirmations_processing.rs index 1da917379236a5..dd2efca8e096a4 100644 --- a/bench-tps/src/confirmations_processing.rs +++ b/bench-tps/src/confirmations_processing.rs @@ -23,7 +23,7 @@ use { collections::HashMap, fs::File, sync::Arc, - thread::{self, Builder, JoinHandle}, + thread::{self, sleep, Builder, JoinHandle}, time::Duration, }, }; @@ -134,29 +134,24 @@ impl LogTransactionService { sent_at, compute_unit_prices }) => { - let mut measure_send_txs = Measure::start("measure_update_map"); signatures.iter().zip(compute_unit_prices).for_each( |(sign, compute_unit_price)| {signature_to_tx_info.insert(*sign, TransactionSendInfo { sent_at, compute_unit_price });}); - - measure_send_txs.stop(); - let time_send_ns = measure_send_txs.as_ns(); - info!("@@@ Time to add signatures to map: {time_send_ns}") } Err(e) => { - info!("Stop LogTransactionService, error message received {e}"); + info!("@@@ Stop LogTransactionService, message received: {e}"); log_writer.flush(); break; } } }, recv(block_processing_timer_receiver) -> _ => { - let mut measure_send_txs = Measure::start("measure_update_map"); - info!("sign_receiver queue len: {}", signature_receiver.len()); + let mut measure_process_blocks = Measure::start("measure_process_blocks"); + info!("@@@ sign_receiver queue len: {}", signature_receiver.len()); let block_slots = get_blocks_with_retry(&client, start_block); let Ok(block_slots) = block_slots else { - error!("Failed to get blocks"); + error!("Failed to get blocks, stop LogWriterService."); drop(signature_receiver); break; }; @@ -170,6 +165,7 @@ impl LogTransactionService { rpc_block_config ) }); + let num_blocks = blocks.len(); for (block, slot) in blocks.zip(&block_slots) { let Ok(block) = block else { continue; @@ -185,10 +181,9 @@ impl LogTransactionService { // maybe ok to write every time here? Or create a separate timer log_writer.flush(); - measure_send_txs.stop(); - let time_send_ns = measure_send_txs.as_ns(); - info!("@@@ Time to process blocks: {time_send_ns}") - + measure_process_blocks.stop(); + let time_send_us = measure_process_blocks.as_us(); + info!("@@@ Time to process {num_blocks} blocks: {time_send_us}") }, } } @@ -247,7 +242,7 @@ impl LogTransactionService { Some(block.blockhash.clone()), Some(slot_leader.clone()), compute_unit_price, - true, + false, ); } } @@ -423,6 +418,7 @@ impl LogWriter { } const NUM_RETRY: u64 = 5; +const RETRY_EVERY_MS: u64 = 4 * DEFAULT_MS_PER_SLOT; fn get_slot_with_retry(client: &Arc) -> Result where @@ -437,6 +433,7 @@ where } Err(error) => { warn!("Failed to get slot: {error}, retry."); + sleep(Duration::from_millis(RETRY_EVERY_MS)); } } } @@ -456,6 +453,7 @@ where } Err(error) => { warn!("Failed to download blocks: {error}, retry."); + sleep(Duration::from_millis(RETRY_EVERY_MS)); } } }