From 2a5616f16aee5589d69f180c8fc9a14681f9ace7 Mon Sep 17 00:00:00 2001 From: steviez Date: Tue, 28 May 2024 15:57:10 -0400 Subject: [PATCH] Reduce to single blockstore inserter thread in BroadcastStage (#1473) The implementation of Blockstore::insert_shreds() holds a write lock for the entirety of the function. Thus, there is no point in having more than one thread in BroadcastStage insert shreds. --- turbine/src/broadcast_stage.rs | 41 ++++++++++++++++------------------ 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/turbine/src/broadcast_stage.rs b/turbine/src/broadcast_stage.rs index a63ad586242cae..c13d72ee46159a 100644 --- a/turbine/src/broadcast_stage.rs +++ b/turbine/src/broadcast_stage.rs @@ -33,7 +33,6 @@ use { }, std::{ collections::{HashMap, HashSet}, - iter::repeat_with, net::{SocketAddr, UdpSocket}, sync::{ atomic::{AtomicBool, Ordering}, @@ -56,7 +55,6 @@ mod standard_broadcast_run; const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8; const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); -pub(crate) const NUM_INSERT_THREADS: usize = 2; pub(crate) type RecordReceiver = Receiver<(Arc>, Option)>; pub(crate) type TransmitReceiver = Receiver<(Arc>, Option)>; @@ -281,7 +279,7 @@ impl BroadcastStage { blockstore: Arc, bank_forks: Arc>, quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>, - broadcast_stage_run: impl BroadcastRun + Send + 'static + Clone, + mut broadcast_stage_run: impl BroadcastRun + Send + 'static + Clone, ) -> Self { let (socket_sender, socket_receiver) = unbounded(); let (blockstore_sender, blockstore_receiver) = unbounded(); @@ -331,25 +329,24 @@ impl BroadcastStage { .spawn(run_transmit) .unwrap() })); - thread_hdls.extend( - repeat_with(|| { - let blockstore_receiver = blockstore_receiver.clone(); - let mut bs_record = broadcast_stage_run.clone(); - let btree = blockstore.clone(); - let run_record = move || loop { - let res = bs_record.record(&blockstore_receiver, &btree); - let res = Self::handle_error(res, "solana-broadcaster-record"); - if let Some(res) = res { - return res; - } - }; - Builder::new() - .name("solBroadcastRec".to_string()) - .spawn(run_record) - .unwrap() - }) - .take(NUM_INSERT_THREADS), - ); + + // Blockstore::insert_threads() obtains and holds a write lock for the entire function. + // Until this changes, only a single inserter thread is necessary. + thread_hdls.push({ + let blockstore = blockstore.clone(); + let run_record = move || loop { + let res = broadcast_stage_run.record(&blockstore_receiver, &blockstore); + let res = Self::handle_error(res, "solana-broadcaster-record"); + if let Some(res) = res { + return res; + } + }; + Builder::new() + .name("solBroadcastRec".to_string()) + .spawn(run_record) + .unwrap() + }); + let retransmit_thread = Builder::new() .name("solBroadcastRtx".to_string()) .spawn(move || loop {