diff --git a/turbine/src/broadcast_stage.rs b/turbine/src/broadcast_stage.rs index a63ad586242cae..c13d72ee46159a 100644 --- a/turbine/src/broadcast_stage.rs +++ b/turbine/src/broadcast_stage.rs @@ -33,7 +33,6 @@ use { }, std::{ collections::{HashMap, HashSet}, - iter::repeat_with, net::{SocketAddr, UdpSocket}, sync::{ atomic::{AtomicBool, Ordering}, @@ -56,7 +55,6 @@ mod standard_broadcast_run; const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8; const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); -pub(crate) const NUM_INSERT_THREADS: usize = 2; pub(crate) type RecordReceiver = Receiver<(Arc>, Option)>; pub(crate) type TransmitReceiver = Receiver<(Arc>, Option)>; @@ -281,7 +279,7 @@ impl BroadcastStage { blockstore: Arc, bank_forks: Arc>, quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>, - broadcast_stage_run: impl BroadcastRun + Send + 'static + Clone, + mut broadcast_stage_run: impl BroadcastRun + Send + 'static + Clone, ) -> Self { let (socket_sender, socket_receiver) = unbounded(); let (blockstore_sender, blockstore_receiver) = unbounded(); @@ -331,25 +329,24 @@ impl BroadcastStage { .spawn(run_transmit) .unwrap() })); - thread_hdls.extend( - repeat_with(|| { - let blockstore_receiver = blockstore_receiver.clone(); - let mut bs_record = broadcast_stage_run.clone(); - let btree = blockstore.clone(); - let run_record = move || loop { - let res = bs_record.record(&blockstore_receiver, &btree); - let res = Self::handle_error(res, "solana-broadcaster-record"); - if let Some(res) = res { - return res; - } - }; - Builder::new() - .name("solBroadcastRec".to_string()) - .spawn(run_record) - .unwrap() - }) - .take(NUM_INSERT_THREADS), - ); + + // Blockstore::insert_threads() obtains and holds a write lock for the entire function. + // Until this changes, only a single inserter thread is necessary. + thread_hdls.push({ + let blockstore = blockstore.clone(); + let run_record = move || loop { + let res = broadcast_stage_run.record(&blockstore_receiver, &blockstore); + let res = Self::handle_error(res, "solana-broadcaster-record"); + if let Some(res) = res { + return res; + } + }; + Builder::new() + .name("solBroadcastRec".to_string()) + .spawn(run_record) + .unwrap() + }); + let retransmit_thread = Builder::new() .name("solBroadcastRtx".to_string()) .spawn(move || loop {