From 921d4867fedd9a083681f78212697ad215964104 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 5 Mar 2024 14:23:46 +0000 Subject: [PATCH] add comments and restructure code --- bench-tps/src/bench.rs | 4 +- bench-tps/src/confirmations_processing.rs | 120 ++++++++++++---------- 2 files changed, 67 insertions(+), 57 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 76e8c6cbe20355..591925159d7452 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -3,7 +3,7 @@ use { bench_tps_client::*, cli::{ComputeUnitPrice, Config, InstructionPaddingConfig}, confirmations_processing::{ - create_log_transactions_service_and_sender, SignatureBatch, SignatureBatchSender, + create_log_transactions_service_and_sender, SignatureBatchSender, TransactionInfoBatch, }, perf_utils::{sample_txs, SampleStats}, send_batch::*, @@ -975,7 +975,7 @@ fn do_tx_transfers( if let Some(signatures_sender) = &signatures_sender { if signatures_sender - .send(SignatureBatch { + .send(TransactionInfoBatch { signatures, sent_at: Utc::now(), compute_unit_prices, diff --git a/bench-tps/src/confirmations_processing.rs b/bench-tps/src/confirmations_processing.rs index 25dbde066c431d..1da917379236a5 100644 --- a/bench-tps/src/confirmations_processing.rs +++ b/bench-tps/src/confirmations_processing.rs @@ -1,3 +1,6 @@ +//! `LogTransactionService` requests confirmed blocks, analyses transactions submitted by bench-tps, +//! and saves log files in csv format. + use { crate::bench_tps_client::{BenchTpsClient, Result}, chrono::{DateTime, TimeZone, Utc}, @@ -25,58 +28,21 @@ use { }, }; -const PROCESS_BLOCKS_EVERY_MS: u64 = 16 * DEFAULT_MS_PER_SLOT; -const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 = (MAX_PROCESSING_AGE as f64 * DEFAULT_S_PER_SLOT) as i64; - +// Data to establish communication between sender thread and +// LogTransactionService. #[derive(Clone)] -pub(crate) struct SignatureBatch { +pub(crate) struct TransactionInfoBatch { pub signatures: Vec, pub sent_at: DateTime, pub compute_unit_prices: Vec>, } -#[derive(Clone)] -pub(crate) struct TransactionSendInfo { - pub sent_at: DateTime, - pub compute_unit_price: Option, -} +pub(crate) type SignatureBatchSender = Sender; -#[derive(Clone, Serialize)] -struct BlockData { - pub block_hash: String, - pub block_slot: Slot, - pub block_leader: String, - pub block_time: Option>, - pub total_num_transactions: usize, - pub num_bench_tps_transactions: usize, - pub total_cu_consumed: u64, - pub bench_tps_cu_consumed: u64, -} - -#[derive(Clone, Serialize)] -struct TransactionData { - pub signature: String, - pub sent_at: String, //TODO(klykov) use consistently DateTime? I think it cane be serialized - pub confirmed_slot: Option, - pub confirmed_at: Option>, - pub successful: bool, - pub slot_leader: Option, - pub error: Option, - pub blockhash: Option, - pub timed_out: bool, - pub compute_unit_price: u64, -} - -fn check_confirmations( - block_data_file: Option<&String>, - transaction_data_file: Option<&String>, -) -> bool { - block_data_file.is_some() || transaction_data_file.is_some() +pub(crate) struct LogTransactionService { + thread_handler: JoinHandle<()>, } -pub(crate) type SignatureBatchSender = Sender; -type SignatureBatchReceiver = Receiver; - pub(crate) fn create_log_transactions_service_and_sender( client: &Arc, block_data_file: Option<&String>, @@ -95,15 +61,22 @@ where } } -pub(crate) struct LogTransactionService { - thread_handler: JoinHandle<()>, +// How often process blocks. +const PROCESS_BLOCKS_EVERY_MS: u64 = 16 * DEFAULT_MS_PER_SLOT; +// Max age for transaction in the transaction map. +const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 = (MAX_PROCESSING_AGE as f64 * DEFAULT_S_PER_SLOT) as i64; + +// Map used to filter submitted transactions. +#[derive(Clone)] +struct TransactionSendInfo { + pub sent_at: DateTime, + pub compute_unit_price: Option, } +type MapSignatureToTxInfo = HashMap; -impl LogTransactionService { - pub fn join(self) -> thread::Result<()> { - self.thread_handler.join() - } +type SignatureBatchReceiver = Receiver; +impl LogTransactionService { fn new( client: &Arc, signature_receiver: SignatureBatchReceiver, @@ -129,6 +102,10 @@ impl LogTransactionService { Self { thread_handler } } + pub fn join(self) -> thread::Result<()> { + self.thread_handler.join() + } + fn run(client: Arc, signature_receiver: SignatureBatchReceiver, mut log_writer: LogWriter) where T: 'static + BenchTpsClient + Send + Sync + ?Sized, @@ -147,12 +124,12 @@ impl LogTransactionService { let mut start_block = get_slot_with_retry(&client).expect("get_slot_with_retry succeed"); - let mut signature_to_tx_info = HashMap::::new(); + let mut signature_to_tx_info = MapSignatureToTxInfo::new(); loop { select! { recv(signature_receiver) -> msg => { match msg { - Ok(SignatureBatch { + Ok(TransactionInfoBatch { signatures, sent_at, compute_unit_prices @@ -219,7 +196,7 @@ impl LogTransactionService { fn process_block( block: UiConfirmedBlock, - signature_to_tx_info: &mut HashMap, + signature_to_tx_info: &mut MapSignatureToTxInfo, slot: u64, log_writer: &mut LogWriter, ) { @@ -247,7 +224,7 @@ impl LogTransactionService { let cu_consumed = meta .as_ref() .map_or(0, |meta| match meta.compute_units_consumed { - OptionSerializer::Some(cu_consumed) => cu_consumed, //TODO(klykov): consider adding error info as well + OptionSerializer::Some(cu_consumed) => cu_consumed, _ => 0, }); let signature = &transaction.signatures[0]; @@ -288,7 +265,7 @@ impl LogTransactionService { fn clean_transaction_map( log_writer: &mut LogWriter, - signature_to_tx_info: &mut HashMap, + signature_to_tx_info: &mut MapSignatureToTxInfo, ) { let now: DateTime = Utc::now(); signature_to_tx_info.retain(|signature, tx_info| { @@ -313,6 +290,39 @@ impl LogTransactionService { } } +fn check_confirmations( + block_data_file: Option<&String>, + transaction_data_file: Option<&String>, +) -> bool { + block_data_file.is_some() || transaction_data_file.is_some() +} + +#[derive(Clone, Serialize)] +struct BlockData { + pub block_hash: String, + pub block_slot: Slot, + pub block_leader: String, + pub block_time: Option>, + pub total_num_transactions: usize, + pub num_bench_tps_transactions: usize, + pub total_cu_consumed: u64, + pub bench_tps_cu_consumed: u64, +} + +#[derive(Clone, Serialize)] +struct TransactionData { + pub signature: String, + pub sent_at: Option>, + pub confirmed_slot: Option, + pub confirmed_at: Option>, + pub successful: bool, + pub slot_leader: Option, + pub error: Option, + pub blockhash: Option, + pub timed_out: bool, + pub compute_unit_price: u64, +} + type CsvFileWriter = csv::Writer; struct LogWriter { block_log_writer: Option, @@ -357,7 +367,7 @@ impl LogWriter { .latest() .expect("valid timestamp") }), - sent_at: sent_at.to_string(), + sent_at: Some(sent_at), successful: meta.as_ref().map_or(false, |m| m.status.is_ok()), error: meta .as_ref()