Skip to content

Commit

Permalink
no clone
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge committed Oct 2, 2024
1 parent 38ffc77 commit ff99ec8
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 114 deletions.
2 changes: 1 addition & 1 deletion core/tests/unified_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ fn test_scheduler_waited_by_drop_bank_service() {
// been started
let lock_to_stall = LOCK_TO_STALL.lock().unwrap();
pruned_bank
.schedule_transaction_executions([(&tx, &0)].into_iter())
.schedule_transaction_executions([(tx, 0)].into_iter())
.unwrap();
drop(pruned_bank);
assert_eq!(pool_raw.pooled_scheduler_count(), 0);
Expand Down
144 changes: 75 additions & 69 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ pub struct TransactionBatchWithIndexes<'a, 'b, Tx: SVMMessage> {
pub transaction_indexes: Vec<usize>,
}

// `TransactionBatchWithIndexes` but without the `Drop` that prevents
// us from nicely unwinding these with manual unlocking.
pub struct LockedTransactionsWithIndexes<Tx: SVMMessage> {
lock_results: Vec<Result<()>>,
transactions: Vec<Tx>,
starting_index: usize,
}

struct ReplayEntry {
entry: EntryType,
starting_index: usize,
Expand Down Expand Up @@ -360,18 +368,14 @@ fn execute_batches_internal(
fn process_batches(
bank: &BankWithScheduler,
replay_tx_thread_pool: &ThreadPool,
batches: &[TransactionBatchWithIndexes<SanitizedTransaction>],
locked_entries: &mut Vec<LockedTransactionsWithIndexes<SanitizedTransaction>>,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
batch_execution_timing: &mut BatchExecutionTiming,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> Result<()> {
if bank.has_installed_scheduler() {
debug!(
"process_batches()/schedule_batches_for_execution({} batches)",
batches.len()
);
// Scheduling usually succeeds (immediately returns `Ok(())`) here without being blocked on
// the actual transaction executions.
//
Expand All @@ -394,16 +398,16 @@ fn process_batches(
// a push based one from the unified scheduler to the replay stage to eliminate the current
// overhead: 1 read lock per batch in
// `BankWithScheduler::schedule_transaction_executions()`.
schedule_batches_for_execution(bank, batches)
schedule_batches_for_execution(bank, locked_entries)
} else {
debug!(
"process_batches()/rebatch_and_execute_batches({} batches)",
batches.len()
locked_entries.len()
);
rebatch_and_execute_batches(
bank,
replay_tx_thread_pool,
batches,
locked_entries,
transaction_status_sender,
replay_vote_sender,
batch_execution_timing,
Expand All @@ -415,18 +419,22 @@ fn process_batches(

fn schedule_batches_for_execution(
bank: &BankWithScheduler,
batches: &[TransactionBatchWithIndexes<SanitizedTransaction>],
locked_entries: &mut Vec<LockedTransactionsWithIndexes<SanitizedTransaction>>,
) -> Result<()> {
for TransactionBatchWithIndexes {
batch,
transaction_indexes,
} in batches
for LockedTransactionsWithIndexes {
lock_results,
transactions,
starting_index,
} in locked_entries.drain(..)
{
// unlock before sending to scheduler.
bank.unlock_accounts(transactions.iter().zip(lock_results.iter()));
// give ownership to scheduler
bank.schedule_transaction_executions(
batch
.sanitized_transactions()
.iter()
.zip(transaction_indexes.iter()),
transactions
.into_iter()
.enumerate()
.map(|(index, tx)| (tx, index + starting_index)),
)?;
}
Ok(())
Expand All @@ -444,7 +452,7 @@ fn rebatch_transactions<'a>(
let results = &lock_results[start..=end];
let mut tx_batch =
TransactionBatch::new(results.to_vec(), bank, OwnedOrBorrowed::Borrowed(txs));
tx_batch.set_needs_unlock(false);
tx_batch.set_needs_unlock(true); // unlock on drop for easier clean up

let transaction_indexes = transaction_indexes[start..=end].to_vec();
TransactionBatchWithIndexes {
Expand All @@ -456,29 +464,29 @@ fn rebatch_transactions<'a>(
fn rebatch_and_execute_batches(
bank: &Arc<Bank>,
replay_tx_thread_pool: &ThreadPool,
batches: &[TransactionBatchWithIndexes<SanitizedTransaction>],
locked_entries: &mut Vec<LockedTransactionsWithIndexes<SanitizedTransaction>>,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
timing: &mut BatchExecutionTiming,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> Result<()> {
if batches.is_empty() {
if locked_entries.is_empty() {
return Ok(());
}

let ((lock_results, sanitized_txs), transaction_indexes): ((Vec<_>, Vec<_>), Vec<_>) = batches
.iter()
.flat_map(|batch| {
batch
.batch
.lock_results()
.iter()
.cloned()
.zip(batch.batch.sanitized_transactions().to_vec())
.zip(batch.transaction_indexes.to_vec())
})
.unzip();
// Flatten the locked entries.
let ((lock_results, sanitized_txs), transaction_indexes): ((Vec<_>, Vec<_>), Vec<_>) =
locked_entries
.drain(..)
.flat_map(|locked_entry| {
locked_entry
.lock_results
.into_iter()
.zip(locked_entry.transactions)
.zip(locked_entry.starting_index..)
})
.unzip();

let mut minimal_tx_cost = u64::MAX;
let mut total_cost: u64 = 0;
Expand All @@ -496,7 +504,7 @@ fn rebatch_and_execute_batches(
let target_batch_count = get_thread_count() as u64;

let mut tx_batches: Vec<TransactionBatchWithIndexes<SanitizedTransaction>> = vec![];
let rebatched_txs = if total_cost > target_batch_count.saturating_mul(minimal_tx_cost) {
let rebatched_txs = {
let target_batch_cost = total_cost / target_batch_count;
let mut batch_cost: u64 = 0;
let mut slice_start = 0;
Expand All @@ -518,8 +526,6 @@ fn rebatch_and_execute_batches(
}
});
&tx_batches[..]
} else {
batches
};

let execute_batches_internal_metrics = execute_batches_internal(
Expand Down Expand Up @@ -561,7 +567,7 @@ pub fn process_entries_for_tests(

let mut entry_starting_index: usize = bank.transaction_count().try_into().unwrap();
let mut batch_timing = BatchExecutionTiming::default();
let mut replay_entries: Vec<_> = entry::verify_transactions(
let replay_entries: Vec<_> = entry::verify_transactions(
entries,
&replay_tx_thread_pool,
Arc::new(verify_transaction),
Expand All @@ -583,7 +589,7 @@ pub fn process_entries_for_tests(
let result = process_entries(
bank,
&replay_tx_thread_pool,
&mut replay_entries,
replay_entries,
transaction_status_sender,
replay_vote_sender,
&mut batch_timing,
Expand All @@ -598,7 +604,7 @@ pub fn process_entries_for_tests(
fn process_entries(
bank: &BankWithScheduler,
replay_tx_thread_pool: &ThreadPool,
entries: &mut [ReplayEntry],
entries: Vec<ReplayEntry>,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
batch_timing: &mut BatchExecutionTiming,
Expand All @@ -624,7 +630,7 @@ fn process_entries(
process_batches(
bank,
replay_tx_thread_pool,
&batches,
&mut batches,
transaction_status_sender,
replay_vote_sender,
batch_timing,
Expand All @@ -639,7 +645,6 @@ fn process_entries(
}
}
EntryType::Transactions(transactions) => {
let starting_index = *starting_index;
queue_batches_with_lock_retry(
bank,
starting_index,
Expand All @@ -664,15 +669,15 @@ fn process_entries(
process_batches(
bank,
replay_tx_thread_pool,
&batches,
&mut batches,
transaction_status_sender,
replay_vote_sender,
batch_timing,
log_messages_bytes_limit,
prioritization_fee_cache,
)?;
for hash in tick_hashes {
bank.register_tick(hash);
bank.register_tick(&hash);
}
Ok(())
}
Expand All @@ -683,30 +688,30 @@ fn process_entries(
/// The locking process is retried, and if it fails again the block is marked
/// as dead.
/// If the lock retry succeeds, them the batch is pushed into `batches`.
fn queue_batches_with_lock_retry<'a>(
bank: &'a Bank,
fn queue_batches_with_lock_retry(
bank: &Bank,
starting_index: usize,
txs: &'a [SanitizedTransaction],
batches: &mut Vec<TransactionBatchWithIndexes<'a, 'a, SanitizedTransaction>>,
transactions: Vec<SanitizedTransaction>,
batches: &mut Vec<LockedTransactionsWithIndexes<SanitizedTransaction>>,
mut process_batches: impl FnMut(
&mut Vec<TransactionBatchWithIndexes<'a, 'a, SanitizedTransaction>>,
&mut Vec<LockedTransactionsWithIndexes<SanitizedTransaction>>,
) -> Result<()>,
) -> Result<()> {
// try to lock the accounts
let lock_results = bank.try_lock_accounts(txs);
let lock_results = bank.try_lock_accounts(&transactions);
let first_lock_err = first_err(&lock_results);
if first_lock_err.is_ok() {
batches.push(TransactionBatchWithIndexes {
batch: TransactionBatch::new(lock_results, bank, OwnedOrBorrowed::Borrowed(txs)),
transaction_indexes: (starting_index..starting_index.saturating_add(txs.len()))
.collect(),
batches.push(LockedTransactionsWithIndexes {
lock_results,
transactions,
starting_index,
});
return Ok(());
}

// We need to unlock the transactions that succeeded to lock before the
// retry.
bank.unlock_accounts(txs.iter().zip(lock_results.iter()));
bank.unlock_accounts(transactions.iter().zip(lock_results.iter()));

// We failed to lock, there are 2 possible reasons:
// 1. A batch already in `batches` holds the lock.
Expand All @@ -715,16 +720,15 @@ fn queue_batches_with_lock_retry<'a>(
// Use the callback to process batches, and clear them.
// Clearing the batches will `Drop` the batches which will unlock the accounts.
process_batches(batches)?;
batches.clear();

// Retry the lock
let lock_results = bank.try_lock_accounts(txs);
let lock_results = bank.try_lock_accounts(&transactions);
match first_err(&lock_results) {
Ok(_) => {
batches.push(TransactionBatchWithIndexes {
batch: TransactionBatch::new(lock_results, bank, OwnedOrBorrowed::Borrowed(txs)),
transaction_indexes: (starting_index..starting_index.saturating_add(txs.len()))
.collect(),
batches.push(LockedTransactionsWithIndexes {
lock_results,
transactions,
starting_index,
});
Ok(())
}
Expand All @@ -735,7 +739,9 @@ fn queue_batches_with_lock_retry<'a>(
"validator_process_entry_error",
(
"error",
format!("Lock accounts error, entry conflicts with itself, txs: {txs:?}"),
format!(
"Lock accounts error, entry conflicts with itself, txs: {transactions:?}"
),
String
)
);
Expand Down Expand Up @@ -1644,7 +1650,7 @@ fn confirm_slot_entries(
.expect("Transaction verification generates entries");

let mut replay_timer = Measure::start("replay_elapsed");
let mut replay_entries: Vec<_> = entries
let replay_entries: Vec<_> = entries
.into_iter()
.zip(entry_tx_starting_indexes)
.map(|(entry, tx_starting_index)| ReplayEntry {
Expand All @@ -1655,7 +1661,7 @@ fn confirm_slot_entries(
let process_result = process_entries(
bank,
replay_tx_thread_pool,
&mut replay_entries,
replay_entries,
transaction_status_sender,
replay_vote_sender,
batch_execute_timing,
Expand Down Expand Up @@ -4915,14 +4921,14 @@ pub mod tests {
mocked_scheduler
.expect_schedule_execution()
.times(txs.len())
.returning(|(_, _)| Ok(()));
.returning(|_, _| Ok(()));
} else {
// mocked_scheduler isn't async; so short-circuiting behavior is quite visible in that
// .times(1) is called instead of .times(txs.len()), not like the succeeding case
mocked_scheduler
.expect_schedule_execution()
.times(1)
.returning(|(_, _)| Err(SchedulerAborted));
.returning(|_, _| Err(SchedulerAborted));
mocked_scheduler
.expect_recover_error_after_abort()
.times(1)
Expand All @@ -4947,10 +4953,10 @@ pub mod tests {
});
let bank = BankWithScheduler::new(bank, Some(Box::new(mocked_scheduler)));

let batch = bank.prepare_sanitized_batch(&txs);
let batch_with_indexes = TransactionBatchWithIndexes {
batch,
transaction_indexes: (0..txs.len()).collect(),
let locked_entry = LockedTransactionsWithIndexes {
lock_results: vec![Ok(()); txs.len()],
transactions: txs,
starting_index: 0,
};

let replay_tx_thread_pool = create_thread_pool(1);
Expand All @@ -4959,7 +4965,7 @@ pub mod tests {
let result = process_batches(
&bank,
&replay_tx_thread_pool,
&[batch_with_indexes],
&mut vec![locked_entry],
None,
None,
&mut batch_execution_timing,
Expand Down
Loading

0 comments on commit ff99ec8

Please sign in to comment.