Skip to content

Commit

Permalink
add fanout to tpu-client-next (#3478)
Browse files Browse the repository at this point in the history
* Add tpu-client-next to the root Cargo.toml

* Change LeaderUpdater trait to accept mut self

* add fanout to the tpu-client-next

* Shutdown in separate task

* Use try_send instead, minor impromenets

* fix LeaderUpdaterError traits

* improve lifetimes in split_leaders

Co-authored-by: Illia Bobyr <[email protected]>

* address PR comments

* create connections in advance

* removed lookahead_slots

---------

Co-authored-by: Illia Bobyr <[email protected]>
(cherry picked from commit 2a618b5)

# Conflicts:
#	Cargo.toml
  • Loading branch information
KirillLykov authored and mergify[bot] committed Nov 7, 2024
1 parent c6a7130 commit 5d74e00
Show file tree
Hide file tree
Showing 7 changed files with 421 additions and 224 deletions.
86 changes: 86 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ solana-msg = { path = "sdk/msg", version = "=2.1.1" }
solana-native-token = { path = "sdk/native-token", version = "=2.1.1" }
solana-net-utils = { path = "net-utils", version = "=2.1.1" }
solana-nohash-hasher = "0.2.1"
<<<<<<< HEAD
solana-notifier = { path = "notifier", version = "=2.1.1" }
solana-package-metadata = { path = "sdk/package-metadata", version = "=2.1.1" }
solana-package-metadata-macro = { path = "sdk/package-metadata-macro", version = "=2.1.1" }
Expand Down Expand Up @@ -526,6 +527,91 @@ solana-zk-keygen = { path = "zk-keygen", version = "=2.1.1" }
solana-zk-sdk = { path = "zk-sdk", version = "=2.1.1" }
solana-zk-token-proof-program = { path = "programs/zk-token-proof", version = "=2.1.1" }
solana-zk-token-sdk = { path = "zk-token-sdk", version = "=2.1.1" }
=======
solana-nonce = { path = "sdk/nonce", version = "=2.2.0" }
solana-notifier = { path = "notifier", version = "=2.2.0" }
solana-package-metadata = { path = "sdk/package-metadata", version = "=2.2.0" }
solana-package-metadata-macro = { path = "sdk/package-metadata-macro", version = "=2.2.0" }
solana-packet = { path = "sdk/packet", version = "=2.2.0" }
solana-perf = { path = "perf", version = "=2.2.0" }
solana-poh = { path = "poh", version = "=2.2.0" }
solana-poseidon = { path = "poseidon", version = "=2.2.0" }
solana-precompile-error = { path = "sdk/precompile-error", version = "=2.2.0" }
solana-presigner = { path = "sdk/presigner", version = "=2.2.0" }
solana-program = { path = "sdk/program", version = "=2.2.0", default-features = false }
solana-program-error = { path = "sdk/program-error", version = "=2.2.0" }
solana-program-memory = { path = "sdk/program-memory", version = "=2.2.0" }
solana-program-option = { path = "sdk/program-option", version = "=2.2.0" }
solana-program-pack = { path = "sdk/program-pack", version = "=2.2.0" }
solana-program-runtime = { path = "program-runtime", version = "=2.2.0" }
solana-program-test = { path = "program-test", version = "=2.2.0" }
solana-pubkey = { path = "sdk/pubkey", version = "=2.2.0", default-features = false }
solana-pubsub-client = { path = "pubsub-client", version = "=2.2.0" }
solana-quic-client = { path = "quic-client", version = "=2.2.0" }
solana-rayon-threadlimit = { path = "rayon-threadlimit", version = "=2.2.0" }
solana-remote-wallet = { path = "remote-wallet", version = "=2.2.0", default-features = false }
solana-rent = { path = "sdk/rent", version = "=2.2.0", default-features = false }
solana-reserved-account-keys = { path = "sdk/reserved-account-keys", version = "=2.2.0", default-features = false }
solana-reward-info = { path = "sdk/reward-info", version = "=2.2.0" }
solana-sanitize = { path = "sdk/sanitize", version = "=2.2.0" }
solana-seed-derivable = { path = "sdk/seed-derivable", version = "=2.2.0" }
solana-seed-phrase = { path = "sdk/seed-phrase", version = "=2.2.0" }
solana-serde-varint = { path = "sdk/serde-varint", version = "=2.2.0" }
solana-serialize-utils = { path = "sdk/serialize-utils", version = "=2.2.0" }
solana-sha256-hasher = { path = "sdk/sha256-hasher", version = "=2.2.0" }
solana-signature = { path = "sdk/signature", version = "=2.2.0", default-features = false }
solana-signer = { path = "sdk/signer", version = "=2.2.0" }
solana-slot-hashes = { path = "sdk/slot-hashes", version = "=2.2.0" }
solana-slot-history = { path = "sdk/slot-history", version = "=2.2.0" }
solana-time-utils = { path = "sdk/time-utils", version = "=2.2.0" }
solana-timings = { path = "timings", version = "=2.2.0" }
solana-unified-scheduler-logic = { path = "unified-scheduler-logic", version = "=2.2.0" }
solana-unified-scheduler-pool = { path = "unified-scheduler-pool", version = "=2.2.0" }
solana-rpc = { path = "rpc", version = "=2.2.0" }
solana-rpc-client = { path = "rpc-client", version = "=2.2.0", default-features = false }
solana-rpc-client-api = { path = "rpc-client-api", version = "=2.2.0" }
solana-rpc-client-nonce-utils = { path = "rpc-client-nonce-utils", version = "=2.2.0" }
solana-runtime = { path = "runtime", version = "=2.2.0" }
solana-runtime-transaction = { path = "runtime-transaction", version = "=2.2.0" }
solana-sdk = { path = "sdk", version = "=2.2.0" }
solana-sdk-ids = { path = "sdk/sdk-ids", version = "=2.2.0" }
solana-sdk-macro = { path = "sdk/macro", version = "=2.2.0" }
solana-secp256k1-recover = { path = "curves/secp256k1-recover", version = "=2.2.0", default-features = false }
solana-send-transaction-service = { path = "send-transaction-service", version = "=2.2.0" }
solana-short-vec = { path = "sdk/short-vec", version = "=2.2.0" }
solana-stable-layout = { path = "sdk/stable-layout", version = "=2.2.0" }
solana-stake-program = { path = "programs/stake", version = "=2.2.0" }
solana-storage-bigtable = { path = "storage-bigtable", version = "=2.2.0" }
solana-storage-proto = { path = "storage-proto", version = "=2.2.0" }
solana-streamer = { path = "streamer", version = "=2.2.0" }
solana-svm = { path = "svm", version = "=2.2.0" }
solana-svm-conformance = { path = "svm-conformance", version = "=2.2.0" }
solana-svm-example-paytube = { path = "svm/examples/paytube", version = "=2.2.0" }
solana-svm-rent-collector = { path = "svm-rent-collector", version = "=2.2.0" }
solana-svm-transaction = { path = "svm-transaction", version = "=2.2.0" }
solana-system-program = { path = "programs/system", version = "=2.2.0" }
solana-sysvar-id = { path = "sdk/sysvar-id", version = "=2.2.0" }
solana-test-validator = { path = "test-validator", version = "=2.2.0" }
solana-thin-client = { path = "thin-client", version = "=2.2.0" }
solana-transaction-error = { path = "sdk/transaction-error", version = "=2.2.0" }
solana-tpu-client = { path = "tpu-client", version = "=2.2.0", default-features = false }
solana-tpu-client-next = { path = "tpu-client-next", version = "=2.2.0" }
solana-transaction-status = { path = "transaction-status", version = "=2.2.0" }
solana-transaction-status-client-types = { path = "transaction-status-client-types", version = "=2.2.0" }
solana-transaction-metrics-tracker = { path = "transaction-metrics-tracker", version = "=2.2.0" }
solana-turbine = { path = "turbine", version = "=2.2.0" }
solana-type-overrides = { path = "type-overrides", version = "=2.2.0" }
solana-udp-client = { path = "udp-client", version = "=2.2.0" }
solana-version = { path = "version", version = "=2.2.0" }
solana-vote = { path = "vote", version = "=2.2.0" }
solana-vote-program = { path = "programs/vote", version = "=2.2.0" }
solana-wen-restart = { path = "wen-restart", version = "=2.2.0" }
solana-zk-elgamal-proof-program = { path = "programs/zk-elgamal-proof", version = "=2.2.0" }
solana-zk-keygen = { path = "zk-keygen", version = "=2.2.0" }
solana-zk-sdk = { path = "zk-sdk", version = "=2.2.0" }
solana-zk-token-proof-program = { path = "programs/zk-token-proof", version = "=2.2.0" }
solana-zk-token-sdk = { path = "zk-token-sdk", version = "=2.2.0" }
>>>>>>> 2a618b5e01 (add fanout to tpu-client-next (#3478))
solana_rbpf = "=0.8.5"
spl-associated-token-account = "=4.0.0"
spl-instruction-padding = "0.2"
Expand Down
26 changes: 13 additions & 13 deletions tpu-client-next/src/connection_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use {
clock::{DEFAULT_MS_PER_SLOT, MAX_PROCESSING_AGE, NUM_CONSECUTIVE_LEADER_SLOTS},
timing::timestamp,
},
std::net::SocketAddr,
std::{
net::SocketAddr,
sync::{atomic::Ordering, Arc},
},
tokio::{
sync::mpsc,
time::{sleep, Duration},
Expand Down Expand Up @@ -72,7 +75,7 @@ pub(crate) struct ConnectionWorker {
connection: ConnectionState,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
send_txs_stats: SendTransactionStats,
send_txs_stats: Arc<SendTransactionStats>,
cancel: CancellationToken,
}

Expand All @@ -93,6 +96,7 @@ impl ConnectionWorker {
transactions_receiver: mpsc::Receiver<TransactionBatch>,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
send_txs_stats: Arc<SendTransactionStats>,
) -> (Self, CancellationToken) {
let cancel = CancellationToken::new();

Expand All @@ -103,7 +107,7 @@ impl ConnectionWorker {
connection: ConnectionState::NotSetup,
skip_check_transaction_age,
max_reconnect_attempts,
send_txs_stats: SendTransactionStats::default(),
send_txs_stats,
cancel: cancel.clone(),
};

Expand Down Expand Up @@ -155,11 +159,6 @@ impl ConnectionWorker {
}
}

/// Retrieves the statistics for transactions sent by this worker.
pub fn transaction_stats(&self) -> &SendTransactionStats {
&self.send_txs_stats
}

/// Sends a batch of transactions using the provided `connection`.
///
/// Each transaction in the batch is sent over the QUIC streams one at the
Expand All @@ -183,11 +182,12 @@ impl ConnectionWorker {

if let Err(error) = result {
trace!("Failed to send transaction over stream with error: {error}.");
record_error(error, &mut self.send_txs_stats);
record_error(error, &self.send_txs_stats);
self.connection = ConnectionState::Retry(0);
} else {
self.send_txs_stats.successfully_sent =
self.send_txs_stats.successfully_sent.saturating_add(1);
self.send_txs_stats
.successfully_sent
.fetch_add(1, Ordering::Relaxed);
}
}
measure_send.stop();
Expand Down Expand Up @@ -221,14 +221,14 @@ impl ConnectionWorker {
}
Err(err) => {
warn!("Connection error {}: {}", self.peer, err);
record_error(err.into(), &mut self.send_txs_stats);
record_error(err.into(), &self.send_txs_stats);
self.connection =
ConnectionState::Retry(max_retries_attempt.saturating_add(1));
}
}
}
Err(connecting_error) => {
record_error(connecting_error.clone().into(), &mut self.send_txs_stats);
record_error(connecting_error.clone().into(), &self.send_txs_stats);
match connecting_error {
ConnectError::EndpointStopping => {
debug!("Endpoint stopping, exit connection worker.");
Expand Down
122 changes: 80 additions & 42 deletions tpu-client-next/src/connection_workers_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use {
create_client_config, create_client_endpoint, QuicClientCertificate, QuicError,
},
transaction_batch::TransactionBatch,
workers_cache::{WorkerInfo, WorkersCache, WorkersCacheError},
workers_cache::{maybe_shutdown_worker, WorkerInfo, WorkersCache, WorkersCacheError},
SendTransactionStats,
},
log::*,
quinn::Endpoint,
Expand Down Expand Up @@ -39,6 +40,25 @@ pub enum ConnectionWorkersSchedulerError {
LeaderReceiverDropped,
}

/// [`Fanout`] is a configuration struct that specifies how many leaders should
/// be targeted when sending transactions and connecting.
///
/// Note, that the unit is number of leaders per
/// [`NUM_CONSECUTIVE_LEADER_SLOTS`]. It means that if the leader schedule is
/// [L1, L1, L1, L1, L1, L1, L1, L1, L2, L2, L2, L2], the leaders per
/// consecutive leader slots are [L1, L1, L2], so there are 3 of them.
///
/// The idea of having a separate `connect` parameter is to create a set of
/// nodes to connect to in advance in order to hide the latency of opening new
/// connection. Hence, `connect` must be greater or equal to `send`
pub struct Fanout {
/// The number of leaders to target for sending transactions.
pub send: usize,

/// The number of leaders to target for establishing connections.
pub connect: usize,
}

/// Configuration for the [`ConnectionWorkersScheduler`].
///
/// This struct holds the necessary settings to initialize and manage connection
Expand Down Expand Up @@ -66,10 +86,8 @@ pub struct ConnectionWorkersSchedulerConfig {
/// connection failure.
pub max_reconnect_attempts: usize,

/// The number of slots to look ahead during the leader estimation
/// procedure. Determines how far into the future leaders are estimated,
/// allowing connections to be established with those leaders in advance.
pub lookahead_slots: u64,
/// Configures the number of leaders to connect to and send transactions to.
pub leaders_fanout: Fanout,
}

impl ConnectionWorkersScheduler {
Expand All @@ -90,7 +108,7 @@ impl ConnectionWorkersScheduler {
skip_check_transaction_age,
worker_channel_size,
max_reconnect_attempts,
lookahead_slots,
leaders_fanout,
}: ConnectionWorkersSchedulerConfig,
mut leader_updater: Box<dyn LeaderUpdater>,
mut transaction_receiver: mpsc::Receiver<TransactionBatch>,
Expand All @@ -99,6 +117,7 @@ impl ConnectionWorkersScheduler {
let endpoint = Self::setup_endpoint(bind, validator_identity)?;
debug!("Client endpoint bind address: {:?}", endpoint.local_addr());
let mut workers = WorkersCache::new(num_connections, cancel.clone());
let mut send_stats_per_addr = SendTransactionStatsPerAddr::new();

loop {
let transaction_batch = tokio::select! {
Expand All @@ -114,50 +133,49 @@ impl ConnectionWorkersScheduler {
break;
}
};
let updated_leaders = leader_updater.next_leaders(lookahead_slots);
let new_leader = &updated_leaders[0];
let future_leaders = &updated_leaders[1..];
if !workers.contains(new_leader) {
debug!("No existing workers for {new_leader:?}, starting a new one.");
let worker = Self::spawn_worker(
&endpoint,
new_leader,
worker_channel_size,
skip_check_transaction_age,
max_reconnect_attempts,
);
workers.push(*new_leader, worker).await;
}

tokio::select! {
send_res = workers.send_transactions_to_address(new_leader, transaction_batch) => match send_res {
Ok(()) => (),
Err(WorkersCacheError::ShutdownError) => {
debug!("Connection to {new_leader} was closed, worker cache shutdown");
}
Err(err) => {
warn!("Connection to {new_leader} was closed, worker error: {err}");
// If we has failed to send batch, it will be dropped.
}
},
() = cancel.cancelled() => {
debug!("Cancelled: Shutting down");
break;
}
};
let updated_leaders = leader_updater.next_leaders(leaders_fanout.connect);

// Regardless of who is leader, add future leaders to the cache to
// hide the latency of opening the connection.
for peer in future_leaders {
let (fanout_leaders, connect_leaders) =
split_leaders(&updated_leaders, &leaders_fanout);
// add future leaders to the cache to hide the latency of opening
// the connection.
for peer in connect_leaders {
if !workers.contains(peer) {
let stats = send_stats_per_addr.entry(peer.ip()).or_default();
let worker = Self::spawn_worker(
&endpoint,
peer,
worker_channel_size,
skip_check_transaction_age,
max_reconnect_attempts,
stats.clone(),
);
workers.push(*peer, worker).await;
maybe_shutdown_worker(workers.push(*peer, worker));
}
}

for new_leader in fanout_leaders {
if !workers.contains(new_leader) {
warn!("No existing worker for {new_leader:?}, skip sending to this leader.");
continue;
}

let send_res =
workers.try_send_transactions_to_address(new_leader, transaction_batch.clone());
match send_res {
Ok(()) => (),
Err(WorkersCacheError::ShutdownError) => {
debug!("Connection to {new_leader} was closed, worker cache shutdown");
}
Err(WorkersCacheError::ReceiverDropped) => {
// Remove the worker from the cache, if the peer has disconnected.
maybe_shutdown_worker(workers.pop(*new_leader));
}
Err(err) => {
warn!("Connection to {new_leader} was closed, worker error: {err}");
// If we has failed to send batch, it will be dropped.
}
}
}
}
Expand All @@ -166,7 +184,7 @@ impl ConnectionWorkersScheduler {

endpoint.close(0u32.into(), b"Closing connection");
leader_updater.stop().await;
Ok(workers.transaction_stats().clone())
Ok(send_stats_per_addr)
}

/// Sets up the QUIC endpoint for the scheduler to handle connections.
Expand All @@ -191,6 +209,7 @@ impl ConnectionWorkersScheduler {
worker_channel_size: usize,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
stats: Arc<SendTransactionStats>,
) -> WorkerInfo {
let (txs_sender, txs_receiver) = mpsc::channel(worker_channel_size);
let endpoint = endpoint.clone();
Expand All @@ -202,12 +221,31 @@ impl ConnectionWorkersScheduler {
txs_receiver,
skip_check_transaction_age,
max_reconnect_attempts,
stats,
);
let handle = tokio::spawn(async move {
worker.run().await;
worker.transaction_stats().clone()
});

WorkerInfo::new(txs_sender, handle, cancel)
}
}

/// Splits `leaders` into two slices based on the `fanout` configuration:
/// * the first slice contains the leaders to which transactions will be sent,
/// * the second vector contains the leaders, used to warm up connections. This
/// slice includes the the first set.
fn split_leaders<'leaders>(
leaders: &'leaders [SocketAddr],
fanout: &Fanout,
) -> (&'leaders [SocketAddr], &'leaders [SocketAddr]) {
let Fanout { send, connect } = fanout;
assert!(send <= connect);
let send_count = (*send).min(leaders.len());
let connect_count = (*connect).min(leaders.len());

let send_slice = &leaders[..send_count];
let connect_slice = &leaders[..connect_count];

(send_slice, connect_slice)
}
Loading

0 comments on commit 5d74e00

Please sign in to comment.