Skip to content

Commit

Permalink
retryable_indexes allocated and dropped by scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge committed Dec 17, 2024
1 parent 54a73ce commit 0899b46
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 75 deletions.
78 changes: 43 additions & 35 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
super::{
consumer::{Consumer, ExecuteAndCommitTransactionsOutput, ProcessTransactionBatchOutput},
leader_slot_timing_metrics::LeaderExecuteAndCommitTimings,
scheduler_messages::{ConsumeWork, FinishedConsumeWork},
scheduler_messages::ConsumeWork,
},
crossbeam_channel::{Receiver, RecvError, SendError, Sender},
solana_measure::measure_us,
Expand All @@ -26,13 +26,13 @@ pub enum ConsumeWorkerError<Tx> {
#[error("Failed to receive work from scheduler: {0}")]
Recv(#[from] RecvError),
#[error("Failed to send finalized consume work to scheduler: {0}")]
Send(#[from] SendError<FinishedConsumeWork<Tx>>),
Send(#[from] SendError<ConsumeWork<Tx>>),
}

pub(crate) struct ConsumeWorker<Tx> {
consume_receiver: Receiver<ConsumeWork<Tx>>,
consumer: Consumer,
consumed_sender: Sender<FinishedConsumeWork<Tx>>,
consumed_sender: Sender<ConsumeWork<Tx>>,

leader_bank_notifier: Arc<LeaderBankNotifier>,
metrics: Arc<ConsumeWorkerMetrics>,
Expand All @@ -43,7 +43,7 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
id: u32,
consume_receiver: Receiver<ConsumeWork<Tx>>,
consumer: Consumer,
consumed_sender: Sender<FinishedConsumeWork<Tx>>,
consumed_sender: Sender<ConsumeWork<Tx>>,
leader_bank_notifier: Arc<LeaderBankNotifier>,
) -> Self {
Self {
Expand Down Expand Up @@ -107,23 +107,26 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
fn consume(
&self,
bank: &Arc<Bank>,
work: ConsumeWork<Tx>,
mut work: ConsumeWork<Tx>,
) -> Result<(), ConsumeWorkerError<Tx>> {
let output = self.consumer.process_and_record_aged_transactions(
bank,
&work.transactions,
&work.max_ages,
);

// Copy retryable indexes back to return message
work.retryable_indexes.extend(
output
.execute_and_commit_transactions_output
.retryable_transaction_indexes
.iter()
.cloned(),
);

self.metrics.update_for_consume(&output);
self.metrics.has_data.store(true, Ordering::Relaxed);

self.consumed_sender.send(FinishedConsumeWork {
work,
retryable_indexes: output
.execute_and_commit_transactions_output
.retryable_transaction_indexes,
})?;
self.consumed_sender.send(work)?;
Ok(())
}

Expand All @@ -143,9 +146,11 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
}

/// Send transactions back to scheduler as retryable.
fn retry(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
let retryable_indexes: Vec<_> = (0..work.transactions.len()).collect();
let num_retryable = retryable_indexes.len();
fn retry(&self, mut work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
// Set all as retryable
work.retryable_indexes.extend(0..work.transactions.len());

let num_retryable = work.retryable_indexes.len();
self.metrics
.count_metrics
.retryable_transaction_count
Expand All @@ -155,10 +160,7 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
.retryable_expired_bank_count
.fetch_add(num_retryable, Ordering::Relaxed);
self.metrics.has_data.store(true, Ordering::Relaxed);
self.consumed_sender.send(FinishedConsumeWork {
work,
retryable_indexes,
})?;
self.consumed_sender.send(work)?;
Ok(())
}
}
Expand Down Expand Up @@ -809,7 +811,7 @@ mod tests {
_replay_vote_receiver: ReplayVoteReceiver,

consume_sender: Sender<ConsumeWork<RuntimeTransaction<SanitizedTransaction>>>,
consumed_receiver: Receiver<FinishedConsumeWork<RuntimeTransaction<SanitizedTransaction>>>,
consumed_receiver: Receiver<ConsumeWork<RuntimeTransaction<SanitizedTransaction>>>,
}

fn setup_test_frame() -> (
Expand Down Expand Up @@ -915,12 +917,13 @@ mod tests {
ids: vec![id],
transactions,
max_ages: vec![max_age],
retryable_indexes: Vec::with_capacity(1),
};
consume_sender.send(work).unwrap();
let consumed = consumed_receiver.recv().unwrap();
assert_eq!(consumed.work.batch_id, bid);
assert_eq!(consumed.work.ids, vec![id]);
assert_eq!(consumed.work.max_ages, vec![max_age]);
assert_eq!(consumed.batch_id, bid);
assert_eq!(consumed.ids, vec![id]);
assert_eq!(consumed.max_ages, vec![max_age]);
assert_eq!(consumed.retryable_indexes, vec![0]);

drop(test_frame);
Expand Down Expand Up @@ -964,12 +967,13 @@ mod tests {
ids: vec![id],
transactions,
max_ages: vec![max_age],
retryable_indexes: Vec::with_capacity(1),
};
consume_sender.send(work).unwrap();
let consumed = consumed_receiver.recv().unwrap();
assert_eq!(consumed.work.batch_id, bid);
assert_eq!(consumed.work.ids, vec![id]);
assert_eq!(consumed.work.max_ages, vec![max_age]);
assert_eq!(consumed.batch_id, bid);
assert_eq!(consumed.ids, vec![id]);
assert_eq!(consumed.max_ages, vec![max_age]);
assert_eq!(consumed.retryable_indexes, Vec::<usize>::new());

drop(test_frame);
Expand Down Expand Up @@ -1015,13 +1019,14 @@ mod tests {
ids: vec![id1, id2],
transactions: txs,
max_ages: vec![max_age, max_age],
retryable_indexes: Vec::with_capacity(2),
})
.unwrap();

let consumed = consumed_receiver.recv().unwrap();
assert_eq!(consumed.work.batch_id, bid);
assert_eq!(consumed.work.ids, vec![id1, id2]);
assert_eq!(consumed.work.max_ages, vec![max_age, max_age]);
assert_eq!(consumed.batch_id, bid);
assert_eq!(consumed.ids, vec![id1, id2]);
assert_eq!(consumed.max_ages, vec![max_age, max_age]);
assert_eq!(consumed.retryable_indexes, vec![1]); // id2 is retryable since lock conflict

drop(test_frame);
Expand Down Expand Up @@ -1076,6 +1081,7 @@ mod tests {
ids: vec![id1],
transactions: txs1,
max_ages: vec![max_age],
retryable_indexes: Vec::with_capacity(1),
})
.unwrap();

Expand All @@ -1085,18 +1091,19 @@ mod tests {
ids: vec![id2],
transactions: txs2,
max_ages: vec![max_age],
retryable_indexes: Vec::with_capacity(1),
})
.unwrap();
let consumed = consumed_receiver.recv().unwrap();
assert_eq!(consumed.work.batch_id, bid1);
assert_eq!(consumed.work.ids, vec![id1]);
assert_eq!(consumed.work.max_ages, vec![max_age]);
assert_eq!(consumed.batch_id, bid1);
assert_eq!(consumed.ids, vec![id1]);
assert_eq!(consumed.max_ages, vec![max_age]);
assert_eq!(consumed.retryable_indexes, Vec::<usize>::new());

let consumed = consumed_receiver.recv().unwrap();
assert_eq!(consumed.work.batch_id, bid2);
assert_eq!(consumed.work.ids, vec![id2]);
assert_eq!(consumed.work.max_ages, vec![max_age]);
assert_eq!(consumed.batch_id, bid2);
assert_eq!(consumed.ids, vec![id2]);
assert_eq!(consumed.max_ages, vec![max_age]);
assert_eq!(consumed.retryable_indexes, Vec::<usize>::new());

drop(test_frame);
Expand Down Expand Up @@ -1223,6 +1230,7 @@ mod tests {
alt_invalidation_slot: bank.slot() + 1,
},
],
retryable_indexes: Vec::with_capacity(6),
})
.unwrap();

Expand Down
8 changes: 3 additions & 5 deletions core/src/banking_stage/scheduler_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ pub struct ConsumeWork<Tx> {
pub ids: Vec<TransactionId>,
pub transactions: Vec<Tx>,
pub max_ages: Vec<MaxAge>,
}

/// Message: [Worker -> Scheduler]
/// Processed transactions.
pub struct FinishedConsumeWork<Tx> {
pub work: ConsumeWork<Tx>,
// Only set by consume worker after processing.
// This is pre-allocated by the scheduler, and is dropped
// by the scheduler upon completion.
pub retryable_indexes: Vec<usize>,
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use {
crate::banking_stage::{
consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH,
read_write_account_set::ReadWriteAccountSet,
scheduler_messages::{
ConsumeWork, FinishedConsumeWork, MaxAge, TransactionBatchId, TransactionId,
},
scheduler_messages::{ConsumeWork, MaxAge, TransactionBatchId, TransactionId},
transaction_scheduler::{
transaction_priority_id::TransactionPriorityId, transaction_state::TransactionState,
transaction_state_container::StateContainer,
Expand Down Expand Up @@ -63,15 +61,15 @@ pub(crate) struct PrioGraphScheduler<Tx> {
in_flight_tracker: InFlightTracker,
account_locks: ThreadAwareAccountLocks,
consume_work_senders: Vec<Sender<ConsumeWork<Tx>>>,
finished_consume_work_receiver: Receiver<FinishedConsumeWork<Tx>>,
finished_consume_work_receiver: Receiver<ConsumeWork<Tx>>,
prio_graph: SchedulerPrioGraph,
config: PrioGraphSchedulerConfig,
}

impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
pub(crate) fn new(
consume_work_senders: Vec<Sender<ConsumeWork<Tx>>>,
finished_consume_work_receiver: Receiver<FinishedConsumeWork<Tx>>,
finished_consume_work_receiver: Receiver<ConsumeWork<Tx>>,
config: PrioGraphSchedulerConfig,
) -> Self {
let num_threads = consume_work_senders.len();
Expand Down Expand Up @@ -344,14 +342,11 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
container: &mut impl StateContainer<Tx>,
) -> Result<(usize, usize), SchedulerError> {
match self.finished_consume_work_receiver.try_recv() {
Ok(FinishedConsumeWork {
work:
ConsumeWork {
batch_id,
ids,
transactions,
max_ages,
},
Ok(ConsumeWork {
batch_id,
ids,
transactions,
max_ages,
retryable_indexes,
}) => {
let num_transactions = ids.len();
Expand Down Expand Up @@ -436,11 +431,13 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
.track_batch(ids.len(), total_cus, thread_index);

let num_scheduled = ids.len();
let retryable_indexes = Vec::with_capacity(transactions.len());
let work = ConsumeWork {
batch_id,
ids,
transactions,
max_ages,
retryable_indexes,
};
self.consume_work_senders[thread_index]
.send(work)
Expand Down Expand Up @@ -653,7 +650,7 @@ mod tests {
) -> (
PrioGraphScheduler<RuntimeTransaction<SanitizedTransaction>>,
Vec<Receiver<ConsumeWork<RuntimeTransaction<SanitizedTransaction>>>>,
Sender<FinishedConsumeWork<RuntimeTransaction<SanitizedTransaction>>>,
Sender<ConsumeWork<RuntimeTransaction<SanitizedTransaction>>>,
) {
let (consume_work_senders, consume_work_receivers) =
(0..num_threads).map(|_| unbounded()).unzip();
Expand Down Expand Up @@ -891,10 +888,7 @@ mod tests {

// Complete batch on thread 0. Remaining txs can be scheduled onto thread 1
finished_work_sender
.send(FinishedConsumeWork {
work: thread_0_work.into_iter().next().unwrap(),
retryable_indexes: vec![],
})
.send(thread_0_work.into_iter().next().unwrap())
.unwrap();
scheduler.receive_completed(&mut container).unwrap();
let scheduling_summary = scheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ mod tests {
banking_stage::{
consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH,
packet_deserializer::PacketDeserializer,
scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId},
scheduler_messages::{ConsumeWork, TransactionBatchId},
tests::create_slow_genesis_config,
transaction_scheduler::{
prio_graph_scheduler::PrioGraphSchedulerConfig,
Expand Down Expand Up @@ -492,8 +492,7 @@ mod tests {

consume_work_receivers:
Vec<Receiver<ConsumeWork<RuntimeTransaction<SanitizedTransaction>>>>,
finished_consume_work_sender:
Sender<FinishedConsumeWork<RuntimeTransaction<SanitizedTransaction>>>,
finished_consume_work_sender: Sender<ConsumeWork<RuntimeTransaction<SanitizedTransaction>>>,
}

#[allow(clippy::type_complexity)]
Expand Down Expand Up @@ -631,13 +630,11 @@ mod tests {
} = &test_frame;

finished_consume_work_sender
.send(FinishedConsumeWork {
work: ConsumeWork {
batch_id: TransactionBatchId::new(0),
ids: vec![],
transactions: vec![],
max_ages: vec![],
},
.send(ConsumeWork {
batch_id: TransactionBatchId::new(0),
ids: vec![],
transactions: vec![],
max_ages: vec![],
retryable_indexes: vec![],
})
.unwrap();
Expand Down Expand Up @@ -937,7 +934,7 @@ mod tests {
.unwrap();

test_receive_then_schedule(&mut scheduler_controller);
let consume_work = consume_work_receivers[0].try_recv().unwrap();
let mut consume_work = consume_work_receivers[0].try_recv().unwrap();
assert_eq!(consume_work.ids.len(), 2);
assert_eq!(consume_work.transactions.len(), 2);
let message_hashes = consume_work
Expand All @@ -948,12 +945,8 @@ mod tests {
assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]);

// Complete the batch - marking the second transaction as retryable
finished_consume_work_sender
.send(FinishedConsumeWork {
work: consume_work,
retryable_indexes: vec![1],
})
.unwrap();
consume_work.retryable_indexes.push(1);
finished_consume_work_sender.send(consume_work).unwrap();

// Transaction should be rescheduled
test_receive_then_schedule(&mut scheduler_controller);
Expand Down

0 comments on commit 0899b46

Please sign in to comment.