From 30c284af9bc4f3df72a0cd381b62b171c7f5177f Mon Sep 17 00:00:00 2001 From: Alessandro Decina Date: Fri, 20 Dec 2024 08:13:31 +1100 Subject: [PATCH] window_service: use the service thread as a rayon worker thread (#3876) This reduces latency in processing a small number of shreds, since they'll be processed directly on the current thread avoiding the rayon fork/join overhead. On current mnb traffic this improves replay-loop-timing.wait_receive_elapsed_us ~2.5x. --- core/src/window_service.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 0d2e0b75317597..ebd56f02864b62 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -313,6 +313,7 @@ where let (mut shreds, mut repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| { packets .par_iter() + .with_min_len(32) .flat_map_iter(|packets| packets.iter().filter_map(handle_packet)) .unzip() }); @@ -489,15 +490,19 @@ impl WindowService { let handle_error = || { inc_new_counter_error!("solana-window-insert-error", 1, 1); }; - let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(get_thread_count().min(8)) - .thread_name(|i| format!("solWinInsert{i:02}")) - .build() - .unwrap(); let reed_solomon_cache = ReedSolomonCache::default(); Builder::new() .name("solWinInsert".to_string()) .spawn(move || { + let thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(get_thread_count().min(8)) + // Use the current thread as one of the workers. This reduces overhead when the + // pool is used to process a small number of shreds, since they'll be processed + // directly on the current thread. + .use_current_thread() + .thread_name(|i| format!("solWinInsert{i:02}")) + .build() + .unwrap(); let handle_duplicate = |possible_duplicate_shred| { let _ = check_duplicate_sender.send(possible_duplicate_shred); };