Skip to content

Commit

Permalink
fix bug with time in clear transaction map
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillLykov committed Mar 25, 2024
1 parent 54adc0c commit 7cd0d67
Showing 1 changed file with 28 additions and 15 deletions.
43 changes: 28 additions & 15 deletions bench-tps/src/log_transaction_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,8 @@ where
const NUM_SLOTS_PER_ITERATION: u64 = 16;
// How often process blocks.
const PROCESS_BLOCKS_EVERY_MS: u64 = NUM_SLOTS_PER_ITERATION * DEFAULT_MS_PER_SLOT;
// Empirically calculated constant added to MAX_PROCESSING_AGE to avoid cleaning some transactions
// that still might be added to the block.
const AGE_EPSILON: usize = 50;
// Max age for transaction in the transaction map, older transactions are cleaned up and marked as timeout.
const REMOVE_TIMEOUT_TX_EVERY_MS: i64 =
(MAX_PROCESSING_AGE + AGE_EPSILON) as i64 * (DEFAULT_MS_PER_SLOT as i64);
const REMOVE_TIMEOUT_TX_EVERY_MS: i64 = MAX_PROCESSING_AGE as i64 * DEFAULT_MS_PER_SLOT as i64;

// Map used to filter submitted transactions.
#[derive(Clone)]
Expand Down Expand Up @@ -173,15 +169,15 @@ impl LogTransactionService {
if block_slots.is_empty() {
continue;
}
Self::process_blocks(
let last_block_time = Self::process_blocks(
&client,
block_slots,
&mut signature_to_tx_info,
&mut tx_log_writer,
&mut block_log_writer,
commitment,
);
Self::clean_transaction_map(&mut tx_log_writer, &mut signature_to_tx_info);
Self::clean_transaction_map(&mut tx_log_writer, &mut signature_to_tx_info, last_block_time);

start_slot = start_slot.saturating_add(NUM_SLOTS_PER_ITERATION);
tx_log_writer.flush();
Expand All @@ -195,14 +191,17 @@ impl LogTransactionService {
}
}

/// Download and process the blocks.
/// Returns the time when the last processed block has been confirmed or now().
fn process_blocks<Client>(
client: &Arc<Client>,
block_slots: Vec<Slot>,
signature_to_tx_info: &mut MapSignatureToTxInfo,
tx_log_writer: &mut TransactionLogWriter,
block_log_writer: &mut BlockLogWriter,
commitment: CommitmentConfig,
) where
) -> DateTime<Utc>
where
Client: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
let rpc_block_config = RpcBlockConfig {
Expand All @@ -217,21 +216,27 @@ impl LogTransactionService {
.iter()
.map(|slot| client.get_block_with_config(*slot, rpc_block_config));
let num_blocks = blocks.len();
let mut last_block_time = None;
for (block, slot) in blocks.zip(&block_slots) {
let Ok(block) = block else {
continue;
};
Self::process_block(
let block_time = Self::process_block(
block,
signature_to_tx_info,
*slot,
tx_log_writer,
block_log_writer,
)
);
// if last_time is some, it means that the there is at least one valid block
if block_time.is_some() {
last_block_time = block_time;
}
}
measure_process_blocks.stop();
let time_process_blocks_us = measure_process_blocks.as_us();
info!("Time to process {num_blocks} blocks: {time_process_blocks_us}us.");
last_block_time.unwrap_or_else(Utc::now)
}

fn process_block(
Expand All @@ -240,7 +245,7 @@ impl LogTransactionService {
slot: u64,
tx_log_writer: &mut TransactionLogWriter,
block_log_writer: &mut BlockLogWriter,
) {
) -> Option<DateTime<Utc>> {
let rewards = block
.rewards
.as_ref()
Expand All @@ -252,7 +257,7 @@ impl LogTransactionService {

let Some(transactions) = &block.transactions else {
warn!("Empty block: {slot}");
return;
return None;
};

let mut num_bench_tps_transactions: usize = 0;
Expand Down Expand Up @@ -304,16 +309,24 @@ impl LogTransactionService {
transactions.len(),
bench_tps_cu_consumed,
total_cu_consumed,
)
);

block.block_time.map(|time| {
Utc.timestamp_opt(time, 0)
.latest()
.expect("valid timestamp")
})
}

/// Remove from map all the signatures which we haven't processed before and they are
/// older than the the timestamp of the last processed block plus max blockhash age.
fn clean_transaction_map(
tx_log_writer: &mut TransactionLogWriter,
signature_to_tx_info: &mut MapSignatureToTxInfo,
last_block_time: DateTime<Utc>,
) {
let now: DateTime<Utc> = Utc::now();
signature_to_tx_info.retain(|signature, tx_info| {
let duration_since_sent = now.signed_duration_since(tx_info.sent_at);
let duration_since_sent = last_block_time.signed_duration_since(tx_info.sent_at);
let is_timeout_tx = duration_since_sent.num_milliseconds() > REMOVE_TIMEOUT_TX_EVERY_MS;
if is_timeout_tx {
tx_log_writer.write(
Expand Down

0 comments on commit 7cd0d67

Please sign in to comment.