From 6f4b56280dcd2329879a551293b05ba42fdf5b3e Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Wed, 6 Mar 2024 14:34:20 +0000 Subject: [PATCH] extract process_blocks --- bench-tps/src/log_transaction_service.rs | 87 ++++++++++++++---------- 1 file changed, 52 insertions(+), 35 deletions(-) diff --git a/bench-tps/src/log_transaction_service.rs b/bench-tps/src/log_transaction_service.rs index fef411ada6c6cf..5f5a057e020694 100644 --- a/bench-tps/src/log_transaction_service.rs +++ b/bench-tps/src/log_transaction_service.rs @@ -115,19 +115,9 @@ impl LogTransactionService { ) where Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { - let commitment: CommitmentConfig = CommitmentConfig { - commitment: CommitmentLevel::Confirmed, - }; - let rpc_block_config = RpcBlockConfig { - encoding: Some(UiTransactionEncoding::Base64), - transaction_details: Some(TransactionDetails::Full), - rewards: Some(true), - commitment: Some(commitment), - max_supported_transaction_version: Some(0), - }; let block_processing_timer_receiver = tick(Duration::from_millis(PROCESS_BLOCKS_EVERY_MS)); - let mut start_block = get_slot_with_retry(&client) + let start_block = get_slot_with_retry(&client) .expect("get_slot_with_retry succeed, cannot proceed without having slot. Must be a problem with RPC."); let mut signature_to_tx_info = MapSignatureToTxInfo::new(); @@ -154,9 +144,12 @@ impl LogTransactionService { } }, recv(block_processing_timer_receiver) -> _ => { - let mut measure_process_blocks = Measure::start("measure_process_blocks"); info!("sign_receiver queue len: {}", signature_receiver.len()); + let mut measure_get_blocks = Measure::start("measure_get_blocks"); let block_slots = get_blocks_with_retry(&client, start_block); + measure_get_blocks.stop(); + let time_get_blocks_us = measure_get_blocks.as_us(); + info!("Time to get_blocks : {time_get_blocks_us}us."); let Ok(block_slots) = block_slots else { error!("Failed to get blocks, stop LogWriterService."); break; @@ -164,31 +157,14 @@ impl LogTransactionService { if block_slots.is_empty() { continue; } - start_block = *block_slots.last().unwrap() + 1; - let blocks = block_slots.iter().map(|slot| { - client.get_block_with_config( - *slot, - rpc_block_config - ) - }); - let num_blocks = blocks.len(); - for (block, slot) in blocks.zip(&block_slots) { - let Ok(block) = block else { - continue; - }; - Self::process_block( - block, - &mut signature_to_tx_info, - *slot, - &mut tx_log_writer, - &mut block_log_writer - ) - } + Self::process_blocks( + &client, + block_slots, + &mut signature_to_tx_info, + &mut tx_log_writer, + &mut block_log_writer); Self::clean_transaction_map(&mut tx_log_writer, &mut signature_to_tx_info); - measure_process_blocks.stop(); - let time_send_us = measure_process_blocks.as_us(); - info!("Time to process {num_blocks} blocks: {time_send_us}"); tx_log_writer.flush(); block_log_writer.flush(); }, @@ -196,6 +172,47 @@ impl LogTransactionService { } } + fn process_blocks( + client: &Arc, + block_slots: Vec, + signature_to_tx_info: &mut MapSignatureToTxInfo, + tx_log_writer: &mut TransactionLogWriter, + block_log_writer: &mut BlockLogWriter, + ) where + Client: 'static + BenchTpsClient + Send + Sync + ?Sized, + { + let commitment: CommitmentConfig = CommitmentConfig { + commitment: CommitmentLevel::Confirmed, + }; + let rpc_block_config = RpcBlockConfig { + encoding: Some(UiTransactionEncoding::Base64), + transaction_details: Some(TransactionDetails::Full), + rewards: Some(true), + commitment: Some(commitment), + max_supported_transaction_version: Some(0), + }; + let mut measure_process_blocks = Measure::start("measure_process_blocks"); + let blocks = block_slots + .iter() + .map(|slot| client.get_block_with_config(*slot, rpc_block_config)); + let num_blocks = blocks.len(); + for (block, slot) in blocks.zip(&block_slots) { + let Ok(block) = block else { + continue; + }; + Self::process_block( + block, + signature_to_tx_info, + *slot, + tx_log_writer, + block_log_writer, + ) + } + measure_process_blocks.stop(); + let time_process_blocks_us = measure_process_blocks.as_us(); + info!("Time to process {num_blocks} blocks: {time_process_blocks_us}us."); + } + fn process_block( block: UiConfirmedBlock, signature_to_tx_info: &mut MapSignatureToTxInfo,