From 2add55ed7bc66abbbe18e5df185c5b631e6aa053 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Mon, 16 Dec 2024 13:47:05 +0000 Subject: [PATCH] Support tx poh recording in unified scheduler --- Cargo.lock | 3 + ledger/benches/blockstore_processor.rs | 1 + ledger/src/blockstore_processor.rs | 153 +++++++++++++++--- poh/src/poh_recorder.rs | 38 ++++- programs/sbf/Cargo.lock | 2 + runtime/Cargo.toml | 1 + runtime/src/bank.rs | 38 ++++- runtime/src/installed_scheduler_pool.rs | 21 ++- sdk/transaction-error/src/lib.rs | 5 + storage-proto/proto/transaction_by_addr.proto | 1 + storage-proto/src/convert.rs | 4 + svm/examples/Cargo.lock | 2 + transaction-status/src/token_balances.rs | 1 + unified-scheduler-logic/src/lib.rs | 6 + unified-scheduler-pool/Cargo.toml | 2 + unified-scheduler-pool/src/lib.rs | 144 ++++++++++++++++- 16 files changed, 388 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b68e5bf9a0f345..d5847c31ce0403 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8683,6 +8683,7 @@ dependencies = [ "solana-system-program", "solana-timings", "solana-transaction-status-client-types", + "solana-unified-scheduler-logic", "solana-version", "solana-vote", "solana-vote-program", @@ -9847,8 +9848,10 @@ dependencies = [ "log", "qualifier_attr", "scopeguard", + "solana-entry", "solana-ledger", "solana-logger", + "solana-poh", "solana-runtime", "solana-runtime-transaction", "solana-sdk", diff --git a/ledger/benches/blockstore_processor.rs b/ledger/benches/blockstore_processor.rs index 44f65db1d54fd4..711c5381b63b8c 100644 --- a/ledger/benches/blockstore_processor.rs +++ b/ledger/benches/blockstore_processor.rs @@ -162,6 +162,7 @@ fn bench_execute_batch( &mut timing, None, &prioritization_fee_cache, + None:: Option>>, ); } }); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 19cafec5adbbe1..fe660a996b85f9 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -110,6 +110,7 @@ fn first_err(results: &[Result<()>]) -> Result<()> { fn get_first_error( batch: &TransactionBatch, commit_results: &[TransactionCommitResult], + is_block_producing_unified_scheduler: bool, ) -> Option<(Result<()>, Signature)> { let mut first_err = None; for (commit_result, transaction) in commit_results.iter().zip(batch.sanitized_transactions()) { @@ -117,18 +118,20 @@ fn get_first_error( if first_err.is_none() { first_err = Some((Err(err.clone()), *transaction.signature())); } - warn!( - "Unexpected validator error: {:?}, transaction: {:?}", - err, transaction - ); - datapoint_error!( - "validator_process_entry_error", - ( - "error", - format!("error: {err:?}, transaction: {transaction:?}"), - String - ) - ); + if !is_block_producing_unified_scheduler { + warn!( + "Unexpected validator error: {:?}, transaction: {:?}", + err, transaction + ); + datapoint_error!( + "validator_process_entry_error", + ( + "error", + format!("error: {err:?}, transaction: {transaction:?}"), + String + ) + ); + } } } first_err @@ -150,12 +153,14 @@ pub fn execute_batch( timings: &mut ExecuteTimings, log_messages_bytes_limit: Option, prioritization_fee_cache: &PrioritizationFeeCache, + pre_commit_callback: Option Option>>, ) -> Result<()> { let TransactionBatchWithIndexes { batch, transaction_indexes, } = batch; let record_token_balances = transaction_status_sender.is_some(); + let mut transaction_indexes = transaction_indexes.to_vec(); let mut mint_decimals: HashMap = HashMap::new(); @@ -165,14 +170,34 @@ pub fn execute_batch( vec![] }; - let (commit_results, balances) = batch.bank().load_execute_and_commit_transactions( - batch, - MAX_PROCESSING_AGE, - transaction_status_sender.is_some(), - ExecutionRecordingConfig::new_single_setting(transaction_status_sender.is_some()), - timings, - log_messages_bytes_limit, - ); + let is_block_producing_unified_scheduler = pre_commit_callback.is_some(); + let pre_commit_callback = pre_commit_callback.map(|wrapped_callback| { + || { + wrapped_callback() + .inspect(|&maybe_index| { + if let Some(index) = maybe_index { + assert!(transaction_indexes.is_empty()); + transaction_indexes.push(index); + } + }) + .is_some() + } + }); + + let Some((commit_results, balances)) = batch + .bank() + .load_execute_and_commit_transactions_with_pre_commit_callback( + batch, + MAX_PROCESSING_AGE, + transaction_status_sender.is_some(), + ExecutionRecordingConfig::new_single_setting(transaction_status_sender.is_some()), + timings, + log_messages_bytes_limit, + pre_commit_callback, + ) + else { + return Err(TransactionError::CommitFailed); + }; bank_utils::find_and_send_votes( batch.sanitized_transactions(), @@ -201,7 +226,7 @@ pub fn execute_batch( .filter_map(|(commit_result, tx)| commit_result.was_committed().then_some(tx)) .collect_vec(); - let first_err = get_first_error(batch, &commit_results); + let first_err = get_first_error(batch, &commit_results, is_block_producing_unified_scheduler); if let Some(transaction_status_sender) = transaction_status_sender { let transactions: Vec = batch @@ -224,7 +249,7 @@ pub fn execute_batch( commit_results, balances, token_balances, - transaction_indexes.to_vec(), + transaction_indexes, ); } @@ -322,6 +347,7 @@ fn execute_batches_internal( &mut timings, log_messages_bytes_limit, prioritization_fee_cache, + None:: Option>>, )); let thread_index = replay_tx_thread_pool.current_thread_index().unwrap(); @@ -2210,11 +2236,13 @@ pub fn process_single_slot( } #[allow(clippy::large_enum_variant)] +#[derive(Debug)] pub enum TransactionStatusMessage { Batch(TransactionStatusBatch), Freeze(Slot), } +#[derive(Debug)] pub struct TransactionStatusBatch { pub slot: Slot, pub transactions: Vec, @@ -4433,7 +4461,7 @@ pub mod tests { &mut ExecuteTimings::default(), None, ); - let (err, signature) = get_first_error(&batch, &commit_results).unwrap(); + let (err, signature) = get_first_error(&batch, &commit_results, false).unwrap(); assert_eq!(err.unwrap_err(), TransactionError::AccountNotFound); assert_eq!(signature, account_not_found_sig); } @@ -5082,6 +5110,85 @@ pub mod tests { do_test_schedule_batches_for_execution(false); } + fn do_test_execute_batch_pre_commit_callback(poh_result: Option>) { + solana_logger::setup(); + let dummy_leader_pubkey = solana_sdk::pubkey::new_rand(); + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100); + let bank = Bank::new_for_tests(&genesis_config); + let (bank, _bank_forks) = bank.wrap_with_bank_forks_for_tests(); + let bank = Arc::new(bank); + let txs = create_test_transactions(&mint_keypair, &genesis_config.hash()); + let mut batch = TransactionBatch::new( + vec![Ok(()); 1], + &bank, + OwnedOrBorrowed::Borrowed(&txs[0..1]), + ); + batch.set_needs_unlock(false); + let poh_with_index = matches!(poh_result, Some(Some(_))); + let transaction_indexes = if poh_with_index { vec![] } else { vec![3] }; + let batch = TransactionBatchWithIndexes { + batch, + transaction_indexes, + }; + let prioritization_fee_cache = PrioritizationFeeCache::default(); + let mut timing = ExecuteTimings::default(); + let (sender, receiver) = crossbeam_channel::unbounded(); + + assert_eq!(bank.transaction_count(), 0); + let result = execute_batch( + &batch, + &bank, + Some(&TransactionStatusSender { sender }), + None, + &mut timing, + None, + &prioritization_fee_cache, + Some(|| poh_result), + ); + let should_succeed = poh_result.is_some(); + if should_succeed { + assert_matches!(result, Ok(())); + assert_eq!(bank.transaction_count(), 1); + } else { + assert_matches!(result, Err(TransactionError::CommitFailed)); + assert_eq!(bank.transaction_count(), 0); + } + if poh_with_index { + assert_matches!( + receiver.try_recv(), + Ok(TransactionStatusMessage::Batch(TransactionStatusBatch{transaction_indexes, ..})) + if transaction_indexes == vec![4_usize] + ); + } else if should_succeed { + assert_matches!( + receiver.try_recv(), + Ok(TransactionStatusMessage::Batch(TransactionStatusBatch{transaction_indexes, ..})) + if transaction_indexes == vec![3_usize] + ); + } else { + assert_matches!(receiver.try_recv(), Err(_)); + } + } + + #[test] + fn test_execute_batch_pre_commit_callback_success() { + do_test_execute_batch_pre_commit_callback(Some(None)); + } + + #[test] + fn test_execute_batch_pre_commit_callback_success_with_index() { + do_test_execute_batch_pre_commit_callback(Some(Some(4))); + } + + #[test] + fn test_execute_batch_pre_commit_callback_failure() { + do_test_execute_batch_pre_commit_callback(None); + } + #[test] fn test_confirm_slot_entries_with_fix() { const HASHES_PER_TICK: u64 = 10; diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index 8b95ecec039d64..6b6906fe1626d0 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -140,7 +140,7 @@ pub struct RecordTransactionsSummary { pub starting_transaction_index: Option, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct TransactionRecorder { // shared by all users of PohRecorder pub record_sender: Sender, @@ -1144,11 +1144,12 @@ impl PohRecorder { } } -pub fn create_test_recorder( +fn do_create_test_recorder( bank: Arc, blockstore: Arc, poh_config: Option, leader_schedule_cache: Option>, + track_transaction_indexes: bool, ) -> ( Arc, Arc>, @@ -1174,7 +1175,10 @@ pub fn create_test_recorder( ); let ticks_per_slot = bank.ticks_per_slot(); - poh_recorder.set_bank(BankWithScheduler::new_without_scheduler(bank), false); + poh_recorder.set_bank( + BankWithScheduler::new_without_scheduler(bank), + track_transaction_indexes, + ); let poh_recorder = Arc::new(RwLock::new(poh_recorder)); let poh_service = PohService::new( poh_recorder.clone(), @@ -1189,6 +1193,34 @@ pub fn create_test_recorder( (exit, poh_recorder, poh_service, entry_receiver) } +pub fn create_test_recorder( + bank: Arc, + blockstore: Arc, + poh_config: Option, + leader_schedule_cache: Option>, +) -> ( + Arc, + Arc>, + PohService, + Receiver, +) { + do_create_test_recorder(bank, blockstore, poh_config, leader_schedule_cache, false) +} + +pub fn create_test_recorder_with_index_tracking( + bank: Arc, + blockstore: Arc, + poh_config: Option, + leader_schedule_cache: Option>, +) -> ( + Arc, + Arc>, + PohService, + Receiver, +) { + do_create_test_recorder(bank, blockstore, poh_config, leader_schedule_cache, true) +} + #[cfg(test)] mod tests { use { diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index f133980765273f..c1f99241475e1e 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6849,6 +6849,7 @@ dependencies = [ "solana-system-program", "solana-timings", "solana-transaction-status-client-types", + "solana-unified-scheduler-logic", "solana-version", "solana-vote", "solana-vote-program", @@ -8239,6 +8240,7 @@ dependencies = [ "qualifier_attr", "scopeguard", "solana-ledger", + "solana-poh", "solana-runtime", "solana-runtime-transaction", "solana-sdk", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 0bec4f1ea64660..5745b5a2441908 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -81,6 +81,7 @@ solana-svm-transaction = { workspace = true } solana-system-program = { workspace = true, optional = true } solana-timings = { workspace = true } solana-transaction-status-client-types = { workspace = true } +solana-unified-scheduler-logic = { workspace = true } solana-version = { workspace = true } solana-vote = { workspace = true } solana-vote-program = { workspace = true } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 5d2a1535d2386d..509ccb39b6e435 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -350,7 +350,7 @@ pub struct TransactionSimulationResult { pub inner_instructions: Option>, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct TransactionBalancesSet { pub pre_balances: TransactionBalances, pub post_balances: TransactionBalances, @@ -5034,6 +5034,29 @@ impl Bank { timings: &mut ExecuteTimings, log_messages_bytes_limit: Option, ) -> (Vec, TransactionBalancesSet) { + self.load_execute_and_commit_transactions_with_pre_commit_callback( + batch, + max_age, + collect_balances, + recording_config, + timings, + log_messages_bytes_limit, + None:: bool>, + ) + .unwrap() + } + + #[must_use] + pub fn load_execute_and_commit_transactions_with_pre_commit_callback( + &self, + batch: &TransactionBatch, + max_age: usize, + collect_balances: bool, + recording_config: ExecutionRecordingConfig, + timings: &mut ExecuteTimings, + log_messages_bytes_limit: Option, + pre_commit_callback: Option bool>, + ) -> Option<(Vec, TransactionBalancesSet)> { let pre_balances = if collect_balances { self.collect_balances(batch) } else { @@ -5059,6 +5082,15 @@ impl Bank { }, ); + if let Some(pre_commit_callback) = pre_commit_callback { + if let Some(e) = processing_results.first() { + assert_eq!(processing_results.len(), 1); + if e.is_ok() && !pre_commit_callback() { + return None; + } + } + } + let commit_results = self.commit_transactions( batch.sanitized_transactions(), processing_results, @@ -5070,10 +5102,10 @@ impl Bank { } else { vec![] }; - ( + Some(( commit_results, TransactionBalancesSet::new(pre_balances, post_balances), - ) + )) } /// Process a Transaction. This is used for unit tests and simply calls the vector diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index 9aa4a20e09c558..ac1d47f4474959 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -30,6 +30,7 @@ use { transaction::{Result, SanitizedTransaction, TransactionError}, }, solana_timings::ExecuteTimings, + solana_unified_scheduler_logic::SchedulingMode, std::{ fmt::{self, Debug}, mem, @@ -227,13 +228,29 @@ pub type SchedulerId = u64; /// `SchedulingContext`s. #[derive(Clone, Debug)] pub struct SchedulingContext { - // mode: SchedulingMode, // this will be added later. + mode: SchedulingMode, bank: Arc, } impl SchedulingContext { pub fn new(bank: Arc) -> Self { - Self { bank } + // mode will be configurable later + Self { + mode: SchedulingMode::BlockVerification, + bank, + } + } + + #[cfg(feature = "dev-context-only-utils")] + pub fn for_production(bank: Arc) -> Self { + Self { + mode: SchedulingMode::BlockProduction, + bank, + } + } + + pub fn mode(&self) -> SchedulingMode { + self.mode } pub fn bank(&self) -> &Arc { diff --git a/sdk/transaction-error/src/lib.rs b/sdk/transaction-error/src/lib.rs index 433a48b0122e31..db08f4fe6ed11f 100644 --- a/sdk/transaction-error/src/lib.rs +++ b/sdk/transaction-error/src/lib.rs @@ -137,6 +137,9 @@ pub enum TransactionError { /// Program cache hit max limit. ProgramCacheHitMaxLimit, + + /// Commit failed internally. + CommitFailed, } impl std::error::Error for TransactionError {} @@ -220,6 +223,8 @@ impl fmt::Display for TransactionError { => f.write_str("Sum of account balances before and after transaction do not match"), Self::ProgramCacheHitMaxLimit => f.write_str("Program cache hit max limit"), + Self::CommitFailed + => f.write_str("CommitFailed"), } } } diff --git a/storage-proto/proto/transaction_by_addr.proto b/storage-proto/proto/transaction_by_addr.proto index d0fa74a2104707..c4025dbafe8922 100644 --- a/storage-proto/proto/transaction_by_addr.proto +++ b/storage-proto/proto/transaction_by_addr.proto @@ -63,6 +63,7 @@ enum TransactionErrorType { PROGRAM_EXECUTION_TEMPORARILY_RESTRICTED = 35; UNBALANCED_TRANSACTION = 36; PROGRAM_CACHE_HIT_MAX_LIMIT = 37; + COMMIT_FAILED = 38; } message InstructionError { diff --git a/storage-proto/src/convert.rs b/storage-proto/src/convert.rs index 6a6e451b4858f1..55a54c3d06d54c 100644 --- a/storage-proto/src/convert.rs +++ b/storage-proto/src/convert.rs @@ -852,6 +852,7 @@ impl TryFrom for TransactionError { 34 => TransactionError::ResanitizationNeeded, 36 => TransactionError::UnbalancedTransaction, 37 => TransactionError::ProgramCacheHitMaxLimit, + 38 => TransactionError::CommitFailed, _ => return Err("Invalid TransactionError"), }) } @@ -973,6 +974,9 @@ impl From for tx_by_addr::TransactionError { TransactionError::ProgramCacheHitMaxLimit => { tx_by_addr::TransactionErrorType::ProgramCacheHitMaxLimit } + TransactionError::CommitFailed => { + tx_by_addr::TransactionErrorType::CommitFailed + } } as i32, instruction_error: match transaction_error { TransactionError::InstructionError(index, ref instruction_error) => { diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 35404e8d55abb4..b43c2b080788c5 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -6668,6 +6668,7 @@ dependencies = [ "solana-svm-transaction", "solana-timings", "solana-transaction-status-client-types", + "solana-unified-scheduler-logic", "solana-version", "solana-vote", "solana-vote-program", @@ -7575,6 +7576,7 @@ dependencies = [ "qualifier_attr", "scopeguard", "solana-ledger", + "solana-poh", "solana-runtime", "solana-runtime-transaction", "solana-sdk", diff --git a/transaction-status/src/token_balances.rs b/transaction-status/src/token_balances.rs index 85a85a053f910f..36b46552cc687f 100644 --- a/transaction-status/src/token_balances.rs +++ b/transaction-status/src/token_balances.rs @@ -2,6 +2,7 @@ use crate::TransactionTokenBalance; pub type TransactionTokenBalances = Vec>; +#[derive(Debug)] pub struct TransactionTokenBalancesSet { pub pre_token_balances: TransactionTokenBalances, pub post_token_balances: TransactionTokenBalances, diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 2e8caca3b85b8b..b28e4b0c7dc854 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -104,6 +104,12 @@ use { std::{collections::VecDeque, mem, sync::Arc}, }; +#[derive(Clone, Copy, Debug)] +pub enum SchedulingMode { + BlockVerification, + BlockProduction, +} + /// Internal utilities. Namely this contains [`ShortCounter`] and [`TokenCell`]. mod utils { use std::{ diff --git a/unified-scheduler-pool/Cargo.toml b/unified-scheduler-pool/Cargo.toml index e1b17308633798..8b9dd5b7f2bbf3 100644 --- a/unified-scheduler-pool/Cargo.toml +++ b/unified-scheduler-pool/Cargo.toml @@ -18,6 +18,7 @@ log = { workspace = true } qualifier_attr = { workspace = true } scopeguard = { workspace = true } solana-ledger = { workspace = true } +solana-poh = { workspace = true } solana-runtime = { workspace = true } solana-runtime-transaction = { workspace = true } solana-sdk = { workspace = true } @@ -29,6 +30,7 @@ vec_extract_if_polyfill = { workspace = true } [dev-dependencies] assert_matches = { workspace = true } lazy_static = { workspace = true } +solana-entry = { workspace = true } solana-logger = { workspace = true } solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index f22cb404d1fe38..747ff0e4cbe2bf 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -20,6 +20,7 @@ use { solana_ledger::blockstore_processor::{ execute_batch, TransactionBatchWithIndexes, TransactionStatusSender, }, + solana_poh::poh_recorder::{RecordTransactionsSummary, TransactionRecorder}, solana_runtime::{ installed_scheduler_pool::{ initialized_result_with_timings, InstalledScheduler, InstalledSchedulerBox, @@ -36,7 +37,10 @@ use { transaction::{Result, SanitizedTransaction, TransactionError}, }, solana_timings::ExecuteTimings, - solana_unified_scheduler_logic::{SchedulingStateMachine, Task, UsageQueue}, + solana_unified_scheduler_logic::{ + SchedulingMode::{BlockProduction, BlockVerification}, + SchedulingStateMachine, Task, UsageQueue, + }, static_assertions::const_assert_eq, std::{ fmt::Debug, @@ -101,6 +105,7 @@ pub struct HandlerContext { transaction_status_sender: Option, replay_vote_sender: Option, prioritization_fee_cache: Arc, + transaction_recorder: Option, } pub type DefaultSchedulerPool = @@ -177,6 +182,8 @@ where transaction_status_sender, replay_vote_sender, prioritization_fee_cache, + // will be configurable later + transaction_recorder: None, }, weak_self: weak_self.clone(), next_scheduler_id: AtomicSchedulerId::default(), @@ -437,9 +444,38 @@ impl TaskHandler for DefaultTaskHandler { let index = task.task_index(); let batch = bank.prepare_unlocked_batch_from_single_tx(transaction); + let transaction_indexes = match scheduling_context.mode() { + BlockVerification => vec![index], + BlockProduction => { + let mut vec = vec![]; + if handler_context.transaction_status_sender.is_some() { + // Create empty vec with the exact needed capacity, which will be filled inside + // `execute_batch()` below. Otherwise, excess cap would be reserved on + // `.push(transaction_index)` in it. + vec.reserve_exact(1); + } + vec + } + }; let batch_with_indexes = TransactionBatchWithIndexes { batch, - transaction_indexes: vec![index], + transaction_indexes, + }; + + let pre_commit_callback = match scheduling_context.mode() { + BlockVerification => None, + BlockProduction => Some(|| { + let RecordTransactionsSummary { + result, + starting_transaction_index, + .. + } = handler_context + .transaction_recorder + .as_ref() + .unwrap() + .record_transactions(bank.slot(), vec![transaction.to_versioned_transaction()]); + result.ok().map(|()| starting_transaction_index) + }), }; *result = execute_batch( @@ -450,6 +486,7 @@ impl TaskHandler for DefaultTaskHandler { timings, handler_context.log_messages_bytes_limit, &handler_context.prioritization_fee_cache, + pre_commit_callback, ); sleepless_testing::at(CheckPoint::TaskHandled(index)); } @@ -1473,6 +1510,13 @@ mod tests { super::*, crate::sleepless_testing, assert_matches::assert_matches, + solana_ledger::{ + blockstore::Blockstore, + blockstore_processor::{TransactionStatusBatch, TransactionStatusMessage}, + create_new_tmp_ledger_auto_delete, + leader_schedule_cache::LeaderScheduleCache, + }, + solana_poh::poh_recorder::create_test_recorder_with_index_tracking, solana_runtime::{ bank::Bank, bank_forks::BankForks, @@ -1489,7 +1533,7 @@ mod tests { }, solana_timings::ExecuteTimingType, std::{ - sync::{Arc, RwLock}, + sync::{atomic::Ordering, Arc, RwLock}, thread::JoinHandle, }, }; @@ -2939,10 +2983,104 @@ mod tests { transaction_status_sender: None, replay_vote_sender: None, prioritization_fee_cache, + transaction_recorder: None, }; let task = SchedulingStateMachine::create_task(tx, 0, &mut |_| UsageQueue::default()); DefaultTaskHandler::handle(result, timings, scheduling_context, &task, handler_context); assert_matches!(result, Err(TransactionError::AccountLoadedTwice)); } + + fn do_test_task_handler_poh_recording(should_succeed: bool) { + solana_logger::setup(); + + let GenesisConfigInfo { + genesis_config, + ref mint_keypair, + .. + } = solana_ledger::genesis_utils::create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = BankForks::new_rw_arc(bank); + let bank = bank_forks.read().unwrap().working_bank_with_scheduler(); + + let tx = system_transaction::transfer( + mint_keypair, + &solana_sdk::pubkey::new_rand(), + 2, + genesis_config.hash(), + ); + let tx = RuntimeTransaction::from_transaction_for_tests(tx); + + let result = &mut Ok(()); + let timings = &mut ExecuteTimings::default(); + let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let scheduling_context = &SchedulingContext::for_production(bank.clone()); + let (sender, receiver) = crossbeam_channel::unbounded(); + let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + let (exit, poh_recorder, poh_service, signal_receiver) = + create_test_recorder_with_index_tracking( + bank.clone(), + blockstore.clone(), + None, + Some(leader_schedule_cache), + ); + let handler_context = &HandlerContext { + log_messages_bytes_limit: None, + transaction_status_sender: Some(TransactionStatusSender { sender }), + replay_vote_sender: None, + prioritization_fee_cache, + transaction_recorder: Some(poh_recorder.read().unwrap().new_recorder()), + }; + + let task = + SchedulingStateMachine::create_task(tx.clone(), 0, &mut |_| UsageQueue::default()); + + // wait until the poh's working bank is cleared. + // also flush signal_receiver after that. + if !should_succeed { + while poh_recorder.read().unwrap().bank().is_some() { + sleep(Duration::from_millis(100)); + } + while signal_receiver.try_recv().is_ok() {} + } + + assert_eq!(bank.transaction_count(), 0); + DefaultTaskHandler::handle(result, timings, scheduling_context, &task, handler_context); + + if should_succeed { + assert_matches!(result, Ok(())); + assert_eq!(bank.transaction_count(), 1); + assert_matches!( + receiver.try_recv(), + Ok(TransactionStatusMessage::Batch( + TransactionStatusBatch { .. } + )) + ); + assert_matches!( + signal_receiver.try_recv(), + Ok((_, (solana_entry::entry::Entry {transactions, ..} , _))) + if transactions == vec![tx.to_versioned_transaction()] + ); + } else { + assert_matches!(result, Err(TransactionError::CommitFailed)); + assert_eq!(bank.transaction_count(), 0); + assert_matches!(receiver.try_recv(), Err(_)); + assert_matches!(signal_receiver.try_recv(), Err(_)); + } + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + + #[test] + fn test_task_handler_poh_recording_success() { + do_test_task_handler_poh_recording(true); + } + + #[test] + fn test_task_handler_poh_recording_failure() { + do_test_task_handler_poh_recording(false); + } }