From c6d13c224c21b6fafde242ccab2b279785f51c7a Mon Sep 17 00:00:00 2001 From: Alessandro Decina Date: Tue, 3 Dec 2024 00:41:08 +0000 Subject: [PATCH] window_service: use the service thread as a rayon worker thread This reduces latency in processing a small number of shreds, since they'll be processed directly on the current thread avoiding the rayon fork/join overhead. On current mnb traffic this improves replay-loop-timing.wait_receive_elapsed_us ~2.5x. --- core/src/window_service.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 0d2e0b75317597..ebd56f02864b62 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -313,6 +313,7 @@ where let (mut shreds, mut repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| { packets .par_iter() + .with_min_len(32) .flat_map_iter(|packets| packets.iter().filter_map(handle_packet)) .unzip() }); @@ -489,15 +490,19 @@ impl WindowService { let handle_error = || { inc_new_counter_error!("solana-window-insert-error", 1, 1); }; - let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(get_thread_count().min(8)) - .thread_name(|i| format!("solWinInsert{i:02}")) - .build() - .unwrap(); let reed_solomon_cache = ReedSolomonCache::default(); Builder::new() .name("solWinInsert".to_string()) .spawn(move || { + let thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(get_thread_count().min(8)) + // Use the current thread as one of the workers. This reduces overhead when the + // pool is used to process a small number of shreds, since they'll be processed + // directly on the current thread. + .use_current_thread() + .thread_name(|i| format!("solWinInsert{i:02}")) + .build() + .unwrap(); let handle_duplicate = |possible_duplicate_shred| { let _ = check_duplicate_sender.send(possible_duplicate_shred); };