Skip to content

Commit

Permalink
removes redundant vector allocations before calling sendmmsg::batch_send
Browse files Browse the repository at this point in the history
streamer::sendmmsg::batch_send only requires an ExactSizeIterator.
Collecting an iterator into a vector before calling batch_send is
unnecessary and only adds overhead.
  • Loading branch information
behzadnouri committed Dec 18, 2024
1 parent 766901a commit d73c5db
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 49 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions core/src/banking_stage/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use {
solana_sdk::{pubkey::Pubkey, transport::TransportError},
solana_streamer::sendmmsg::batch_send,
std::{
iter::repeat,
net::{SocketAddr, UdpSocket},
sync::{atomic::Ordering, Arc, RwLock},
},
Expand Down Expand Up @@ -281,8 +280,9 @@ impl<T: LikeClusterInfo> Forwarder<T> {
match forward_option {
ForwardOption::ForwardTpuVote => {
// The vote must be forwarded using only UDP.
let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(*addr)).collect();
batch_send(&self.socket, &pkts).map_err(|err| err.into())
let addrs = itertools::repeat_n(addr, packet_vec.len());
let pkts = packet_vec.iter().zip(addrs);
batch_send(&self.socket, pkts).map_err(TransportError::from)
}
ForwardOption::ForwardTransaction => {
let conn = self.connection_cache.get_connection(addr);
Expand Down
14 changes: 6 additions & 8 deletions core/src/repair/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,15 +476,13 @@ impl RepairService {

let mut batch_send_repairs_elapsed = Measure::start("batch_send_repairs_elapsed");
if !batch.is_empty() {
match batch_send(repair_socket, &batch) {
let num_pkts = batch.len();
let batch = batch.iter().map(|(bytes, addr)| (&bytes[..], addr));
match batch_send(repair_socket, batch) {
Ok(()) => (),
Err(SendPktsError::IoError(err, num_failed)) => {
error!(
"{} batch_send failed to send {}/{} packets first error {:?}",
id,
num_failed,
batch.len(),
err
"{id} batch_send failed to send {num_failed}/{num_pkts} packets first error {err:?}"
);
}
}
Expand Down Expand Up @@ -871,10 +869,10 @@ impl RepairService {
ServeRepair::repair_proto_to_bytes(&request_proto, &identity_keypair).unwrap();

// Prepare packet batch to send
let reqs = [(packet_buf, address)];
let reqs = [(&packet_buf, address)];

// Send packet batch
match batch_send(repair_socket, &reqs[..]) {
match batch_send(repair_socket, reqs) {
Ok(()) => {
debug!("successfully sent repair request to {pubkey} / {address}!");
}
Expand Down
9 changes: 4 additions & 5 deletions core/src/repair/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1239,14 +1239,13 @@ impl ServeRepair {
}
}
if !pending_pongs.is_empty() {
match batch_send(repair_socket, &pending_pongs) {
let num_pkts = pending_pongs.len();
let pending_pongs = pending_pongs.iter().map(|(bytes, addr)| (&bytes[..], addr));
match batch_send(repair_socket, pending_pongs) {
Ok(()) => (),
Err(SendPktsError::IoError(err, num_failed)) => {
warn!(
"batch_send failed to send {}/{} packets. First error: {:?}",
num_failed,
pending_pongs.len(),
err
"batch_send failed to send {num_failed}/{num_pkts} packets. First error: {err:?}"
);
}
}
Expand Down
1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 27 additions & 13 deletions streamer/src/sendmmsg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use {
std::{
borrow::Borrow,
io,
iter::repeat,
net::{SocketAddr, UdpSocket},
},
thiserror::Error,
Expand All @@ -34,11 +33,17 @@ impl From<SendPktsError> for TransportError {
}
}

// The type and lifetime constraints are overspecified to match 'linux' code.
#[cfg(not(target_os = "linux"))]
pub fn batch_send<S, T>(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError>
pub fn batch_send<'a, I, S, T: 'a + ?Sized>(
sock: &UdpSocket,
packets: I,
) -> Result<(), SendPktsError>
where
I: IntoIterator<Item = (&'a T, S)>,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
S: Borrow<SocketAddr>,
T: AsRef<[u8]>,
&'a T: AsRef<[u8]>,
{
let mut num_failed = 0;
let mut erropt = None;
Expand Down Expand Up @@ -160,12 +165,20 @@ fn sendmmsg_retry(sock: &UdpSocket, hdrs: &mut [mmsghdr]) -> Result<(), SendPkts
}
}

// Need &'a to ensure that raw packet pointers obtained
// in mmsghdr_for_packet stay valid.
#[cfg(target_os = "linux")]
pub fn batch_send<S, T>(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError>
pub fn batch_send<'a, I, S, T: 'a + ?Sized>(
sock: &UdpSocket,
packets: I,
) -> Result<(), SendPktsError>
where
I: IntoIterator<Item = (&'a T, S)>,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
S: Borrow<SocketAddr>,
T: AsRef<[u8]>,
&'a T: AsRef<[u8]>,
{
let packets = packets.into_iter();
let size = packets.len();
let mut iovs = vec![MaybeUninit::uninit(); size];
let mut addrs = vec![MaybeUninit::zeroed(); size];
Expand Down Expand Up @@ -193,9 +206,10 @@ where
S: Borrow<SocketAddr>,
T: AsRef<[u8]>,
{
let num_dests = dests.len();
let dests = dests.iter().map(Borrow::borrow);
let pkts: Vec<_> = repeat(&packet).zip(dests).collect();
batch_send(sock, &pkts)
let pkts = itertools::repeat_n(&packet, num_dests).zip(dests);
batch_send(sock, pkts)
}

#[cfg(test)]
Expand Down Expand Up @@ -224,7 +238,7 @@ mod tests {
let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
let packet_refs: Vec<_> = packets.iter().map(|p| (&p[..], &addr)).collect();

let sent = batch_send(&sender, &packet_refs[..]).ok();
let sent = batch_send(&sender, packet_refs).ok();
assert_eq!(sent, Some(()));

let mut packets = vec![Packet::default(); 32];
Expand Down Expand Up @@ -255,7 +269,7 @@ mod tests {
})
.collect();

let sent = batch_send(&sender, &packet_refs[..]).ok();
let sent = batch_send(&sender, packet_refs).ok();
assert_eq!(sent, Some(()));

let mut packets = vec![Packet::default(); 32];
Expand Down Expand Up @@ -323,7 +337,7 @@ mod tests {
let dest_refs: Vec<_> = vec![&ip4, &ip6, &ip4];

let sender = bind_to_unspecified().expect("bind");
let res = batch_send(&sender, &packet_refs[..]);
let res = batch_send(&sender, packet_refs);
assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1)));
let res = multi_target_send(&sender, &packets[0], &dest_refs);
assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1)));
Expand All @@ -344,7 +358,7 @@ mod tests {
(&packets[3][..], &ipv4broadcast),
(&packets[4][..], &ipv4local),
];
match batch_send(&sender, &packet_refs[..]) {
match batch_send(&sender, packet_refs) {
Ok(()) => panic!(),
Err(SendPktsError::IoError(ioerror, num_failed)) => {
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
Expand All @@ -360,7 +374,7 @@ mod tests {
(&packets[3][..], &ipv4local),
(&packets[4][..], &ipv4broadcast),
];
match batch_send(&sender, &packet_refs[..]) {
match batch_send(&sender, packet_refs) {
Ok(()) => panic!(),
Err(SendPktsError::IoError(ioerror, num_failed)) => {
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
Expand All @@ -376,7 +390,7 @@ mod tests {
(&packets[3][..], &ipv4broadcast),
(&packets[4][..], &ipv4local),
];
match batch_send(&sender, &packet_refs[..]) {
match batch_send(&sender, packet_refs) {
Ok(()) => panic!(),
Err(SendPktsError::IoError(ioerror, num_failed)) => {
assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
Expand Down
2 changes: 1 addition & 1 deletion streamer/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ fn recv_send(
let data = pkt.data(..)?;
socket_addr_space.check(&addr).then_some((data, addr))
});
batch_send(sock, &packets.collect::<Vec<_>>())?;
batch_send(sock, packets.collect::<Vec<_>>())?;
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions svm/examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions turbine/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,9 @@ pub fn broadcast_shreds(
shred_select.stop();
transmit_stats.shred_select += shred_select.as_us();

let num_udp_packets = packets.len();
let mut send_mmsg_time = Measure::start("send_mmsg");
match batch_send(s, &packets[..]) {
match batch_send(s, packets) {
Ok(()) => (),
Err(SendPktsError::IoError(ioerr, num_failed)) => {
transmit_stats.dropped_packets_udp += num_failed;
Expand All @@ -485,7 +486,7 @@ pub fn broadcast_shreds(
}
send_mmsg_time.stop();
transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us();
transmit_stats.total_packets += packets.len() + quic_packets.len();
transmit_stats.total_packets += num_udp_packets + quic_packets.len();
for (shred, addr) in quic_packets {
let shred = Bytes::from(shred.clone());
if let Err(err) = quic_endpoint_sender.blocking_send((addr, shred)) {
Expand Down
8 changes: 1 addition & 7 deletions turbine/src/broadcast_stage/broadcast_duplicates_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
.flatten()
.collect();

match batch_send(sock, &packets) {
Ok(()) => (),
Err(SendPktsError::IoError(ioerr, _)) => {
return Err(Error::Io(ioerr));
}
}
Ok(())
batch_send(sock, packets).map_err(|SendPktsError::IoError(err, _)| Error::Io(err))
}

fn record(&mut self, receiver: &RecordReceiver, blockstore: &Blockstore) -> Result<()> {
Expand Down
1 change: 1 addition & 0 deletions udp-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ edition = { workspace = true }

[dependencies]
async-trait = { workspace = true }
itertools = { workspace = true }
solana-connection-cache = { workspace = true }
solana-keypair = { workspace = true }
solana-net-utils = { workspace = true }
Expand Down
16 changes: 6 additions & 10 deletions udp-client/src/udp_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
//! an interface for sending data
use {
core::iter::repeat,
solana_connection_cache::client_connection::ClientConnection,
solana_streamer::sendmmsg::batch_send,
solana_transaction_error::TransportResult,
Expand Down Expand Up @@ -37,18 +36,15 @@ impl ClientConnection for UdpClientConnection {
}

fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
let pkts: Vec<_> = buffers.iter().zip(repeat(self.server_addr())).collect();
batch_send(&self.socket, &pkts)?;
Ok(())
let addrs = itertools::repeat_n(self.server_addr(), buffers.len());
let pkts = buffers.iter().zip(addrs);
Ok(batch_send(&self.socket, pkts)?)
}

fn send_data_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
let pkts: Vec<_> = buffers
.into_iter()
.zip(repeat(self.server_addr()))
.collect();
batch_send(&self.socket, &pkts)?;
Ok(())
let addrs = itertools::repeat_n(self.server_addr(), buffers.len());
let pkts = buffers.iter().zip(addrs);
Ok(batch_send(&self.socket, pkts)?)
}

fn send_data(&self, buffer: &[u8]) -> TransportResult<()> {
Expand Down

0 comments on commit d73c5db

Please sign in to comment.