From 5742a4c212a612183070f0f21dd44c5041d373c7 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 19 Mar 2024 15:57:44 +0000 Subject: [PATCH] update start_slot when processing blocks --- bench-tps/src/log_transaction_service.rs | 48 +++++++++++++++--------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/bench-tps/src/log_transaction_service.rs b/bench-tps/src/log_transaction_service.rs index 3f03161420ad35..dec9353fc038a5 100644 --- a/bench-tps/src/log_transaction_service.rs +++ b/bench-tps/src/log_transaction_service.rs @@ -61,10 +61,18 @@ where } } +// How many blocks to process during one iteration. +// The time to process blocks is dominated by get_block calls. +// Each call takes slightly less time than slot. +const NUM_SLOTS_PER_ITERATION: u64 = 16; // How often process blocks. -const PROCESS_BLOCKS_EVERY_MS: u64 = 16 * DEFAULT_MS_PER_SLOT; +const PROCESS_BLOCKS_EVERY_MS: u64 = NUM_SLOTS_PER_ITERATION * DEFAULT_MS_PER_SLOT; +// Empirically calculated constant added to MAX_PROCESSING_AGE to avoid cleaning some transactions +// that still might be added to the block. +const AGE_EPSILON: usize = 50; // Max age for transaction in the transaction map. -const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 = (MAX_PROCESSING_AGE as f64 * DEFAULT_S_PER_SLOT) as i64; +const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 = + ((MAX_PROCESSING_AGE + AGE_EPSILON) as f64 * DEFAULT_S_PER_SLOT) as i64; // Map used to filter submitted transactions. #[derive(Clone)] @@ -120,9 +128,10 @@ impl LogTransactionService { }; let block_processing_timer_receiver = tick(Duration::from_millis(PROCESS_BLOCKS_EVERY_MS)); - let start_block = get_slot_with_retry(&client, commitment) + let mut start_slot = get_slot_with_retry(&client, commitment) .expect("get_slot_with_retry succeed, cannot proceed without having slot. Must be a problem with RPC."); + let mut sender_stopped = false; let mut signature_to_tx_info = MapSignatureToTxInfo::new(); loop { select! { @@ -138,18 +147,18 @@ impl LogTransactionService { compute_unit_price });}); } - Err(e) => { - info!("Stop LogTransactionService, message received: {e}"); - tx_log_writer.flush(); - block_log_writer.flush(); - break; + Err(_) => { + sender_stopped = true; } } }, recv(block_processing_timer_receiver) -> _ => { info!("sign_receiver queue len: {}", signature_receiver.len()); + if signature_receiver.len() != 0 { + continue; + } let mut measure_get_blocks = Measure::start("measure_get_blocks"); - let block_slots = get_blocks_with_retry(&client, start_block, commitment); + let block_slots = get_blocks_with_retry(&client, start_slot, Some(start_slot + NUM_SLOTS_PER_ITERATION - 1), commitment); measure_get_blocks.stop(); let time_get_blocks_us = measure_get_blocks.as_us(); info!("Time to get_blocks : {time_get_blocks_us}us."); @@ -170,8 +179,13 @@ impl LogTransactionService { ); Self::clean_transaction_map(&mut tx_log_writer, &mut signature_to_tx_info); + start_slot = start_slot.saturating_add(NUM_SLOTS_PER_ITERATION); tx_log_writer.flush(); block_log_writer.flush(); + if sender_stopped && signature_to_tx_info.len() == 0 { + info!("Stop LogTransactionService"); + break; + } }, } } @@ -292,10 +306,9 @@ impl LogTransactionService { ) { let now: DateTime = Utc::now(); signature_to_tx_info.retain(|signature, tx_info| { - let duration_since_past_time = now.signed_duration_since(tx_info.sent_at); - let is_not_timeout_tx = - duration_since_past_time.num_seconds() < REMOVE_TIMEOUT_TX_EVERY_SEC; - if !is_not_timeout_tx { + let duration_since_sent = now.signed_duration_since(tx_info.sent_at); + let is_timeout_tx = duration_since_sent.num_seconds() > REMOVE_TIMEOUT_TX_EVERY_SEC; + if is_timeout_tx { tx_log_writer.write( None, None, @@ -308,7 +321,7 @@ impl LogTransactionService { tx_info.compute_unit_price, ); } - is_not_timeout_tx + !is_timeout_tx }); } } @@ -367,7 +380,7 @@ impl BlockLogWriter { block_time: block_time.map(|time| { Utc.timestamp_opt(time, 0) .latest() - .expect("valid timestamp") + .expect("timestamp should be valid") }), num_bench_tps_transactions, total_num_transactions, @@ -495,14 +508,15 @@ where fn get_blocks_with_retry( client: &Arc, - start_block: u64, + start_slot: Slot, + end_slot: Option, commitment: CommitmentConfig, ) -> Result> where Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { call_rpc_with_retry( - || client.get_blocks_with_commitment(start_block, None, commitment), + || client.get_blocks_with_commitment(start_slot, end_slot, commitment), "Failed to download blocks", ) }