diff --git a/Cargo.lock b/Cargo.lock index 23c429edd446de..49bb864924a587 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9856,6 +9856,7 @@ name = "solana-udp-client" version = "2.2.0" dependencies = [ "async-trait", + "itertools 0.12.1", "solana-connection-cache", "solana-keypair", "solana-net-utils", diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index 0a3cfd35b2a9e5..0d377421befcdf 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -23,7 +23,6 @@ use { solana_sdk::{pubkey::Pubkey, transport::TransportError}, solana_streamer::sendmmsg::batch_send, std::{ - iter::repeat, net::{SocketAddr, UdpSocket}, sync::{atomic::Ordering, Arc, RwLock}, }, @@ -281,8 +280,9 @@ impl Forwarder { match forward_option { ForwardOption::ForwardTpuVote => { // The vote must be forwarded using only UDP. - let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(*addr)).collect(); - batch_send(&self.socket, &pkts).map_err(|err| err.into()) + let addrs = itertools::repeat_n(addr, packet_vec.len()); + let pkts = packet_vec.into_iter().zip(addrs); + batch_send(&self.socket, pkts).map_err(TransportError::from) } ForwardOption::ForwardTransaction => { let conn = self.connection_cache.get_connection(addr); diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index ebefeb54f8e0f9..e8acc47e6e2603 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -476,15 +476,12 @@ impl RepairService { let mut batch_send_repairs_elapsed = Measure::start("batch_send_repairs_elapsed"); if !batch.is_empty() { - match batch_send(repair_socket, &batch) { + let num_pkts = batch.len(); + match batch_send(repair_socket, batch) { Ok(()) => (), Err(SendPktsError::IoError(err, num_failed)) => { error!( - "{} batch_send failed to send {}/{} packets first error {:?}", - id, - num_failed, - batch.len(), - err + "{id} batch_send failed to send {num_failed}/{num_pkts} packets first error {err:?}" ); } } @@ -874,7 +871,7 @@ impl RepairService { let reqs = [(packet_buf, address)]; // Send packet batch - match batch_send(repair_socket, &reqs[..]) { + match batch_send(repair_socket, reqs) { Ok(()) => { debug!("successfully sent repair request to {pubkey} / {address}!"); } diff --git a/core/src/repair/serve_repair.rs b/core/src/repair/serve_repair.rs index 8271e670d9ace0..6e9dd045d86edc 100644 --- a/core/src/repair/serve_repair.rs +++ b/core/src/repair/serve_repair.rs @@ -1239,14 +1239,12 @@ impl ServeRepair { } } if !pending_pongs.is_empty() { - match batch_send(repair_socket, &pending_pongs) { + let num_pkts = pending_pongs.len(); + match batch_send(repair_socket, pending_pongs) { Ok(()) => (), Err(SendPktsError::IoError(err, num_failed)) => { warn!( - "batch_send failed to send {}/{} packets. First error: {:?}", - num_failed, - pending_pongs.len(), - err + "batch_send failed to send {num_failed}/{num_pkts} packets. First error: {err:?}" ); } } diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index b5ae6a5cfd16b9..b78ee1999a33ef 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -8231,6 +8231,7 @@ name = "solana-udp-client" version = "2.2.0" dependencies = [ "async-trait", + "itertools 0.12.1", "solana-connection-cache", "solana-keypair", "solana-net-utils", diff --git a/streamer/src/sendmmsg.rs b/streamer/src/sendmmsg.rs index 1b2257f2f00172..9aa16b6665fdb4 100644 --- a/streamer/src/sendmmsg.rs +++ b/streamer/src/sendmmsg.rs @@ -15,7 +15,6 @@ use { std::{ borrow::Borrow, io, - iter::repeat, net::{SocketAddr, UdpSocket}, }, thiserror::Error, @@ -35,8 +34,12 @@ impl From for TransportError { } #[cfg(not(target_os = "linux"))] -pub fn batch_send(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError> +pub fn batch_send(sock: &UdpSocket, packets: I) -> Result<(), SendPktsError> where + I: IntoIterator, + // ExactSizeIterator is unnecessary but is included + // for signature consistency with the 'linux' code. + ::IntoIter: ExactSizeIterator, S: Borrow, T: AsRef<[u8]>, { @@ -161,11 +164,14 @@ fn sendmmsg_retry(sock: &UdpSocket, hdrs: &mut [mmsghdr]) -> Result<(), SendPkts } #[cfg(target_os = "linux")] -pub fn batch_send(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError> +pub fn batch_send(sock: &UdpSocket, packets: I) -> Result<(), SendPktsError> where + I: IntoIterator, + ::IntoIter: ExactSizeIterator, S: Borrow, T: AsRef<[u8]>, { + let packets = packets.into_iter(); let size = packets.len(); let mut iovs = vec![MaybeUninit::uninit(); size]; let mut addrs = vec![MaybeUninit::zeroed(); size]; @@ -193,9 +199,10 @@ where S: Borrow, T: AsRef<[u8]>, { + let num_dests = dests.len(); let dests = dests.iter().map(Borrow::borrow); - let pkts: Vec<_> = repeat(&packet).zip(dests).collect(); - batch_send(sock, &pkts) + let pkts = itertools::repeat_n(&packet, num_dests).zip(dests); + batch_send(sock, pkts) } #[cfg(test)] @@ -224,7 +231,7 @@ mod tests { let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); let packet_refs: Vec<_> = packets.iter().map(|p| (&p[..], &addr)).collect(); - let sent = batch_send(&sender, &packet_refs[..]).ok(); + let sent = batch_send(&sender, packet_refs).ok(); assert_eq!(sent, Some(())); let mut packets = vec![Packet::default(); 32]; @@ -255,7 +262,7 @@ mod tests { }) .collect(); - let sent = batch_send(&sender, &packet_refs[..]).ok(); + let sent = batch_send(&sender, packet_refs).ok(); assert_eq!(sent, Some(())); let mut packets = vec![Packet::default(); 32]; @@ -323,7 +330,7 @@ mod tests { let dest_refs: Vec<_> = vec![&ip4, &ip6, &ip4]; let sender = bind_to_unspecified().expect("bind"); - let res = batch_send(&sender, &packet_refs[..]); + let res = batch_send(&sender, packet_refs); assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1))); let res = multi_target_send(&sender, &packets[0], &dest_refs); assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1))); @@ -344,7 +351,7 @@ mod tests { (&packets[3][..], &ipv4broadcast), (&packets[4][..], &ipv4local), ]; - match batch_send(&sender, &packet_refs[..]) { + match batch_send(&sender, packet_refs) { Ok(()) => panic!(), Err(SendPktsError::IoError(ioerror, num_failed)) => { assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied); @@ -360,7 +367,7 @@ mod tests { (&packets[3][..], &ipv4local), (&packets[4][..], &ipv4broadcast), ]; - match batch_send(&sender, &packet_refs[..]) { + match batch_send(&sender, packet_refs) { Ok(()) => panic!(), Err(SendPktsError::IoError(ioerror, num_failed)) => { assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied); @@ -376,7 +383,7 @@ mod tests { (&packets[3][..], &ipv4broadcast), (&packets[4][..], &ipv4local), ]; - match batch_send(&sender, &packet_refs[..]) { + match batch_send(&sender, packet_refs) { Ok(()) => panic!(), Err(SendPktsError::IoError(ioerror, num_failed)) => { assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied); diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index b7ce3f6e2756f3..b8dc0eb1699b30 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -362,7 +362,7 @@ fn recv_send( let data = pkt.data(..)?; socket_addr_space.check(&addr).then_some((data, addr)) }); - batch_send(sock, &packets.collect::>())?; + batch_send(sock, packets.collect::>())?; Ok(()) } diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 10e72b4ec732ff..18a624323662d4 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -7567,6 +7567,7 @@ name = "solana-udp-client" version = "2.2.0" dependencies = [ "async-trait", + "itertools 0.12.1", "solana-connection-cache", "solana-keypair", "solana-net-utils", diff --git a/turbine/src/broadcast_stage.rs b/turbine/src/broadcast_stage.rs index 2f9d21980372e3..9f7037f091617d 100644 --- a/turbine/src/broadcast_stage.rs +++ b/turbine/src/broadcast_stage.rs @@ -475,8 +475,9 @@ pub fn broadcast_shreds( shred_select.stop(); transmit_stats.shred_select += shred_select.as_us(); + let num_udp_packets = packets.len(); let mut send_mmsg_time = Measure::start("send_mmsg"); - match batch_send(s, &packets[..]) { + match batch_send(s, packets) { Ok(()) => (), Err(SendPktsError::IoError(ioerr, num_failed)) => { transmit_stats.dropped_packets_udp += num_failed; @@ -485,7 +486,7 @@ pub fn broadcast_shreds( } send_mmsg_time.stop(); transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us(); - transmit_stats.total_packets += packets.len() + quic_packets.len(); + transmit_stats.total_packets += num_udp_packets + quic_packets.len(); for (shred, addr) in quic_packets { let shred = Bytes::from(shred.clone()); if let Err(err) = quic_endpoint_sender.blocking_send((addr, shred)) { diff --git a/turbine/src/broadcast_stage/broadcast_duplicates_run.rs b/turbine/src/broadcast_stage/broadcast_duplicates_run.rs index 3060fd27c820bf..277926acae9919 100644 --- a/turbine/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/turbine/src/broadcast_stage/broadcast_duplicates_run.rs @@ -393,13 +393,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { .flatten() .collect(); - match batch_send(sock, &packets) { - Ok(()) => (), - Err(SendPktsError::IoError(ioerr, _)) => { - return Err(Error::Io(ioerr)); - } - } - Ok(()) + batch_send(sock, packets).map_err(|SendPktsError::IoError(err, _)| Error::Io(err)) } fn record(&mut self, receiver: &RecordReceiver, blockstore: &Blockstore) -> Result<()> { diff --git a/udp-client/Cargo.toml b/udp-client/Cargo.toml index 3bf92468ec7abc..da583e8cf38940 100644 --- a/udp-client/Cargo.toml +++ b/udp-client/Cargo.toml @@ -11,6 +11,7 @@ edition = { workspace = true } [dependencies] async-trait = { workspace = true } +itertools = { workspace = true } solana-connection-cache = { workspace = true } solana-keypair = { workspace = true } solana-net-utils = { workspace = true } diff --git a/udp-client/src/udp_client.rs b/udp-client/src/udp_client.rs index 4a257a2d748092..945e5e7fe70196 100644 --- a/udp-client/src/udp_client.rs +++ b/udp-client/src/udp_client.rs @@ -2,7 +2,6 @@ //! an interface for sending data use { - core::iter::repeat, solana_connection_cache::client_connection::ClientConnection, solana_streamer::sendmmsg::batch_send, solana_transaction_error::TransportResult, @@ -37,18 +36,15 @@ impl ClientConnection for UdpClientConnection { } fn send_data_batch(&self, buffers: &[Vec]) -> TransportResult<()> { - let pkts: Vec<_> = buffers.iter().zip(repeat(self.server_addr())).collect(); - batch_send(&self.socket, &pkts)?; - Ok(()) + let addrs = itertools::repeat_n(self.server_addr(), buffers.len()); + let pkts = buffers.iter().zip(addrs); + Ok(batch_send(&self.socket, pkts)?) } fn send_data_batch_async(&self, buffers: Vec>) -> TransportResult<()> { - let pkts: Vec<_> = buffers - .into_iter() - .zip(repeat(self.server_addr())) - .collect(); - batch_send(&self.socket, &pkts)?; - Ok(()) + let addrs = itertools::repeat_n(self.server_addr(), buffers.len()); + let pkts = buffers.into_iter().zip(addrs); + Ok(batch_send(&self.socket, pkts)?) } fn send_data(&self, buffer: &[u8]) -> TransportResult<()> {