diff --git a/core/src/banking_stage/consume_worker.rs b/core/src/banking_stage/consume_worker.rs index cdb2ad2ea2ceed..aea50618a3f50f 100644 --- a/core/src/banking_stage/consume_worker.rs +++ b/core/src/banking_stage/consume_worker.rs @@ -2,7 +2,7 @@ use { super::{ consumer::{Consumer, ExecuteAndCommitTransactionsOutput, ProcessTransactionBatchOutput}, leader_slot_timing_metrics::LeaderExecuteAndCommitTimings, - scheduler_messages::{ConsumeWork, FinishedConsumeWork}, + scheduler_messages::ConsumeWork, }, crossbeam_channel::{Receiver, RecvError, SendError, Sender}, solana_measure::measure_us, @@ -26,13 +26,13 @@ pub enum ConsumeWorkerError<Tx> { #[error("Failed to receive work from scheduler: {0}")] Recv(#[from] RecvError), #[error("Failed to send finalized consume work to scheduler: {0}")] - Send(#[from] SendError<FinishedConsumeWork<Tx>>), + Send(#[from] SendError<ConsumeWork<Tx>>), } pub(crate) struct ConsumeWorker<Tx> { consume_receiver: Receiver<ConsumeWork<Tx>>, consumer: Consumer, - consumed_sender: Sender<FinishedConsumeWork<Tx>>, + consumed_sender: Sender<ConsumeWork<Tx>>, leader_bank_notifier: Arc<LeaderBankNotifier>, metrics: Arc<ConsumeWorkerMetrics>, @@ -43,7 +43,7 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> { id: u32, consume_receiver: Receiver<ConsumeWork<Tx>>, consumer: Consumer, - consumed_sender: Sender<FinishedConsumeWork<Tx>>, + consumed_sender: Sender<ConsumeWork<Tx>>, leader_bank_notifier: Arc<LeaderBankNotifier>, ) -> Self { Self { @@ -107,7 +107,7 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> { fn consume( &self, bank: &Arc<Bank>, - work: ConsumeWork<Tx>, + mut work: ConsumeWork<Tx>, ) -> Result<(), ConsumeWorkerError<Tx>> { let output = self.consumer.process_and_record_aged_transactions( bank, @@ -115,15 +115,18 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> { &work.max_ages, ); + // Copy retryable indexes back to return message + work.retryable_indexes.extend( + output + .execute_and_commit_transactions_output + .retryable_transaction_indexes + .iter() + .cloned(), + ); + self.metrics.update_for_consume(&output); self.metrics.has_data.store(true, Ordering::Relaxed); - - self.consumed_sender.send(FinishedConsumeWork { - work, - retryable_indexes: output - .execute_and_commit_transactions_output - .retryable_transaction_indexes, - })?; + self.consumed_sender.send(work)?; Ok(()) } @@ -143,9 +146,11 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> { } /// Send transactions back to scheduler as retryable. - fn retry(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> { - let retryable_indexes: Vec<_> = (0..work.transactions.len()).collect(); - let num_retryable = retryable_indexes.len(); + fn retry(&self, mut work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> { + // Set all as retryable + work.retryable_indexes.extend(0..work.transactions.len()); + + let num_retryable = work.retryable_indexes.len(); self.metrics .count_metrics .retryable_transaction_count @@ -155,10 +160,7 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> { .retryable_expired_bank_count .fetch_add(num_retryable, Ordering::Relaxed); self.metrics.has_data.store(true, Ordering::Relaxed); - self.consumed_sender.send(FinishedConsumeWork { - work, - retryable_indexes, - })?; + self.consumed_sender.send(work)?; Ok(()) } } @@ -809,7 +811,7 @@ mod tests { _replay_vote_receiver: ReplayVoteReceiver, consume_sender: Sender<ConsumeWork<RuntimeTransaction<SanitizedTransaction>>>, - consumed_receiver: Receiver<FinishedConsumeWork<RuntimeTransaction<SanitizedTransaction>>>, + consumed_receiver: Receiver<ConsumeWork<RuntimeTransaction<SanitizedTransaction>>>, } fn setup_test_frame() -> ( @@ -915,12 +917,13 @@ mod tests { ids: vec![id], transactions, max_ages: vec![max_age], + retryable_indexes: Vec::with_capacity(1), }; consume_sender.send(work).unwrap(); let consumed = consumed_receiver.recv().unwrap(); - assert_eq!(consumed.work.batch_id, bid); - assert_eq!(consumed.work.ids, vec![id]); - assert_eq!(consumed.work.max_ages, vec![max_age]); + assert_eq!(consumed.batch_id, bid); + assert_eq!(consumed.ids, vec![id]); + assert_eq!(consumed.max_ages, vec![max_age]); assert_eq!(consumed.retryable_indexes, vec![0]); drop(test_frame); @@ -964,12 +967,13 @@ mod tests { ids: vec![id], transactions, max_ages: vec![max_age], + retryable_indexes: Vec::with_capacity(1), }; consume_sender.send(work).unwrap(); let consumed = consumed_receiver.recv().unwrap(); - assert_eq!(consumed.work.batch_id, bid); - assert_eq!(consumed.work.ids, vec![id]); - assert_eq!(consumed.work.max_ages, vec![max_age]); + assert_eq!(consumed.batch_id, bid); + assert_eq!(consumed.ids, vec![id]); + assert_eq!(consumed.max_ages, vec![max_age]); assert_eq!(consumed.retryable_indexes, Vec::<usize>::new()); drop(test_frame); @@ -1015,13 +1019,14 @@ mod tests { ids: vec![id1, id2], transactions: txs, max_ages: vec![max_age, max_age], + retryable_indexes: Vec::with_capacity(2), }) .unwrap(); let consumed = consumed_receiver.recv().unwrap(); - assert_eq!(consumed.work.batch_id, bid); - assert_eq!(consumed.work.ids, vec![id1, id2]); - assert_eq!(consumed.work.max_ages, vec![max_age, max_age]); + assert_eq!(consumed.batch_id, bid); + assert_eq!(consumed.ids, vec![id1, id2]); + assert_eq!(consumed.max_ages, vec![max_age, max_age]); assert_eq!(consumed.retryable_indexes, vec![1]); // id2 is retryable since lock conflict drop(test_frame); @@ -1076,6 +1081,7 @@ mod tests { ids: vec![id1], transactions: txs1, max_ages: vec![max_age], + retryable_indexes: Vec::with_capacity(1), }) .unwrap(); @@ -1085,18 +1091,19 @@ mod tests { ids: vec![id2], transactions: txs2, max_ages: vec![max_age], + retryable_indexes: Vec::with_capacity(1), }) .unwrap(); let consumed = consumed_receiver.recv().unwrap(); - assert_eq!(consumed.work.batch_id, bid1); - assert_eq!(consumed.work.ids, vec![id1]); - assert_eq!(consumed.work.max_ages, vec![max_age]); + assert_eq!(consumed.batch_id, bid1); + assert_eq!(consumed.ids, vec![id1]); + assert_eq!(consumed.max_ages, vec![max_age]); assert_eq!(consumed.retryable_indexes, Vec::<usize>::new()); let consumed = consumed_receiver.recv().unwrap(); - assert_eq!(consumed.work.batch_id, bid2); - assert_eq!(consumed.work.ids, vec![id2]); - assert_eq!(consumed.work.max_ages, vec![max_age]); + assert_eq!(consumed.batch_id, bid2); + assert_eq!(consumed.ids, vec![id2]); + assert_eq!(consumed.max_ages, vec![max_age]); assert_eq!(consumed.retryable_indexes, Vec::<usize>::new()); drop(test_frame); @@ -1223,6 +1230,7 @@ mod tests { alt_invalidation_slot: bank.slot() + 1, }, ], + retryable_indexes: Vec::with_capacity(6), }) .unwrap(); diff --git a/core/src/banking_stage/scheduler_messages.rs b/core/src/banking_stage/scheduler_messages.rs index 1c7cf31592b791..53b84641f5e9aa 100644 --- a/core/src/banking_stage/scheduler_messages.rs +++ b/core/src/banking_stage/scheduler_messages.rs @@ -41,11 +41,9 @@ pub struct ConsumeWork<Tx> { pub ids: Vec<TransactionId>, pub transactions: Vec<Tx>, pub max_ages: Vec<MaxAge>, -} -/// Message: [Worker -> Scheduler] -/// Processed transactions. -pub struct FinishedConsumeWork<Tx> { - pub work: ConsumeWork<Tx>, + // Only set by consume worker after processing. + // This is pre-allocated by the scheduler, and is dropped + // by the scheduler upon completion. pub retryable_indexes: Vec<usize>, } diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 8edebc1f80c200..e4d4aa4d6aa972 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -8,9 +8,7 @@ use { crate::banking_stage::{ consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, read_write_account_set::ReadWriteAccountSet, - scheduler_messages::{ - ConsumeWork, FinishedConsumeWork, MaxAge, TransactionBatchId, TransactionId, - }, + scheduler_messages::{ConsumeWork, MaxAge, TransactionBatchId, TransactionId}, transaction_scheduler::{ transaction_priority_id::TransactionPriorityId, transaction_state::TransactionState, transaction_state_container::StateContainer, @@ -63,7 +61,7 @@ pub(crate) struct PrioGraphScheduler<Tx> { in_flight_tracker: InFlightTracker, account_locks: ThreadAwareAccountLocks, consume_work_senders: Vec<Sender<ConsumeWork<Tx>>>, - finished_consume_work_receiver: Receiver<FinishedConsumeWork<Tx>>, + finished_consume_work_receiver: Receiver<ConsumeWork<Tx>>, prio_graph: SchedulerPrioGraph, config: PrioGraphSchedulerConfig, } @@ -71,7 +69,7 @@ pub(crate) struct PrioGraphScheduler<Tx> { impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> { pub(crate) fn new( consume_work_senders: Vec<Sender<ConsumeWork<Tx>>>, - finished_consume_work_receiver: Receiver<FinishedConsumeWork<Tx>>, + finished_consume_work_receiver: Receiver<ConsumeWork<Tx>>, config: PrioGraphSchedulerConfig, ) -> Self { let num_threads = consume_work_senders.len(); @@ -344,14 +342,11 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> { container: &mut impl StateContainer<Tx>, ) -> Result<(usize, usize), SchedulerError> { match self.finished_consume_work_receiver.try_recv() { - Ok(FinishedConsumeWork { - work: - ConsumeWork { - batch_id, - ids, - transactions, - max_ages, - }, + Ok(ConsumeWork { + batch_id, + ids, + transactions, + max_ages, retryable_indexes, }) => { let num_transactions = ids.len(); @@ -436,11 +431,13 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> { .track_batch(ids.len(), total_cus, thread_index); let num_scheduled = ids.len(); + let retryable_indexes = Vec::with_capacity(transactions.len()); let work = ConsumeWork { batch_id, ids, transactions, max_ages, + retryable_indexes, }; self.consume_work_senders[thread_index] .send(work) @@ -653,7 +650,7 @@ mod tests { ) -> ( PrioGraphScheduler<RuntimeTransaction<SanitizedTransaction>>, Vec<Receiver<ConsumeWork<RuntimeTransaction<SanitizedTransaction>>>>, - Sender<FinishedConsumeWork<RuntimeTransaction<SanitizedTransaction>>>, + Sender<ConsumeWork<RuntimeTransaction<SanitizedTransaction>>>, ) { let (consume_work_senders, consume_work_receivers) = (0..num_threads).map(|_| unbounded()).unzip(); @@ -891,10 +888,7 @@ mod tests { // Complete batch on thread 0. Remaining txs can be scheduled onto thread 1 finished_work_sender - .send(FinishedConsumeWork { - work: thread_0_work.into_iter().next().unwrap(), - retryable_indexes: vec![], - }) + .send(thread_0_work.into_iter().next().unwrap()) .unwrap(); scheduler.receive_completed(&mut container).unwrap(); let scheduling_summary = scheduler diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 0a7bcf34fc0a01..5550b06e9c2687 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -439,7 +439,7 @@ mod tests { banking_stage::{ consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, packet_deserializer::PacketDeserializer, - scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId}, + scheduler_messages::{ConsumeWork, TransactionBatchId}, tests::create_slow_genesis_config, transaction_scheduler::{ prio_graph_scheduler::PrioGraphSchedulerConfig, @@ -492,8 +492,7 @@ mod tests { consume_work_receivers: Vec<Receiver<ConsumeWork<RuntimeTransaction<SanitizedTransaction>>>>, - finished_consume_work_sender: - Sender<FinishedConsumeWork<RuntimeTransaction<SanitizedTransaction>>>, + finished_consume_work_sender: Sender<ConsumeWork<RuntimeTransaction<SanitizedTransaction>>>, } #[allow(clippy::type_complexity)] @@ -631,13 +630,11 @@ mod tests { } = &test_frame; finished_consume_work_sender - .send(FinishedConsumeWork { - work: ConsumeWork { - batch_id: TransactionBatchId::new(0), - ids: vec![], - transactions: vec![], - max_ages: vec![], - }, + .send(ConsumeWork { + batch_id: TransactionBatchId::new(0), + ids: vec![], + transactions: vec![], + max_ages: vec![], retryable_indexes: vec![], }) .unwrap(); @@ -937,7 +934,7 @@ mod tests { .unwrap(); test_receive_then_schedule(&mut scheduler_controller); - let consume_work = consume_work_receivers[0].try_recv().unwrap(); + let mut consume_work = consume_work_receivers[0].try_recv().unwrap(); assert_eq!(consume_work.ids.len(), 2); assert_eq!(consume_work.transactions.len(), 2); let message_hashes = consume_work @@ -948,12 +945,8 @@ mod tests { assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]); // Complete the batch - marking the second transaction as retryable - finished_consume_work_sender - .send(FinishedConsumeWork { - work: consume_work, - retryable_indexes: vec![1], - }) - .unwrap(); + consume_work.retryable_indexes.push(1); + finished_consume_work_sender.send(consume_work).unwrap(); // Transaction should be rescheduled test_receive_then_schedule(&mut scheduler_controller);