Skip to content

Commit

Permalink
Shutdown in separate task
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillLykov committed Nov 6, 2024
1 parent f62dee2 commit 263d796
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 147 deletions.
26 changes: 13 additions & 13 deletions tpu-client-next/src/connection_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use {
clock::{DEFAULT_MS_PER_SLOT, MAX_PROCESSING_AGE, NUM_CONSECUTIVE_LEADER_SLOTS},
timing::timestamp,
},
std::net::SocketAddr,
std::{
net::SocketAddr,
sync::{atomic::Ordering, Arc},
},
tokio::{
sync::mpsc,
time::{sleep, Duration},
Expand Down Expand Up @@ -72,7 +75,7 @@ pub(crate) struct ConnectionWorker {
connection: ConnectionState,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
send_txs_stats: SendTransactionStats,
send_txs_stats: Arc<SendTransactionStats>,
cancel: CancellationToken,
}

Expand All @@ -93,6 +96,7 @@ impl ConnectionWorker {
transactions_receiver: mpsc::Receiver<TransactionBatch>,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
send_txs_stats: Arc<SendTransactionStats>,
) -> (Self, CancellationToken) {
let cancel = CancellationToken::new();

Expand All @@ -103,7 +107,7 @@ impl ConnectionWorker {
connection: ConnectionState::NotSetup,
skip_check_transaction_age,
max_reconnect_attempts,
send_txs_stats: SendTransactionStats::default(),
send_txs_stats,
cancel: cancel.clone(),
};

Expand Down Expand Up @@ -155,11 +159,6 @@ impl ConnectionWorker {
}
}

/// Retrieves the statistics for transactions sent by this worker.
pub fn transaction_stats(&self) -> &SendTransactionStats {
&self.send_txs_stats
}

/// Sends a batch of transactions using the provided `connection`.
///
/// Each transaction in the batch is sent over the QUIC streams one at the
Expand All @@ -183,11 +182,12 @@ impl ConnectionWorker {

if let Err(error) = result {
trace!("Failed to send transaction over stream with error: {error}.");
record_error(error, &mut self.send_txs_stats);
record_error(error, &self.send_txs_stats);
self.connection = ConnectionState::Retry(0);
} else {
self.send_txs_stats.successfully_sent =
self.send_txs_stats.successfully_sent.saturating_add(1);
self.send_txs_stats
.successfully_sent
.fetch_add(1, Ordering::Relaxed);
}
}
measure_send.stop();
Expand Down Expand Up @@ -221,14 +221,14 @@ impl ConnectionWorker {
}
Err(err) => {
warn!("Connection error {}: {}", self.peer, err);
record_error(err.into(), &mut self.send_txs_stats);
record_error(err.into(), &self.send_txs_stats);
self.connection =
ConnectionState::Retry(max_retries_attempt.saturating_add(1));
}
}
}
Err(connecting_error) => {
record_error(connecting_error.clone().into(), &mut self.send_txs_stats);
record_error(connecting_error.clone().into(), &self.send_txs_stats);
match connecting_error {
ConnectError::EndpointStopping => {
debug!("Endpoint stopping, exit connection worker.");
Expand Down
22 changes: 19 additions & 3 deletions tpu-client-next/src/connection_workers_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use {
},
transaction_batch::TransactionBatch,
workers_cache::{WorkerInfo, WorkersCache, WorkersCacheError},
SendTransactionStats,
},
log::*,
quinn::Endpoint,
Expand Down Expand Up @@ -112,6 +113,7 @@ impl ConnectionWorkersScheduler {
let endpoint = Self::setup_endpoint(bind, validator_identity)?;
debug!("Client endpoint bind address: {:?}", endpoint.local_addr());
let mut workers = WorkersCache::new(num_connections, cancel.clone());
let mut send_stats_per_addr = SendTransactionStatsPerAddr::new();

loop {
let transaction_batch = tokio::select! {
Expand All @@ -133,14 +135,25 @@ impl ConnectionWorkersScheduler {
for new_leader in new_leaders {
if !workers.contains(new_leader) {
debug!("No existing workers for {new_leader:?}, starting a new one.");
let stats = send_stats_per_addr.entry(new_leader.ip()).or_default();
let worker = Self::spawn_worker(
&endpoint,
new_leader,
worker_channel_size,
skip_check_transaction_age,
max_reconnect_attempts,
stats.clone(),
);
workers.push(*new_leader, worker).await;
let shutdown_worker = workers.push(*new_leader, worker).await;
if let Some(shutdown_worker) = shutdown_worker {
tokio::spawn(async move {
let leader = shutdown_worker.leader();
let res = shutdown_worker.shutdown().await;
if let Err(err) = res {
debug!("Error while shutting down worker for {leader}: {err}");
}
});
}
}

tokio::select! {
Expand All @@ -165,12 +178,14 @@ impl ConnectionWorkersScheduler {
// connection.
for peer in future_leaders {
if !workers.contains(peer) {
let stats = send_stats_per_addr.entry(peer.ip()).or_default();
let worker = Self::spawn_worker(
&endpoint,
peer,
worker_channel_size,
skip_check_transaction_age,
max_reconnect_attempts,
stats.clone(),
);
workers.push(*peer, worker).await;
}
Expand All @@ -181,7 +196,7 @@ impl ConnectionWorkersScheduler {

endpoint.close(0u32.into(), b"Closing connection");
leader_updater.stop().await;
Ok(workers.transaction_stats().clone())
Ok(send_stats_per_addr)
}

/// Sets up the QUIC endpoint for the scheduler to handle connections.
Expand All @@ -206,6 +221,7 @@ impl ConnectionWorkersScheduler {
worker_channel_size: usize,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
stats: Arc<SendTransactionStats>,
) -> WorkerInfo {
let (txs_sender, txs_receiver) = mpsc::channel(worker_channel_size);
let endpoint = endpoint.clone();
Expand All @@ -217,10 +233,10 @@ impl ConnectionWorkersScheduler {
txs_receiver,
skip_check_transaction_age,
max_reconnect_attempts,
stats,
);
let handle = tokio::spawn(async move {
worker.run().await;
worker.transaction_stats().clone()
});

WorkerInfo::new(txs_sender, handle, cancel)
Expand Down
Loading

0 comments on commit 263d796

Please sign in to comment.