Skip to content

Commit

Permalink
Do not use binary_search, do simple compare in one loop
Browse files Browse the repository at this point in the history
  • Loading branch information
lijunwangs committed Mar 1, 2024
1 parent 4d1e1cc commit b12a19e
Showing 1 changed file with 20 additions and 33 deletions.
53 changes: 20 additions & 33 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,8 @@ use {
signature::{Keypair, Signature},
timing,
},
solana_transaction_metrics_tracker::{
get_signature_from_packet, signature_if_should_track_packet,
},
solana_transaction_metrics_tracker::signature_if_should_track_packet,
std::{
collections::HashMap,
iter::repeat_with,
net::{IpAddr, SocketAddr, UdpSocket},
sync::{
Expand Down Expand Up @@ -633,7 +630,7 @@ async fn packet_batch_sender(
trace!("enter packet_batch_sender");
let mut batch_start_time = Instant::now();
loop {
let mut packet_perf_measure: HashMap<[u8; 64], std::time::Instant> = HashMap::default();
let mut packet_perf_measure: Vec<([u8; 64], std::time::Instant)> = Vec::default();
let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH);
let mut total_bytes: usize = 0;

Expand All @@ -653,11 +650,7 @@ async fn packet_batch_sender(
|| (!packet_batch.is_empty() && elapsed >= coalesce)
{
let len = packet_batch.len();
track_streamer_fetch_packet_performance(
&packet_batch,
&mut packet_perf_measure,
stats.clone(),
);
track_streamer_fetch_packet_performance(&mut packet_perf_measure, &stats);

if let Err(e) = packet_sender.send(packet_batch) {
stats
Expand Down Expand Up @@ -708,7 +701,7 @@ async fn packet_batch_sender(
.ok()
.flatten()
{
packet_perf_measure.insert(*signature, packet_accumulator.start_time);
packet_perf_measure.push((*signature, packet_accumulator.start_time));
// we set the PERF_TRACK_PACKET on
packet_batch[i].meta_mut().set_track_performance(true);
}
Expand All @@ -721,29 +714,23 @@ async fn packet_batch_sender(
}

fn track_streamer_fetch_packet_performance(
packet_batch: &PacketBatch,
packet_perf_measure: &mut HashMap<[u8; 64], Instant>,
stats: Arc<StreamStats>,
packet_perf_measure: &mut Vec<([u8; 64], Instant)>,
stats: &Arc<StreamStats>,
) {
for packet in packet_batch.iter() {
if packet.meta().is_perf_track_packet() {
let signature = get_signature_from_packet(packet);
if let Ok(signature) = signature {
if let Some(start_time) = packet_perf_measure.remove(signature) {
let duration = Instant::now().duration_since(start_time);
debug!(
"QUIC streamer fetch stage took {duration:?} for transaction {:?}",
Signature::from(*signature)
);
stats
.process_sampled_packets_us_hist
.lock()
.unwrap()
.increment(duration.as_micros() as u64)
.unwrap();
}
}
}
if packet_perf_measure.is_empty() {
return;
}
let mut process_sampled_packets_us_hist = stats.process_sampled_packets_us_hist.lock().unwrap();

for (signature, start_time) in packet_perf_measure.iter() {
let duration = Instant::now().duration_since(*start_time);
debug!(
"QUIC streamer fetch stage took {duration:?} for transaction {:?}",
Signature::from(*signature)
);
process_sampled_packets_us_hist
.increment(duration.as_micros() as u64)
.unwrap();
}
}

Expand Down

0 comments on commit b12a19e

Please sign in to comment.