diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 34ec825e665a18..4dfc6bc8e5c733 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -51,6 +51,7 @@ use { time::{Duration, Instant}, }, transaction_scheduler::{ + prio_graph_scheduler::PrioGraphSchedulerConfig, receive_and_buffer::SanitizedTransactionReceiveAndBuffer, transaction_state_container::TransactionStateContainer, }, @@ -618,7 +619,11 @@ impl BankingStage { bank_forks.clone(), forwarder.is_some(), ); - let scheduler = PrioGraphScheduler::new(work_senders, finished_work_receiver); + let scheduler = PrioGraphScheduler::new( + work_senders, + finished_work_receiver, + PrioGraphSchedulerConfig::default(), + ); let scheduler_controller = SchedulerController::new( decision_maker.clone(), receive_and_buffer, diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 950f506fd51af4..8edebc1f80c200 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -41,19 +41,38 @@ type SchedulerPrioGraph = PrioGraph< fn(&TransactionPriorityId, &GraphNode) -> TransactionPriorityId, >; +pub(crate) struct PrioGraphSchedulerConfig { + pub max_scheduled_cus: u64, + pub max_transactions_per_scheduling_pass: usize, + pub look_ahead_window_size: usize, + pub target_transactions_per_batch: usize, +} + +impl Default for PrioGraphSchedulerConfig { + fn default() -> Self { + Self { + max_scheduled_cus: MAX_BLOCK_UNITS, + max_transactions_per_scheduling_pass: 100_000, + look_ahead_window_size: 2048, + target_transactions_per_batch: TARGET_NUM_TRANSACTIONS_PER_BATCH, + } + } +} + pub(crate) struct PrioGraphScheduler { in_flight_tracker: InFlightTracker, account_locks: ThreadAwareAccountLocks, consume_work_senders: Vec>>, finished_consume_work_receiver: Receiver>, - look_ahead_window_size: usize, prio_graph: SchedulerPrioGraph, + config: PrioGraphSchedulerConfig, } impl PrioGraphScheduler { pub(crate) fn new( consume_work_senders: Vec>>, finished_consume_work_receiver: Receiver>, + config: PrioGraphSchedulerConfig, ) -> Self { let num_threads = consume_work_senders.len(); Self { @@ -61,8 +80,8 @@ impl PrioGraphScheduler { account_locks: ThreadAwareAccountLocks::new(num_threads), consume_work_senders, finished_consume_work_receiver, - look_ahead_window_size: 2048, prio_graph: PrioGraph::new(passthrough_priority), + config, } } @@ -89,7 +108,7 @@ impl PrioGraphScheduler { pre_lock_filter: impl Fn(&Tx) -> bool, ) -> Result { let num_threads = self.consume_work_senders.len(); - let max_cu_per_thread = MAX_BLOCK_UNITS / num_threads as u64; + let max_cu_per_thread = self.config.max_scheduled_cus / num_threads as u64; let mut schedulable_threads = ThreadSet::any(num_threads); for thread_id in 0..num_threads { @@ -106,7 +125,7 @@ impl PrioGraphScheduler { }); } - let mut batches = Batches::new(num_threads); + let mut batches = Batches::new(num_threads, self.config.target_transactions_per_batch); // Some transactions may be unschedulable due to multi-thread conflicts. // These transactions cannot be scheduled until some conflicting work is completed. // However, the scheduler should not allow other transactions that conflict with @@ -118,7 +137,7 @@ impl PrioGraphScheduler { let mut num_filtered_out: usize = 0; let mut total_filter_time_us: u64 = 0; - let mut window_budget = self.look_ahead_window_size; + let mut window_budget = self.config.look_ahead_window_size; let mut chunked_pops = |container: &mut S, prio_graph: &mut PrioGraph<_, _, _, _>, window_budget: &mut usize| { @@ -170,13 +189,13 @@ impl PrioGraphScheduler { // Check transactions against filter, remove from container if it fails. chunked_pops(container, &mut self.prio_graph, &mut window_budget); - let mut unblock_this_batch = - Vec::with_capacity(self.consume_work_senders.len() * TARGET_NUM_TRANSACTIONS_PER_BATCH); - const MAX_TRANSACTIONS_PER_SCHEDULING_PASS: usize = 100_000; + let mut unblock_this_batch = Vec::with_capacity( + self.consume_work_senders.len() * self.config.target_transactions_per_batch, + ); let mut num_scheduled: usize = 0; let mut num_sent: usize = 0; let mut num_unschedulable: usize = 0; - while num_scheduled < MAX_TRANSACTIONS_PER_SCHEDULING_PASS { + while num_scheduled < self.config.max_transactions_per_scheduling_pass { // If nothing is in the main-queue of the `PrioGraph` then there's nothing left to schedule. if self.prio_graph.is_empty() { break; @@ -229,7 +248,8 @@ impl PrioGraphScheduler { saturating_add_assign!(batches.total_cus[thread_id], cost); // If target batch size is reached, send only this batch. - if batches.ids[thread_id].len() >= TARGET_NUM_TRANSACTIONS_PER_BATCH { + if batches.ids[thread_id].len() >= self.config.target_transactions_per_batch + { saturating_add_assign!( num_sent, self.send_batch(&mut batches, thread_id)? @@ -248,7 +268,7 @@ impl PrioGraphScheduler { } } - if num_scheduled >= MAX_TRANSACTIONS_PER_SCHEDULING_PASS { + if num_scheduled >= self.config.max_transactions_per_scheduling_pass { break; } } @@ -408,7 +428,8 @@ impl PrioGraphScheduler { return Ok(0); } - let (ids, transactions, max_ages, total_cus) = batches.take_batch(thread_index); + let (ids, transactions, max_ages, total_cus) = + batches.take_batch(thread_index, self.config.target_transactions_per_batch); let batch_id = self .in_flight_tracker @@ -498,14 +519,14 @@ struct Batches { } impl Batches { - fn new(num_threads: usize) -> Self { + fn new(num_threads: usize, target_num_transactions_per_batch: usize) -> Self { Self { - ids: vec![Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH); num_threads], + ids: vec![Vec::with_capacity(target_num_transactions_per_batch); num_threads], transactions: (0..num_threads) - .map(|_| Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH)) + .map(|_| Vec::with_capacity(target_num_transactions_per_batch)) .collect(), - max_ages: vec![Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH); num_threads], + max_ages: vec![Vec::with_capacity(target_num_transactions_per_batch); num_threads], total_cus: vec![0; num_threads], } } @@ -513,19 +534,20 @@ impl Batches { fn take_batch( &mut self, thread_id: ThreadId, + target_num_transactions_per_batch: usize, ) -> (Vec, Vec, Vec, u64) { ( core::mem::replace( &mut self.ids[thread_id], - Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH), + Vec::with_capacity(target_num_transactions_per_batch), ), core::mem::replace( &mut self.transactions[thread_id], - Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH), + Vec::with_capacity(target_num_transactions_per_batch), ), core::mem::replace( &mut self.max_ages[thread_id], - Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH), + Vec::with_capacity(target_num_transactions_per_batch), ), core::mem::replace(&mut self.total_cus[thread_id], 0), ) @@ -605,7 +627,6 @@ mod tests { use { super::*, crate::banking_stage::{ - consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, immutable_deserialized_packet::ImmutableDeserializedPacket, transaction_scheduler::transaction_state_container::TransactionStateContainer, }, @@ -637,8 +658,11 @@ mod tests { let (consume_work_senders, consume_work_receivers) = (0..num_threads).map(|_| unbounded()).unzip(); let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded(); - let scheduler = - PrioGraphScheduler::new(consume_work_senders, finished_consume_work_receiver); + let scheduler = PrioGraphScheduler::new( + consume_work_senders, + finished_consume_work_receiver, + PrioGraphSchedulerConfig::default(), + ); ( scheduler, consume_work_receivers, @@ -821,7 +845,7 @@ mod tests { fn test_schedule_priority_guard() { let (mut scheduler, work_receivers, finished_work_sender) = create_test_frame(2); // intentionally shorten the look-ahead window to cause unschedulable conflicts - scheduler.look_ahead_window_size = 2; + scheduler.config.look_ahead_window_size = 2; let accounts = (0..8).map(|_| Keypair::new()).collect_vec(); let mut container = create_container([ diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 17012db79257a2..0a7bcf34fc0a01 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -441,7 +441,10 @@ mod tests { packet_deserializer::PacketDeserializer, scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId}, tests::create_slow_genesis_config, - transaction_scheduler::receive_and_buffer::SanitizedTransactionReceiveAndBuffer, + transaction_scheduler::{ + prio_graph_scheduler::PrioGraphSchedulerConfig, + receive_and_buffer::SanitizedTransactionReceiveAndBuffer, + }, }, banking_trace::BankingPacketBatch, }, @@ -549,11 +552,16 @@ mod tests { false, ); + let scheduler = PrioGraphScheduler::new( + consume_work_senders, + finished_consume_work_receiver, + PrioGraphSchedulerConfig::default(), + ); let scheduler_controller = SchedulerController::new( decision_maker, receive_and_buffer, bank_forks, - PrioGraphScheduler::new(consume_work_senders, finished_consume_work_receiver), + scheduler, vec![], // no actual workers with metrics to report, this can be empty None, );