diff --git a/core/src/repair/ancestor_hashes_service.rs b/core/src/repair/ancestor_hashes_service.rs index a20794189a19ff..b36878976ef960 100644 --- a/core/src/repair/ancestor_hashes_service.rs +++ b/core/src/repair/ancestor_hashes_service.rs @@ -7,23 +7,22 @@ use { }, outstanding_requests::OutstandingRequests, packet_threshold::DynamicPacketToProcessThreshold, - quic_endpoint::LocalRequest, repair_service::{AncestorDuplicateSlotsSender, RepairInfo, RepairStatsGroup}, - request_response::RequestResponse, serve_repair::{ self, AncestorHashesRepairType, AncestorHashesResponse, RepairProtocol, ServeRepair, }, }, replay_stage::DUPLICATE_THRESHOLD, - shred_fetch_stage::receive_repair_quic_packets, + shred_fetch_stage::receive_quic_datagrams, }, bincode::serialize, + bytes::Bytes, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, dashmap::{mapref::entry::Entry::Occupied, DashMap}, solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol, ping_pong::Pong}, solana_ledger::blockstore::Blockstore, solana_perf::{ - packet::{deserialize_from_with_limit, Packet, PacketBatch}, + packet::{deserialize_from_with_limit, Packet, PacketBatch, PacketFlags}, recycler::Recycler, }, solana_runtime::bank::Bank, @@ -153,7 +152,8 @@ impl AncestorHashesService { exit: Arc, blockstore: Arc, ancestor_hashes_request_socket: Arc, - quic_endpoint_sender: AsyncSender, + ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>, + ancestor_hashes_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, repair_info: RepairInfo, ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, ) -> Self { @@ -171,17 +171,17 @@ impl AncestorHashesService { Duration::from_millis(1), // coalesce false, // use_pinned_memory None, // in_vote_only_mode - false, // is_staked_service + false, // is_staked_service ); - let (quic_endpoint_response_sender, quic_endpoint_response_receiver) = unbounded(); let t_receiver_quic = { let exit = exit.clone(); Builder::new() .name(String::from("solAncHashQuic")) .spawn(|| { - receive_repair_quic_packets( - quic_endpoint_response_receiver, + receive_quic_datagrams( + ancestor_hashes_response_quic_receiver, + PacketFlags::REPAIR, response_sender, Recycler::default(), exit, @@ -210,8 +210,7 @@ impl AncestorHashesService { let t_ancestor_requests = Self::run_manage_ancestor_requests( ancestor_hashes_request_statuses, ancestor_hashes_request_socket, - quic_endpoint_sender, - quic_endpoint_response_sender, + ancestor_hashes_request_quic_sender, repair_info, outstanding_requests, exit, @@ -586,8 +585,7 @@ impl AncestorHashesService { fn run_manage_ancestor_requests( ancestor_hashes_request_statuses: Arc>, ancestor_hashes_request_socket: Arc, - quic_endpoint_sender: AsyncSender, - quic_endpoint_response_sender: Sender<(SocketAddr, Vec)>, + ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>, repair_info: RepairInfo, outstanding_requests: Arc>, exit: Arc, @@ -627,8 +625,7 @@ impl AncestorHashesService { Self::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &ancestor_hashes_request_quic_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, @@ -650,8 +647,7 @@ impl AncestorHashesService { fn manage_ancestor_requests( ancestor_hashes_request_statuses: &DashMap, ancestor_hashes_request_socket: &UdpSocket, - quic_endpoint_sender: &AsyncSender, - quic_endpoint_response_sender: &Sender<(SocketAddr, Vec)>, + ancestor_hashes_request_quic_sender: &AsyncSender<(SocketAddr, Bytes)>, repair_info: &RepairInfo, outstanding_requests: &RwLock, ancestor_hashes_replay_update_receiver: &AncestorHashesReplayUpdateReceiver, @@ -750,8 +746,7 @@ impl AncestorHashesService { if Self::initiate_ancestor_hashes_requests_for_duplicate_slot( ancestor_hashes_request_statuses, ancestor_hashes_request_socket, - quic_endpoint_sender, - quic_endpoint_response_sender, + ancestor_hashes_request_quic_sender, &repair_info.cluster_slots, serve_repair, &repair_info.repair_validators, @@ -829,8 +824,7 @@ impl AncestorHashesService { fn initiate_ancestor_hashes_requests_for_duplicate_slot( ancestor_hashes_request_statuses: &DashMap, ancestor_hashes_request_socket: &UdpSocket, - quic_endpoint_sender: &AsyncSender, - quic_endpoint_response_sender: &Sender<(SocketAddr, Vec)>, + ancestor_hashes_request_quic_sender: &AsyncSender<(SocketAddr, Bytes)>, cluster_slots: &ClusterSlots, serve_repair: &ServeRepair, repair_validators: &Option>, @@ -873,16 +867,10 @@ impl AncestorHashesService { let _ = ancestor_hashes_request_socket.send_to(&request_bytes, socket_addr); } Protocol::QUIC => { - let num_expected_responses = - usize::try_from(ancestor_hashes_repair_type.num_expected_responses()) - .unwrap(); - let request = LocalRequest { - remote_address: *socket_addr, - bytes: request_bytes, - num_expected_responses, - response_sender: quic_endpoint_response_sender.clone(), - }; - if quic_endpoint_sender.blocking_send(request).is_err() { + if ancestor_hashes_request_quic_sender + .blocking_send((*socket_addr, Bytes::from(request_bytes))) + .is_err() + { // The receiver end of the channel is disconnected. break; } @@ -1316,10 +1304,12 @@ mod test { let t_packet_adapter = Builder::new() .spawn(|| adapt_repair_requests_packets(requests_receiver, remote_request_sender)) .unwrap(); + let (repair_response_quic_sender, _) = tokio::sync::mpsc::channel(/*buffer:*/ 128); let t_listen = responder_serve_repair.listen( blockstore, remote_request_receiver, response_sender, + repair_response_quic_sender, exit.clone(), ); @@ -1511,14 +1501,12 @@ mod test { repair_validators, .. } = repair_info; - let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = unbounded(); - let (quic_endpoint_sender, _quic_endpoint_sender) = + let (ancestor_hashes_request_quic_sender, _) = tokio::sync::mpsc::channel(/*buffer:*/ 128); AncestorHashesService::initiate_ancestor_hashes_requests_for_duplicate_slot( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &ancestor_hashes_request_quic_sender, &cluster_slots, &requester_serve_repair, &repair_validators, @@ -1568,8 +1556,7 @@ mod test { AncestorHashesService::initiate_ancestor_hashes_requests_for_duplicate_slot( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &ancestor_hashes_request_quic_sender, &cluster_slots, &requester_serve_repair, &repair_validators, @@ -1631,8 +1618,7 @@ mod test { AncestorHashesService::initiate_ancestor_hashes_requests_for_duplicate_slot( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &ancestor_hashes_request_quic_sender, &cluster_slots, &requester_serve_repair, &repair_validators, @@ -1718,15 +1704,13 @@ mod test { } = repair_info; cluster_info.insert_info(responder_node.info); bank_forks.read().unwrap().root_bank().epoch_schedule(); - let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = unbounded(); - let (quic_endpoint_sender, _quic_endpoint_sender) = + let (ancestor_hashes_request_quic_sender, _) = tokio::sync::mpsc::channel(/*buffer:*/ 128); // 1) No signals from ReplayStage, no requests should be made AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &ancestor_hashes_request_quic_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, @@ -1769,8 +1753,7 @@ mod test { AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &ancestor_hashes_request_quic_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, @@ -1810,8 +1793,7 @@ mod test { AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &ancestor_hashes_request_quic_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, @@ -1843,8 +1825,7 @@ mod test { AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &ancestor_hashes_request_quic_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, @@ -1882,8 +1863,7 @@ mod test { AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &ancestor_hashes_request_quic_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, @@ -1926,8 +1906,7 @@ mod test { AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &ancestor_hashes_request_quic_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, @@ -2084,15 +2063,13 @@ mod test { &leader_schedule_cache, ); - let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = unbounded(); - let (quic_endpoint_sender, _quic_endpoint_sender) = + let (ancestor_hashes_request_quic_sender, _) = tokio::sync::mpsc::channel(/*buffer:*/ 128); // Simulate making a request AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &ancestor_hashes_request_quic_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, @@ -2188,8 +2165,7 @@ mod test { &repair_info.ancestor_duplicate_slots_sender, &retryable_slots_sender, ); - let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = unbounded(); - let (quic_endpoint_sender, _quic_endpoint_sender) = + let (ancestor_hashes_request_quic_sender, _) = tokio::sync::mpsc::channel(/*buffer:*/ 128); // Simulate ancestor request thread getting the retry signal @@ -2199,8 +2175,7 @@ mod test { AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &ancestor_hashes_request_quic_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, @@ -2239,8 +2214,7 @@ mod test { AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &ancestor_hashes_request_quic_sender, &repair_info, &outstanding_requests, &ancestor_hashes_replay_update_receiver, diff --git a/core/src/repair/quic_endpoint.rs b/core/src/repair/quic_endpoint.rs index 5f4fea3f637e73..13197b39f9afe9 100644 --- a/core/src/repair/quic_endpoint.rs +++ b/core/src/repair/quic_endpoint.rs @@ -1,28 +1,23 @@ use { - bincode::Options, + bytes::Bytes, crossbeam_channel::Sender, - futures::future::TryJoin, - itertools::Itertools, + futures::future::{TryJoin, TryJoin3}, log::error, quinn::{ - crypto::rustls::{QuicClientConfig, QuicServerConfig}, - ClientConfig, ClosedStream, ConnectError, Connecting, Connection, ConnectionError, - Endpoint, EndpointConfig, IdleTimeout, ReadError, ReadToEndError, RecvStream, SendStream, - ServerConfig, TokioRuntime, TransportConfig, VarInt, WriteError, + ClientConfig, ConnectError, Connecting, Connection, ConnectionError, Endpoint, + EndpointConfig, IdleTimeout, SendDatagramError, ServerConfig, TokioRuntime, + TransportConfig, VarInt, }, - rustls::{ - pki_types::{CertificateDer, PrivateKeyDer}, - CertificateError, KeyLogFile, - }, - serde_bytes::ByteBuf, + rustls::{Certificate, KeyLogFile, PrivateKey}, + solana_gossip::contact_info::Protocol, solana_quic_client::nonblocking::quic_client::SkipServerVerification, solana_runtime::bank_forks::BankForks, - solana_sdk::{packet::PACKET_DATA_SIZE, pubkey::Pubkey, signature::Keypair}, + solana_sdk::{pubkey::Pubkey, signature::Keypair}, solana_streamer::{quic::SkipClientVerification, tls_certificates::new_dummy_x509_certificate}, std::{ cmp::Reverse, collections::{hash_map::Entry, HashMap}, - io::{Cursor, Error as IoError}, + io::Error as IoError, net::{SocketAddr, UdpSocket}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, @@ -34,26 +29,52 @@ use { tokio::{ sync::{ mpsc::{error::TrySendError, Receiver as AsyncReceiver, Sender as AsyncSender}, - oneshot::Sender as OneShotSender, Mutex, RwLock as AsyncRwLock, }, task::JoinHandle, }, }; -const ALPN_REPAIR_PROTOCOL_ID: &[u8] = b"solana-repair"; -const CONNECT_SERVER_NAME: &str = "solana-repair"; +// Incoming packets can be either: +// RepairProtocol +// RepairResponse or Shred + repair Nonce +// AncestorHashesResponse +// So, we need 3 QUIC endpoints on 3 separate sockets to correctly distinguish +// between these packets and send them down the right channel. +// 1) serve_repair_quic: +// The server side receives incoming RepairProtocols from the cluster and +// channels them to serve_repair using a Sender channel. +// The outgoing repair (or ancestor hashes) responses from serve_repair are +// sent back to the client side through a AsyncReceiver<(SocketAddr, Bytes)> +// channel and sent back to the remote node. +// 2) repair_quic: +// Outgoing repair requests from the repair_service are received by the +// client through a AsyncReceiver<(SocketAddr, Bytes)> channel and sent to +// serve_repair_quic socket of the remote node. +// Incoming repair responses (RepairResponse or Shred + repair Nonce) are +// channeled to shred-fetch-stage using a Sender<(Pubkey, SocketAddr, Bytes)> +// channel. +// 3) ancestor_hashes_requests_quic: +// Outgoing RepairProtocol::AncestorHashes from the ancestor_hashes_service +// are received by the client through a AsyncReceiver<(SocketAddr, Bytes)> +// channel and sent to serve_repair_quic socket of the remote node. +// Incoming AncestorHashesResponse are channeled back to +// ancestor_hashes_service using a Sender<(Pubkey, SocketAddr, Bytes)> +// channel. const CLIENT_CHANNEL_BUFFER: usize = 1 << 14; const ROUTER_CHANNEL_BUFFER: usize = 64; const CONNECTION_CACHE_CAPACITY: usize = 3072; +const ALPN_REPAIR_PROTOCOL_ID: &[u8] = b"solana-repair"; +const CONNECT_SERVER_NAME: &str = "solana-repair"; // Transport config. -// Repair randomly samples peers, uses bi-directional streams and generally has -// low to moderate load and so is configured separately from other protocols. +const DATAGRAM_RECEIVE_BUFFER_SIZE: usize = 256 * 1024 * 1024; +const DATAGRAM_SEND_BUFFER_SIZE: usize = 128 * 1024 * 1024; +const INITIAL_MAXIMUM_TRANSMISSION_UNIT: u16 = MINIMUM_MAXIMUM_TRANSMISSION_UNIT; const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(4); -const MAX_CONCURRENT_BIDI_STREAMS: VarInt = VarInt::from_u32(512); const MAX_IDLE_TIMEOUT: Duration = Duration::from_secs(10); +const MINIMUM_MAXIMUM_TRANSMISSION_UNIT: u16 = 1280; const CONNECTION_CLOSE_ERROR_CODE_SHUTDOWN: VarInt = VarInt::from_u32(1); const CONNECTION_CLOSE_ERROR_CODE_DROPPED: VarInt = VarInt::from_u32(2); @@ -67,23 +88,49 @@ const CONNECTION_CLOSE_REASON_INVALID_IDENTITY: &[u8] = b"INVALID_IDENTITY"; const CONNECTION_CLOSE_REASON_REPLACED: &[u8] = b"REPLACED"; const CONNECTION_CLOSE_REASON_PRUNED: &[u8] = b"PRUNED"; -pub(crate) type AsyncTryJoinHandle = TryJoin, JoinHandle<()>>; - -// Outgoing local requests. -pub struct LocalRequest { - pub(crate) remote_address: SocketAddr, - pub(crate) bytes: Vec, - pub(crate) num_expected_responses: usize, - pub(crate) response_sender: Sender<(SocketAddr, Vec)>, -} +pub(crate) type AsyncTryJoinHandle = TryJoin3< + TryJoin, JoinHandle<()>>, + TryJoin, JoinHandle<()>>, + TryJoin, JoinHandle<()>>, +>; // Incoming requests from remote nodes. -// remote_pubkey and response_sender are None only when adapting UDP packets. -pub struct RemoteRequest { +// remote_pubkey is None only when adapting UDP packets. +pub(crate) struct RemoteRequest { pub(crate) remote_pubkey: Option, pub(crate) remote_address: SocketAddr, - pub(crate) bytes: Vec, - pub(crate) response_sender: Option>>>, + pub(crate) bytes: Bytes, +} + +pub(crate) struct RepairQuicAsyncSenders { + // Outgoing repair responses to remote repair requests from serve_repair. + pub(crate) repair_response_quic_sender: AsyncSender<(SocketAddr, Bytes)>, + // Outgoing local repair requests from repair_service. + pub(crate) repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>, + // Outgoing RepairProtocol::AncestorHashes requests from + // ancestor_hashes_service. + pub(crate) ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>, +} + +pub(crate) struct RepairQuicSockets { + // Socket receiving remote repair requests from the cluster, + // and sending back repair responses. + pub(crate) repair_service_quic_socket: UdpSocket, + // Socket sending out local repair requests, + // and receiving repair responses from the cluster. + pub(crate) repair_client_quic_socket: UdpSocket, + // Socket sending out local RepairProtocol::AncestorHashes, + // and receiving AncestorHashesResponse from the cluster. + pub(crate) ancestor_hashes_quic_socket: UdpSocket, +} + +pub(crate) struct RepairQuicSenders { + // Channel to send incoming repair requests from the cluster. + pub(crate) repair_request_quic_sender: Sender, + // Channel to send incoming repair responses from the cluster. + pub(crate) repair_response_quic_sender: Sender<(Pubkey, SocketAddr, Bytes)>, + // Channel to send incoming ancestor hashes responses from the cluster. + pub(crate) ancestor_hashes_response_quic_sender: Sender<(Pubkey, SocketAddr, Bytes)>, } #[derive(Error, Debug)] @@ -99,18 +146,10 @@ pub(crate) enum Error { InvalidIdentity(SocketAddr), #[error(transparent)] IoError(#[from] IoError), - #[error("No Response Received")] - NoResponseReceived, #[error(transparent)] - ReadToEndError(#[from] ReadToEndError), - #[error("read_to_end Timeout")] - ReadToEndTimeout, + SendDatagramError(#[from] SendDatagramError), #[error(transparent)] TlsError(#[from] rustls::Error), - #[error(transparent)] - WriteError(#[from] WriteError), - #[error(transparent)] - ClosedStream(#[from] ClosedStream), } macro_rules! add_metric { @@ -119,16 +158,87 @@ macro_rules! add_metric { }}; } +pub(crate) fn new_quic_endpoints( + runtime: &tokio::runtime::Handle, + keypair: &Keypair, + sockets: RepairQuicSockets, + senders: RepairQuicSenders, + bank_forks: Arc>, +) -> Result<([Endpoint; 3], RepairQuicAsyncSenders, AsyncTryJoinHandle), Error> { + let (repair_service_quic_endpoint, repair_response_quic_sender, repair_service_join_handle) = + new_quic_endpoint( + runtime, + "repair_service_quic_client", + "repair_service_quic_server", + keypair, + sockets.repair_service_quic_socket, + senders.repair_request_quic_sender, + bank_forks.clone(), + )?; + let (repair_client_quic_endpoint, repair_request_quic_sender, repair_client_join_handle) = + new_quic_endpoint( + runtime, + "repair_client_quic_client", + "repair_client_quic_server", + keypair, + sockets.repair_client_quic_socket, + senders.repair_response_quic_sender, + bank_forks.clone(), + )?; + let ( + ancestor_hashes_quic_endpoint, + ancestor_hashes_request_quic_sender, + ancestor_hashes_join_handle, + ) = new_quic_endpoint( + runtime, + "ancestor_hashes_quic_client", + "ancestor_hashes_quic_server", + keypair, + sockets.ancestor_hashes_quic_socket, + senders.ancestor_hashes_response_quic_sender, + bank_forks, + )?; + Ok(( + [ + repair_service_quic_endpoint, + repair_client_quic_endpoint, + ancestor_hashes_quic_endpoint, + ], + RepairQuicAsyncSenders { + repair_response_quic_sender, + repair_request_quic_sender, + ancestor_hashes_request_quic_sender, + }, + futures::future::try_join3( + repair_service_join_handle, + repair_client_join_handle, + ancestor_hashes_join_handle, + ), + )) +} + #[allow(clippy::type_complexity)] -pub(crate) fn new_quic_endpoint( +fn new_quic_endpoint( runtime: &tokio::runtime::Handle, + client_name: &'static str, + server_name: &'static str, keypair: &Keypair, socket: UdpSocket, - remote_request_sender: Sender, + sender: Sender, bank_forks: Arc>, -) -> Result<(Endpoint, AsyncSender, AsyncTryJoinHandle), Error> { +) -> Result< + ( + Endpoint, + AsyncSender<(SocketAddr, Bytes)>, + TryJoin, JoinHandle<()>>, + ), + Error, +> +where + T: 'static + From<(Pubkey, SocketAddr, Bytes)> + Send, +{ let (cert, key) = new_dummy_x509_certificate(keypair); - let server_config = new_server_config(cert.clone(), key.clone_key())?; + let server_config = new_server_config(cert.clone(), key.clone())?; let client_config = new_client_config(cert, key)?; let mut endpoint = { // Endpoint::new requires entering the runtime context, @@ -144,11 +254,12 @@ pub(crate) fn new_quic_endpoint( endpoint.set_default_client_config(client_config); let prune_cache_pending = Arc::::default(); let cache = Arc::>>::default(); + let router = Arc::>>>::default(); let (client_sender, client_receiver) = tokio::sync::mpsc::channel(CLIENT_CHANNEL_BUFFER); - let router = Arc::>>>::default(); let server_task = runtime.spawn(run_server( endpoint.clone(), - remote_request_sender.clone(), + server_name, + sender.clone(), bank_forks.clone(), prune_cache_pending.clone(), router.clone(), @@ -156,8 +267,9 @@ pub(crate) fn new_quic_endpoint( )); let client_task = runtime.spawn(run_client( endpoint.clone(), + client_name, client_receiver, - remote_request_sender, + sender, bank_forks, prune_cache_pending, router, @@ -174,36 +286,29 @@ pub(crate) fn close_quic_endpoint(endpoint: &Endpoint) { ); } -fn new_server_config( - cert: CertificateDer<'static>, - key: PrivateKeyDer<'static>, -) -> Result { +fn new_server_config(cert: Certificate, key: PrivateKey) -> Result { let mut config = rustls::ServerConfig::builder() - .with_client_cert_verifier(SkipClientVerification::new()) + .with_safe_defaults() + .with_client_cert_verifier(Arc::new(SkipClientVerification {})) .with_single_cert(vec![cert], key)?; config.alpn_protocols = vec![ALPN_REPAIR_PROTOCOL_ID.to_vec()]; config.key_log = Arc::new(KeyLogFile::new()); - let quic_server_config = QuicServerConfig::try_from(config) - .map_err(|_err| rustls::Error::InvalidCertificate(CertificateError::BadSignature))?; - - let mut config = ServerConfig::with_crypto(Arc::new(quic_server_config)); + let mut config = ServerConfig::with_crypto(Arc::new(config)); config .transport_config(Arc::new(new_transport_config())) + .use_retry(true) .migration(false); Ok(config) } -fn new_client_config( - cert: CertificateDer<'static>, - key: PrivateKeyDer<'static>, -) -> Result { +fn new_client_config(cert: Certificate, key: PrivateKey) -> Result { let mut config = rustls::ClientConfig::builder() - .dangerous() - .with_custom_certificate_verifier(SkipServerVerification::new()) + .with_safe_defaults() + .with_custom_certificate_verifier(Arc::new(SkipServerVerification {})) .with_client_auth_cert(vec![cert], key)?; config.enable_early_data = true; config.alpn_protocols = vec![ALPN_REPAIR_PROTOCOL_ID.to_vec()]; - let mut config = ClientConfig::new(Arc::new(QuicClientConfig::try_from(config).unwrap())); + let mut config = ClientConfig::new(Arc::new(config)); config.transport_config(Arc::new(new_transport_config())); Ok(config) } @@ -211,82 +316,80 @@ fn new_client_config( fn new_transport_config() -> TransportConfig { let max_idle_timeout = IdleTimeout::try_from(MAX_IDLE_TIMEOUT).unwrap(); let mut config = TransportConfig::default(); - // Disable datagrams and uni streams. config - .datagram_receive_buffer_size(None) + .datagram_receive_buffer_size(Some(DATAGRAM_RECEIVE_BUFFER_SIZE)) + .datagram_send_buffer_size(DATAGRAM_SEND_BUFFER_SIZE) + .initial_mtu(INITIAL_MAXIMUM_TRANSMISSION_UNIT) .keep_alive_interval(Some(KEEP_ALIVE_INTERVAL)) - .max_concurrent_bidi_streams(MAX_CONCURRENT_BIDI_STREAMS) + .max_concurrent_bidi_streams(VarInt::from(0u8)) .max_concurrent_uni_streams(VarInt::from(0u8)) - .max_idle_timeout(Some(max_idle_timeout)); + .max_idle_timeout(Some(max_idle_timeout)) + .min_mtu(MINIMUM_MAXIMUM_TRANSMISSION_UNIT) + .mtu_discovery_config(None); config } -async fn run_server( +async fn run_server( endpoint: Endpoint, - remote_request_sender: Sender, + server_name: &'static str, + sender: Sender, bank_forks: Arc>, prune_cache_pending: Arc, - router: Arc>>>, + router: Arc>>>, cache: Arc>>, -) { +) where + T: 'static + From<(Pubkey, SocketAddr, Bytes)> + Send, +{ let stats = Arc::::default(); - let report_metrics_task = - tokio::task::spawn(report_metrics_task("repair_quic_server", stats.clone())); - while let Some(incoming) = endpoint.accept().await { - let remote_addr: SocketAddr = incoming.remote_address(); - let connecting = incoming.accept(); - match connecting { - Ok(connecting) => { - tokio::task::spawn(handle_connecting_task( - endpoint.clone(), - connecting, - remote_request_sender.clone(), - bank_forks.clone(), - prune_cache_pending.clone(), - router.clone(), - cache.clone(), - stats.clone(), - )); - } - Err(error) => { - debug!("Error while accepting incoming connection: {error:?} from {remote_addr}"); - } - } + let report_metrics_task = tokio::task::spawn(report_metrics_task(server_name, stats.clone())); + while let Some(connecting) = endpoint.accept().await { + tokio::task::spawn(handle_connecting_task( + endpoint.clone(), + connecting, + sender.clone(), + bank_forks.clone(), + prune_cache_pending.clone(), + router.clone(), + cache.clone(), + stats.clone(), + )); } report_metrics_task.abort(); } -async fn run_client( +async fn run_client( endpoint: Endpoint, - mut receiver: AsyncReceiver, - remote_request_sender: Sender, + client_name: &'static str, + mut receiver: AsyncReceiver<(SocketAddr, Bytes)>, + sender: Sender, bank_forks: Arc>, prune_cache_pending: Arc, - router: Arc>>>, + router: Arc>>>, cache: Arc>>, -) { +) where + T: 'static + From<(Pubkey, SocketAddr, Bytes)> + Send, +{ let stats = Arc::::default(); - let report_metrics_task = - tokio::task::spawn(report_metrics_task("repair_quic_client", stats.clone())); - while let Some(request) = receiver.recv().await { - let Some(request) = try_route_request(request, &*router.read().await, &stats) else { + let report_metrics_task = tokio::task::spawn(report_metrics_task(client_name, stats.clone())); + while let Some((remote_address, bytes)) = receiver.recv().await { + let Some(bytes) = try_route_bytes(&remote_address, bytes, &*router.read().await, &stats) + else { continue; }; - let remote_address = request.remote_address; let receiver = { let mut router = router.write().await; - let Some(request) = try_route_request(request, &router, &stats) else { + let Some(bytes) = try_route_bytes(&remote_address, bytes, &router, &stats) else { continue; }; let (sender, receiver) = tokio::sync::mpsc::channel(ROUTER_CHANNEL_BUFFER); - sender.try_send(request).unwrap(); + sender.try_send(bytes).unwrap(); router.insert(remote_address, sender); receiver }; tokio::task::spawn(make_connection_task( endpoint.clone(), remote_address, - remote_request_sender.clone(), + sender.clone(), receiver, bank_forks.clone(), prune_cache_pending.clone(), @@ -301,42 +404,42 @@ async fn run_client( report_metrics_task.abort(); } -// Routes the local request to respective channel. Drops the request if the -// channel is full. Bounces the request back if the channel is closed or does -// not exist. -fn try_route_request( - request: LocalRequest, - router: &HashMap>, +fn try_route_bytes( + remote_address: &SocketAddr, + bytes: Bytes, + router: &HashMap>, stats: &RepairQuicStats, -) -> Option { - match router.get(&request.remote_address) { - None => Some(request), - Some(sender) => match sender.try_send(request) { +) -> Option { + match router.get(remote_address) { + None => Some(bytes), + Some(sender) => match sender.try_send(bytes) { Ok(()) => None, - Err(TrySendError::Full(request)) => { - debug!("TrySendError::Full {}", request.remote_address); + Err(TrySendError::Full(_)) => { + debug!("TrySendError::Full {remote_address}"); add_metric!(stats.router_try_send_error_full); None } - Err(TrySendError::Closed(request)) => Some(request), + Err(TrySendError::Closed(bytes)) => Some(bytes), }, } } -async fn handle_connecting_task( +async fn handle_connecting_task( endpoint: Endpoint, connecting: Connecting, - remote_request_sender: Sender, + sender: Sender, bank_forks: Arc>, prune_cache_pending: Arc, - router: Arc>>>, + router: Arc>>>, cache: Arc>>, stats: Arc, -) { +) where + T: 'static + From<(Pubkey, SocketAddr, Bytes)> + Send, +{ if let Err(err) = handle_connecting( endpoint, connecting, - remote_request_sender, + sender, bank_forks, prune_cache_pending, router, @@ -350,16 +453,19 @@ async fn handle_connecting_task( } } -async fn handle_connecting( +async fn handle_connecting( endpoint: Endpoint, connecting: Connecting, - remote_request_sender: Sender, + sender: Sender, bank_forks: Arc>, prune_cache_pending: Arc, - router: Arc>>>, + router: Arc>>>, cache: Arc>>, stats: Arc, -) -> Result<(), Error> { +) -> Result<(), Error> +where + T: 'static + From<(Pubkey, SocketAddr, Bytes)> + Send, +{ let connection = connecting.await?; let remote_address = connection.remote_address(); let remote_pubkey = get_remote_pubkey(&connection)?; @@ -373,7 +479,7 @@ async fn handle_connecting( remote_address, remote_pubkey, connection, - remote_request_sender, + sender, receiver, bank_forks, prune_cache_pending, @@ -386,19 +492,21 @@ async fn handle_connecting( } #[allow(clippy::too_many_arguments)] -async fn handle_connection( +async fn handle_connection( endpoint: Endpoint, remote_address: SocketAddr, remote_pubkey: Pubkey, connection: Connection, - remote_request_sender: Sender, - receiver: AsyncReceiver, + sender: Sender, + receiver: AsyncReceiver, bank_forks: Arc>, prune_cache_pending: Arc, - router: Arc>>>, + router: Arc>>>, cache: Arc>>, stats: Arc, -) { +) where + T: 'static + From<(Pubkey, SocketAddr, Bytes)> + Send, +{ cache_connection( remote_pubkey, connection.clone(), @@ -408,30 +516,24 @@ async fn handle_connection( cache.clone(), ) .await; - let send_requests_task = tokio::task::spawn(send_requests_task( - endpoint.clone(), - remote_address, - connection.clone(), - receiver, - stats.clone(), - )); - let recv_requests_task = tokio::task::spawn(recv_requests_task( + let send_datagram_task = tokio::task::spawn(send_datagram_task(connection.clone(), receiver)); + let read_datagram_task = tokio::task::spawn(read_datagram_task( endpoint, remote_address, remote_pubkey, connection.clone(), - remote_request_sender, + sender, stats.clone(), )); - match futures::future::try_join(send_requests_task, recv_requests_task).await { + match futures::future::try_join(send_datagram_task, read_datagram_task).await { Err(err) => error!("handle_connection: {remote_pubkey}, {remote_address}, {err:?}"), Ok(out) => { if let (Err(ref err), _) = out { - debug!("send_requests_task: {remote_pubkey}, {remote_address}, {err:?}"); + debug!("send_datagram_task: {remote_pubkey}, {remote_address}, {err:?}"); record_error(err, &stats); } if let (_, Err(ref err)) = out { - debug!("recv_requests_task: {remote_pubkey}, {remote_address}, {err:?}"); + debug!("read_datagram_task: {remote_pubkey}, {remote_address}, {err:?}"); record_error(err, &stats); } } @@ -444,97 +546,42 @@ async fn handle_connection( } } -async fn recv_requests_task( +async fn read_datagram_task( endpoint: Endpoint, remote_address: SocketAddr, remote_pubkey: Pubkey, connection: Connection, - remote_request_sender: Sender, - stats: Arc, -) -> Result<(), Error> { - loop { - let (send_stream, recv_stream) = connection.accept_bi().await?; - tokio::task::spawn(handle_streams_task( - endpoint.clone(), - remote_address, - remote_pubkey, - send_stream, - recv_stream, - remote_request_sender.clone(), - stats.clone(), - )); - } -} - -async fn handle_streams_task( - endpoint: Endpoint, - remote_address: SocketAddr, - remote_pubkey: Pubkey, - send_stream: SendStream, - recv_stream: RecvStream, - remote_request_sender: Sender, + sender: Sender, stats: Arc, -) { - if let Err(err) = handle_streams( - &endpoint, - remote_address, - remote_pubkey, - send_stream, - recv_stream, - &remote_request_sender, - ) - .await - { - debug!("handle_stream: {remote_address}, {remote_pubkey}, {err:?}"); - record_error(&err, &stats); - } -} - -async fn handle_streams( - endpoint: &Endpoint, - remote_address: SocketAddr, - remote_pubkey: Pubkey, - mut send_stream: SendStream, - mut recv_stream: RecvStream, - remote_request_sender: &Sender, -) -> Result<(), Error> { +) -> Result<(), Error> +where + T: From<(Pubkey, SocketAddr, Bytes)>, +{ // Assert that send won't block. - debug_assert_eq!(remote_request_sender.capacity(), None); - const READ_TIMEOUT_DURATION: Duration = Duration::from_secs(2); - let bytes = tokio::time::timeout( - READ_TIMEOUT_DURATION, - recv_stream.read_to_end(PACKET_DATA_SIZE), - ) - .await - .map_err(|_| Error::ReadToEndTimeout)??; - let (response_sender, response_receiver) = tokio::sync::oneshot::channel(); - let remote_request = RemoteRequest { - remote_pubkey: Some(remote_pubkey), - remote_address, - bytes, - response_sender: Some(response_sender), - }; - if let Err(err) = remote_request_sender.send(remote_request) { - close_quic_endpoint(endpoint); - return Err(Error::from(err)); - } - let Ok(response) = response_receiver.await else { - return Err(Error::NoResponseReceived); - }; - for chunk in response { - let size = chunk.len() as u64; - send_stream.write_all(&size.to_le_bytes()).await?; - send_stream.write_all(&chunk).await?; + debug_assert_eq!(sender.capacity(), None); + loop { + match connection.read_datagram().await { + Ok(bytes) => { + let value = T::from((remote_pubkey, remote_address, bytes)); + if let Err(err) = sender.send(value) { + close_quic_endpoint(&endpoint); + return Err(Error::from(err)); + } + } + Err(err) => { + if let Some(err) = connection.close_reason() { + return Err(Error::from(err)); + } + debug!("connection.read_datagram: {remote_pubkey}, {remote_address}, {err:?}"); + record_error(&Error::from(err), &stats); + } + }; } - send_stream.finish().map_err(Error::from) } -async fn send_requests_task( - endpoint: Endpoint, - remote_address: SocketAddr, +async fn send_datagram_task( connection: Connection, - mut receiver: AsyncReceiver, - stats: Arc, + mut receiver: AsyncReceiver, ) -> Result<(), Error> { tokio::pin! { let connection_closed = connection.closed(); @@ -542,97 +589,34 @@ async fn send_requests_task( loop { tokio::select! { biased; - request = receiver.recv() => { - match request { + bytes = receiver.recv() => { + match bytes { None => return Ok(()), - Some(request) => tokio::task::spawn(send_request_task( - endpoint.clone(), - remote_address, - connection.clone(), - request, - stats.clone(), - )), - }; + Some(bytes) => connection.send_datagram(bytes)?, + } } err = &mut connection_closed => return Err(Error::from(err)), } } } -async fn send_request_task( - endpoint: Endpoint, - remote_address: SocketAddr, - connection: Connection, - request: LocalRequest, - stats: Arc, -) { - if let Err(err) = send_request(endpoint, connection, request).await { - debug!("send_request: {remote_address}, {err:?}"); - record_error(&err, &stats); - } -} - -async fn send_request( - endpoint: Endpoint, - connection: Connection, - LocalRequest { - remote_address: _, - bytes, - num_expected_responses, - response_sender, - }: LocalRequest, -) -> Result<(), Error> { - // Assert that send won't block. - debug_assert_eq!(response_sender.capacity(), None); - const READ_TIMEOUT_DURATION: Duration = Duration::from_secs(10); - let (mut send_stream, mut recv_stream) = connection.open_bi().await?; - send_stream.write_all(&bytes).await?; - send_stream.finish()?; - // Each response is at most PACKET_DATA_SIZE bytes and requires - // an additional 8 bytes to encode its length. - let size = PACKET_DATA_SIZE - .saturating_add(8) - .saturating_mul(num_expected_responses); - let response = tokio::time::timeout(READ_TIMEOUT_DURATION, recv_stream.read_to_end(size)) - .await - .map_err(|_| Error::ReadToEndTimeout)??; - let remote_address = connection.remote_address(); - let mut cursor = Cursor::new(&response[..]); - std::iter::repeat_with(|| { - bincode::options() - .with_limit(response.len() as u64) - .with_fixint_encoding() - .allow_trailing_bytes() - .deserialize_from::<_, ByteBuf>(&mut cursor) - .map(ByteBuf::into_vec) - .ok() - }) - .while_some() - .try_for_each(|chunk| { - response_sender - .send((remote_address, chunk)) - .map_err(|err| { - close_quic_endpoint(&endpoint); - Error::from(err) - }) - }) -} - -async fn make_connection_task( +async fn make_connection_task( endpoint: Endpoint, remote_address: SocketAddr, - remote_request_sender: Sender, - receiver: AsyncReceiver, + sender: Sender, + receiver: AsyncReceiver, bank_forks: Arc>, prune_cache_pending: Arc, - router: Arc>>>, + router: Arc>>>, cache: Arc>>, stats: Arc, -) { +) where + T: 'static + From<(Pubkey, SocketAddr, Bytes)> + Send, +{ if let Err(err) = make_connection( endpoint, remote_address, - remote_request_sender, + sender, receiver, bank_forks, prune_cache_pending, @@ -647,17 +631,20 @@ async fn make_connection_task( } } -async fn make_connection( +async fn make_connection( endpoint: Endpoint, remote_address: SocketAddr, - remote_request_sender: Sender, - receiver: AsyncReceiver, + sender: Sender, + receiver: AsyncReceiver, bank_forks: Arc>, prune_cache_pending: Arc, - router: Arc>>>, + router: Arc>>>, cache: Arc>>, stats: Arc, -) -> Result<(), Error> { +) -> Result<(), Error> +where + T: 'static + From<(Pubkey, SocketAddr, Bytes)> + Send, +{ let connection = endpoint .connect(remote_address, CONNECT_SERVER_NAME)? .await?; @@ -666,7 +653,7 @@ async fn make_connection( connection.remote_address(), get_remote_pubkey(&connection)?, connection, - remote_request_sender, + sender, receiver, bank_forks, prune_cache_pending, @@ -696,7 +683,7 @@ async fn cache_connection( connection: Connection, bank_forks: Arc>, prune_cache_pending: Arc, - router: Arc>>>, + router: Arc>>>, cache: Arc>>, ) { let (old, should_prune_cache) = { @@ -741,7 +728,7 @@ async fn drop_connection( async fn prune_connection_cache( bank_forks: Arc>, prune_cache_pending: Arc, - router: Arc>>>, + router: Arc>>>, cache: Arc>>, ) { debug_assert!(prune_cache_pending.load(Ordering::Relaxed)); @@ -782,6 +769,39 @@ async fn prune_connection_cache( router.write().await.retain(|_, sender| !sender.is_closed()); } +impl RemoteRequest { + #[inline] + pub(crate) fn protocol(&self) -> Protocol { + // remote_pubkey is only available with QUIC. + if self.remote_pubkey.is_some() { + Protocol::QUIC + } else { + Protocol::UDP + } + } +} + +impl From<(Pubkey, SocketAddr, Bytes)> for RemoteRequest { + #[inline] + fn from((pubkey, remote_address, bytes): (Pubkey, SocketAddr, Bytes)) -> Self { + Self { + remote_pubkey: Some(pubkey), + remote_address, + bytes, + } + } +} + +impl RepairQuicAsyncSenders { + pub(crate) fn new_dummy() -> Self { + Self { + repair_response_quic_sender: tokio::sync::mpsc::channel(1).0, + repair_request_quic_sender: tokio::sync::mpsc::channel(1).0, + ancestor_hashes_request_quic_sender: tokio::sync::mpsc::channel(1).0, + } + } +} + impl From> for Error { fn from(_: crossbeam_channel::SendError) -> Self { Error::ChannelSendError @@ -800,33 +820,16 @@ struct RepairQuicStats { connection_error_timed_out: AtomicU64, connection_error_transport_error: AtomicU64, connection_error_version_mismatch: AtomicU64, - connection_error_connection_limit_exceeded: AtomicU64, invalid_identity: AtomicU64, - no_response_received: AtomicU64, - read_to_end_error_connection_lost: AtomicU64, - read_to_end_error_illegal_ordered_read: AtomicU64, - read_to_end_error_reset: AtomicU64, - read_to_end_error_too_long: AtomicU64, - read_to_end_error_unknown_stream: AtomicU64, - read_to_end_error_zero_rtt_rejected: AtomicU64, - read_to_end_timeout: AtomicU64, router_try_send_error_full: AtomicU64, - write_error_connection_lost: AtomicU64, - write_error_stopped: AtomicU64, - write_error_unknown_stream: AtomicU64, - write_error_zero_rtt_rejected: AtomicU64, - connect_error_cids_exhausted: AtomicU64, - connect_error_invalid_server_name: AtomicU64, - connection_error_cids_exhausted: AtomicU64, - closed_streams: AtomicU64, - read_to_end_error_closed_stream: AtomicU64, - write_error_closed_stream: AtomicU64, + send_datagram_error_connection_lost: AtomicU64, + send_datagram_error_too_large: AtomicU64, + send_datagram_error_unsupported_by_peer: AtomicU64, } async fn report_metrics_task(name: &'static str, stats: Arc) { - const METRICS_SUBMIT_CADENCE: Duration = Duration::from_secs(2); loop { - tokio::time::sleep(METRICS_SUBMIT_CADENCE).await; + tokio::time::sleep(Duration::from_secs(2)).await; report_metrics(name, &stats); } } @@ -837,6 +840,12 @@ fn record_error(err: &Error, stats: &RepairQuicStats) { Error::ConnectError(ConnectError::EndpointStopping) => { add_metric!(stats.connect_error_other) } + Error::ConnectError(ConnectError::TooManyConnections) => { + add_metric!(stats.connect_error_too_many_connections) + } + Error::ConnectError(ConnectError::InvalidDnsName(_)) => { + add_metric!(stats.connect_error_other) + } Error::ConnectError(ConnectError::InvalidRemoteAddress(_)) => { add_metric!(stats.connect_error_invalid_remote_address) } @@ -867,49 +876,17 @@ fn record_error(err: &Error, stats: &RepairQuicStats) { } Error::InvalidIdentity(_) => add_metric!(stats.invalid_identity), Error::IoError(_) => (), - Error::NoResponseReceived => add_metric!(stats.no_response_received), - Error::ReadToEndError(ReadToEndError::Read(ReadError::Reset(_))) => { - add_metric!(stats.read_to_end_error_reset) - } - Error::ReadToEndError(ReadToEndError::Read(ReadError::ConnectionLost(_))) => { - add_metric!(stats.read_to_end_error_connection_lost) - } - Error::ReadToEndError(ReadToEndError::Read(ReadError::IllegalOrderedRead)) => { - add_metric!(stats.read_to_end_error_illegal_ordered_read) + Error::SendDatagramError(SendDatagramError::UnsupportedByPeer) => { + add_metric!(stats.send_datagram_error_unsupported_by_peer) } - Error::ReadToEndError(ReadToEndError::Read(ReadError::ZeroRttRejected)) => { - add_metric!(stats.read_to_end_error_zero_rtt_rejected) + Error::SendDatagramError(SendDatagramError::Disabled) => (), + Error::SendDatagramError(SendDatagramError::TooLarge) => { + add_metric!(stats.send_datagram_error_too_large) } - Error::ReadToEndError(ReadToEndError::TooLong) => { - add_metric!(stats.read_to_end_error_too_long) + Error::SendDatagramError(SendDatagramError::ConnectionLost(_)) => { + add_metric!(stats.send_datagram_error_connection_lost) } - Error::ReadToEndTimeout => add_metric!(stats.read_to_end_timeout), Error::TlsError(_) => (), - Error::WriteError(WriteError::Stopped(_)) => add_metric!(stats.write_error_stopped), - Error::WriteError(WriteError::ConnectionLost(_)) => { - add_metric!(stats.write_error_connection_lost) - } - Error::WriteError(WriteError::ZeroRttRejected) => { - add_metric!(stats.write_error_zero_rtt_rejected) - } - Error::ConnectError(ConnectError::CidsExhausted) => { - add_metric!(stats.connect_error_cids_exhausted) - } - Error::ConnectError(ConnectError::InvalidServerName(_)) => { - add_metric!(stats.connect_error_invalid_server_name) - } - Error::ConnectionError(ConnectionError::CidsExhausted) => { - add_metric!(stats.connection_error_cids_exhausted) - } - Error::ClosedStream(_) => { - add_metric!(stats.closed_streams) - } - Error::ReadToEndError(ReadToEndError::Read(ReadError::ClosedStream)) => { - add_metric!(stats.read_to_end_error_closed_stream) - } - Error::WriteError(WriteError::ClosedStream) => { - add_metric!(stats.write_error_closed_stream) - } } } @@ -971,79 +948,29 @@ fn report_metrics(name: &'static str, stats: &RepairQuicStats) { reset_metric!(stats.connection_error_version_mismatch), i64 ), - ( - "connection_error_connection_limit_exceeded", - reset_metric!(stats.connection_error_connection_limit_exceeded), - i64 - ), ( "invalid_identity", reset_metric!(stats.invalid_identity), i64 ), - ( - "no_response_received", - reset_metric!(stats.no_response_received), - i64 - ), - ( - "read_to_end_error_connection_lost", - reset_metric!(stats.read_to_end_error_connection_lost), - i64 - ), - ( - "read_to_end_error_illegal_ordered_read", - reset_metric!(stats.read_to_end_error_illegal_ordered_read), - i64 - ), - ( - "read_to_end_error_reset", - reset_metric!(stats.read_to_end_error_reset), - i64 - ), - ( - "read_to_end_error_too_long", - reset_metric!(stats.read_to_end_error_too_long), - i64 - ), - ( - "read_to_end_error_unknown_stream", - reset_metric!(stats.read_to_end_error_unknown_stream), - i64 - ), - ( - "read_to_end_error_zero_rtt_rejected", - reset_metric!(stats.read_to_end_error_zero_rtt_rejected), - i64 - ), - ( - "read_to_end_timeout", - reset_metric!(stats.read_to_end_timeout), - i64 - ), ( "router_try_send_error_full", reset_metric!(stats.router_try_send_error_full), i64 ), ( - "write_error_connection_lost", - reset_metric!(stats.write_error_connection_lost), + "send_datagram_error_connection_lost", + reset_metric!(stats.send_datagram_error_connection_lost), i64 ), ( - "write_error_stopped", - reset_metric!(stats.write_error_stopped), + "send_datagram_error_too_large", + reset_metric!(stats.send_datagram_error_too_large), i64 ), ( - "write_error_unknown_stream", - reset_metric!(stats.write_error_unknown_stream), - i64 - ), - ( - "write_error_zero_rtt_rejected", - reset_metric!(stats.write_error_zero_rtt_rejected), + "send_datagram_error_unsupported_by_peer", + reset_metric!(stats.send_datagram_error_unsupported_by_peer), i64 ), ); @@ -1063,7 +990,7 @@ mod tests { #[test] fn test_quic_endpoint() { const NUM_ENDPOINTS: usize = 3; - const RECV_TIMEOUT: Duration = Duration::from_secs(30); + const RECV_TIMEOUT: Duration = Duration::from_secs(60); let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(8) .enable_all() @@ -1079,8 +1006,8 @@ mod tests { .map(UdpSocket::local_addr) .collect::>() .unwrap(); - let (remote_request_senders, remote_request_receivers): (Vec<_>, Vec<_>) = - repeat_with(crossbeam_channel::unbounded::) + let (senders, receivers): (Vec<_>, Vec<_>) = + repeat_with(crossbeam_channel::unbounded::<(Pubkey, SocketAddr, Bytes)>) .take(NUM_ENDPOINTS) .unzip(); let bank_forks = { @@ -1089,84 +1016,35 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); BankForks::new_rw_arc(bank) }; - let (endpoints, senders, tasks): (Vec<_>, Vec<_>, Vec<_>) = multiunzip( - keypairs - .iter() - .zip(sockets) - .zip(remote_request_senders) - .map(|((keypair, socket), remote_request_sender)| { + let (endpoints, senders, tasks): (Vec<_>, Vec<_>, Vec<_>) = + multiunzip(keypairs.iter().zip(sockets).zip(senders).map( + |((keypair, socket), sender)| { new_quic_endpoint( runtime.handle(), + "test_quic_client", + "test_quic_server", keypair, socket, - remote_request_sender, + sender, bank_forks.clone(), ) .unwrap() - }), - ); - let (response_senders, response_receivers): (Vec<_>, Vec<_>) = - repeat_with(crossbeam_channel::unbounded::<(SocketAddr, Vec)>) - .take(NUM_ENDPOINTS) - .unzip(); - // Send a unique request from each endpoint to every other endpoint. + }, + )); + // Send a unique message from each endpoint to every other endpoint. for (i, (keypair, &address, sender)) in izip!(&keypairs, &addresses, &senders).enumerate() { - for (j, (&remote_address, response_sender)) in - addresses.iter().zip(&response_senders).enumerate() - { - if i != j { - let mut bytes: Vec = format!("{i}=>{j}").into_bytes(); - bytes.resize(PACKET_DATA_SIZE, 0xa5); - let request = LocalRequest { - remote_address, - bytes, - num_expected_responses: j + 1, - response_sender: response_sender.clone(), - }; - sender.blocking_send(request).unwrap(); - } - } - // Verify all requests are received and respond to each. - for (j, remote_request_receiver) in remote_request_receivers.iter().enumerate() { + for (j, &address) in addresses.iter().enumerate() { if i != j { - let RemoteRequest { - remote_pubkey, - remote_address, - bytes, - response_sender, - } = remote_request_receiver.recv_timeout(RECV_TIMEOUT).unwrap(); - assert_eq!(remote_pubkey, Some(keypair.pubkey())); - assert_eq!(remote_address, address); - assert_eq!(bytes, { - let mut bytes = format!("{i}=>{j}").into_bytes(); - bytes.resize(PACKET_DATA_SIZE, 0xa5); - bytes - }); - let response: Vec> = (0..=j) - .map(|k| { - let mut bytes = format!("{j}=>{i}({k})").into_bytes(); - bytes.resize(PACKET_DATA_SIZE, 0xd5); - bytes - }) - .collect(); - response_sender.unwrap().send(response).unwrap(); + let bytes = Bytes::from(format!("{i}=>{j}")); + sender.blocking_send((address, bytes)).unwrap(); } } - // Verify responses. - for (j, (&remote_address, response_receiver)) in - addresses.iter().zip(&response_receivers).enumerate() - { + // Verify all messages are received. + for (j, receiver) in receivers.iter().enumerate() { if i != j { - for k in 0..=j { - let (address, response) = - response_receiver.recv_timeout(RECV_TIMEOUT).unwrap(); - assert_eq!(address, remote_address); - assert_eq!(response, { - let mut bytes = format!("{j}=>{i}({k})").into_bytes(); - bytes.resize(PACKET_DATA_SIZE, 0xd5); - bytes - }); - } + let bytes = Bytes::from(format!("{i}=>{j}")); + let entry = (keypair.pubkey(), address, bytes); + assert_eq!(receiver.recv_timeout(RECV_TIMEOUT).unwrap(), entry); } } } diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index f69aea596960e2..878d2ab9dfb9f4 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -13,7 +13,6 @@ use { ancestor_hashes_service::{AncestorHashesReplayUpdateReceiver, AncestorHashesService}, duplicate_repair_status::AncestorDuplicateSlotToRepair, outstanding_requests::OutstandingRequests, - quic_endpoint::LocalRequest, repair_weight::RepairWeight, serve_repair::{ self, RepairProtocol, RepairRequestHeader, ServeRepair, ShredRepairType, @@ -21,6 +20,7 @@ use { }, }, }, + bytes::Bytes, crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}, lru::LruCache, rand::seq::SliceRandom, @@ -254,8 +254,9 @@ impl RepairService { exit: Arc, repair_socket: Arc, ancestor_hashes_socket: Arc, - quic_endpoint_sender: AsyncSender, - quic_endpoint_response_sender: CrossbeamSender<(SocketAddr, Vec)>, + repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>, + ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>, + ancestor_hashes_response_quic_receiver: CrossbeamReceiver<(Pubkey, SocketAddr, Bytes)>, repair_info: RepairInfo, verified_vote_receiver: VerifiedVoteReceiver, outstanding_requests: Arc>, @@ -267,7 +268,6 @@ impl RepairService { let blockstore = blockstore.clone(); let exit = exit.clone(); let repair_info = repair_info.clone(); - let quic_endpoint_sender = quic_endpoint_sender.clone(); Builder::new() .name("solRepairSvc".to_string()) .spawn(move || { @@ -275,8 +275,7 @@ impl RepairService { &blockstore, &exit, &repair_socket, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &repair_request_quic_sender, repair_info, verified_vote_receiver, &outstanding_requests, @@ -291,7 +290,8 @@ impl RepairService { exit, blockstore, ancestor_hashes_socket, - quic_endpoint_sender, + ancestor_hashes_request_quic_sender, + ancestor_hashes_response_quic_receiver, repair_info, ancestor_hashes_replay_update_receiver, ); @@ -307,8 +307,7 @@ impl RepairService { blockstore: &Blockstore, exit: &AtomicBool, repair_socket: &UdpSocket, - quic_endpoint_sender: &AsyncSender, - quic_endpoint_response_sender: &CrossbeamSender<(SocketAddr, Vec)>, + repair_request_quic_sender: &AsyncSender<(SocketAddr, Bytes)>, repair_info: RepairInfo, verified_vote_receiver: VerifiedVoteReceiver, outstanding_requests: &RwLock, @@ -464,8 +463,7 @@ impl RepairService { &repair_info.repair_validators, &mut outstanding_requests, identity_keypair, - quic_endpoint_sender, - quic_endpoint_response_sender, + repair_request_quic_sender, repair_protocol, ) .ok()??; @@ -1124,8 +1122,7 @@ mod test { let remote_request = RemoteRequest { remote_pubkey: None, remote_address: packet.meta().socket_addr(), - bytes, - response_sender: None, + bytes: Bytes::from(bytes), }; // Deserialize and check the request diff --git a/core/src/repair/serve_repair.rs b/core/src/repair/serve_repair.rs index a1887cf689bb09..5ad270928c3ed9 100644 --- a/core/src/repair/serve_repair.rs +++ b/core/src/repair/serve_repair.rs @@ -3,7 +3,7 @@ use { cluster_slots_service::cluster_slots::ClusterSlots, repair::{ duplicate_repair_status::get_ancestor_hash_repair_sample_size, - quic_endpoint::{LocalRequest, RemoteRequest}, + quic_endpoint::RemoteRequest, repair_response, repair_service::{OutstandingShredRepairs, RepairStats, REPAIR_MS}, request_response::RequestResponse, @@ -11,7 +11,8 @@ use { }, }, bincode::{serialize, Options}, - crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, + bytes::Bytes, + crossbeam_channel::{Receiver, RecvTimeoutError}, lru::LruCache, rand::{ distributions::{Distribution, WeightedError, WeightedIndex}, @@ -59,7 +60,7 @@ use { thread::{Builder, JoinHandle}, time::{Duration, Instant}, }, - tokio::sync::{mpsc::Sender as AsyncSender, oneshot::Sender as OneShotSender}, + tokio::sync::mpsc::Sender as AsyncSender, }; /// the number of slots to respond with when responding to `Orphan` requests @@ -393,9 +394,9 @@ impl RepairPeers { struct RepairRequestWithMeta { request: RepairProtocol, from_addr: SocketAddr, + protocol: Protocol, stake: u64, whitelisted: bool, - response_sender: Option>>>, } impl ServeRepair { @@ -563,9 +564,9 @@ impl ServeRepair { Ok(RepairRequestWithMeta { request, from_addr, + protocol: remote_request.protocol(), stake, whitelisted, - response_sender: remote_request.response_sender, }) } @@ -636,6 +637,7 @@ impl ServeRepair { blockstore: &Blockstore, requests_receiver: &Receiver, response_sender: &PacketBatchSender, + repair_response_quic_sender: &AsyncSender<(SocketAddr, Bytes)>, stats: &mut ServeRepairStats, data_budget: &DataBudget, ) -> std::result::Result<(), RecvTimeoutError> { @@ -710,6 +712,7 @@ impl ServeRepair { blockstore, decoded_requests, response_sender, + repair_response_quic_sender, stats, data_budget, ); @@ -806,11 +809,12 @@ impl ServeRepair { *stats = ServeRepairStats::default(); } - pub fn listen( + pub(crate) fn listen( self, blockstore: Arc, requests_receiver: Receiver, response_sender: PacketBatchSender, + repair_response_quic_sender: AsyncSender<(SocketAddr, Bytes)>, exit: Arc, ) -> JoinHandle<()> { const INTERVAL_MS: u64 = 1000; @@ -840,6 +844,7 @@ impl ServeRepair { &blockstore, &requests_receiver, &response_sender, + &repair_response_quic_sender, &mut stats, &data_budget, ); @@ -967,6 +972,7 @@ impl ServeRepair { blockstore: &Blockstore, requests: Vec, packet_batch_sender: &PacketBatchSender, + repair_response_quic_sender: &AsyncSender<(SocketAddr, Bytes)>, stats: &mut ServeRepairStats, data_budget: &DataBudget, ) { @@ -976,9 +982,9 @@ impl ServeRepair { for RepairRequestWithMeta { request, from_addr, + protocol, stake, whitelisted: _, - response_sender, } in requests.into_iter() { if !data_budget.check(request.max_response_bytes()) { @@ -986,7 +992,7 @@ impl ServeRepair { continue; } // Bypass ping/pong check for requests coming from QUIC endpoint. - if !matches!(&request, RepairProtocol::Pong(_)) && response_sender.is_none() { + if !matches!(&request, RepairProtocol::Pong(_)) && protocol == Protocol::UDP { let (check, ping_pkt) = Self::check_ping_cache(ping_cache, &request, &from_addr, &identity_keypair); if let Some(ping_pkt) = ping_pkt { @@ -1006,7 +1012,12 @@ impl ServeRepair { let num_response_packets = rsp.len(); let num_response_bytes = rsp.iter().map(|p| p.meta().size).sum(); if data_budget.take(num_response_bytes) - && send_response(rsp, packet_batch_sender, response_sender) + && send_response( + rsp, + protocol, + packet_batch_sender, + repair_response_quic_sender, + ) { stats.total_response_packets += num_response_packets; match stake > 0 { @@ -1057,8 +1068,7 @@ impl ServeRepair { repair_validators: &Option>, outstanding_requests: &mut OutstandingShredRepairs, identity_keypair: &Keypair, - quic_endpoint_sender: &AsyncSender, - quic_endpoint_response_sender: &Sender<(SocketAddr, Vec)>, + repair_request_quic_sender: &AsyncSender<(SocketAddr, Bytes)>, repair_protocol: Protocol, ) -> Result)>> { // find a peer that appears to be accepting replication and has the desired slot, as indicated @@ -1092,18 +1102,10 @@ impl ServeRepair { match repair_protocol { Protocol::UDP => Ok(Some((peer.serve_repair, out))), Protocol::QUIC => { - let num_expected_responses = - usize::try_from(repair_request.num_expected_responses()).unwrap(); - let request = LocalRequest { - remote_address: peer.serve_repair_quic, - bytes: out, - num_expected_responses, - response_sender: quic_endpoint_response_sender.clone(), - }; - quic_endpoint_sender - .blocking_send(request) - .map_err(|_| Error::SendError) - .map(|()| None) + repair_request_quic_sender + .blocking_send((peer.serve_repair_quic, Bytes::from(out))) + .map_err(|_| Error::SendError)?; + Ok(None) } } } @@ -1420,19 +1422,19 @@ where // Returns true on success. fn send_response( packets: PacketBatch, + protocol: Protocol, packet_batch_sender: &PacketBatchSender, - response_sender: Option>>>, + repair_response_quic_sender: &AsyncSender<(SocketAddr, Bytes)>, ) -> bool { - match response_sender { - None => packet_batch_sender.send(packets).is_ok(), - Some(response_sender) => { - let response = packets - .iter() - .filter_map(|packet| packet.data(..)) - .map(Vec::from) - .collect(); - response_sender.send(response).is_ok() - } + match protocol { + Protocol::UDP => packet_batch_sender.send(packets).is_ok(), + Protocol::QUIC => packets + .iter() + .filter_map(|packet| { + let bytes = Bytes::from(Vec::from(packet.data(..)?)); + Some((packet.meta().socket_addr(), bytes)) + }) + .all(|packet| repair_response_quic_sender.blocking_send(packet).is_ok()), } } @@ -1507,8 +1509,7 @@ mod tests { RemoteRequest { remote_pubkey: None, remote_address: packet.meta().socket_addr(), - bytes: packet.data(..).map(Vec::from).unwrap(), - response_sender: None, + bytes: Bytes::from(Vec::from(packet.data(..).unwrap())), } } @@ -2002,10 +2003,7 @@ mod tests { ); let identity_keypair = cluster_info.keypair().clone(); let mut outstanding_requests = OutstandingShredRepairs::default(); - let (quic_endpoint_sender, _quic_endpoint_receiver) = - tokio::sync::mpsc::channel(/*buffer:*/ 128); - let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = - crossbeam_channel::unbounded(); + let (repair_request_quic_sender, _) = tokio::sync::mpsc::channel(/*buffer:*/ 128); let rv = serve_repair.repair_request( &cluster_slots, ShredRepairType::Shred(0, 0), @@ -2014,8 +2012,7 @@ mod tests { &None, &mut outstanding_requests, &identity_keypair, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &repair_request_quic_sender, Protocol::UDP, // repair_protocol ); assert_matches!(rv, Err(Error::ClusterInfo(ClusterInfoError::NoPeers))); @@ -2047,8 +2044,7 @@ mod tests { &None, &mut outstanding_requests, &identity_keypair, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &repair_request_quic_sender, Protocol::UDP, // repair_protocol ) .unwrap() @@ -2087,8 +2083,7 @@ mod tests { &None, &mut outstanding_requests, &identity_keypair, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &repair_request_quic_sender, Protocol::UDP, // repair_protocol ) .unwrap() @@ -2327,10 +2322,7 @@ mod tests { let cluster_slots = ClusterSlots::default(); let cluster_info = Arc::new(new_test_cluster_info()); let me = cluster_info.my_contact_info(); - let (quic_endpoint_sender, _quic_endpoint_receiver) = - tokio::sync::mpsc::channel(/*buffer:*/ 128); - let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = - crossbeam_channel::unbounded(); + let (repair_request_quic_sender, _) = tokio::sync::mpsc::channel(/*buffer:*/ 128); // Insert two peers on the network let contact_info2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp()); @@ -2361,8 +2353,7 @@ mod tests { &known_validators, &mut OutstandingShredRepairs::default(), &identity_keypair, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &repair_request_quic_sender, Protocol::UDP, // repair_protocol ), Err(Error::ClusterInfo(ClusterInfoError::NoPeers)) @@ -2383,8 +2374,7 @@ mod tests { &known_validators, &mut OutstandingShredRepairs::default(), &identity_keypair, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &repair_request_quic_sender, Protocol::UDP, // repair_protocol ), Ok(Some(_)) @@ -2409,8 +2399,7 @@ mod tests { &None, &mut OutstandingShredRepairs::default(), &identity_keypair, - &quic_endpoint_sender, - &quic_endpoint_response_sender, + &repair_request_quic_sender, Protocol::UDP, // repair_protocol ), Ok(Some(_)) diff --git a/core/src/repair/serve_repair_service.rs b/core/src/repair/serve_repair_service.rs index 2be1a6712045de..8801defbbb8cc2 100644 --- a/core/src/repair/serve_repair_service.rs +++ b/core/src/repair/serve_repair_service.rs @@ -1,5 +1,6 @@ use { crate::repair::{quic_endpoint::RemoteRequest, serve_repair::ServeRepair}, + bytes::Bytes, crossbeam_channel::{unbounded, Receiver, Sender}, solana_ledger::blockstore::Blockstore, solana_perf::{packet::PacketBatch, recycler::Recycler}, @@ -8,11 +9,12 @@ use { streamer::{self, StreamerReceiveStats}, }, std::{ - net::UdpSocket, + net::{SocketAddr, UdpSocket}, sync::{atomic::AtomicBool, Arc}, thread::{self, Builder, JoinHandle}, time::Duration, }, + tokio::sync::mpsc::Sender as AsyncSender, }; pub struct ServeRepairService { @@ -20,10 +22,11 @@ pub struct ServeRepairService { } impl ServeRepairService { - pub fn new( + pub(crate) fn new( serve_repair: ServeRepair, remote_request_sender: Sender, remote_request_receiver: Receiver, + repair_response_quic_sender: AsyncSender<(SocketAddr, Bytes)>, blockstore: Arc, serve_repair_socket: UdpSocket, socket_addr_space: SocketAddrSpace, @@ -61,8 +64,13 @@ impl ServeRepairService { socket_addr_space, Some(stats_reporter_sender), ); - let t_listen = - serve_repair.listen(blockstore, remote_request_receiver, response_sender, exit); + let t_listen = serve_repair.listen( + blockstore, + remote_request_receiver, + response_sender, + repair_response_quic_sender, + exit, + ); let thread_hdls = vec![t_receiver, t_packet_adapter, t_responder, t_listen]; Self { thread_hdls } @@ -86,8 +94,7 @@ pub(crate) fn adapt_repair_requests_packets( let request = RemoteRequest { remote_pubkey: None, remote_address: packet.meta().socket_addr(), - bytes, - response_sender: None, + bytes: Bytes::from(bytes), }; if remote_request_sender.send(request).is_err() { return; // The receiver end of the channel is disconnected. diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 0776674a4748c1..6d4e6172af46b2 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -201,8 +201,8 @@ impl ShredFetchStage { pub(crate) fn new( sockets: Vec>, turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, + repair_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, repair_socket: Arc, - repair_quic_endpoint_receiver: Receiver<(SocketAddr, Vec)>, sender: Sender, shred_version: u16, bank_forks: Arc>, @@ -257,8 +257,9 @@ impl ShredFetchStage { Builder::new() .name("solTvuRecvRpr".to_string()) .spawn(|| { - receive_repair_quic_packets( - repair_quic_endpoint_receiver, + receive_quic_datagrams( + repair_response_quic_receiver, + PacketFlags::REPAIR, packet_sender, recycler, exit, @@ -290,6 +291,7 @@ impl ShredFetchStage { .spawn(|| { receive_quic_datagrams( turbine_quic_endpoint_receiver, + PacketFlags::empty(), packet_sender, recycler, exit, @@ -325,15 +327,16 @@ impl ShredFetchStage { } } -fn receive_quic_datagrams( - turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, +pub(crate) fn receive_quic_datagrams( + quic_datagrams_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, + flags: PacketFlags, sender: Sender, recycler: PacketBatchRecycler, exit: Arc, ) { const RECV_TIMEOUT: Duration = Duration::from_secs(1); while !exit.load(Ordering::Relaxed) { - let entry = match turbine_quic_endpoint_receiver.recv_timeout(RECV_TIMEOUT) { + let entry = match quic_datagrams_receiver.recv_timeout(RECV_TIMEOUT) { Ok(entry) => entry, Err(RecvTimeoutError::Timeout) => continue, Err(RecvTimeoutError::Disconnected) => return, @@ -345,7 +348,7 @@ fn receive_quic_datagrams( }; let deadline = Instant::now() + PACKET_COALESCE_DURATION; let entries = std::iter::once(entry).chain( - std::iter::repeat_with(|| turbine_quic_endpoint_receiver.recv_deadline(deadline).ok()) + std::iter::repeat_with(|| quic_datagrams_receiver.recv_deadline(deadline).ok()) .while_some(), ); let size = entries @@ -356,52 +359,7 @@ fn receive_quic_datagrams( size: bytes.len(), addr: addr.ip(), port: addr.port(), - flags: PacketFlags::empty(), - }; - packet.buffer_mut()[..bytes.len()].copy_from_slice(&bytes); - }) - .count(); - if size > 0 { - packet_batch.truncate(size); - if sender.send(packet_batch).is_err() { - return; - } - } - } -} - -pub(crate) fn receive_repair_quic_packets( - repair_quic_endpoint_receiver: Receiver<(SocketAddr, Vec)>, - sender: Sender, - recycler: PacketBatchRecycler, - exit: Arc, -) { - const RECV_TIMEOUT: Duration = Duration::from_secs(1); - while !exit.load(Ordering::Relaxed) { - let entry = match repair_quic_endpoint_receiver.recv_timeout(RECV_TIMEOUT) { - Ok(entry) => entry, - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => return, - }; - let mut packet_batch = - PacketBatch::new_with_recycler(&recycler, PACKETS_PER_BATCH, "receive_quic_datagrams"); - unsafe { - packet_batch.set_len(PACKETS_PER_BATCH); - }; - let deadline = Instant::now() + PACKET_COALESCE_DURATION; - let entries = std::iter::once(entry).chain( - std::iter::repeat_with(|| repair_quic_endpoint_receiver.recv_deadline(deadline).ok()) - .while_some(), - ); - let size = entries - .filter(|(_, bytes)| bytes.len() <= PACKET_DATA_SIZE) - .zip(packet_batch.iter_mut()) - .map(|((addr, bytes), packet)| { - *packet.meta_mut() = Meta { - size: bytes.len(), - addr: addr.ip(), - port: addr.port(), - flags: PacketFlags::REPAIR, + flags, }; packet.buffer_mut()[..bytes.len()].copy_from_slice(&bytes); }) diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 083ff02bbb4abc..9938bcf1ab846a 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -14,10 +14,7 @@ use { consensus::{tower_storage::TowerStorage, Tower}, cost_update_service::CostUpdateService, drop_bank_service::DropBankService, - repair::{ - quic_endpoint::LocalRequest, - repair_service::{OutstandingShredRepairs, RepairInfo}, - }, + repair::repair_service::{OutstandingShredRepairs, RepairInfo}, replay_stage::{ReplayStage, ReplayStageConfig}, rewards_recorder_service::RewardsRecorderSender, shred_fetch_stage::ShredFetchStage, @@ -155,7 +152,10 @@ impl Tvu { banking_tracer: Arc, turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>, turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, - repair_quic_endpoint_sender: AsyncSender, + repair_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, + repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>, + ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>, + ancestor_hashes_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, outstanding_repair_requests: Arc>, cluster_slots: Arc, wen_restart_repair_slots: Option>>>, @@ -174,13 +174,11 @@ impl Tvu { let repair_socket = Arc::new(repair_socket); let ancestor_hashes_socket = Arc::new(ancestor_hashes_socket); let fetch_sockets: Vec> = fetch_sockets.into_iter().map(Arc::new).collect(); - let (repair_quic_endpoint_response_sender, repair_quic_endpoint_response_receiver) = - unbounded(); let fetch_stage = ShredFetchStage::new( fetch_sockets, turbine_quic_endpoint_receiver, + repair_response_quic_receiver, repair_socket.clone(), - repair_quic_endpoint_response_receiver, fetch_sender, tvu_config.shred_version, bank_forks.clone(), @@ -240,8 +238,9 @@ impl Tvu { retransmit_sender, repair_socket, ancestor_hashes_socket, - repair_quic_endpoint_sender, - repair_quic_endpoint_response_sender, + repair_request_quic_sender, + ancestor_hashes_request_quic_sender, + ancestor_hashes_response_quic_receiver, exit.clone(), repair_info, leader_schedule_cache.clone(), @@ -405,7 +404,10 @@ impl Tvu { pub mod tests { use { super::*, - crate::consensus::tower_storage::FileTowerStorage, + crate::{ + consensus::tower_storage::FileTowerStorage, + repair::quic_endpoint::RepairQuicAsyncSenders, + }, serial_test::serial, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_ledger::{ @@ -436,8 +438,9 @@ pub mod tests { let (turbine_quic_endpoint_sender, _turbine_quic_endpoint_receiver) = tokio::sync::mpsc::channel(/*capacity:*/ 128); let (_turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded(); - let (repair_quic_endpoint_sender, _repair_quic_endpoint_receiver) = - tokio::sync::mpsc::channel(/*buffer:*/ 128); + let (_, repair_response_quic_receiver) = unbounded(); + let repair_quic_async_senders = RepairQuicAsyncSenders::new_dummy(); + let (_, ancestor_hashes_response_quic_receiver) = unbounded(); //start cluster_info1 let cluster_info1 = ClusterInfo::new( target1.info.clone(), @@ -529,7 +532,10 @@ pub mod tests { BankingTracer::new_disabled(), turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver, - repair_quic_endpoint_sender, + repair_response_quic_receiver, + repair_quic_async_senders.repair_request_quic_sender, + repair_quic_async_senders.ancestor_hashes_request_quic_sender, + ancestor_hashes_response_quic_receiver, outstanding_repair_requests, cluster_slots, wen_restart_repair_slots, diff --git a/core/src/validator.rs b/core/src/validator.rs index 7915c2117f4f8b..fce0f5f83a0e39 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -15,7 +15,12 @@ use { ExternalRootSource, Tower, }, poh_timing_report_service::PohTimingReportService, - repair::{self, serve_repair::ServeRepair, serve_repair_service::ServeRepairService}, + repair::{ + self, + quic_endpoint::{RepairQuicAsyncSenders, RepairQuicSenders, RepairQuicSockets}, + serve_repair::ServeRepair, + serve_repair_service::ServeRepairService, + }, rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService}, sample_performance_service::SamplePerformanceService, sigverify, @@ -499,9 +504,9 @@ pub struct Validator { turbine_quic_endpoint: Option, turbine_quic_endpoint_runtime: Option, turbine_quic_endpoint_join_handle: Option, - repair_quic_endpoint: Option, - repair_quic_endpoint_runtime: Option, - repair_quic_endpoint_join_handle: Option, + repair_quic_endpoints: Option<[Endpoint; 3]>, + repair_quic_endpoints_runtime: Option, + repair_quic_endpoints_join_handle: Option, } impl Validator { @@ -1158,19 +1163,10 @@ impl Validator { bank_forks.clone(), config.repair_whitelist.clone(), ); - let (repair_quic_endpoint_sender, repair_quic_endpoint_receiver) = unbounded(); - let serve_repair_service = ServeRepairService::new( - serve_repair, - // Incoming UDP repair requests are adapted into RemoteRequest - // and also sent through the same channel. - repair_quic_endpoint_sender.clone(), - repair_quic_endpoint_receiver, - blockstore.clone(), - node.sockets.serve_repair, - socket_addr_space, - stats_reporter_sender, - exit.clone(), - ); + let (repair_request_quic_sender, repair_request_quic_receiver) = unbounded(); + let (repair_response_quic_sender, repair_response_quic_receiver) = unbounded(); + let (ancestor_hashes_response_quic_sender, ancestor_hashes_response_quic_receiver) = + unbounded(); let waited_for_supermajority = wait_for_supermajority( config, @@ -1267,7 +1263,7 @@ impl Validator { }; // Repair quic endpoint. - let repair_quic_endpoint_runtime = (current_runtime_handle.is_err() + let repair_quic_endpoints_runtime = (current_runtime_handle.is_err() && genesis_config.cluster_type != ClusterType::MainnetBeta) .then(|| { tokio::runtime::Builder::new_multi_thread() @@ -1276,24 +1272,48 @@ impl Validator { .build() .unwrap() }); - let (repair_quic_endpoint, repair_quic_endpoint_sender, repair_quic_endpoint_join_handle) = + let (repair_quic_endpoints, repair_quic_async_senders, repair_quic_endpoints_join_handle) = if genesis_config.cluster_type == ClusterType::MainnetBeta { - let (sender, _receiver) = tokio::sync::mpsc::channel(1); - (None, sender, None) + (None, RepairQuicAsyncSenders::new_dummy(), None) } else { - repair::quic_endpoint::new_quic_endpoint( - repair_quic_endpoint_runtime + let repair_quic_sockets = RepairQuicSockets { + repair_service_quic_socket: node.sockets.serve_repair_quic, + repair_client_quic_socket: node.sockets.repair_quic, + ancestor_hashes_quic_socket: node.sockets.ancestor_hashes_requests_quic, + }; + let repair_quic_senders = RepairQuicSenders { + repair_request_quic_sender: repair_request_quic_sender.clone(), + repair_response_quic_sender, + ancestor_hashes_response_quic_sender, + }; + repair::quic_endpoint::new_quic_endpoints( + repair_quic_endpoints_runtime .as_ref() .map(TokioRuntime::handle) .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()), &identity_keypair, - node.sockets.serve_repair_quic, - repair_quic_endpoint_sender, + repair_quic_sockets, + repair_quic_senders, bank_forks.clone(), ) - .map(|(endpoint, sender, join_handle)| (Some(endpoint), sender, Some(join_handle))) + .map(|(endpoints, senders, join_handle)| { + (Some(endpoints), senders, Some(join_handle)) + }) .unwrap() }; + let serve_repair_service = ServeRepairService::new( + serve_repair, + // Incoming UDP repair requests are adapted into RemoteRequest + // and also sent through the same channel. + repair_request_quic_sender, + repair_request_quic_receiver, + repair_quic_async_senders.repair_response_quic_sender, + blockstore.clone(), + node.sockets.serve_repair, + socket_addr_space, + stats_reporter_sender, + exit.clone(), + ); let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority; let wen_restart_repair_slots = if in_wen_restart { @@ -1373,7 +1393,10 @@ impl Validator { banking_tracer.clone(), turbine_quic_endpoint_sender.clone(), turbine_quic_endpoint_receiver, - repair_quic_endpoint_sender, + repair_response_quic_receiver, + repair_quic_async_senders.repair_request_quic_sender, + repair_quic_async_senders.ancestor_hashes_request_quic_sender, + ancestor_hashes_response_quic_receiver, outstanding_repair_requests.clone(), cluster_slots.clone(), wen_restart_repair_slots.clone(), @@ -1501,9 +1524,9 @@ impl Validator { turbine_quic_endpoint, turbine_quic_endpoint_runtime, turbine_quic_endpoint_join_handle, - repair_quic_endpoint, - repair_quic_endpoint_runtime, - repair_quic_endpoint_join_handle, + repair_quic_endpoints, + repair_quic_endpoints_runtime, + repair_quic_endpoints_join_handle, }) } @@ -1615,18 +1638,19 @@ impl Validator { } self.gossip_service.join().expect("gossip_service"); - if let Some(repair_quic_endpoint) = &self.repair_quic_endpoint { - repair::quic_endpoint::close_quic_endpoint(repair_quic_endpoint); - } + self.repair_quic_endpoints + .iter() + .flatten() + .for_each(repair::quic_endpoint::close_quic_endpoint); self.serve_repair_service .join() .expect("serve_repair_service"); - if let Some(repair_quic_endpoint_join_handle) = self.repair_quic_endpoint_join_handle { - self.repair_quic_endpoint_runtime - .map(|runtime| runtime.block_on(repair_quic_endpoint_join_handle)) + if let Some(repair_quic_endpoints_join_handle) = self.repair_quic_endpoints_join_handle { + self.repair_quic_endpoints_runtime + .map(|runtime| runtime.block_on(repair_quic_endpoints_join_handle)) .transpose() .unwrap(); - }; + } self.stats_reporter_service .join() .expect("stats_reporter_service"); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 3056090cf9ba94..0d2e0b75317597 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -8,7 +8,6 @@ use { completed_data_sets_service::CompletedDataSetsSender, repair::{ ancestor_hashes_service::AncestorHashesReplayUpdateReceiver, - quic_endpoint::LocalRequest, repair_response, repair_service::{ DumpedSlotsReceiver, OutstandingShredRepairs, PopularPrunedForksSender, RepairInfo, @@ -17,6 +16,7 @@ use { }, result::{Error, Result}, }, + bytes::Bytes, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, rayon::{prelude::*, ThreadPool}, solana_feature_set as feature_set, @@ -31,7 +31,10 @@ use { solana_perf::packet::{Packet, PacketBatch}, solana_rayon_threadlimit::get_thread_count, solana_runtime::bank_forks::BankForks, - solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT}, + solana_sdk::{ + clock::{Slot, DEFAULT_MS_PER_SLOT}, + pubkey::Pubkey, + }, solana_turbine::cluster_nodes, std::{ cmp::Reverse, @@ -374,8 +377,9 @@ impl WindowService { retransmit_sender: Sender>, repair_socket: Arc, ancestor_hashes_socket: Arc, - repair_quic_endpoint_sender: AsyncSender, - repair_quic_endpoint_response_sender: Sender<(SocketAddr, Vec)>, + repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>, + ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>, + ancestor_hashes_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, exit: Arc, repair_info: RepairInfo, leader_schedule_cache: Arc, @@ -399,8 +403,9 @@ impl WindowService { exit.clone(), repair_socket, ancestor_hashes_socket, - repair_quic_endpoint_sender, - repair_quic_endpoint_response_sender, + repair_request_quic_sender, + ancestor_hashes_request_quic_sender, + ancestor_hashes_response_quic_receiver, repair_info, verified_vote_receiver, outstanding_repair_requests.clone(), diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index f0916970c9cae6..8fa99ad85b49ff 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2874,11 +2874,19 @@ pub struct Sockets { pub tpu_forwards: Vec, pub tpu_vote: Vec, pub broadcast: Vec, + // Socket sending out local repair requests, + // and receiving repair responses from the cluster. pub repair: UdpSocket, + pub repair_quic: UdpSocket, pub retransmit_sockets: Vec, + // Socket receiving remote repair requests from the cluster, + // and sending back repair responses. pub serve_repair: UdpSocket, pub serve_repair_quic: UdpSocket, + // Socket sending out local RepairProtocol::AncestorHashes, + // and receiving AncestorHashesResponse from the cluster. pub ancestor_hashes_requests: UdpSocket, + pub ancestor_hashes_requests_quic: UdpSocket, pub tpu_quic: Vec, pub tpu_forwards_quic: Vec, } @@ -2951,6 +2959,7 @@ impl Node { bind_more_with_config(tpu_forwards_quic, num_quic_endpoints, quic_config).unwrap(); let tpu_vote = UdpSocket::bind(&localhost_bind_addr).unwrap(); let repair = UdpSocket::bind(&localhost_bind_addr).unwrap(); + let repair_quic = UdpSocket::bind(&localhost_bind_addr).unwrap(); let rpc_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap(); let rpc_addr = SocketAddr::new(localhost_ip_addr, rpc_port); let rpc_pubsub_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap(); @@ -2960,6 +2969,7 @@ impl Node { let serve_repair = UdpSocket::bind(&localhost_bind_addr).unwrap(); let serve_repair_quic = UdpSocket::bind(&localhost_bind_addr).unwrap(); let ancestor_hashes_requests = UdpSocket::bind(&unspecified_bind_addr).unwrap(); + let ancestor_hashes_requests_quic = UdpSocket::bind(&unspecified_bind_addr).unwrap(); let mut info = ContactInfo::new( *pubkey, @@ -3008,10 +3018,12 @@ impl Node { tpu_vote: vec![tpu_vote], broadcast, repair, + repair_quic, retransmit_sockets: vec![retransmit_socket], serve_repair, serve_repair_quic, ancestor_hashes_requests, + ancestor_hashes_requests_quic, tpu_quic, tpu_forwards_quic, }, @@ -3084,10 +3096,12 @@ impl Node { let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range); let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range); let (_, repair) = Self::bind(bind_ip_addr, port_range); + let (_, repair_quic) = Self::bind(bind_ip_addr, port_range); let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range); let (serve_repair_quic_port, serve_repair_quic) = Self::bind(bind_ip_addr, port_range); let (_, broadcast) = Self::bind(bind_ip_addr, port_range); let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range); + let (_, ancestor_hashes_requests_quic) = Self::bind(bind_ip_addr, port_range); let rpc_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap(); let rpc_pubsub_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap(); @@ -3134,10 +3148,12 @@ impl Node { tpu_vote: vec![tpu_vote], broadcast: vec![broadcast], repair, + repair_quic, retransmit_sockets: vec![retransmit_socket], serve_repair, serve_repair_quic, ancestor_hashes_requests, + ancestor_hashes_requests_quic, tpu_quic, tpu_forwards_quic, }, @@ -3199,6 +3215,7 @@ impl Node { multi_bind_in_range(bind_ip_addr, port_range, 8).expect("retransmit multi_bind"); let (_, repair) = Self::bind(bind_ip_addr, port_range); + let (_, repair_quic) = Self::bind(bind_ip_addr, port_range); let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range); let (serve_repair_quic_port, serve_repair_quic) = Self::bind(bind_ip_addr, port_range); @@ -3206,6 +3223,7 @@ impl Node { multi_bind_in_range(bind_ip_addr, port_range, 4).expect("broadcast multi_bind"); let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range); + let (_, ancestor_hashes_requests_quic) = Self::bind(bind_ip_addr, port_range); let mut info = ContactInfo::new( *pubkey, @@ -3239,11 +3257,13 @@ impl Node { tpu_vote: tpu_vote_sockets, broadcast, repair, + repair_quic, retransmit_sockets, serve_repair, serve_repair_quic, ip_echo: Some(ip_echo), ancestor_hashes_requests, + ancestor_hashes_requests_quic, tpu_quic, tpu_forwards_quic, }, diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 50008171eed119..196dda94afa624 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -32,7 +32,7 @@ pub struct UdpSocketPair { pub type PortRange = (u16, u16); pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000); -pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 14; // VALIDATOR_PORT_RANGE must be at least this wide +pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 16; // VALIDATOR_PORT_RANGE must be at least this wide pub(crate) const HEADER_LENGTH: usize = 4; pub(crate) const IP_ECHO_SERVER_RESPONSE_LENGTH: usize = HEADER_LENGTH + 23;