Skip to content

Commit

Permalink
send-transactions: optimize retry pool (#31)
Browse files Browse the repository at this point in the history
* optimize retry pool

* add clone

* add clippy rule

* revert tx in tests
  • Loading branch information
fanatid authored Dec 6, 2024
1 parent 31606f5 commit 0abfe27
Showing 1 changed file with 86 additions and 43 deletions.
129 changes: 86 additions & 43 deletions send-transaction-service/src/send_transaction_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ use {
hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, signature::Signature,
},
std::{
collections::{
hash_map::{Entry, HashMap},
HashSet,
},
collections::hash_map::{Entry, HashMap},
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -91,6 +88,16 @@ impl TransactionInfo {
last_sent_time,
}
}

fn get_max_retries(
&self,
default_max_retries: Option<usize>,
service_max_retries: usize,
) -> Option<usize> {
self.max_retries
.or(default_max_retries)
.map(|max_retries| max_retries.min(service_max_retries))
}
}

#[derive(Default, Debug, PartialEq, Eq)]
Expand All @@ -101,6 +108,7 @@ struct ProcessTransactionsResult {
max_retries_elapsed: u64,
failed: u64,
retained: u64,
last_sent_time: Option<Instant>,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -135,7 +143,7 @@ impl Default for Config {

/// The maximum duration the retry thread may be configured to sleep before
/// processing the transactions that need to be retried.
pub const MAX_RETRY_SLEEP_MS: u64 = 1000;
pub const MAX_RETRY_SLEEP_MS: u64 = 1_000;

impl SendTransactionService {
pub fn new<Client: TransactionClient + Clone + std::marker::Send + 'static>(
Expand Down Expand Up @@ -168,6 +176,8 @@ impl SendTransactionService {
client.clone(),
retry_transactions.clone(),
stats_report.clone(),
config.service_max_retries,
config.default_max_retries,
config.batch_send_rate_ms,
config.batch_size,
config.retry_pool_max_size,
Expand All @@ -193,11 +203,14 @@ impl SendTransactionService {
}

/// Thread responsible for receiving transactions from RPC clients.
#[allow(clippy::too_many_arguments)]
fn receive_txn_thread<Client: TransactionClient + std::marker::Send + 'static>(
receiver: Receiver<TransactionInfo>,
client: Client,
retry_transactions: Arc<Mutex<HashMap<Signature, TransactionInfo>>>,
stats_report: Arc<SendTransactionServiceStatsReport>,
service_max_retries: usize,
default_max_retries: Option<usize>,
batch_send_rate_ms: u64,
batch_size: usize,
retry_pool_max_size: usize,
Expand Down Expand Up @@ -261,9 +274,17 @@ impl SendTransactionService {
{
// take a lock of retry_transactions and move the batch to the retry set.
let mut retry_transactions = retry_transactions.lock().unwrap();
let transactions_to_retry = transactions.len();
let mut transactions_to_retry: usize = 0;
let mut transactions_added_to_retry: usize = 0;
for (signature, mut transaction_info) in transactions.drain() {
// drop transactions with 0 max retries
let max_retries = transaction_info
.get_max_retries(default_max_retries, service_max_retries);
if max_retries == Some(0) {
continue;
}
transactions_to_retry += 1;

let retry_len = retry_transactions.len();
let entry = retry_transactions.entry(signature);
if let Entry::Vacant(_) = entry {
Expand Down Expand Up @@ -305,19 +326,20 @@ impl SendTransactionService {
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
info!("Starting send-transaction-service::retry_thread with config.");
let retry_interval_ms_default = MAX_RETRY_SLEEP_MS.min(retry_rate_ms);
let mut retry_interval_ms = retry_interval_ms_default;
Builder::new()
.name("solStxRetry".to_string())
.spawn(move || loop {
let retry_interval_ms = retry_rate_ms;
let stats = &stats_report.stats;
sleep(Duration::from_millis(
MAX_RETRY_SLEEP_MS.min(retry_interval_ms),
));
sleep(Duration::from_millis(retry_interval_ms));
if exit.load(Ordering::Relaxed) {
break;
}
let mut transactions = retry_transactions.lock().unwrap();
if !transactions.is_empty() {
if transactions.is_empty() {
retry_interval_ms = retry_interval_ms_default;
} else {
let stats = &stats_report.stats;
stats
.retry_queue_size
.store(transactions.len() as u64, Ordering::Relaxed);
Expand All @@ -326,7 +348,7 @@ impl SendTransactionService {
(bank_forks.root_bank(), bank_forks.working_bank())
};

let _result = Self::process_transactions(
let result = Self::process_transactions(
&working_bank,
&root_bank,
&mut transactions,
Expand All @@ -338,6 +360,17 @@ impl SendTransactionService {
stats,
);
stats_report.report();

// to send transactions as soon as possible we adjust retry interval
retry_interval_ms = retry_interval_ms_default
.checked_sub(
result
.last_sent_time
.and_then(|last| Instant::now().checked_duration_since(last))
.and_then(|interval| interval.as_millis().try_into().ok())
.unwrap_or(0),
)
.unwrap_or(retry_interval_ms_default);
}
})
.unwrap()
Expand All @@ -357,7 +390,8 @@ impl SendTransactionService {
) -> ProcessTransactionsResult {
let mut result = ProcessTransactionsResult::default();

let mut batched_transactions = HashSet::new();
let mut batched_transactions = Vec::new();
let mut exceeded_retries_transactions = Vec::new();
let retry_rate = Duration::from_millis(retry_rate_ms);

transactions.retain(|signature, transaction_info| {
Expand All @@ -376,7 +410,8 @@ impl SendTransactionService {
let now = Instant::now();
let expired = transaction_info
.last_sent_time
.map(|last| now.duration_since(last) >= retry_rate)
.and_then(|last| now.checked_duration_since(last))
.map(|elapsed| elapsed >= retry_rate)
.unwrap_or(false);
let verify_nonce_account =
nonce_account::verify_nonce_account(&nonce_account, &durable_nonce);
Expand Down Expand Up @@ -415,21 +450,36 @@ impl SendTransactionService {
let now = Instant::now();
let need_send = transaction_info
.last_sent_time
.map(|last| now.duration_since(last) >= retry_rate)
.and_then(|last| now.checked_duration_since(last))
.map(|elapsed| elapsed >= retry_rate)
.unwrap_or(true);
if need_send {
if transaction_info.last_sent_time.is_some() {
// Transaction sent before is unknown to the working bank, it might have been
// dropped or landed in another fork. Re-send it
// dropped or landed in another fork. Re-send it.

info!("Retrying transaction: {}", signature);
result.retried += 1;
transaction_info.retries += 1;
stats.retries.fetch_add(1, Ordering::Relaxed);
}

batched_transactions.insert(*signature);
batched_transactions.push(*signature);
transaction_info.last_sent_time = Some(now);

let max_retries = transaction_info
.get_max_retries(default_max_retries, service_max_retries);
if let Some(max_retries) = max_retries {
if transaction_info.retries >= max_retries {
exceeded_retries_transactions.push(*signature);
}
}
} else if let Some(last) = transaction_info.last_sent_time {
result.last_sent_time = Some(
result
.last_sent_time
.map(|result_last| result_last.min(last))
.unwrap_or(last),
);
}
true
}
Expand All @@ -447,19 +497,31 @@ impl SendTransactionService {
}
});

stats.retries.fetch_add(result.retried, Ordering::Relaxed);

if !batched_transactions.is_empty() {
// Processing the transactions in batch
let wire_transactions = transactions
let wire_transactions = batched_transactions
.iter()
.filter(|(signature, _)| batched_transactions.contains(signature))
.map(|(_, transaction_info)| transaction_info.wire_transaction.clone());
.filter_map(|signature| transactions.get(signature))
.map(|transaction_info| transaction_info.wire_transaction.clone());

let iter = wire_transactions.chunks(batch_size);
for chunk in &iter {
let chunk = chunk.collect();
client.send_transactions_in_batch(chunk, stats);
}
}

result.max_retries_elapsed += exceeded_retries_transactions.len() as u64;
stats
.transactions_exceeding_max_retries
.fetch_add(result.max_retries_elapsed, Ordering::Relaxed);
for signature in exceeded_retries_transactions {
info!("Dropping transaction due to max retries: {signature}");
transactions.remove(&signature);
}

result
}

Expand Down Expand Up @@ -821,31 +883,12 @@ mod test {
config.batch_size,
&stats,
);
assert_eq!(transactions.len(), 1);
assert_eq!(
result,
ProcessTransactionsResult {
retried: 1,
max_retries_elapsed: 1,
..ProcessTransactionsResult::default()
}
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&mut transactions,
&client,
config.retry_rate_ms,
config.service_max_retries,
config.default_max_retries,
config.batch_size,
&stats,
);
assert!(transactions.is_empty());
assert_eq!(
result,
ProcessTransactionsResult {
max_retries_elapsed: 1,
retried: 1,
max_retries_elapsed: 2,
..ProcessTransactionsResult::default()
}
);
Expand Down

0 comments on commit 0abfe27

Please sign in to comment.