Skip to content

Commit

Permalink
v1.18: client: Timeout resends during send_and_confirm_in_parallel
Browse files Browse the repository at this point in the history
…(backport of #358) (#384)

client: Timeout resends during `send_and_confirm_in_parallel` (#358)

* client: Timeout resends during `send_and_confirm_in_parallel`

* Clarify constant

(cherry picked from commit 36c66f5)

Co-authored-by: Jon C <[email protected]>
  • Loading branch information
mergify[bot] and joncinque authored Mar 22, 2024
1 parent 6422ceb commit c219226
Showing 1 changed file with 32 additions and 22 deletions.
54 changes: 32 additions & 22 deletions client/src/send_and_confirm_transactions_in_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use {
tokio::{sync::RwLock, task::JoinHandle, time::Instant},
};

const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(10);
const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(5);
const TPU_RESEND_REFRESH_RATE: Duration = Duration::from_secs(2);
const SEND_INTERVAL: Duration = Duration::from_millis(10);
type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;
Expand Down Expand Up @@ -326,21 +326,20 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
);
}

if let Some(progress_bar) = progress_bar {
let progress = progress_from_context_and_block_height(context, max_valid_block_height);
progress.set_message_for_confirmed_transactions(
progress_bar,
"Checking transaction status...",
);
}

// wait till all transactions are confirmed or we have surpassed max processing age for the last sent transaction
while !unconfirmed_transaction_map.is_empty()
&& current_block_height.load(Ordering::Relaxed) <= max_valid_block_height
{
let block_height = current_block_height.load(Ordering::Relaxed);

if let Some(progress_bar) = progress_bar {
let progress =
progress_from_context_and_block_height(context, max_valid_block_height);
progress.set_message_for_confirmed_transactions(
progress_bar,
"Checking transaction status...",
);
}

if let Some(tpu_client) = tpu_client {
let instant = Instant::now();
// retry sending transaction only over TPU port
Expand All @@ -349,10 +348,29 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
.iter()
.filter(|x| block_height < x.last_valid_block_height)
.map(|x| x.serialized_transaction.clone())
.collect();
let _ = tpu_client
.try_send_wire_transaction_batch(txs_to_resend_over_tpu)
.await;
.collect::<Vec<_>>();
let num_txs_to_resend = txs_to_resend_over_tpu.len();
// This is a "reasonable" constant for how long it should
// take to fan the transactions out, taken from
// `solana_tpu_client::nonblocking::tpu_client::send_wire_transaction_futures`
const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
let message = if tokio::time::timeout(
SEND_TIMEOUT_INTERVAL,
tpu_client.try_send_wire_transaction_batch(txs_to_resend_over_tpu),
)
.await
.is_err()
{
format!("Timed out resending {num_txs_to_resend} transactions...")
} else {
format!("Resent {num_txs_to_resend} transactions...")
};

if let Some(progress_bar) = progress_bar {
let progress =
progress_from_context_and_block_height(context, max_valid_block_height);
progress.set_message_for_confirmed_transactions(progress_bar, &message);
}

let elapsed = instant.elapsed();
if elapsed < TPU_RESEND_REFRESH_RATE {
Expand All @@ -370,14 +388,6 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
max_valid_block_height = max_valid_block_height_in_remaining_transaction;
}
}

if let Some(progress_bar) = progress_bar {
let progress = progress_from_context_and_block_height(context, max_valid_block_height);
progress.set_message_for_confirmed_transactions(
progress_bar,
"Checking transaction status...",
);
}
}
}

Expand Down

0 comments on commit c219226

Please sign in to comment.