From 014279907088f19687102c3c6df0f2337d9d5065 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Sun, 15 Dec 2024 11:32:06 -0600 Subject: [PATCH] removes redundant vector allocations before calling sendmmsg::batch_send streamer::sendmmsg::batch_send only requires an ExactSizeIterator: https://github.com/anza-xyz/agave/blob/82347779f/streamer/src/sendmmsg.rs#L169-L175 Collecting an iterator into a vector before calling batch_send is unnecessary and only adds overhead. In particular multi_target_send used in retransmitting shreds can be use without doing an additional vector allocation: https://github.com/anza-xyz/agave/blob/82347779f/streamer/src/sendmmsg.rs#L197 --- Cargo.lock | 1 + core/src/banking_stage/forwarder.rs | 6 +-- core/src/repair/repair_service.rs | 14 +++---- core/src/repair/serve_repair.rs | 9 ++--- programs/sbf/Cargo.lock | 1 + streamer/src/sendmmsg.rs | 40 +++++++++++++------ streamer/src/streamer.rs | 2 +- svm/examples/Cargo.lock | 1 + turbine/src/broadcast_stage.rs | 5 ++- .../broadcast_duplicates_run.rs | 8 +--- udp-client/Cargo.toml | 1 + udp-client/src/udp_client.rs | 16 +++----- 12 files changed, 55 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8df165733c5c87..2bc090e6fbef1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9856,6 +9856,7 @@ name = "solana-udp-client" version = "2.2.0" dependencies = [ "async-trait", + "itertools 0.12.1", "solana-connection-cache", "solana-keypair", "solana-net-utils", diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index 0a3cfd35b2a9e5..b08ca868ff366b 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -23,7 +23,6 @@ use { solana_sdk::{pubkey::Pubkey, transport::TransportError}, solana_streamer::sendmmsg::batch_send, std::{ - iter::repeat, net::{SocketAddr, UdpSocket}, sync::{atomic::Ordering, Arc, RwLock}, }, @@ -281,8 +280,9 @@ impl Forwarder { match forward_option { ForwardOption::ForwardTpuVote => { // The vote must be forwarded using only UDP. - let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(*addr)).collect(); - batch_send(&self.socket, &pkts).map_err(|err| err.into()) + let addrs = itertools::repeat_n(addr, packet_vec.len()); + let pkts = packet_vec.iter().zip(addrs); + batch_send(&self.socket, pkts).map_err(TransportError::from) } ForwardOption::ForwardTransaction => { let conn = self.connection_cache.get_connection(addr); diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index ebefeb54f8e0f9..ffd0d9d183a08b 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -476,15 +476,13 @@ impl RepairService { let mut batch_send_repairs_elapsed = Measure::start("batch_send_repairs_elapsed"); if !batch.is_empty() { - match batch_send(repair_socket, &batch) { + let num_pkts = batch.len(); + let batch = batch.iter().map(|(bytes, addr)| (&bytes[..], addr)); + match batch_send(repair_socket, batch) { Ok(()) => (), Err(SendPktsError::IoError(err, num_failed)) => { error!( - "{} batch_send failed to send {}/{} packets first error {:?}", - id, - num_failed, - batch.len(), - err + "{id} batch_send failed to send {num_failed}/{num_pkts} packets first error {err:?}" ); } } @@ -871,10 +869,10 @@ impl RepairService { ServeRepair::repair_proto_to_bytes(&request_proto, &identity_keypair).unwrap(); // Prepare packet batch to send - let reqs = [(packet_buf, address)]; + let reqs = [(&packet_buf, address)]; // Send packet batch - match batch_send(repair_socket, &reqs[..]) { + match batch_send(repair_socket, reqs) { Ok(()) => { debug!("successfully sent repair request to {pubkey} / {address}!"); } diff --git a/core/src/repair/serve_repair.rs b/core/src/repair/serve_repair.rs index 8271e670d9ace0..14de57e7d799f6 100644 --- a/core/src/repair/serve_repair.rs +++ b/core/src/repair/serve_repair.rs @@ -1239,14 +1239,13 @@ impl ServeRepair { } } if !pending_pongs.is_empty() { - match batch_send(repair_socket, &pending_pongs) { + let num_pkts = pending_pongs.len(); + let pending_pongs = pending_pongs.iter().map(|(bytes, addr)| (&bytes[..], addr)); + match batch_send(repair_socket, pending_pongs) { Ok(()) => (), Err(SendPktsError::IoError(err, num_failed)) => { warn!( - "batch_send failed to send {}/{} packets. First error: {:?}", - num_failed, - pending_pongs.len(), - err + "batch_send failed to send {num_failed}/{num_pkts} packets. First error: {err:?}" ); } } diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index d30186c24c18d6..85db40d5d2a243 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -8231,6 +8231,7 @@ name = "solana-udp-client" version = "2.2.0" dependencies = [ "async-trait", + "itertools 0.12.1", "solana-connection-cache", "solana-keypair", "solana-net-utils", diff --git a/streamer/src/sendmmsg.rs b/streamer/src/sendmmsg.rs index 1b2257f2f00172..011a6d6cc36931 100644 --- a/streamer/src/sendmmsg.rs +++ b/streamer/src/sendmmsg.rs @@ -15,7 +15,6 @@ use { std::{ borrow::Borrow, io, - iter::repeat, net::{SocketAddr, UdpSocket}, }, thiserror::Error, @@ -34,11 +33,17 @@ impl From for TransportError { } } +// The type and lifetime constraints are overspecified to match 'linux' code. #[cfg(not(target_os = "linux"))] -pub fn batch_send(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError> +pub fn batch_send<'a, I, S, T: 'a + ?Sized>( + sock: &UdpSocket, + packets: I, +) -> Result<(), SendPktsError> where + I: IntoIterator, + ::IntoIter: ExactSizeIterator, S: Borrow, - T: AsRef<[u8]>, + &'a T: AsRef<[u8]>, { let mut num_failed = 0; let mut erropt = None; @@ -160,12 +165,20 @@ fn sendmmsg_retry(sock: &UdpSocket, hdrs: &mut [mmsghdr]) -> Result<(), SendPkts } } +// Need &'a to ensure that raw packet pointers obtained +// in mmsghdr_for_packet stay valid. #[cfg(target_os = "linux")] -pub fn batch_send(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError> +pub fn batch_send<'a, I, S, T: 'a + ?Sized>( + sock: &UdpSocket, + packets: I, +) -> Result<(), SendPktsError> where + I: IntoIterator, + ::IntoIter: ExactSizeIterator, S: Borrow, - T: AsRef<[u8]>, + &'a T: AsRef<[u8]>, { + let packets = packets.into_iter(); let size = packets.len(); let mut iovs = vec![MaybeUninit::uninit(); size]; let mut addrs = vec![MaybeUninit::zeroed(); size]; @@ -193,9 +206,10 @@ where S: Borrow, T: AsRef<[u8]>, { + let num_dests = dests.len(); let dests = dests.iter().map(Borrow::borrow); - let pkts: Vec<_> = repeat(&packet).zip(dests).collect(); - batch_send(sock, &pkts) + let pkts = itertools::repeat_n(&packet, num_dests).zip(dests); + batch_send(sock, pkts) } #[cfg(test)] @@ -224,7 +238,7 @@ mod tests { let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); let packet_refs: Vec<_> = packets.iter().map(|p| (&p[..], &addr)).collect(); - let sent = batch_send(&sender, &packet_refs[..]).ok(); + let sent = batch_send(&sender, packet_refs).ok(); assert_eq!(sent, Some(())); let mut packets = vec![Packet::default(); 32]; @@ -255,7 +269,7 @@ mod tests { }) .collect(); - let sent = batch_send(&sender, &packet_refs[..]).ok(); + let sent = batch_send(&sender, packet_refs).ok(); assert_eq!(sent, Some(())); let mut packets = vec![Packet::default(); 32]; @@ -323,7 +337,7 @@ mod tests { let dest_refs: Vec<_> = vec![&ip4, &ip6, &ip4]; let sender = bind_to_unspecified().expect("bind"); - let res = batch_send(&sender, &packet_refs[..]); + let res = batch_send(&sender, packet_refs); assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1))); let res = multi_target_send(&sender, &packets[0], &dest_refs); assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1))); @@ -344,7 +358,7 @@ mod tests { (&packets[3][..], &ipv4broadcast), (&packets[4][..], &ipv4local), ]; - match batch_send(&sender, &packet_refs[..]) { + match batch_send(&sender, packet_refs) { Ok(()) => panic!(), Err(SendPktsError::IoError(ioerror, num_failed)) => { assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied); @@ -360,7 +374,7 @@ mod tests { (&packets[3][..], &ipv4local), (&packets[4][..], &ipv4broadcast), ]; - match batch_send(&sender, &packet_refs[..]) { + match batch_send(&sender, packet_refs) { Ok(()) => panic!(), Err(SendPktsError::IoError(ioerror, num_failed)) => { assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied); @@ -376,7 +390,7 @@ mod tests { (&packets[3][..], &ipv4broadcast), (&packets[4][..], &ipv4local), ]; - match batch_send(&sender, &packet_refs[..]) { + match batch_send(&sender, packet_refs) { Ok(()) => panic!(), Err(SendPktsError::IoError(ioerror, num_failed)) => { assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied); diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index b7ce3f6e2756f3..b8dc0eb1699b30 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -362,7 +362,7 @@ fn recv_send( let data = pkt.data(..)?; socket_addr_space.check(&addr).then_some((data, addr)) }); - batch_send(sock, &packets.collect::>())?; + batch_send(sock, packets.collect::>())?; Ok(()) } diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 1b7291092c71ac..4799fd39740b90 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -7567,6 +7567,7 @@ name = "solana-udp-client" version = "2.2.0" dependencies = [ "async-trait", + "itertools 0.12.1", "solana-connection-cache", "solana-keypair", "solana-net-utils", diff --git a/turbine/src/broadcast_stage.rs b/turbine/src/broadcast_stage.rs index 2f9d21980372e3..9f7037f091617d 100644 --- a/turbine/src/broadcast_stage.rs +++ b/turbine/src/broadcast_stage.rs @@ -475,8 +475,9 @@ pub fn broadcast_shreds( shred_select.stop(); transmit_stats.shred_select += shred_select.as_us(); + let num_udp_packets = packets.len(); let mut send_mmsg_time = Measure::start("send_mmsg"); - match batch_send(s, &packets[..]) { + match batch_send(s, packets) { Ok(()) => (), Err(SendPktsError::IoError(ioerr, num_failed)) => { transmit_stats.dropped_packets_udp += num_failed; @@ -485,7 +486,7 @@ pub fn broadcast_shreds( } send_mmsg_time.stop(); transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us(); - transmit_stats.total_packets += packets.len() + quic_packets.len(); + transmit_stats.total_packets += num_udp_packets + quic_packets.len(); for (shred, addr) in quic_packets { let shred = Bytes::from(shred.clone()); if let Err(err) = quic_endpoint_sender.blocking_send((addr, shred)) { diff --git a/turbine/src/broadcast_stage/broadcast_duplicates_run.rs b/turbine/src/broadcast_stage/broadcast_duplicates_run.rs index 3060fd27c820bf..277926acae9919 100644 --- a/turbine/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/turbine/src/broadcast_stage/broadcast_duplicates_run.rs @@ -393,13 +393,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { .flatten() .collect(); - match batch_send(sock, &packets) { - Ok(()) => (), - Err(SendPktsError::IoError(ioerr, _)) => { - return Err(Error::Io(ioerr)); - } - } - Ok(()) + batch_send(sock, packets).map_err(|SendPktsError::IoError(err, _)| Error::Io(err)) } fn record(&mut self, receiver: &RecordReceiver, blockstore: &Blockstore) -> Result<()> { diff --git a/udp-client/Cargo.toml b/udp-client/Cargo.toml index 3bf92468ec7abc..da583e8cf38940 100644 --- a/udp-client/Cargo.toml +++ b/udp-client/Cargo.toml @@ -11,6 +11,7 @@ edition = { workspace = true } [dependencies] async-trait = { workspace = true } +itertools = { workspace = true } solana-connection-cache = { workspace = true } solana-keypair = { workspace = true } solana-net-utils = { workspace = true } diff --git a/udp-client/src/udp_client.rs b/udp-client/src/udp_client.rs index 4a257a2d748092..9019786bc8fc42 100644 --- a/udp-client/src/udp_client.rs +++ b/udp-client/src/udp_client.rs @@ -2,7 +2,6 @@ //! an interface for sending data use { - core::iter::repeat, solana_connection_cache::client_connection::ClientConnection, solana_streamer::sendmmsg::batch_send, solana_transaction_error::TransportResult, @@ -37,18 +36,15 @@ impl ClientConnection for UdpClientConnection { } fn send_data_batch(&self, buffers: &[Vec]) -> TransportResult<()> { - let pkts: Vec<_> = buffers.iter().zip(repeat(self.server_addr())).collect(); - batch_send(&self.socket, &pkts)?; - Ok(()) + let addrs = itertools::repeat_n(self.server_addr(), buffers.len()); + let pkts = buffers.iter().zip(addrs); + Ok(batch_send(&self.socket, pkts)?) } fn send_data_batch_async(&self, buffers: Vec>) -> TransportResult<()> { - let pkts: Vec<_> = buffers - .into_iter() - .zip(repeat(self.server_addr())) - .collect(); - batch_send(&self.socket, &pkts)?; - Ok(()) + let addrs = itertools::repeat_n(self.server_addr(), buffers.len()); + let pkts = buffers.iter().zip(addrs); + Ok(batch_send(&self.socket, pkts)?) } fn send_data(&self, buffer: &[u8]) -> TransportResult<()> {