Skip to content

Commit

Permalink
update start_slot when processing blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillLykov committed Mar 22, 2024
1 parent 5f83197 commit 5742a4c
Showing 1 changed file with 31 additions and 17 deletions.
48 changes: 31 additions & 17 deletions bench-tps/src/log_transaction_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,18 @@ where
}
}

// How many blocks to process during one iteration.
// The time to process blocks is dominated by get_block calls.
// Each call takes slightly less time than slot.
const NUM_SLOTS_PER_ITERATION: u64 = 16;
// How often process blocks.
const PROCESS_BLOCKS_EVERY_MS: u64 = 16 * DEFAULT_MS_PER_SLOT;
const PROCESS_BLOCKS_EVERY_MS: u64 = NUM_SLOTS_PER_ITERATION * DEFAULT_MS_PER_SLOT;
// Empirically calculated constant added to MAX_PROCESSING_AGE to avoid cleaning some transactions
// that still might be added to the block.
const AGE_EPSILON: usize = 50;
// Max age for transaction in the transaction map.
const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 = (MAX_PROCESSING_AGE as f64 * DEFAULT_S_PER_SLOT) as i64;
const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 =
((MAX_PROCESSING_AGE + AGE_EPSILON) as f64 * DEFAULT_S_PER_SLOT) as i64;

// Map used to filter submitted transactions.
#[derive(Clone)]
Expand Down Expand Up @@ -120,9 +128,10 @@ impl LogTransactionService {
};
let block_processing_timer_receiver = tick(Duration::from_millis(PROCESS_BLOCKS_EVERY_MS));

let start_block = get_slot_with_retry(&client, commitment)
let mut start_slot = get_slot_with_retry(&client, commitment)
.expect("get_slot_with_retry succeed, cannot proceed without having slot. Must be a problem with RPC.");

let mut sender_stopped = false;
let mut signature_to_tx_info = MapSignatureToTxInfo::new();
loop {
select! {
Expand All @@ -138,18 +147,18 @@ impl LogTransactionService {
compute_unit_price
});});
}
Err(e) => {
info!("Stop LogTransactionService, message received: {e}");
tx_log_writer.flush();
block_log_writer.flush();
break;
Err(_) => {
sender_stopped = true;
}
}
},
recv(block_processing_timer_receiver) -> _ => {
info!("sign_receiver queue len: {}", signature_receiver.len());
if signature_receiver.len() != 0 {
continue;
}
let mut measure_get_blocks = Measure::start("measure_get_blocks");
let block_slots = get_blocks_with_retry(&client, start_block, commitment);
let block_slots = get_blocks_with_retry(&client, start_slot, Some(start_slot + NUM_SLOTS_PER_ITERATION - 1), commitment);
measure_get_blocks.stop();
let time_get_blocks_us = measure_get_blocks.as_us();
info!("Time to get_blocks : {time_get_blocks_us}us.");
Expand All @@ -170,8 +179,13 @@ impl LogTransactionService {
);
Self::clean_transaction_map(&mut tx_log_writer, &mut signature_to_tx_info);

start_slot = start_slot.saturating_add(NUM_SLOTS_PER_ITERATION);
tx_log_writer.flush();
block_log_writer.flush();
if sender_stopped && signature_to_tx_info.len() == 0 {
info!("Stop LogTransactionService");
break;
}
},
}
}
Expand Down Expand Up @@ -292,10 +306,9 @@ impl LogTransactionService {
) {
let now: DateTime<Utc> = Utc::now();
signature_to_tx_info.retain(|signature, tx_info| {
let duration_since_past_time = now.signed_duration_since(tx_info.sent_at);
let is_not_timeout_tx =
duration_since_past_time.num_seconds() < REMOVE_TIMEOUT_TX_EVERY_SEC;
if !is_not_timeout_tx {
let duration_since_sent = now.signed_duration_since(tx_info.sent_at);
let is_timeout_tx = duration_since_sent.num_seconds() > REMOVE_TIMEOUT_TX_EVERY_SEC;
if is_timeout_tx {
tx_log_writer.write(
None,
None,
Expand All @@ -308,7 +321,7 @@ impl LogTransactionService {
tx_info.compute_unit_price,
);
}
is_not_timeout_tx
!is_timeout_tx
});
}
}
Expand Down Expand Up @@ -367,7 +380,7 @@ impl BlockLogWriter {
block_time: block_time.map(|time| {
Utc.timestamp_opt(time, 0)
.latest()
.expect("valid timestamp")
.expect("timestamp should be valid")
}),
num_bench_tps_transactions,
total_num_transactions,
Expand Down Expand Up @@ -495,14 +508,15 @@ where

fn get_blocks_with_retry<Client>(
client: &Arc<Client>,
start_block: u64,
start_slot: Slot,
end_slot: Option<Slot>,
commitment: CommitmentConfig,
) -> Result<Vec<Slot>>
where
Client: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
call_rpc_with_retry(
|| client.get_blocks_with_commitment(start_block, None, commitment),
|| client.get_blocks_with_commitment(start_slot, end_slot, commitment),
"Failed to download blocks",
)
}

0 comments on commit 5742a4c

Please sign in to comment.