Skip to content

Commit

Permalink
extract process_blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillLykov committed Mar 22, 2024
1 parent a5f6103 commit 6f4b562
Showing 1 changed file with 52 additions and 35 deletions.
87 changes: 52 additions & 35 deletions bench-tps/src/log_transaction_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,9 @@ impl LogTransactionService {
) where
Client: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
let commitment: CommitmentConfig = CommitmentConfig {
commitment: CommitmentLevel::Confirmed,
};
let rpc_block_config = RpcBlockConfig {
encoding: Some(UiTransactionEncoding::Base64),
transaction_details: Some(TransactionDetails::Full),
rewards: Some(true),
commitment: Some(commitment),
max_supported_transaction_version: Some(0),
};
let block_processing_timer_receiver = tick(Duration::from_millis(PROCESS_BLOCKS_EVERY_MS));

let mut start_block = get_slot_with_retry(&client)
let start_block = get_slot_with_retry(&client)
.expect("get_slot_with_retry succeed, cannot proceed without having slot. Must be a problem with RPC.");

let mut signature_to_tx_info = MapSignatureToTxInfo::new();
Expand All @@ -154,48 +144,75 @@ impl LogTransactionService {
}
},
recv(block_processing_timer_receiver) -> _ => {
let mut measure_process_blocks = Measure::start("measure_process_blocks");
info!("sign_receiver queue len: {}", signature_receiver.len());
let mut measure_get_blocks = Measure::start("measure_get_blocks");
let block_slots = get_blocks_with_retry(&client, start_block);
measure_get_blocks.stop();
let time_get_blocks_us = measure_get_blocks.as_us();
info!("Time to get_blocks : {time_get_blocks_us}us.");
let Ok(block_slots) = block_slots else {
error!("Failed to get blocks, stop LogWriterService.");
break;
};
if block_slots.is_empty() {
continue;
}
start_block = *block_slots.last().unwrap() + 1;
let blocks = block_slots.iter().map(|slot| {
client.get_block_with_config(
*slot,
rpc_block_config
)
});
let num_blocks = blocks.len();
for (block, slot) in blocks.zip(&block_slots) {
let Ok(block) = block else {
continue;
};
Self::process_block(
block,
&mut signature_to_tx_info,
*slot,
&mut tx_log_writer,
&mut block_log_writer
)
}
Self::process_blocks(
&client,
block_slots,
&mut signature_to_tx_info,
&mut tx_log_writer,
&mut block_log_writer);
Self::clean_transaction_map(&mut tx_log_writer, &mut signature_to_tx_info);
measure_process_blocks.stop();

let time_send_us = measure_process_blocks.as_us();
info!("Time to process {num_blocks} blocks: {time_send_us}");
tx_log_writer.flush();
block_log_writer.flush();
},
}
}
}

fn process_blocks<Client>(
client: &Arc<Client>,
block_slots: Vec<Slot>,
signature_to_tx_info: &mut MapSignatureToTxInfo,
tx_log_writer: &mut TransactionLogWriter,
block_log_writer: &mut BlockLogWriter,
) where
Client: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
let commitment: CommitmentConfig = CommitmentConfig {
commitment: CommitmentLevel::Confirmed,
};
let rpc_block_config = RpcBlockConfig {
encoding: Some(UiTransactionEncoding::Base64),
transaction_details: Some(TransactionDetails::Full),
rewards: Some(true),
commitment: Some(commitment),
max_supported_transaction_version: Some(0),
};
let mut measure_process_blocks = Measure::start("measure_process_blocks");
let blocks = block_slots
.iter()
.map(|slot| client.get_block_with_config(*slot, rpc_block_config));
let num_blocks = blocks.len();
for (block, slot) in blocks.zip(&block_slots) {
let Ok(block) = block else {
continue;
};
Self::process_block(
block,
signature_to_tx_info,
*slot,
tx_log_writer,
block_log_writer,
)
}
measure_process_blocks.stop();
let time_process_blocks_us = measure_process_blocks.as_us();
info!("Time to process {num_blocks} blocks: {time_process_blocks_us}us.");
}

fn process_block(
block: UiConfirmedBlock,
signature_to_tx_info: &mut MapSignatureToTxInfo,
Expand Down

0 comments on commit 6f4b562

Please sign in to comment.