Skip to content

Commit

Permalink
Reduce to single blockstore inserter thread in BroadcastStage (#1473)
Browse files Browse the repository at this point in the history
The implementation of Blockstore::insert_shreds() holds a write lock for
the entirety of the function. Thus, there is no point in having more
than one thread in BroadcastStage insert shreds.
  • Loading branch information
steviez authored May 28, 2024
1 parent dfa48eb commit 2a5616f
Showing 1 changed file with 19 additions and 22 deletions.
41 changes: 19 additions & 22 deletions turbine/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use {
},
std::{
collections::{HashMap, HashSet},
iter::repeat_with,
net::{SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -56,7 +55,6 @@ mod standard_broadcast_run;
const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8;
const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5);

pub(crate) const NUM_INSERT_THREADS: usize = 2;
pub(crate) type RecordReceiver = Receiver<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>;
pub(crate) type TransmitReceiver = Receiver<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>;

Expand Down Expand Up @@ -281,7 +279,7 @@ impl BroadcastStage {
blockstore: Arc<Blockstore>,
bank_forks: Arc<RwLock<BankForks>>,
quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
broadcast_stage_run: impl BroadcastRun + Send + 'static + Clone,
mut broadcast_stage_run: impl BroadcastRun + Send + 'static + Clone,
) -> Self {
let (socket_sender, socket_receiver) = unbounded();
let (blockstore_sender, blockstore_receiver) = unbounded();
Expand Down Expand Up @@ -331,25 +329,24 @@ impl BroadcastStage {
.spawn(run_transmit)
.unwrap()
}));
thread_hdls.extend(
repeat_with(|| {
let blockstore_receiver = blockstore_receiver.clone();
let mut bs_record = broadcast_stage_run.clone();
let btree = blockstore.clone();
let run_record = move || loop {
let res = bs_record.record(&blockstore_receiver, &btree);
let res = Self::handle_error(res, "solana-broadcaster-record");
if let Some(res) = res {
return res;
}
};
Builder::new()
.name("solBroadcastRec".to_string())
.spawn(run_record)
.unwrap()
})
.take(NUM_INSERT_THREADS),
);

// Blockstore::insert_threads() obtains and holds a write lock for the entire function.
// Until this changes, only a single inserter thread is necessary.
thread_hdls.push({
let blockstore = blockstore.clone();
let run_record = move || loop {
let res = broadcast_stage_run.record(&blockstore_receiver, &blockstore);
let res = Self::handle_error(res, "solana-broadcaster-record");
if let Some(res) = res {
return res;
}
};
Builder::new()
.name("solBroadcastRec".to_string())
.spawn(run_record)
.unwrap()
});

let retransmit_thread = Builder::new()
.name("solBroadcastRtx".to_string())
.spawn(move || loop {
Expand Down

0 comments on commit 2a5616f

Please sign in to comment.