Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support tx poh recording in unified scheduler #4150

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion ledger/benches/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use {
genesis_utils::{create_genesis_config, GenesisConfigInfo},
},
solana_runtime::{
bank::Bank,
bank::{Bank, PreCommitCallbackResult},
bank_forks::BankForks,
prioritization_fee_cache::PrioritizationFeeCache,
transaction_batch::{OwnedOrBorrowed, TransactionBatch},
Expand Down Expand Up @@ -162,6 +162,7 @@ fn bench_execute_batch(
&mut timing,
None,
&prioritization_fee_cache,
None::<fn() -> PreCommitCallbackResult<Option<usize>>>,
);
}
});
Expand Down
163 changes: 138 additions & 25 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use {
solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
solana_runtime::{
accounts_background_service::{AbsRequestSender, SnapshotRequestKind},
bank::{Bank, KeyedRewardsAndNumPartitions, TransactionBalancesSet},
bank::{
Bank, KeyedRewardsAndNumPartitions, PreCommitCallbackResult, TransactionBalancesSet,
},
bank_forks::{BankForks, SetRootError},
bank_utils,
commitment::VOTE_THRESHOLD_SIZE,
Expand Down Expand Up @@ -61,6 +63,7 @@ use {
solana_transaction_status::token_balances::TransactionTokenBalancesSet,
solana_vote::vote_account::VoteAccountsHashMap,
std::{
borrow::Cow,
collections::{HashMap, HashSet},
num::Saturating,
ops::{Index, Range},
Expand Down Expand Up @@ -110,25 +113,30 @@ fn first_err(results: &[Result<()>]) -> Result<()> {
fn get_first_error(
batch: &TransactionBatch<impl SVMTransaction>,
commit_results: &[TransactionCommitResult],
is_block_producing_unified_scheduler: bool,
) -> Option<(Result<()>, Signature)> {
let mut first_err = None;
for (commit_result, transaction) in commit_results.iter().zip(batch.sanitized_transactions()) {
if let Err(err) = commit_result {
if first_err.is_none() {
first_err = Some((Err(err.clone()), *transaction.signature()));
}
warn!(
"Unexpected validator error: {:?}, transaction: {:?}",
err, transaction
);
datapoint_error!(
"validator_process_entry_error",
(
"error",
format!("error: {err:?}, transaction: {transaction:?}"),
String
)
);
// Skip with block producing unified scheduler because it's quite common to observe
// transaction errors...
if !is_block_producing_unified_scheduler {
warn!(
"Unexpected validator error: {:?}, transaction: {:?}",
err, transaction
);
datapoint_error!(
"validator_process_entry_error",
(
"error",
format!("error: {err:?}, transaction: {transaction:?}"),
String
)
);
}
}
}
first_err
Expand All @@ -150,12 +158,16 @@ pub fn execute_batch(
timings: &mut ExecuteTimings,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
// None is meaningfully used to detect this is called from the block producing unified
// scheduler. If so, suppress too verbose logging for the code path.
pre_commit_callback: Option<impl FnOnce() -> PreCommitCallbackResult<Option<usize>>>,
) -> Result<()> {
let TransactionBatchWithIndexes {
batch,
transaction_indexes,
} = batch;
let record_token_balances = transaction_status_sender.is_some();
let mut transaction_indexes = Cow::from(transaction_indexes);

let mut mint_decimals: HashMap<Pubkey, u8> = HashMap::new();

Expand All @@ -165,14 +177,32 @@ pub fn execute_batch(
vec![]
};

let (commit_results, balances) = batch.bank().load_execute_and_commit_transactions(
batch,
MAX_PROCESSING_AGE,
transaction_status_sender.is_some(),
ExecutionRecordingConfig::new_single_setting(transaction_status_sender.is_some()),
timings,
log_messages_bytes_limit,
);
let is_block_producing_unified_scheduler = pre_commit_callback.is_some();
let pre_commit_callback = pre_commit_callback.map(|wrapped_callback| {
|| {
wrapped_callback().map(|maybe_index| {
assert!(transaction_indexes.is_empty());
transaction_indexes.to_mut().extend(maybe_index);
// Strip the index away by implicitly returning (), now that we're done with it
// here (= `solana-ledger`), to make `solana-runtime` not bothered with it.
})
}
});

let Ok((commit_results, balances)) = batch
.bank()
.load_execute_and_commit_transactions_with_pre_commit_callback(
batch,
MAX_PROCESSING_AGE,
transaction_status_sender.is_some(),
ExecutionRecordingConfig::new_single_setting(transaction_status_sender.is_some()),
timings,
log_messages_bytes_limit,
pre_commit_callback,
)
else {
return Err(TransactionError::CommitCancelled);
};

bank_utils::find_and_send_votes(
batch.sanitized_transactions(),
Expand Down Expand Up @@ -201,7 +231,7 @@ pub fn execute_batch(
.filter_map(|(commit_result, tx)| commit_result.was_committed().then_some(tx))
.collect_vec();

let first_err = get_first_error(batch, &commit_results);
let first_err = get_first_error(batch, &commit_results, is_block_producing_unified_scheduler);

if let Some(transaction_status_sender) = transaction_status_sender {
let transactions: Vec<SanitizedTransaction> = batch
Expand All @@ -224,7 +254,7 @@ pub fn execute_batch(
commit_results,
balances,
token_balances,
transaction_indexes.to_vec(),
transaction_indexes.into_owned(),
);
}

Expand Down Expand Up @@ -322,6 +352,7 @@ fn execute_batches_internal(
&mut timings,
log_messages_bytes_limit,
prioritization_fee_cache,
None::<fn() -> PreCommitCallbackResult<Option<usize>>>,
));

let thread_index = replay_tx_thread_pool.current_thread_index().unwrap();
Expand Down Expand Up @@ -2210,11 +2241,13 @@ pub fn process_single_slot(
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum TransactionStatusMessage {
Batch(TransactionStatusBatch),
Freeze(Slot),
}

#[derive(Debug)]
pub struct TransactionStatusBatch {
pub slot: Slot,
pub transactions: Vec<SanitizedTransaction>,
Expand Down Expand Up @@ -2360,7 +2393,7 @@ pub mod tests {
solana_entry::entry::{create_ticks, next_entry, next_entry_mut},
solana_program_runtime::declare_process_instruction,
solana_runtime::{
bank::bank_hash_details::SlotDetails,
bank::{bank_hash_details::SlotDetails, PreCommitCallbackFailed},
genesis_utils::{
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
},
Expand Down Expand Up @@ -4433,7 +4466,7 @@ pub mod tests {
&mut ExecuteTimings::default(),
None,
);
let (err, signature) = get_first_error(&batch, &commit_results).unwrap();
let (err, signature) = get_first_error(&batch, &commit_results, false).unwrap();
assert_eq!(err.unwrap_err(), TransactionError::AccountNotFound);
assert_eq!(signature, account_not_found_sig);
}
Expand Down Expand Up @@ -5082,6 +5115,86 @@ pub mod tests {
do_test_schedule_batches_for_execution(false);
}

fn do_test_execute_batch_pre_commit_callback(
poh_result: PreCommitCallbackResult<Option<usize>>,
) {
solana_logger::setup();
let dummy_leader_pubkey = solana_sdk::pubkey::new_rand();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100);
let bank = Bank::new_for_tests(&genesis_config);
let (bank, _bank_forks) = bank.wrap_with_bank_forks_for_tests();
let bank = Arc::new(bank);
let txs = create_test_transactions(&mint_keypair, &genesis_config.hash());
let mut batch = TransactionBatch::new(
vec![Ok(()); 1],
&bank,
OwnedOrBorrowed::Borrowed(&txs[0..1]),
);
batch.set_needs_unlock(false);
let poh_with_index = matches!(poh_result, Ok(Some(_)));
let batch = TransactionBatchWithIndexes {
batch,
transaction_indexes: vec![],
};
let prioritization_fee_cache = PrioritizationFeeCache::default();
let mut timing = ExecuteTimings::default();
let (sender, receiver) = crossbeam_channel::unbounded();

assert_eq!(bank.transaction_count(), 0);
let result = execute_batch(
&batch,
&bank,
Some(&TransactionStatusSender { sender }),
None,
&mut timing,
None,
&prioritization_fee_cache,
Some(|| poh_result),
);
let should_succeed = poh_result.is_ok();
if should_succeed {
assert_matches!(result, Ok(()));
assert_eq!(bank.transaction_count(), 1);
} else {
assert_matches!(result, Err(TransactionError::CommitCancelled));
assert_eq!(bank.transaction_count(), 0);
}
if poh_with_index {
assert_matches!(
receiver.try_recv(),
Ok(TransactionStatusMessage::Batch(TransactionStatusBatch{transaction_indexes, ..}))
if transaction_indexes == vec![4_usize]
);
} else if should_succeed {
assert_matches!(
receiver.try_recv(),
Ok(TransactionStatusMessage::Batch(TransactionStatusBatch{transaction_indexes, ..}))
if transaction_indexes.is_empty()
);
} else {
assert_matches!(receiver.try_recv(), Err(_));
}
}

#[test]
fn test_execute_batch_pre_commit_callback_success() {
do_test_execute_batch_pre_commit_callback(Ok(None));
}

#[test]
fn test_execute_batch_pre_commit_callback_success_with_index() {
do_test_execute_batch_pre_commit_callback(Ok(Some(4)));
}

#[test]
fn test_execute_batch_pre_commit_callback_failure() {
do_test_execute_batch_pre_commit_callback(Err(PreCommitCallbackFailed));
}

#[test]
fn test_confirm_slot_entries_with_fix() {
const HASHES_PER_TICK: u64 = 10;
Expand Down
38 changes: 35 additions & 3 deletions poh/src/poh_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ pub struct RecordTransactionsSummary {
pub starting_transaction_index: Option<usize>,
}

#[derive(Clone)]
#[derive(Clone, Debug)]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need Debug on all of this? I can't imagine anyone has ever used it for the massive scheduling and bank structures. I'd sooner remove it from all of that than let it spread further, wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, i think Debug is handy for quick debugging and actually required by assert_matches!()...

pub struct TransactionRecorder {
// shared by all users of PohRecorder
pub record_sender: Sender<Record>,
Expand Down Expand Up @@ -1144,11 +1144,12 @@ impl PohRecorder {
}
}

pub fn create_test_recorder(
fn do_create_test_recorder(
bank: Arc<Bank>,
blockstore: Arc<Blockstore>,
poh_config: Option<PohConfig>,
leader_schedule_cache: Option<Arc<LeaderScheduleCache>>,
track_transaction_indexes: bool,
) -> (
Arc<AtomicBool>,
Arc<RwLock<PohRecorder>>,
Expand All @@ -1174,7 +1175,10 @@ pub fn create_test_recorder(
);
let ticks_per_slot = bank.ticks_per_slot();

poh_recorder.set_bank(BankWithScheduler::new_without_scheduler(bank), false);
poh_recorder.set_bank(
BankWithScheduler::new_without_scheduler(bank),
track_transaction_indexes,
);
let poh_recorder = Arc::new(RwLock::new(poh_recorder));
let poh_service = PohService::new(
poh_recorder.clone(),
Expand All @@ -1189,6 +1193,34 @@ pub fn create_test_recorder(
(exit, poh_recorder, poh_service, entry_receiver)
}

pub fn create_test_recorder(
bank: Arc<Bank>,
blockstore: Arc<Blockstore>,
poh_config: Option<PohConfig>,
leader_schedule_cache: Option<Arc<LeaderScheduleCache>>,
) -> (
Arc<AtomicBool>,
Arc<RwLock<PohRecorder>>,
PohService,
Receiver<WorkingBankEntry>,
) {
do_create_test_recorder(bank, blockstore, poh_config, leader_schedule_cache, false)
}

pub fn create_test_recorder_with_index_tracking(
bank: Arc<Bank>,
blockstore: Arc<Blockstore>,
poh_config: Option<PohConfig>,
leader_schedule_cache: Option<Arc<LeaderScheduleCache>>,
) -> (
Arc<AtomicBool>,
Arc<RwLock<PohRecorder>>,
PohService,
Receiver<WorkingBankEntry>,
) {
do_create_test_recorder(bank, blockstore, poh_config, leader_schedule_cache, true)
}

#[cfg(test)]
mod tests {
use {
Expand Down
2 changes: 2 additions & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ solana-svm-transaction = { workspace = true }
solana-system-program = { workspace = true, optional = true }
solana-timings = { workspace = true }
solana-transaction-status-client-types = { workspace = true }
solana-unified-scheduler-logic = { workspace = true }
solana-version = { workspace = true }
solana-vote = { workspace = true }
solana-vote-program = { workspace = true }
Expand Down
Loading
Loading