Skip to content

Commit

Permalink
removed lookahead_slots
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillLykov committed Nov 7, 2024
1 parent afbc3da commit 5fbfddd
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 48 deletions.
60 changes: 22 additions & 38 deletions tpu-client-next/src/connection_workers_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,20 @@ pub enum ConnectionWorkersSchedulerError {
/// [`Fanout`] is a configuration struct that specifies how many leaders should
/// be targeted when sending transactions and connecting.
///
/// Assumption is that `send_next` <= `connect_next`. The idea is to hide latency
/// of creating connections by doing this in advance.
/// Note, that the unit is number of leaders per
/// [`NUM_CONSECUTIVE_LEADER_SLOTS`]. It means that if the leader schedule is
/// [L1, L1, L1, L1, L1, L1, L1, L1, L2, L2, L2, L2], the leaders per
/// consecutive leader slots are [L1, L1, L2], so there are 3 of them.
///
/// The idea of having a separate `connect` parameter is to create a set of
/// nodes to connect to in advance in order to hide the latency of opening new
/// connection. Hence, `connect` must be greater or equal to `send`
pub struct Fanout {
/// The number of leaders to target for sending transactions.
pub send_next: usize,
pub send: usize,

/// The number of leaders to target for establishing connections.
pub connect_next: usize,
}

/// This enum defines to how many discovered leaders we will send transactions.
pub enum LeadersFanout {
/// Send transactions to all the leaders discovered by the `next_leaders`
/// call.
All,
/// Send transactions to the first selected number of leaders discovered by
/// the `next_leaders` call.
Next(Fanout),
pub connect: usize,
}

/// Configuration for the [`ConnectionWorkersScheduler`].
Expand Down Expand Up @@ -90,13 +86,8 @@ pub struct ConnectionWorkersSchedulerConfig {
/// connection failure.
pub max_reconnect_attempts: usize,

/// The number of slots to look ahead during the leader estimation
/// procedure. Determines how far into the future leaders are estimated,
/// allowing connections to be established with those leaders in advance.
pub lookahead_slots: u64,

/// The number of leaders to send transactions to.
pub leaders_fanout: LeadersFanout,
/// Configures the number of leaders to connect to and send transactions to.
pub leaders_fanout: Fanout,
}

impl ConnectionWorkersScheduler {
Expand All @@ -117,7 +108,6 @@ impl ConnectionWorkersScheduler {
skip_check_transaction_age,
worker_channel_size,
max_reconnect_attempts,
lookahead_slots,
leaders_fanout,
}: ConnectionWorkersSchedulerConfig,
mut leader_updater: Box<dyn LeaderUpdater>,
Expand All @@ -143,7 +133,8 @@ impl ConnectionWorkersScheduler {
break;
}
};
let updated_leaders = leader_updater.next_leaders(lookahead_slots);

let updated_leaders = leader_updater.next_leaders(leaders_fanout.connect);

let (fanout_leaders, connect_leaders) =
split_leaders(&updated_leaders, &leaders_fanout);
Expand Down Expand Up @@ -246,22 +237,15 @@ impl ConnectionWorkersScheduler {
/// slice includes the the first set.
fn split_leaders<'leaders>(
leaders: &'leaders [SocketAddr],
fanout: &LeadersFanout,
fanout: &Fanout,
) -> (&'leaders [SocketAddr], &'leaders [SocketAddr]) {
match fanout {
LeadersFanout::All => (leaders, leaders),
LeadersFanout::Next(Fanout {
send_next,
connect_next,
}) => {
assert!(send_next <= connect_next);
let send_count = (*send_next).min(leaders.len());
let connect_count = (*connect_next).min(leaders.len());
let Fanout { send, connect } = fanout;
assert!(send <= connect);
let send_count = (*send).min(leaders.len());
let connect_count = (*connect).min(leaders.len());

let send_slice = &leaders[..send_count];
let connect_slice = &leaders[..connect_count];
let send_slice = &leaders[..send_count];
let connect_slice = &leaders[..connect_count];

(send_slice, connect_slice)
}
}
(send_slice, connect_slice)
}
13 changes: 9 additions & 4 deletions tpu-client-next/src/leader_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use {
log::*,
solana_connection_cache::connection_cache::Protocol,
solana_rpc_client::nonblocking::rpc_client::RpcClient,
solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS,
solana_tpu_client::nonblocking::tpu_client::LeaderTpuService,
std::{
fmt,
Expand All @@ -30,13 +31,15 @@ use {
/// identify next leaders to send transactions to.
#[async_trait]
pub trait LeaderUpdater: Send {
/// Returns next unique leaders for the next `lookahead_slots` starting from
/// Returns next leaders for the next `lookahead_leaders` starting from
/// current estimated slot.
///
/// Leaders are returned per [`NUM_CONSECUTIVE_LEADER_SLOTS`] to avoid unnecessary repetition.
///
/// If the current leader estimation is incorrect and transactions are sent to
/// only one estimated leader, there is a risk of losing all the transactions,
/// depending on the forwarding policy.
fn next_leaders(&mut self, lookahead_slots: u64) -> Vec<SocketAddr>;
fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec<SocketAddr>;

/// Stop [`LeaderUpdater`] and releases all associated resources.
async fn stop(&mut self);
Expand Down Expand Up @@ -100,7 +103,9 @@ struct LeaderUpdaterService {

#[async_trait]
impl LeaderUpdater for LeaderUpdaterService {
fn next_leaders(&mut self, lookahead_slots: u64) -> Vec<SocketAddr> {
fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec<SocketAddr> {
let lookahead_slots =
(lookahead_leaders as u64).saturating_mul(NUM_CONSECUTIVE_LEADER_SLOTS);
self.leader_tpu_service.leader_tpu_sockets(lookahead_slots)
}

Expand All @@ -118,7 +123,7 @@ struct PinnedLeaderUpdater {

#[async_trait]
impl LeaderUpdater for PinnedLeaderUpdater {
fn next_leaders(&mut self, _lookahead_slots: u64) -> Vec<SocketAddr> {
fn next_leaders(&mut self, _lookahead_leaders: usize) -> Vec<SocketAddr> {
self.address.clone()
}

Expand Down
11 changes: 5 additions & 6 deletions tpu-client-next/tests/connection_workers_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use {
streamer::StakedNodes,
},
solana_tpu_client_next::{
connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, Fanout, LeadersFanout},
connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, Fanout},
leader_updater::create_leader_updater,
send_transaction_stats::SendTransactionStatsNonAtomic,
transaction_batch::TransactionBatch,
Expand Down Expand Up @@ -54,11 +54,10 @@ fn test_config(validator_identity: Option<Keypair>) -> ConnectionWorkersSchedule
// the speed of fastest leader.
worker_channel_size: 100,
max_reconnect_attempts: 4,
lookahead_slots: 1,
leaders_fanout: LeadersFanout::Next(Fanout {
send_next: 1,
connect_next: 1,
}),
leaders_fanout: Fanout {
send: 1,
connect: 1,
},
}
}

Expand Down

0 comments on commit 5fbfddd

Please sign in to comment.