diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index b3d02d8287b77e..901076da10f9d0 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -30,11 +30,8 @@ use { signature::{Keypair, Signature}, timing, }, - solana_transaction_metrics_tracker::{ - get_signature_from_packet, signature_if_should_track_packet, - }, + solana_transaction_metrics_tracker::signature_if_should_track_packet, std::{ - collections::HashMap, iter::repeat_with, net::{IpAddr, SocketAddr, UdpSocket}, sync::{ @@ -633,7 +630,7 @@ async fn packet_batch_sender( trace!("enter packet_batch_sender"); let mut batch_start_time = Instant::now(); loop { - let mut packet_perf_measure: HashMap<[u8; 64], std::time::Instant> = HashMap::default(); + let mut packet_perf_measure: Vec<([u8; 64], std::time::Instant)> = Vec::default(); let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH); let mut total_bytes: usize = 0; @@ -653,11 +650,7 @@ async fn packet_batch_sender( || (!packet_batch.is_empty() && elapsed >= coalesce) { let len = packet_batch.len(); - track_streamer_fetch_packet_performance( - &packet_batch, - &mut packet_perf_measure, - stats.clone(), - ); + track_streamer_fetch_packet_performance(&mut packet_perf_measure, &stats); if let Err(e) = packet_sender.send(packet_batch) { stats @@ -708,7 +701,7 @@ async fn packet_batch_sender( .ok() .flatten() { - packet_perf_measure.insert(*signature, packet_accumulator.start_time); + packet_perf_measure.push((*signature, packet_accumulator.start_time)); // we set the PERF_TRACK_PACKET on packet_batch[i].meta_mut().set_track_performance(true); } @@ -721,29 +714,23 @@ async fn packet_batch_sender( } fn track_streamer_fetch_packet_performance( - packet_batch: &PacketBatch, - packet_perf_measure: &mut HashMap<[u8; 64], Instant>, - stats: Arc, + packet_perf_measure: &mut Vec<([u8; 64], Instant)>, + stats: &Arc, ) { - for packet in packet_batch.iter() { - if packet.meta().is_perf_track_packet() { - let signature = get_signature_from_packet(packet); - if let Ok(signature) = signature { - if let Some(start_time) = packet_perf_measure.remove(signature) { - let duration = Instant::now().duration_since(start_time); - debug!( - "QUIC streamer fetch stage took {duration:?} for transaction {:?}", - Signature::from(*signature) - ); - stats - .process_sampled_packets_us_hist - .lock() - .unwrap() - .increment(duration.as_micros() as u64) - .unwrap(); - } - } - } + if packet_perf_measure.is_empty() { + return; + } + let mut process_sampled_packets_us_hist = stats.process_sampled_packets_us_hist.lock().unwrap(); + + for (signature, start_time) in packet_perf_measure.iter() { + let duration = Instant::now().duration_since(*start_time); + debug!( + "QUIC streamer fetch stage took {duration:?} for transaction {:?}", + Signature::from(*signature) + ); + process_sampled_packets_us_hist + .increment(duration.as_micros() as u64) + .unwrap(); } }