diff --git a/core/src/tpu.rs b/core/src/tpu.rs index cd077a18..0dc86ca8 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -33,8 +33,8 @@ pub const MAX_QUIC_CONNECTIONS_PER_IP: usize = 8; #[derive(Debug)] pub struct TpuSockets { - pub transactions_quic_sockets: UdpSocket, - pub transactions_forwards_quic_sockets: UdpSocket, + pub transactions_quic_sockets: Vec, + pub transactions_forwards_quic_sockets: Vec, } pub struct Tpu { @@ -55,6 +55,7 @@ impl Tpu { tpu_fwd_ip: &IpAddr, rpc_load_balancer: &Arc, max_unstaked_quic_connections: usize, + max_staked_quic_connections: usize, staked_nodes_overrides: HashMap, ) -> (Self, Receiver) { let TpuSockets { @@ -77,37 +78,51 @@ impl Tpu { let (tpu_forwards_sender, tpu_forwards_receiver) = crossbeam_channel::bounded(Tpu::TPU_QUEUE_CAPACITY); - let (_, tpu_quic_t) = spawn_server( - "quic_streamer_tpu", - transactions_quic_sockets, - keypair, - *tpu_ip, - tpu_sender.clone(), - exit.clone(), - MAX_QUIC_CONNECTIONS_PER_PEER, - staked_nodes.clone(), - MAX_STAKED_CONNECTIONS, - max_unstaked_quic_connections, - DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, - Duration::from_millis(DEFAULT_TPU_COALESCE_MS), - ) - .unwrap(); - - let (_, tpu_forwards_quic_t) = spawn_server( - "quic_streamer_tpu_forwards", - transactions_forwards_quic_sockets, - keypair, - *tpu_fwd_ip, - tpu_forwards_sender, - exit.clone(), - MAX_QUIC_CONNECTIONS_PER_PEER, - staked_nodes.clone(), - MAX_STAKED_CONNECTIONS.saturating_add(max_unstaked_quic_connections), - 0, // Prevent unstaked nodes from forwarding transactions - DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, - Duration::from_millis(DEFAULT_TPU_COALESCE_MS), - ) - .unwrap(); + let mut quic_tasks = transactions_quic_sockets + .into_iter() + .map(|sock| { + spawn_server( + "quic_streamer_tpu", + sock, + keypair, + *tpu_ip, + tpu_sender.clone(), + exit.clone(), + MAX_QUIC_CONNECTIONS_PER_PEER, + staked_nodes.clone(), + max_staked_quic_connections, + max_unstaked_quic_connections, + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + Duration::from_millis(DEFAULT_TPU_COALESCE_MS), + ) + .unwrap() + .1 + }) + .collect::>(); + + quic_tasks.extend( + transactions_forwards_quic_sockets + .into_iter() + .map(|sock| { + spawn_server( + "quic_streamer_tpu_forwards", + sock, + keypair, + *tpu_fwd_ip, + tpu_forwards_sender.clone(), + exit.clone(), + MAX_QUIC_CONNECTIONS_PER_PEER, + staked_nodes.clone(), + max_staked_quic_connections.saturating_add(max_unstaked_quic_connections), + 0, // Prevent unstaked nodes from forwarding transactions + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + Duration::from_millis(DEFAULT_TPU_COALESCE_MS), + ) + .unwrap() + .1 + }) + .collect::>(), + ); let fetch_stage = FetchStage::new(tpu_forwards_receiver, tpu_sender, exit.clone()); @@ -124,7 +139,7 @@ impl Tpu { fetch_stage, staked_nodes_updater_service, sigverify_stage, - thread_handles: vec![tpu_quic_t, tpu_forwards_quic_t], + thread_handles: quic_tasks, }, banking_packet_receiver, ) diff --git a/docker-compose.yaml b/docker-compose.yaml index 0c20cf7f..a8d82c7f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -13,10 +13,10 @@ services: - KEYPAIR_PATH=/etc/solana/id.json - PUBLIC_IP=127.0.0.1 - TPU_BIND_IP=0.0.0.0 - - TPU_PORT=10000 - - TPU_FWD_PORT=10001 - TPU_QUIC_PORT=10006 - - TPU_QUIC_FWD_PORT=10007 + - NUM_TPU_QUIC_SERVERS=10 + - TPU_QUIC_FWD_PORT=10016 + - NUM_TPU_FWD_QUIC_SERVERS=10 - GRPC_BIND_IP=0.0.0.0 - GRPC_BIND_PORT=11226 - NUM_TPU_BINDS=32 diff --git a/relayer/src/relayer.rs b/relayer/src/relayer.rs index e46f3beb..ff1495c4 100644 --- a/relayer/src/relayer.rs +++ b/relayer/src/relayer.rs @@ -2,7 +2,7 @@ use std::{ collections::{hash_map::Entry, HashMap, HashSet}, net::IpAddr, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, RwLock, }, thread, @@ -375,17 +375,14 @@ pub enum RelayerError { pub type RelayerResult = Result; +type PacketSubscriptions = + Arc>>>>; pub struct RelayerHandle { - packet_subscriptions: - Arc>>>>, + packet_subscriptions: PacketSubscriptions, } impl RelayerHandle { - pub fn new( - packet_subscriptions: &Arc< - RwLock>>>, - >, - ) -> RelayerHandle { + pub fn new(packet_subscriptions: &PacketSubscriptions) -> RelayerHandle { RelayerHandle { packet_subscriptions: packet_subscriptions.clone(), } @@ -402,15 +399,15 @@ impl RelayerHandle { } pub struct RelayerImpl { - tpu_port: u16, - tpu_fwd_port: u16, + tpu_quic_ports: Vec, + tpu_fwd_quic_ports: Vec, public_ip: IpAddr, + seq: AtomicU64, subscription_sender: Sender, threads: Vec>, health_state: Arc>, - packet_subscriptions: - Arc>>>>, + packet_subscriptions: PacketSubscriptions, } impl RelayerImpl { @@ -422,8 +419,8 @@ impl RelayerImpl { delay_packet_receiver: Receiver, leader_schedule_cache: LeaderScheduleUpdatingHandle, public_ip: IpAddr, - tpu_port: u16, - tpu_fwd_port: u16, + tpu_quic_ports: Vec, + tpu_fwd_quic_ports: Vec, health_state: Arc>, exit: Arc, ofac_addresses: HashSet, @@ -463,13 +460,14 @@ impl RelayerImpl { }; Self { - tpu_port, - tpu_fwd_port, + tpu_quic_ports, + tpu_fwd_quic_ports, subscription_sender, public_ip, threads: vec![thread], health_state, packet_subscriptions, + seq: AtomicU64::new(0), } } @@ -486,9 +484,7 @@ impl RelayerImpl { leader_lookahead: u64, health_state: Arc>, exit: Arc, - packet_subscriptions: &Arc< - RwLock>>>, - >, + packet_subscriptions: &PacketSubscriptions, ofac_addresses: HashSet, address_lookup_table_cache: Arc>, validator_packet_batch_size: usize, @@ -582,9 +578,7 @@ impl RelayerImpl { fn drop_connections( disconnected_pubkeys: Vec, - subscriptions: &Arc< - RwLock>>>, - >, + subscriptions: &PacketSubscriptions, relayer_metrics: &mut RelayerMetrics, ) { relayer_metrics.num_removed_connections += disconnected_pubkeys.len() as u64; @@ -602,9 +596,7 @@ impl RelayerImpl { } fn handle_heartbeat( - subscriptions: &Arc< - RwLock>>>, - >, + subscriptions: &PacketSubscriptions, relayer_metrics: &mut RelayerMetrics, ) -> Vec { let failed_pubkey_updates = subscriptions @@ -638,9 +630,7 @@ impl RelayerImpl { /// Returns pubkeys of subscribers that failed to send fn forward_packets( maybe_packet_batches: Result, - subscriptions: &Arc< - RwLock>>>, - >, + subscriptions: &PacketSubscriptions, slot_leaders: &HashSet, relayer_metrics: &mut RelayerMetrics, ofac_addresses: &HashSet, @@ -730,9 +720,7 @@ impl RelayerImpl { fn handle_subscription( maybe_subscription: Result, - subscriptions: &Arc< - RwLock>>>, - >, + subscriptions: &PacketSubscriptions, relayer_metrics: &mut RelayerMetrics, ) -> RelayerResult<()> { match maybe_subscription? { @@ -796,14 +784,16 @@ impl Relayer for RelayerImpl { &self, _: Request, ) -> Result, Status> { + let seq = self.seq.fetch_add(1, Ordering::Acquire); return Ok(Response::new(GetTpuConfigsResponse { tpu: Some(Socket { ip: self.public_ip.to_string(), - port: self.tpu_port as i64, + port: (self.tpu_quic_ports[seq as usize % self.tpu_quic_ports.len()] - 6) as i64, }), tpu_forward: Some(Socket { ip: self.public_ip.to_string(), - port: self.tpu_fwd_port as i64, + port: (self.tpu_fwd_quic_ports[seq as usize % self.tpu_fwd_quic_ports.len()] - 6) + as i64, }), })); } diff --git a/transaction-relayer/src/main.rs b/transaction-relayer/src/main.rs index 13bd102b..230f4750 100644 --- a/transaction-relayer/src/main.rs +++ b/transaction-relayer/src/main.rs @@ -2,6 +2,7 @@ use std::{ collections::HashSet, fs, net::{IpAddr, Ipv4Addr, SocketAddr}, + ops::Range, path::PathBuf, str::FromStr, sync::{ @@ -59,26 +60,41 @@ static GLOBAL: Jemalloc = Jemalloc; #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] struct Args { - /// Port to bind to advertise for TPU - /// NOTE: There is no longer a socket created at this port since UDP transaction receiving is - /// deprecated. - #[arg(long, env, default_value_t = 11_222)] + /// DEPRECATED, will be removed in a future release. + #[deprecated(since = "0.1.8", note = "UDP TPU disabled")] + #[arg(long, env, default_value_t = 0)] tpu_port: u16, - /// Port to bind to for tpu fwd packets - /// NOTE: There is no longer a socket created at this port since UDP transaction receiving is - /// deprecated. - #[arg(long, env, default_value_t = 11_223)] + /// DEPRECATED, will be removed in a future release. + #[deprecated(since = "0.1.8", note = "UDP TPU_FWD disabled")] + #[arg(long, env, default_value_t = 0)] tpu_fwd_port: u16, - /// Port to bind to for tpu packets. Needs to be tpu_port + 6 + /// Port to bind to for tpu quic packets. + /// The TPU will bind to all ports in the range of (tpu_quic_port, tpu_quic_port + num_tpu_quic_servers). + /// Open firewall ports for this entire range + /// Make sure to not overlap any tpu forward ports with the normal tpu ports. + /// Note: get_tpu_configs will return ths port - 6 to validators to match old UDP TPU definition. #[arg(long, env, default_value_t = 11_228)] tpu_quic_port: u16, - /// Port to bind to for tpu fwd packets. Needs to be tpu_fwd_port + 6 + /// Number of tpu quic servers to spawn. + #[arg(long, env, default_value_t = 1)] + num_tpu_quic_servers: u16, + + /// Port to bind to for tpu quic fwd packets. + /// Make sure to set this to at least (num_tpu_fwd_quic_servers + 6) higher than tpu_fwd_quic_port, + /// to avoid overlap any tpu forward ports with the normal tpu ports. + /// TPU_FWD will bind to all ports in the range of (tpu_fwd_quic_port, tpu_fwd_quic_port + num_tpu_fwd_quic_servers). + /// Open firewall ports for this entire range + /// Note: get_tpu_configs will return ths port - 6 to validators to match old UDP TPU definition. #[arg(long, env, default_value_t = 11_229)] tpu_quic_fwd_port: u16, + /// Number of tpu fwd quic servers to spawn. + #[arg(long, env, default_value_t = 1)] + num_tpu_fwd_quic_servers: u16, + /// Bind IP address for GRPC server #[arg(long, env, default_value_t = IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)))] grpc_bind_ip: IpAddr, @@ -194,6 +210,10 @@ struct Args { #[arg(long, env, default_value_t = 500)] max_unstaked_quic_connections: usize, + /// Max unstaked connections for the QUIC server + #[arg(long, env, default_value_t = 2000)] + max_staked_quic_connections: usize, + /// Number of packets to send in each packet batch to the validator #[arg(long, env, default_value_t = 4)] validator_packet_batch_size: usize, @@ -220,29 +240,61 @@ struct Sockets { } fn get_sockets(args: &Args) -> Sockets { - let (tpu_quic_bind_port, mut tpu_quic_sockets) = multi_bind_in_range( - IpAddr::V4(Ipv4Addr::from([0, 0, 0, 0])), - (args.tpu_quic_port, args.tpu_quic_port + 1), - 1, - ) - .expect("to bind tpu_quic sockets"); + assert!(args.num_tpu_quic_servers < u16::MAX); + assert!(args.num_tpu_fwd_quic_servers < u16::MAX); + + let tpu_ports = Range { + start: args.tpu_quic_port, + end: args + .tpu_quic_port + .checked_add(args.num_tpu_quic_servers) + .unwrap(), + }; + let tpu_fwd_ports = Range { + start: args.tpu_quic_fwd_port, + end: args + .tpu_quic_fwd_port + .checked_add(args.num_tpu_fwd_quic_servers) + .unwrap(), + }; - let (tpu_fwd_quic_bind_port, mut tpu_fwd_quic_sockets) = multi_bind_in_range( - IpAddr::V4(Ipv4Addr::from([0, 0, 0, 0])), - (args.tpu_quic_fwd_port, args.tpu_quic_fwd_port + 1), - 1, - ) - .expect("to bind tpu_quic sockets"); + for tpu_port in tpu_ports.start..tpu_ports.end { + assert!(!tpu_fwd_ports.contains(&tpu_port)); + } + + let (tpu_p, tpu_quic_sockets): (Vec<_>, Vec<_>) = (0..args.num_tpu_quic_servers) + .map(|i| { + let (port, mut sock) = multi_bind_in_range( + IpAddr::V4(Ipv4Addr::from([0, 0, 0, 0])), + (tpu_ports.start + i, tpu_ports.start + 1 + i), + 1, + ) + .unwrap(); + + (port, sock.pop().unwrap()) + }) + .unzip(); + + let (tpu_fwd_p, tpu_fwd_quic_sockets): (Vec<_>, Vec<_>) = (0..args.num_tpu_fwd_quic_servers) + .map(|i| { + let (port, mut sock) = multi_bind_in_range( + IpAddr::V4(Ipv4Addr::from([0, 0, 0, 0])), + (tpu_fwd_ports.start + i, tpu_fwd_ports.start + 1 + i), + 1, + ) + .unwrap(); + + (port, sock.pop().unwrap()) + }) + .unzip(); - assert_eq!(tpu_quic_bind_port, args.tpu_quic_port); - assert_eq!(tpu_fwd_quic_bind_port, args.tpu_quic_fwd_port); - assert_eq!(args.tpu_port + 6, tpu_quic_bind_port); // QUIC is expected to be at TPU + 6 - assert_eq!(args.tpu_fwd_port + 6, tpu_fwd_quic_bind_port); // QUIC is expected to be at TPU forward + 6 + assert_eq!(tpu_ports.collect::>(), tpu_p); + assert_eq!(tpu_fwd_ports.collect::>(), tpu_fwd_p); Sockets { tpu_sockets: TpuSockets { - transactions_quic_sockets: tpu_quic_sockets.pop().unwrap(), - transactions_forwards_quic_sockets: tpu_fwd_quic_sockets.pop().unwrap(), + transactions_quic_sockets: tpu_quic_sockets, + transactions_forwards_quic_sockets: tpu_fwd_quic_sockets, }, tpu_ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED), tpu_fwd_ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED), @@ -369,6 +421,7 @@ fn main() { &sockets.tpu_fwd_ip, &rpc_load_balancer, args.max_unstaked_quic_connections, + args.max_staked_quic_connections, staked_nodes_overrides.staked_map_id, ); @@ -435,8 +488,9 @@ fn main() { delay_packet_receiver, leader_cache.handle(), public_ip, - args.tpu_port, - args.tpu_fwd_port, + (args.tpu_quic_port..args.tpu_quic_port + args.num_tpu_quic_servers as u16).collect(), + (args.tpu_quic_fwd_port..args.tpu_quic_fwd_port + args.num_tpu_fwd_quic_servers as u16) + .collect(), health_manager.handle(), exit.clone(), ofac_addresses,