diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 62a604cc5e1392..928a67dc4d84ca 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -13,10 +13,7 @@ use { hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, signature::Signature, }, std::{ - collections::{ - hash_map::{Entry, HashMap}, - HashSet, - }, + collections::hash_map::{Entry, HashMap}, net::SocketAddr, sync::{ atomic::{AtomicBool, Ordering}, @@ -91,6 +88,16 @@ impl TransactionInfo { last_sent_time, } } + + fn get_max_retries( + &self, + default_max_retries: Option, + service_max_retries: usize, + ) -> Option { + self.max_retries + .or(default_max_retries) + .map(|max_retries| max_retries.min(service_max_retries)) + } } #[derive(Default, Debug, PartialEq, Eq)] @@ -101,6 +108,7 @@ struct ProcessTransactionsResult { max_retries_elapsed: u64, failed: u64, retained: u64, + last_sent_time: Option, } #[derive(Clone, Debug)] @@ -135,7 +143,7 @@ impl Default for Config { /// The maximum duration the retry thread may be configured to sleep before /// processing the transactions that need to be retried. -pub const MAX_RETRY_SLEEP_MS: u64 = 1000; +pub const MAX_RETRY_SLEEP_MS: u64 = 1_000; impl SendTransactionService { pub fn new( @@ -168,6 +176,8 @@ impl SendTransactionService { client.clone(), retry_transactions.clone(), stats_report.clone(), + config.service_max_retries, + config.default_max_retries, config.batch_send_rate_ms, config.batch_size, config.retry_pool_max_size, @@ -193,11 +203,14 @@ impl SendTransactionService { } /// Thread responsible for receiving transactions from RPC clients. + #[allow(clippy::too_many_arguments)] fn receive_txn_thread( receiver: Receiver, client: Client, retry_transactions: Arc>>, stats_report: Arc, + service_max_retries: usize, + default_max_retries: Option, batch_send_rate_ms: u64, batch_size: usize, retry_pool_max_size: usize, @@ -261,9 +274,17 @@ impl SendTransactionService { { // take a lock of retry_transactions and move the batch to the retry set. let mut retry_transactions = retry_transactions.lock().unwrap(); - let transactions_to_retry = transactions.len(); + let mut transactions_to_retry: usize = 0; let mut transactions_added_to_retry: usize = 0; for (signature, mut transaction_info) in transactions.drain() { + // drop transactions with 0 max retries + let max_retries = transaction_info + .get_max_retries(default_max_retries, service_max_retries); + if max_retries == Some(0) { + continue; + } + transactions_to_retry += 1; + let retry_len = retry_transactions.len(); let entry = retry_transactions.entry(signature); if let Entry::Vacant(_) = entry { @@ -305,19 +326,20 @@ impl SendTransactionService { exit: Arc, ) -> JoinHandle<()> { info!("Starting send-transaction-service::retry_thread with config."); + let retry_interval_ms_default = MAX_RETRY_SLEEP_MS.min(retry_rate_ms); + let mut retry_interval_ms = retry_interval_ms_default; Builder::new() .name("solStxRetry".to_string()) .spawn(move || loop { - let retry_interval_ms = retry_rate_ms; - let stats = &stats_report.stats; - sleep(Duration::from_millis( - MAX_RETRY_SLEEP_MS.min(retry_interval_ms), - )); + sleep(Duration::from_millis(retry_interval_ms)); if exit.load(Ordering::Relaxed) { break; } let mut transactions = retry_transactions.lock().unwrap(); - if !transactions.is_empty() { + if transactions.is_empty() { + retry_interval_ms = retry_interval_ms_default; + } else { + let stats = &stats_report.stats; stats .retry_queue_size .store(transactions.len() as u64, Ordering::Relaxed); @@ -326,7 +348,7 @@ impl SendTransactionService { (bank_forks.root_bank(), bank_forks.working_bank()) }; - let _result = Self::process_transactions( + let result = Self::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -338,6 +360,17 @@ impl SendTransactionService { stats, ); stats_report.report(); + + // to send transactions as soon as possible we adjust retry interval + retry_interval_ms = retry_interval_ms_default + .checked_sub( + result + .last_sent_time + .and_then(|last| Instant::now().checked_duration_since(last)) + .and_then(|interval| interval.as_millis().try_into().ok()) + .unwrap_or(0), + ) + .unwrap_or(retry_interval_ms_default); } }) .unwrap() @@ -357,7 +390,8 @@ impl SendTransactionService { ) -> ProcessTransactionsResult { let mut result = ProcessTransactionsResult::default(); - let mut batched_transactions = HashSet::new(); + let mut batched_transactions = Vec::new(); + let mut exceeded_retries_transactions = Vec::new(); let retry_rate = Duration::from_millis(retry_rate_ms); transactions.retain(|signature, transaction_info| { @@ -376,7 +410,8 @@ impl SendTransactionService { let now = Instant::now(); let expired = transaction_info .last_sent_time - .map(|last| now.duration_since(last) >= retry_rate) + .and_then(|last| now.checked_duration_since(last)) + .map(|elapsed| elapsed >= retry_rate) .unwrap_or(false); let verify_nonce_account = nonce_account::verify_nonce_account(&nonce_account, &durable_nonce); @@ -415,21 +450,36 @@ impl SendTransactionService { let now = Instant::now(); let need_send = transaction_info .last_sent_time - .map(|last| now.duration_since(last) >= retry_rate) + .and_then(|last| now.checked_duration_since(last)) + .map(|elapsed| elapsed >= retry_rate) .unwrap_or(true); if need_send { if transaction_info.last_sent_time.is_some() { // Transaction sent before is unknown to the working bank, it might have been - // dropped or landed in another fork. Re-send it + // dropped or landed in another fork. Re-send it. info!("Retrying transaction: {}", signature); result.retried += 1; transaction_info.retries += 1; - stats.retries.fetch_add(1, Ordering::Relaxed); } - batched_transactions.insert(*signature); + batched_transactions.push(*signature); transaction_info.last_sent_time = Some(now); + + let max_retries = transaction_info + .get_max_retries(default_max_retries, service_max_retries); + if let Some(max_retries) = max_retries { + if transaction_info.retries >= max_retries { + exceeded_retries_transactions.push(*signature); + } + } + } else if let Some(last) = transaction_info.last_sent_time { + result.last_sent_time = Some( + result + .last_sent_time + .map(|result_last| result_last.min(last)) + .unwrap_or(last), + ); } true } @@ -447,12 +497,14 @@ impl SendTransactionService { } }); + stats.retries.fetch_add(result.retried, Ordering::Relaxed); + if !batched_transactions.is_empty() { // Processing the transactions in batch - let wire_transactions = transactions + let wire_transactions = batched_transactions .iter() - .filter(|(signature, _)| batched_transactions.contains(signature)) - .map(|(_, transaction_info)| transaction_info.wire_transaction.clone()); + .filter_map(|signature| transactions.get(signature)) + .map(|transaction_info| transaction_info.wire_transaction.clone()); let iter = wire_transactions.chunks(batch_size); for chunk in &iter { @@ -460,6 +512,16 @@ impl SendTransactionService { client.send_transactions_in_batch(chunk, stats); } } + + result.max_retries_elapsed += exceeded_retries_transactions.len() as u64; + stats + .transactions_exceeding_max_retries + .fetch_add(result.max_retries_elapsed, Ordering::Relaxed); + for signature in exceeded_retries_transactions { + info!("Dropping transaction due to max retries: {signature}"); + transactions.remove(&signature); + } + result } @@ -821,31 +883,12 @@ mod test { config.batch_size, &stats, ); - assert_eq!(transactions.len(), 1); - assert_eq!( - result, - ProcessTransactionsResult { - retried: 1, - max_retries_elapsed: 1, - ..ProcessTransactionsResult::default() - } - ); - let result = SendTransactionService::process_transactions( - &working_bank, - &root_bank, - &mut transactions, - &client, - config.retry_rate_ms, - config.service_max_retries, - config.default_max_retries, - config.batch_size, - &stats, - ); assert!(transactions.is_empty()); assert_eq!( result, ProcessTransactionsResult { - max_retries_elapsed: 1, + retried: 1, + max_retries_elapsed: 2, ..ProcessTransactionsResult::default() } );