Skip to content

Commit

Permalink
adds lifetime constraint
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed Dec 18, 2024
1 parent 746b1f5 commit 63583bd
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 9 deletions.
2 changes: 1 addition & 1 deletion core/src/banking_stage/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ impl<T: LikeClusterInfo> Forwarder<T> {
ForwardOption::ForwardTpuVote => {
// The vote must be forwarded using only UDP.
let addrs = itertools::repeat_n(addr, packet_vec.len());
let pkts = packet_vec.into_iter().zip(addrs);
let pkts = packet_vec.iter().zip(addrs);
batch_send(&self.socket, pkts).map_err(TransportError::from)
}
ForwardOption::ForwardTransaction => {
Expand Down
3 changes: 2 additions & 1 deletion core/src/repair/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ impl RepairService {
let mut batch_send_repairs_elapsed = Measure::start("batch_send_repairs_elapsed");
if !batch.is_empty() {
let num_pkts = batch.len();
let batch = batch.iter().map(|(bytes, addr)| (&bytes[..], addr));
match batch_send(repair_socket, batch) {
Ok(()) => (),
Err(SendPktsError::IoError(err, num_failed)) => {
Expand Down Expand Up @@ -868,7 +869,7 @@ impl RepairService {
ServeRepair::repair_proto_to_bytes(&request_proto, &identity_keypair).unwrap();

// Prepare packet batch to send
let reqs = [(packet_buf, address)];
let reqs = [(&packet_buf, address)];

// Send packet batch
match batch_send(repair_socket, reqs) {
Expand Down
1 change: 1 addition & 0 deletions core/src/repair/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,7 @@ impl ServeRepair {
}
if !pending_pongs.is_empty() {
let num_pkts = pending_pongs.len();
let pending_pongs = pending_pongs.iter().map(|(bytes, addr)| (&bytes[..], addr));
match batch_send(repair_socket, pending_pongs) {
Ok(()) => (),
Err(SendPktsError::IoError(err, num_failed)) => {
Expand Down
18 changes: 12 additions & 6 deletions streamer/src/sendmmsg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ impl From<SendPktsError> for TransportError {
}

#[cfg(not(target_os = "linux"))]
pub fn batch_send<I, S, T>(sock: &UdpSocket, packets: I) -> Result<(), SendPktsError>
pub fn batch_send<'a, I, S, T: 'a + ?Sized>(
sock: &UdpSocket,
packets: I,
) -> Result<(), SendPktsError>
where
I: IntoIterator<Item = (T, S)>,
I: IntoIterator<Item = (&'a T, S)>,
// ExactSizeIterator is unnecessary but is included
// for signature consistency with the 'linux' code.
<I as IntoIterator>::IntoIter: ExactSizeIterator,
S: Borrow<SocketAddr>,
T: AsRef<[u8]>,
&'a T: AsRef<[u8]>,
{
let mut num_failed = 0;
let mut erropt = None;
Expand Down Expand Up @@ -164,12 +167,15 @@ fn sendmmsg_retry(sock: &UdpSocket, hdrs: &mut [mmsghdr]) -> Result<(), SendPkts
}

#[cfg(target_os = "linux")]
pub fn batch_send<I, S, T>(sock: &UdpSocket, packets: I) -> Result<(), SendPktsError>
pub fn batch_send<'a, I, S, T: 'a + ?Sized>(
sock: &UdpSocket,
packets: I,
) -> Result<(), SendPktsError>
where
I: IntoIterator<Item = (T, S)>,
I: IntoIterator<Item = (&'a T, S)>,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
S: Borrow<SocketAddr>,
T: AsRef<[u8]>,
&'a T: AsRef<[u8]>,
{
let packets = packets.into_iter();
let size = packets.len();
Expand Down
2 changes: 1 addition & 1 deletion udp-client/src/udp_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl ClientConnection for UdpClientConnection {

fn send_data_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
let addrs = itertools::repeat_n(self.server_addr(), buffers.len());
let pkts = buffers.into_iter().zip(addrs);
let pkts = buffers.iter().zip(addrs);
Ok(batch_send(&self.socket, pkts)?)
}

Expand Down

0 comments on commit 63583bd

Please sign in to comment.