Skip to content

Commit

Permalink
more quic servers (#116)
Browse files Browse the repository at this point in the history
Co-authored-by: Jed <[email protected]>
  • Loading branch information
segfaultdoc and jedleggett authored Apr 5, 2024
1 parent 32f57d0 commit 6fb91ab
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 100 deletions.
83 changes: 49 additions & 34 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ pub const MAX_QUIC_CONNECTIONS_PER_IP: usize = 8;

#[derive(Debug)]
pub struct TpuSockets {
pub transactions_quic_sockets: UdpSocket,
pub transactions_forwards_quic_sockets: UdpSocket,
pub transactions_quic_sockets: Vec<UdpSocket>,
pub transactions_forwards_quic_sockets: Vec<UdpSocket>,
}

pub struct Tpu {
Expand All @@ -55,6 +55,7 @@ impl Tpu {
tpu_fwd_ip: &IpAddr,
rpc_load_balancer: &Arc<LoadBalancer>,
max_unstaked_quic_connections: usize,
max_staked_quic_connections: usize,
staked_nodes_overrides: HashMap<Pubkey, u64>,
) -> (Self, Receiver<BankingPacketBatch>) {
let TpuSockets {
Expand All @@ -77,37 +78,51 @@ impl Tpu {
let (tpu_forwards_sender, tpu_forwards_receiver) =
crossbeam_channel::bounded(Tpu::TPU_QUEUE_CAPACITY);

let (_, tpu_quic_t) = spawn_server(
"quic_streamer_tpu",
transactions_quic_sockets,
keypair,
*tpu_ip,
tpu_sender.clone(),
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS,
max_unstaked_quic_connections,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
Duration::from_millis(DEFAULT_TPU_COALESCE_MS),
)
.unwrap();

let (_, tpu_forwards_quic_t) = spawn_server(
"quic_streamer_tpu_forwards",
transactions_forwards_quic_sockets,
keypair,
*tpu_fwd_ip,
tpu_forwards_sender,
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS.saturating_add(max_unstaked_quic_connections),
0, // Prevent unstaked nodes from forwarding transactions
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
Duration::from_millis(DEFAULT_TPU_COALESCE_MS),
)
.unwrap();
let mut quic_tasks = transactions_quic_sockets
.into_iter()
.map(|sock| {
spawn_server(
"quic_streamer_tpu",
sock,
keypair,
*tpu_ip,
tpu_sender.clone(),
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
max_staked_quic_connections,
max_unstaked_quic_connections,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
Duration::from_millis(DEFAULT_TPU_COALESCE_MS),
)
.unwrap()
.1
})
.collect::<Vec<_>>();

quic_tasks.extend(
transactions_forwards_quic_sockets
.into_iter()
.map(|sock| {
spawn_server(
"quic_streamer_tpu_forwards",
sock,
keypair,
*tpu_fwd_ip,
tpu_forwards_sender.clone(),
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
max_staked_quic_connections.saturating_add(max_unstaked_quic_connections),
0, // Prevent unstaked nodes from forwarding transactions
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
Duration::from_millis(DEFAULT_TPU_COALESCE_MS),
)
.unwrap()
.1
})
.collect::<Vec<_>>(),
);

let fetch_stage = FetchStage::new(tpu_forwards_receiver, tpu_sender, exit.clone());

Expand All @@ -124,7 +139,7 @@ impl Tpu {
fetch_stage,
staked_nodes_updater_service,
sigverify_stage,
thread_handles: vec![tpu_quic_t, tpu_forwards_quic_t],
thread_handles: quic_tasks,
},
banking_packet_receiver,
)
Expand Down
6 changes: 3 additions & 3 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ services:
- KEYPAIR_PATH=/etc/solana/id.json
- PUBLIC_IP=127.0.0.1
- TPU_BIND_IP=0.0.0.0
- TPU_PORT=10000
- TPU_FWD_PORT=10001
- TPU_QUIC_PORT=10006
- TPU_QUIC_FWD_PORT=10007
- NUM_TPU_QUIC_SERVERS=10
- TPU_QUIC_FWD_PORT=10016
- NUM_TPU_FWD_QUIC_SERVERS=10
- GRPC_BIND_IP=0.0.0.0
- GRPC_BIND_PORT=11226
- NUM_TPU_BINDS=32
Expand Down
56 changes: 23 additions & 33 deletions relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
collections::{hash_map::Entry, HashMap, HashSet},
net::IpAddr,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, RwLock,
},
thread,
Expand Down Expand Up @@ -375,17 +375,14 @@ pub enum RelayerError {

pub type RelayerResult<T> = Result<T, RelayerError>;

type PacketSubscriptions =
Arc<RwLock<HashMap<Pubkey, TokioSender<Result<SubscribePacketsResponse, Status>>>>>;
pub struct RelayerHandle {
packet_subscriptions:
Arc<RwLock<HashMap<Pubkey, TokioSender<Result<SubscribePacketsResponse, Status>>>>>,
packet_subscriptions: PacketSubscriptions,
}

impl RelayerHandle {
pub fn new(
packet_subscriptions: &Arc<
RwLock<HashMap<Pubkey, TokioSender<Result<SubscribePacketsResponse, Status>>>>,
>,
) -> RelayerHandle {
pub fn new(packet_subscriptions: &PacketSubscriptions) -> RelayerHandle {
RelayerHandle {
packet_subscriptions: packet_subscriptions.clone(),
}
Expand All @@ -402,15 +399,15 @@ impl RelayerHandle {
}

pub struct RelayerImpl {
tpu_port: u16,
tpu_fwd_port: u16,
tpu_quic_ports: Vec<u16>,
tpu_fwd_quic_ports: Vec<u16>,
public_ip: IpAddr,
seq: AtomicU64,

subscription_sender: Sender<Subscription>,
threads: Vec<JoinHandle<()>>,
health_state: Arc<RwLock<HealthState>>,
packet_subscriptions:
Arc<RwLock<HashMap<Pubkey, TokioSender<Result<SubscribePacketsResponse, Status>>>>>,
packet_subscriptions: PacketSubscriptions,
}

impl RelayerImpl {
Expand All @@ -422,8 +419,8 @@ impl RelayerImpl {
delay_packet_receiver: Receiver<RelayerPacketBatches>,
leader_schedule_cache: LeaderScheduleUpdatingHandle,
public_ip: IpAddr,
tpu_port: u16,
tpu_fwd_port: u16,
tpu_quic_ports: Vec<u16>,
tpu_fwd_quic_ports: Vec<u16>,
health_state: Arc<RwLock<HealthState>>,
exit: Arc<AtomicBool>,
ofac_addresses: HashSet<Pubkey>,
Expand Down Expand Up @@ -463,13 +460,14 @@ impl RelayerImpl {
};

Self {
tpu_port,
tpu_fwd_port,
tpu_quic_ports,
tpu_fwd_quic_ports,
subscription_sender,
public_ip,
threads: vec![thread],
health_state,
packet_subscriptions,
seq: AtomicU64::new(0),
}
}

Expand All @@ -486,9 +484,7 @@ impl RelayerImpl {
leader_lookahead: u64,
health_state: Arc<RwLock<HealthState>>,
exit: Arc<AtomicBool>,
packet_subscriptions: &Arc<
RwLock<HashMap<Pubkey, TokioSender<Result<SubscribePacketsResponse, Status>>>>,
>,
packet_subscriptions: &PacketSubscriptions,
ofac_addresses: HashSet<Pubkey>,
address_lookup_table_cache: Arc<DashMap<Pubkey, AddressLookupTableAccount>>,
validator_packet_batch_size: usize,
Expand Down Expand Up @@ -582,9 +578,7 @@ impl RelayerImpl {

fn drop_connections(
disconnected_pubkeys: Vec<Pubkey>,
subscriptions: &Arc<
RwLock<HashMap<Pubkey, TokioSender<Result<SubscribePacketsResponse, Status>>>>,
>,
subscriptions: &PacketSubscriptions,
relayer_metrics: &mut RelayerMetrics,
) {
relayer_metrics.num_removed_connections += disconnected_pubkeys.len() as u64;
Expand All @@ -602,9 +596,7 @@ impl RelayerImpl {
}

fn handle_heartbeat(
subscriptions: &Arc<
RwLock<HashMap<Pubkey, TokioSender<Result<SubscribePacketsResponse, Status>>>>,
>,
subscriptions: &PacketSubscriptions,
relayer_metrics: &mut RelayerMetrics,
) -> Vec<Pubkey> {
let failed_pubkey_updates = subscriptions
Expand Down Expand Up @@ -638,9 +630,7 @@ impl RelayerImpl {
/// Returns pubkeys of subscribers that failed to send
fn forward_packets(
maybe_packet_batches: Result<RelayerPacketBatches, RecvError>,
subscriptions: &Arc<
RwLock<HashMap<Pubkey, TokioSender<Result<SubscribePacketsResponse, Status>>>>,
>,
subscriptions: &PacketSubscriptions,
slot_leaders: &HashSet<Pubkey>,
relayer_metrics: &mut RelayerMetrics,
ofac_addresses: &HashSet<Pubkey>,
Expand Down Expand Up @@ -730,9 +720,7 @@ impl RelayerImpl {

fn handle_subscription(
maybe_subscription: Result<Subscription, RecvError>,
subscriptions: &Arc<
RwLock<HashMap<Pubkey, TokioSender<Result<SubscribePacketsResponse, Status>>>>,
>,
subscriptions: &PacketSubscriptions,
relayer_metrics: &mut RelayerMetrics,
) -> RelayerResult<()> {
match maybe_subscription? {
Expand Down Expand Up @@ -796,14 +784,16 @@ impl Relayer for RelayerImpl {
&self,
_: Request<GetTpuConfigsRequest>,
) -> Result<Response<GetTpuConfigsResponse>, Status> {
let seq = self.seq.fetch_add(1, Ordering::Acquire);
return Ok(Response::new(GetTpuConfigsResponse {
tpu: Some(Socket {
ip: self.public_ip.to_string(),
port: self.tpu_port as i64,
port: (self.tpu_quic_ports[seq as usize % self.tpu_quic_ports.len()] - 6) as i64,
}),
tpu_forward: Some(Socket {
ip: self.public_ip.to_string(),
port: self.tpu_fwd_port as i64,
port: (self.tpu_fwd_quic_ports[seq as usize % self.tpu_fwd_quic_ports.len()] - 6)
as i64,
}),
}));
}
Expand Down
Loading

0 comments on commit 6fb91ab

Please sign in to comment.