Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support tx poh recording in unified scheduler #4150

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ledger/benches/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ fn bench_execute_batch(
&mut timing,
None,
&prioritization_fee_cache,
None::<fn() -> Option<Option<usize>>>,
);
}
});
Expand Down
155 changes: 132 additions & 23 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,25 +110,30 @@ fn first_err(results: &[Result<()>]) -> Result<()> {
fn get_first_error(
batch: &TransactionBatch<impl SVMTransaction>,
commit_results: &[TransactionCommitResult],
is_block_producing_unified_scheduler: bool,
) -> Option<(Result<()>, Signature)> {
let mut first_err = None;
for (commit_result, transaction) in commit_results.iter().zip(batch.sanitized_transactions()) {
if let Err(err) = commit_result {
if first_err.is_none() {
first_err = Some((Err(err.clone()), *transaction.signature()));
}
warn!(
"Unexpected validator error: {:?}, transaction: {:?}",
err, transaction
);
datapoint_error!(
"validator_process_entry_error",
(
"error",
format!("error: {err:?}, transaction: {transaction:?}"),
String
)
);
// Skip with block producing unified scheduler because it's quite common to observe
// transaction errors...
if !is_block_producing_unified_scheduler {
warn!(
"Unexpected validator error: {:?}, transaction: {:?}",
err, transaction
);
datapoint_error!(
"validator_process_entry_error",
(
"error",
format!("error: {err:?}, transaction: {transaction:?}"),
String
)
);
}
}
}
first_err
Expand All @@ -150,12 +155,14 @@ pub fn execute_batch(
timings: &mut ExecuteTimings,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
pre_commit_callback: Option<impl FnOnce() -> Option<Option<usize>>>,
) -> Result<()> {
let TransactionBatchWithIndexes {
batch,
transaction_indexes,
} = batch;
let record_token_balances = transaction_status_sender.is_some();
let mut transaction_indexes = transaction_indexes.to_vec();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: adding a clone here here

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we can remove this clone and simplify the callback interface.

What if the callback itself did an allocation (only if necessary) returning the transaction index in that case.

I also really don't like the Option<Option<usize>> that is there now because it hides the meaning. It's not clear from just this code that the outer option means that recording/pre-commit failed.

If I were to do this, I'd take one of these two approaches:

  1. Make the outer option of return value a Result<...,()>
  2. Simple enum type to more clearly represent meanings

Lean towards 1 since it's simpler, and not sure an additional enum would benefit too much here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: adding a clone here here

Wondering if we can remove this clone and simplify the callback interface.

What if the callback itself did an allocation (only if necessary) returning the transaction index in that case.

nice catch.. done: 08536f0
I think Cow is enough. I'd like to remain the closure agnostic from allocation at all for separation of concern.

I also really don't like the Option<Option<usize>> that is there now because it hides the meaning. It's not clear from just this code that the outer option means that recording/pre-commit failed.

If I were to do this, I'd take one of these two approaches:

1. Make the outer option of return value a Result<...,()>

2. Simple enum type to more clearly represent meanings

Lean towards 1 since it's simpler, and not sure an additional enum would benefit too much here.

this is done: 3b852f6


let mut mint_decimals: HashMap<Pubkey, u8> = HashMap::new();

Expand All @@ -165,14 +172,34 @@ pub fn execute_batch(
vec![]
};

let (commit_results, balances) = batch.bank().load_execute_and_commit_transactions(
batch,
MAX_PROCESSING_AGE,
transaction_status_sender.is_some(),
ExecutionRecordingConfig::new_single_setting(transaction_status_sender.is_some()),
timings,
log_messages_bytes_limit,
);
let is_block_producing_unified_scheduler = pre_commit_callback.is_some();
let pre_commit_callback = pre_commit_callback.map(|wrapped_callback| {
|| {
wrapped_callback()
.inspect(|&maybe_index| {
if let Some(index) = maybe_index {
assert!(transaction_indexes.is_empty());
transaction_indexes.push(index);
}
})
.is_some()
}
});

let Some((commit_results, balances)) = batch
.bank()
.load_execute_and_commit_transactions_with_pre_commit_callback(
batch,
MAX_PROCESSING_AGE,
transaction_status_sender.is_some(),
ExecutionRecordingConfig::new_single_setting(transaction_status_sender.is_some()),
timings,
log_messages_bytes_limit,
pre_commit_callback,
)
else {
return Err(TransactionError::CommitFailed);
};

bank_utils::find_and_send_votes(
batch.sanitized_transactions(),
Expand Down Expand Up @@ -201,7 +228,7 @@ pub fn execute_batch(
.filter_map(|(commit_result, tx)| commit_result.was_committed().then_some(tx))
.collect_vec();

let first_err = get_first_error(batch, &commit_results);
let first_err = get_first_error(batch, &commit_results, is_block_producing_unified_scheduler);

if let Some(transaction_status_sender) = transaction_status_sender {
let transactions: Vec<SanitizedTransaction> = batch
Expand All @@ -224,7 +251,7 @@ pub fn execute_batch(
commit_results,
balances,
token_balances,
transaction_indexes.to_vec(),
transaction_indexes,
);
}

Expand Down Expand Up @@ -322,6 +349,7 @@ fn execute_batches_internal(
&mut timings,
log_messages_bytes_limit,
prioritization_fee_cache,
None::<fn() -> Option<Option<usize>>>,
));

let thread_index = replay_tx_thread_pool.current_thread_index().unwrap();
Expand Down Expand Up @@ -2210,11 +2238,13 @@ pub fn process_single_slot(
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum TransactionStatusMessage {
Batch(TransactionStatusBatch),
Freeze(Slot),
}

#[derive(Debug)]
pub struct TransactionStatusBatch {
pub slot: Slot,
pub transactions: Vec<SanitizedTransaction>,
Expand Down Expand Up @@ -4433,7 +4463,7 @@ pub mod tests {
&mut ExecuteTimings::default(),
None,
);
let (err, signature) = get_first_error(&batch, &commit_results).unwrap();
let (err, signature) = get_first_error(&batch, &commit_results, false).unwrap();
assert_eq!(err.unwrap_err(), TransactionError::AccountNotFound);
assert_eq!(signature, account_not_found_sig);
}
Expand Down Expand Up @@ -5082,6 +5112,85 @@ pub mod tests {
do_test_schedule_batches_for_execution(false);
}

fn do_test_execute_batch_pre_commit_callback(poh_result: Option<Option<usize>>) {
solana_logger::setup();
let dummy_leader_pubkey = solana_sdk::pubkey::new_rand();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100);
let bank = Bank::new_for_tests(&genesis_config);
let (bank, _bank_forks) = bank.wrap_with_bank_forks_for_tests();
let bank = Arc::new(bank);
let txs = create_test_transactions(&mint_keypair, &genesis_config.hash());
let mut batch = TransactionBatch::new(
vec![Ok(()); 1],
&bank,
OwnedOrBorrowed::Borrowed(&txs[0..1]),
);
batch.set_needs_unlock(false);
let poh_with_index = matches!(poh_result, Some(Some(_)));
let transaction_indexes = if poh_with_index { vec![] } else { vec![3] };
let batch = TransactionBatchWithIndexes {
batch,
transaction_indexes,
};
let prioritization_fee_cache = PrioritizationFeeCache::default();
let mut timing = ExecuteTimings::default();
let (sender, receiver) = crossbeam_channel::unbounded();

assert_eq!(bank.transaction_count(), 0);
let result = execute_batch(
&batch,
&bank,
Some(&TransactionStatusSender { sender }),
None,
&mut timing,
None,
&prioritization_fee_cache,
Some(|| poh_result),
);
let should_succeed = poh_result.is_some();
if should_succeed {
assert_matches!(result, Ok(()));
assert_eq!(bank.transaction_count(), 1);
} else {
assert_matches!(result, Err(TransactionError::CommitFailed));
assert_eq!(bank.transaction_count(), 0);
}
if poh_with_index {
assert_matches!(
receiver.try_recv(),
Ok(TransactionStatusMessage::Batch(TransactionStatusBatch{transaction_indexes, ..}))
if transaction_indexes == vec![4_usize]
);
} else if should_succeed {
assert_matches!(
receiver.try_recv(),
Ok(TransactionStatusMessage::Batch(TransactionStatusBatch{transaction_indexes, ..}))
if transaction_indexes == vec![3_usize]
);
} else {
assert_matches!(receiver.try_recv(), Err(_));
}
}

#[test]
fn test_execute_batch_pre_commit_callback_success() {
do_test_execute_batch_pre_commit_callback(Some(None));
}

#[test]
fn test_execute_batch_pre_commit_callback_success_with_index() {
do_test_execute_batch_pre_commit_callback(Some(Some(4)));
}

#[test]
fn test_execute_batch_pre_commit_callback_failure() {
do_test_execute_batch_pre_commit_callback(None);
}

#[test]
fn test_confirm_slot_entries_with_fix() {
const HASHES_PER_TICK: u64 = 10;
Expand Down
38 changes: 35 additions & 3 deletions poh/src/poh_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ pub struct RecordTransactionsSummary {
pub starting_transaction_index: Option<usize>,
}

#[derive(Clone)]
#[derive(Clone, Debug)]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need Debug on all of this? I can't imagine anyone has ever used it for the massive scheduling and bank structures. I'd sooner remove it from all of that than let it spread further, wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, i think Debug is handy for quick debugging and actually required by assert_matches!()...

pub struct TransactionRecorder {
// shared by all users of PohRecorder
pub record_sender: Sender<Record>,
Expand Down Expand Up @@ -1144,11 +1144,12 @@ impl PohRecorder {
}
}

pub fn create_test_recorder(
fn do_create_test_recorder(
bank: Arc<Bank>,
blockstore: Arc<Blockstore>,
poh_config: Option<PohConfig>,
leader_schedule_cache: Option<Arc<LeaderScheduleCache>>,
track_transaction_indexes: bool,
) -> (
Arc<AtomicBool>,
Arc<RwLock<PohRecorder>>,
Expand All @@ -1174,7 +1175,10 @@ pub fn create_test_recorder(
);
let ticks_per_slot = bank.ticks_per_slot();

poh_recorder.set_bank(BankWithScheduler::new_without_scheduler(bank), false);
poh_recorder.set_bank(
BankWithScheduler::new_without_scheduler(bank),
track_transaction_indexes,
);
let poh_recorder = Arc::new(RwLock::new(poh_recorder));
let poh_service = PohService::new(
poh_recorder.clone(),
Expand All @@ -1189,6 +1193,34 @@ pub fn create_test_recorder(
(exit, poh_recorder, poh_service, entry_receiver)
}

pub fn create_test_recorder(
bank: Arc<Bank>,
blockstore: Arc<Blockstore>,
poh_config: Option<PohConfig>,
leader_schedule_cache: Option<Arc<LeaderScheduleCache>>,
) -> (
Arc<AtomicBool>,
Arc<RwLock<PohRecorder>>,
PohService,
Receiver<WorkingBankEntry>,
) {
do_create_test_recorder(bank, blockstore, poh_config, leader_schedule_cache, false)
}

pub fn create_test_recorder_with_index_tracking(
bank: Arc<Bank>,
blockstore: Arc<Blockstore>,
poh_config: Option<PohConfig>,
leader_schedule_cache: Option<Arc<LeaderScheduleCache>>,
) -> (
Arc<AtomicBool>,
Arc<RwLock<PohRecorder>>,
PohService,
Receiver<WorkingBankEntry>,
) {
do_create_test_recorder(bank, blockstore, poh_config, leader_schedule_cache, true)
}

#[cfg(test)]
mod tests {
use {
Expand Down
2 changes: 2 additions & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ solana-svm-transaction = { workspace = true }
solana-system-program = { workspace = true, optional = true }
solana-timings = { workspace = true }
solana-transaction-status-client-types = { workspace = true }
solana-unified-scheduler-logic = { workspace = true }
solana-version = { workspace = true }
solana-vote = { workspace = true }
solana-vote-program = { workspace = true }
Expand Down
Loading
Loading