Skip to content

Commit

Permalink
TPU Vote using quic -- client side implementation (#3473)
Browse files Browse the repository at this point in the history
* Vote using QUIC

Vote using QUIC

send vote packet using the rigth sender

removed dup declared functions

rebase with master QuicServerParams

removed remove_tpu_vote

first part of sending votes using quic

use quic for vote on client side with connection cache

add debug messages

turn on quic for vote by default for testing

* remove unsed import

* turn DEFAULT_VOTE_USE_QUIC to false

* Minor fixes

* addressed some feedback from Behzad and Brennan

* fixed one more merge conflicts
  • Loading branch information
lijunwangs authored Dec 6, 2024
1 parent 5b8489f commit ce4a52d
Show file tree
Hide file tree
Showing 15 changed files with 281 additions and 105 deletions.
15 changes: 8 additions & 7 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,15 +450,16 @@ fn main() {
};
let cluster_info = Arc::new(cluster_info);
let tpu_disable_quic = matches.is_present("tpu_disable_quic");
let connection_cache = match tpu_disable_quic {
false => ConnectionCache::new_quic(
"connection_cache_banking_bench_quic",
DEFAULT_TPU_CONNECTION_POOL_SIZE,
),
true => ConnectionCache::with_udp(
let connection_cache = if tpu_disable_quic {
ConnectionCache::with_udp(
"connection_cache_banking_bench_udp",
DEFAULT_TPU_CONNECTION_POOL_SIZE,
),
)
} else {
ConnectionCache::new_quic(
"connection_cache_banking_bench_quic",
DEFAULT_TPU_CONNECTION_POOL_SIZE,
)
};
let banking_stage = BankingStage::new_num_threads(
block_production_method,
Expand Down
3 changes: 2 additions & 1 deletion core/src/next_leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub(crate) fn upcoming_leader_tpu_vote_sockets(
cluster_info: &ClusterInfo,
poh_recorder: &RwLock<PohRecorder>,
fanout_slots: u64,
protocol: Protocol,
) -> Vec<SocketAddr> {
let upcoming_leaders = {
let poh_recorder = poh_recorder.read().unwrap();
Expand All @@ -29,7 +30,7 @@ pub(crate) fn upcoming_leader_tpu_vote_sockets(
.dedup()
.filter_map(|leader_pubkey| {
cluster_info
.lookup_contact_info(&leader_pubkey, |node| node.tpu_vote(Protocol::UDP))?
.lookup_contact_info(&leader_pubkey, |node| node.tpu_vote(protocol))?
.ok()
})
// dedup again since leaders could potentially share the same tpu vote socket
Expand Down
57 changes: 57 additions & 0 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4233,6 +4233,7 @@ pub(crate) mod tests {
},
crossbeam_channel::unbounded,
itertools::Itertools,
solana_client::connection_cache::ConnectionCache,
solana_entry::entry::{self, Entry},
solana_gossip::{cluster_info::Node, crds::Cursor},
solana_ledger::{
Expand Down Expand Up @@ -4263,6 +4264,7 @@ pub(crate) mod tests {
transaction::TransactionError,
},
solana_streamer::socket::SocketAddrSpace,
solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_VOTE_USE_QUIC},
solana_transaction_status::VersionedTransactionWithStatusMeta,
solana_vote_program::{
vote_state::{self, TowerSync, VoteStateVersions},
Expand Down Expand Up @@ -7547,11 +7549,25 @@ pub(crate) mod tests {
let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1))
.unwrap();

let connection_cache = if DEFAULT_VOTE_USE_QUIC {
ConnectionCache::new_quic(
"connection_cache_vote_quic",
DEFAULT_TPU_CONNECTION_POOL_SIZE,
)
} else {
ConnectionCache::with_udp(
"connection_cache_vote_udp",
DEFAULT_TPU_CONNECTION_POOL_SIZE,
)
};

crate::voting_service::VotingService::handle_vote(
&cluster_info,
&poh_recorder,
&tower_storage,
vote_info,
Arc::new(connection_cache),
);

let mut cursor = Cursor::default();
Expand Down Expand Up @@ -7622,12 +7638,27 @@ pub(crate) mod tests {
let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1))
.unwrap();

let connection_cache = if DEFAULT_VOTE_USE_QUIC {
ConnectionCache::new_quic(
"connection_cache_vote_quic",
DEFAULT_TPU_CONNECTION_POOL_SIZE,
)
} else {
ConnectionCache::with_udp(
"connection_cache_vote_udp",
DEFAULT_TPU_CONNECTION_POOL_SIZE,
)
};

crate::voting_service::VotingService::handle_vote(
&cluster_info,
&poh_recorder,
&tower_storage,
vote_info,
Arc::new(connection_cache),
);

let votes = cluster_info.get_votes(&mut cursor);
assert_eq!(votes.len(), 1);
let vote_tx = &votes[0];
Expand Down Expand Up @@ -7705,11 +7736,24 @@ pub(crate) mod tests {
let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1))
.unwrap();
let connection_cache = if DEFAULT_VOTE_USE_QUIC {
ConnectionCache::new_quic(
"connection_cache_vote_quic",
DEFAULT_TPU_CONNECTION_POOL_SIZE,
)
} else {
ConnectionCache::with_udp(
"connection_cache_vote_udp",
DEFAULT_TPU_CONNECTION_POOL_SIZE,
)
};

crate::voting_service::VotingService::handle_vote(
&cluster_info,
&poh_recorder,
&tower_storage,
vote_info,
Arc::new(connection_cache),
);

assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time);
Expand Down Expand Up @@ -7820,11 +7864,24 @@ pub(crate) mod tests {
let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1))
.unwrap();
let connection_cache = if DEFAULT_VOTE_USE_QUIC {
ConnectionCache::new_quic(
"connection_cache_vote_quic",
DEFAULT_TPU_CONNECTION_POOL_SIZE,
)
} else {
ConnectionCache::with_udp(
"connection_cache_vote_udp",
DEFAULT_TPU_CONNECTION_POOL_SIZE,
)
};

crate::voting_service::VotingService::handle_vote(
cluster_info,
poh_recorder,
tower_storage,
vote_info,
Arc::new(connection_cache),
);

let votes = cluster_info.get_votes(cursor);
Expand Down
16 changes: 16 additions & 0 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ impl Tvu {
cluster_slots: Arc<ClusterSlots>,
wen_restart_repair_slots: Option<Arc<RwLock<Vec<Slot>>>>,
slot_status_notifier: Option<SlotStatusNotifier>,
vote_connection_cache: Arc<ConnectionCache>,
) -> Result<Self, String> {
let in_wen_restart = wen_restart_repair_slots.is_some();

Expand Down Expand Up @@ -331,6 +332,7 @@ impl Tvu {
cluster_info.clone(),
poh_recorder.clone(),
tower_storage,
vote_connection_cache,
);

let warm_quic_cache_service = connection_cache.and_then(|connection_cache| {
Expand Down Expand Up @@ -436,6 +438,7 @@ pub mod tests {
solana_runtime::bank::Bank,
solana_sdk::signature::{Keypair, Signer},
solana_streamer::socket::SocketAddrSpace,
solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_VOTE_USE_QUIC},
std::sync::atomic::{AtomicU64, Ordering},
};

Expand Down Expand Up @@ -494,6 +497,18 @@ pub mod tests {
} else {
None
};
let connection_cache = if DEFAULT_VOTE_USE_QUIC {
ConnectionCache::new_quic(
"connection_cache_vote_quic",
DEFAULT_TPU_CONNECTION_POOL_SIZE,
)
} else {
ConnectionCache::with_udp(
"connection_cache_vote_udp",
DEFAULT_TPU_CONNECTION_POOL_SIZE,
)
};

let tvu = Tvu::new(
&vote_keypair.pubkey(),
Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
Expand Down Expand Up @@ -555,6 +570,7 @@ pub mod tests {
cluster_slots,
wen_restart_repair_slots,
None,
Arc::new(connection_cache),
)
.expect("assume success");
if enable_wen_restart {
Expand Down
116 changes: 83 additions & 33 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,20 @@ struct TransactionHistoryServices {
cache_block_meta_service: Option<CacheBlockMetaService>,
}

/// A struct easing passing Validator TPU Configurations
pub struct ValidatorTpuConfig {
/// Controls if to use QUIC for sending regular TPU transaction
pub use_quic: bool,
/// Controls if to use QUIC for sending TPU votes
pub vote_use_quic: bool,
/// Controls the connection cache pool size
pub tpu_connection_pool_size: usize,
/// Controls if to enable UDP for TPU tansactions.
pub tpu_enable_udp: bool,
/// Controls the new maximum connections per IpAddr per minute
pub tpu_max_connections_per_ipaddr_per_minute: u64,
}

pub struct Validator {
validator_exit: Arc<RwLock<Exit>>,
json_rpc_service: Option<JsonRpcService>,
Expand Down Expand Up @@ -528,12 +542,17 @@ impl Validator {
rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
start_progress: Arc<RwLock<ValidatorStartProgress>>,
socket_addr_space: SocketAddrSpace,
use_quic: bool,
tpu_connection_pool_size: usize,
tpu_enable_udp: bool,
tpu_max_connections_per_ipaddr_per_minute: u64,
tpu_config: ValidatorTpuConfig,
admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
) -> Result<Self> {
let ValidatorTpuConfig {
use_quic,
vote_use_quic,
tpu_connection_pool_size,
tpu_enable_udp,
tpu_max_connections_per_ipaddr_per_minute,
} = tpu_config;

let start_time = Instant::now();

// Initialize the global rayon pool first to ensure the value in config
Expand Down Expand Up @@ -990,29 +1009,52 @@ impl Validator {

let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));

let connection_cache = match use_quic {
true => {
let connection_cache = ConnectionCache::new_with_client_options(
"connection_cache_tpu_quic",
tpu_connection_pool_size,
None,
Some((
&identity_keypair,
node.info
.tpu(Protocol::UDP)
.map_err(|err| {
ValidatorError::Other(format!("Invalid TPU address: {err:?}"))
})?
.ip(),
)),
Some((&staked_nodes, &identity_keypair.pubkey())),
);
Arc::new(connection_cache)
}
false => Arc::new(ConnectionCache::with_udp(
let connection_cache = if use_quic {
let connection_cache = ConnectionCache::new_with_client_options(
"connection_cache_tpu_quic",
tpu_connection_pool_size,
None,
Some((
&identity_keypair,
node.info
.tpu(Protocol::UDP)
.map_err(|err| {
ValidatorError::Other(format!("Invalid TPU address: {err:?}"))
})?
.ip(),
)),
Some((&staked_nodes, &identity_keypair.pubkey())),
);
Arc::new(connection_cache)
} else {
Arc::new(ConnectionCache::with_udp(
"connection_cache_tpu_udp",
tpu_connection_pool_size,
)),
))
};

let vote_connection_cache = if vote_use_quic {
let vote_connection_cache = ConnectionCache::new_with_client_options(
"connection_cache_vote_quic",
tpu_connection_pool_size,
None, // client_endpoint
Some((
&identity_keypair,
node.info
.tpu_vote(Protocol::QUIC)
.map_err(|err| {
ValidatorError::Other(format!("Invalid TPU Vote address: {err:?}"))
})?
.ip(),
)),
Some((&staked_nodes, &identity_keypair.pubkey())),
);
Arc::new(vote_connection_cache)
} else {
Arc::new(ConnectionCache::with_udp(
"connection_cache_vote_udp",
tpu_connection_pool_size,
))
};

let rpc_override_health_check =
Expand Down Expand Up @@ -1428,6 +1470,7 @@ impl Validator {
cluster_slots.clone(),
wen_restart_repair_slots.clone(),
slot_status_notifier,
vote_connection_cache,
)
.map_err(ValidatorError::Other)?;

Expand Down Expand Up @@ -2715,6 +2758,7 @@ mod tests {
solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig},
solana_tpu_client::tpu_client::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC,
DEFAULT_VOTE_USE_QUIC,
},
std::{fs::remove_dir_all, thread, time::Duration},
};
Expand Down Expand Up @@ -2753,10 +2797,13 @@ mod tests {
None, // rpc_to_plugin_manager_receiver
start_progress.clone(),
SocketAddrSpace::Unspecified,
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per minute for test
ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute for test
},
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -2972,10 +3019,13 @@ mod tests {
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())),
SocketAddrSpace::Unspecified,
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per minute for test
ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute for test
},
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start")
Expand Down
Loading

0 comments on commit ce4a52d

Please sign in to comment.