diff --git a/bench-tps/src/log_transaction_service.rs b/bench-tps/src/log_transaction_service.rs index 9af285a6a46408..6363ff59914c83 100644 --- a/bench-tps/src/log_transaction_service.rs +++ b/bench-tps/src/log_transaction_service.rs @@ -70,12 +70,8 @@ where const NUM_SLOTS_PER_ITERATION: u64 = 16; // How often process blocks. const PROCESS_BLOCKS_EVERY_MS: u64 = NUM_SLOTS_PER_ITERATION * DEFAULT_MS_PER_SLOT; -// Empirically calculated constant added to MAX_PROCESSING_AGE to avoid cleaning some transactions -// that still might be added to the block. -const AGE_EPSILON: usize = 50; // Max age for transaction in the transaction map, older transactions are cleaned up and marked as timeout. -const REMOVE_TIMEOUT_TX_EVERY_MS: i64 = - (MAX_PROCESSING_AGE + AGE_EPSILON) as i64 * (DEFAULT_MS_PER_SLOT as i64); +const REMOVE_TIMEOUT_TX_EVERY_MS: i64 = MAX_PROCESSING_AGE as i64 * DEFAULT_MS_PER_SLOT as i64; // Map used to filter submitted transactions. #[derive(Clone)] @@ -173,7 +169,7 @@ impl LogTransactionService { if block_slots.is_empty() { continue; } - Self::process_blocks( + let last_block_time = Self::process_blocks( &client, block_slots, &mut signature_to_tx_info, @@ -181,7 +177,7 @@ impl LogTransactionService { &mut block_log_writer, commitment, ); - Self::clean_transaction_map(&mut tx_log_writer, &mut signature_to_tx_info); + Self::clean_transaction_map(&mut tx_log_writer, &mut signature_to_tx_info, last_block_time); start_slot = start_slot.saturating_add(NUM_SLOTS_PER_ITERATION); tx_log_writer.flush(); @@ -195,6 +191,8 @@ impl LogTransactionService { } } + /// Download and process the blocks. + /// Returns the time when the last processed block has been confirmed or now(). fn process_blocks( client: &Arc, block_slots: Vec, @@ -202,7 +200,8 @@ impl LogTransactionService { tx_log_writer: &mut TransactionLogWriter, block_log_writer: &mut BlockLogWriter, commitment: CommitmentConfig, - ) where + ) -> DateTime + where Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { let rpc_block_config = RpcBlockConfig { @@ -217,21 +216,27 @@ impl LogTransactionService { .iter() .map(|slot| client.get_block_with_config(*slot, rpc_block_config)); let num_blocks = blocks.len(); + let mut last_block_time = None; for (block, slot) in blocks.zip(&block_slots) { let Ok(block) = block else { continue; }; - Self::process_block( + let block_time = Self::process_block( block, signature_to_tx_info, *slot, tx_log_writer, block_log_writer, - ) + ); + // if last_time is some, it means that the there is at least one valid block + if block_time.is_some() { + last_block_time = block_time; + } } measure_process_blocks.stop(); let time_process_blocks_us = measure_process_blocks.as_us(); info!("Time to process {num_blocks} blocks: {time_process_blocks_us}us."); + last_block_time.unwrap_or_else(Utc::now) } fn process_block( @@ -240,7 +245,7 @@ impl LogTransactionService { slot: u64, tx_log_writer: &mut TransactionLogWriter, block_log_writer: &mut BlockLogWriter, - ) { + ) -> Option> { let rewards = block .rewards .as_ref() @@ -252,7 +257,7 @@ impl LogTransactionService { let Some(transactions) = &block.transactions else { warn!("Empty block: {slot}"); - return; + return None; }; let mut num_bench_tps_transactions: usize = 0; @@ -304,16 +309,24 @@ impl LogTransactionService { transactions.len(), bench_tps_cu_consumed, total_cu_consumed, - ) + ); + + block.block_time.map(|time| { + Utc.timestamp_opt(time, 0) + .latest() + .expect("valid timestamp") + }) } + /// Remove from map all the signatures which we haven't processed before and they are + /// older than the the timestamp of the last processed block plus max blockhash age. fn clean_transaction_map( tx_log_writer: &mut TransactionLogWriter, signature_to_tx_info: &mut MapSignatureToTxInfo, + last_block_time: DateTime, ) { - let now: DateTime = Utc::now(); signature_to_tx_info.retain(|signature, tx_info| { - let duration_since_sent = now.signed_duration_since(tx_info.sent_at); + let duration_since_sent = last_block_time.signed_duration_since(tx_info.sent_at); let is_timeout_tx = duration_since_sent.num_milliseconds() > REMOVE_TIMEOUT_TX_EVERY_MS; if is_timeout_tx { tx_log_writer.write(