From 36bf1d86588cba93dd05d724bed16e9f643a64ae Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 16 Dec 2024 10:37:06 -0600 Subject: [PATCH 1/8] Committer: optional prioritization_fee_cache --- core/benches/banking_stage.rs | 2 +- core/benches/consumer.rs | 2 +- core/src/banking_stage.rs | 4 +- core/src/banking_stage/committer.rs | 10 ++-- core/src/banking_stage/consume_worker.rs | 11 +--- core/src/banking_stage/consumer.rs | 72 +++++------------------- 6 files changed, 26 insertions(+), 75 deletions(-) diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 5d764f4b7c1d44..59f861e8a5c7d2 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -109,7 +109,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { ThreadType::Transactions, ); let (s, _r) = unbounded(); - let committer = Committer::new(None, s, Arc::new(PrioritizationFeeCache::new(0u64))); + let committer = Committer::new(None, s, None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); // This tests the performance of buffering packets. // If the packet buffers are copied, performance will be poor. diff --git a/core/benches/consumer.rs b/core/benches/consumer.rs index 3a89cdfd39d0dd..cea1f63208ecd8 100644 --- a/core/benches/consumer.rs +++ b/core/benches/consumer.rs @@ -83,7 +83,7 @@ fn create_transactions(bank: &Bank, num: usize) -> Vec) -> Consumer { let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new(None, replay_vote_sender, Arc::default()); + let committer = Committer::new(None, replay_vote_sender, None); let transaction_recorder = poh_recorder.read().unwrap().new_recorder(); Consumer::new(committer, transaction_recorder, QosService::new(0), None) } diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 4dfc6bc8e5c733..63b85543188aa4 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -449,7 +449,7 @@ impl BankingStage { let committer = Committer::new( transaction_status_sender.clone(), replay_vote_sender.clone(), - prioritization_fee_cache.clone(), + Some(prioritization_fee_cache.clone()), ); let transaction_recorder = poh_recorder.read().unwrap().new_recorder(); @@ -534,7 +534,7 @@ impl BankingStage { let committer = Committer::new( transaction_status_sender.clone(), replay_vote_sender.clone(), - prioritization_fee_cache.clone(), + Some(prioritization_fee_cache.clone()), ); let transaction_recorder = poh_recorder.read().unwrap().new_recorder(); diff --git a/core/src/banking_stage/committer.rs b/core/src/banking_stage/committer.rs index 3722134294896c..6c63ebf9436d45 100644 --- a/core/src/banking_stage/committer.rs +++ b/core/src/banking_stage/committer.rs @@ -46,14 +46,14 @@ pub(super) struct PreBalanceInfo { pub struct Committer { transaction_status_sender: Option, replay_vote_sender: ReplayVoteSender, - prioritization_fee_cache: Arc, + prioritization_fee_cache: Option>, } impl Committer { pub fn new( transaction_status_sender: Option, replay_vote_sender: ReplayVoteSender, - prioritization_fee_cache: Arc, + prioritization_fee_cache: Option>, ) -> Self { Self { transaction_status_sender, @@ -119,8 +119,10 @@ impl Committer { pre_balance_info, starting_transaction_index, ); - self.prioritization_fee_cache - .update(bank, processed_transactions.into_iter()); + + if let Some(prioritization_fee_cache) = self.prioritization_fee_cache.as_ref() { + prioritization_fee_cache.update(bank, processed_transactions.into_iter()); + } }); execute_and_commit_timings.find_and_send_votes_us = find_and_send_votes_us; (commit_time_us, commit_transaction_statuses) diff --git a/core/src/banking_stage/consume_worker.rs b/core/src/banking_stage/consume_worker.rs index cdb2ad2ea2ceed..411de7465a6032 100644 --- a/core/src/banking_stage/consume_worker.rs +++ b/core/src/banking_stage/consume_worker.rs @@ -764,10 +764,7 @@ mod tests { get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache, }, solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry}, - solana_runtime::{ - bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, - vote_sender_types::ReplayVoteReceiver, - }, + solana_runtime::{bank_forks::BankForks, vote_sender_types::ReplayVoteReceiver}, solana_runtime_transaction::runtime_transaction::RuntimeTransaction, solana_sdk::{ address_lookup_table::AddressLookupTableAccount, @@ -848,11 +845,7 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &poh_recorder); let (replay_vote_sender, replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); + let committer = Committer::new(None, replay_vote_sender, None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let (consume_sender, consume_receiver) = unbounded(); diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index 7f8d7f1d4baee9..d8de8c0a0cd7c0 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -866,7 +866,7 @@ mod tests { solana_perf::packet::Packet, solana_poh::poh_recorder::{PohRecorder, Record, WorkingBankEntry}, solana_rpc::transaction_status_service::TransactionStatusService, - solana_runtime::{bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache}, + solana_runtime::bank_forks::BankForks, solana_runtime_transaction::runtime_transaction::RuntimeTransaction, solana_sdk::{ account::AccountSharedData, @@ -939,11 +939,7 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &poh_recorder); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); + let committer = Committer::new(None, replay_vote_sender, None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let process_transactions_summary = consumer.process_transactions(&bank, &Instant::now(), &transactions); @@ -1114,11 +1110,7 @@ mod tests { .unwrap() .set_bank_for_test(bank.clone()); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); + let committer = Committer::new(None, replay_vote_sender, None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let process_transactions_batch_output = @@ -1304,11 +1296,7 @@ mod tests { } let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); + let committer = Committer::new(None, replay_vote_sender, None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let process_transactions_batch_output = @@ -1408,11 +1396,7 @@ mod tests { .unwrap() .set_bank_for_test(bank.clone()); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); + let committer = Committer::new(None, replay_vote_sender, None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let process_transactions_batch_output = @@ -1487,11 +1471,7 @@ mod tests { .unwrap() .set_bank_for_test(bank.clone()); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); + let committer = Committer::new(None, replay_vote_sender, None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let get_block_cost = || bank.read_cost_tracker().unwrap().block_cost(); @@ -1650,11 +1630,7 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &poh_recorder); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); + let committer = Committer::new(None, replay_vote_sender, None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let process_transactions_batch_output = @@ -1855,11 +1831,7 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &Arc::new(RwLock::new(poh_recorder))); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); + let committer = Committer::new(None, replay_vote_sender, None); let consumer = Consumer::new(committer, recorder.clone(), QosService::new(1), None); let process_transactions_summary = @@ -1987,7 +1959,7 @@ mod tests { sender: transaction_status_sender, }), replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), + None, ); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); @@ -2132,7 +2104,7 @@ mod tests { sender: transaction_status_sender, }), replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), + None, ); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); @@ -2189,11 +2161,7 @@ mod tests { ); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); + let committer = Committer::new(None, replay_vote_sender, None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); // When the working bank in poh_recorder is None, no packets should be processed (consume will not be called) @@ -2274,11 +2242,7 @@ mod tests { ); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); + let committer = Committer::new(None, replay_vote_sender, None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); // When the working bank in poh_recorder is None, no packets should be processed @@ -2326,11 +2290,7 @@ mod tests { ); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); + let committer = Committer::new(None, replay_vote_sender, None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); // When the working bank in poh_recorder is None, no packets should be processed (consume will not be called) @@ -2458,11 +2418,7 @@ mod tests { ); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new( - None, - replay_vote_sender, - Arc::new(PrioritizationFeeCache::new(0u64)), - ); + let committer = Committer::new(None, replay_vote_sender, None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); // When the working bank in poh_recorder is None, no packets should be processed (consume will not be called) From b0df821e58b58e70e4ba08979126efd968e05df0 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 16 Dec 2024 10:41:39 -0600 Subject: [PATCH 2/8] BankingStage: optional priortization_fee_cache --- banking-bench/src/main.rs | 6 ++---- core/benches/banking_stage.rs | 6 ++---- core/src/banking_simulation.rs | 4 +--- core/src/banking_stage.rs | 22 +++++++++++----------- core/src/tpu.rs | 2 +- 5 files changed, 17 insertions(+), 23 deletions(-) diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 99722c7cf6082e..6400ec24d59f49 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -23,9 +23,7 @@ use { solana_measure::measure::Measure, solana_perf::packet::{to_packet_batches, PacketBatch}, solana_poh::poh_recorder::{create_test_recorder, PohRecorder, WorkingBankEntry}, - solana_runtime::{ - bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, - }, + solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ compute_budget::ComputeBudgetInstruction, hash::Hash, @@ -481,7 +479,7 @@ fn main() { None, Arc::new(connection_cache), bank_forks.clone(), - &Arc::new(PrioritizationFeeCache::new(0u64)), + None, false, ); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 59f861e8a5c7d2..0b8b953109bf70 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -39,9 +39,7 @@ use { test_tx::test_tx, }, solana_poh::poh_recorder::{create_test_recorder, WorkingBankEntry}, - solana_runtime::{ - bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, - }, + solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ genesis_config::GenesisConfig, hash::Hash, @@ -307,7 +305,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { None, Arc::new(ConnectionCache::new("connection_cache_test")), bank_forks, - &Arc::new(PrioritizationFeeCache::new(0u64)), + None, false, ); diff --git a/core/src/banking_simulation.rs b/core/src/banking_simulation.rs index 82a4e5e94a3e76..59e469928176b8 100644 --- a/core/src/banking_simulation.rs +++ b/core/src/banking_simulation.rs @@ -31,7 +31,6 @@ use { bank::{Bank, HashOverrides}, bank_forks::BankForks, installed_scheduler_pool::BankWithScheduler, - prioritization_fee_cache::PrioritizationFeeCache, }, solana_sdk::{ clock::{Slot, DEFAULT_MS_PER_SLOT, HOLD_TRANSACTIONS_SLOT_OFFSET}, @@ -800,7 +799,6 @@ impl BankingSimulator { let cluster_info = Arc::new(DummyClusterInfo { id: simulated_leader.into(), }); - let prioritization_fee_cache = &Arc::new(PrioritizationFeeCache::new(0u64)); let banking_stage = BankingStage::new_num_threads( block_production_method.clone(), &cluster_info, @@ -814,7 +812,7 @@ impl BankingSimulator { None, connection_cache, bank_forks.clone(), - prioritization_fee_cache, + None, false, ); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 63b85543188aa4..6d097a5f5e9503 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -360,7 +360,7 @@ impl BankingStage { log_messages_bytes_limit: Option, connection_cache: Arc, bank_forks: Arc>, - prioritization_fee_cache: &Arc, + prioritization_fee_cache: Option>, enable_forwarding: bool, ) -> Self { Self::new_num_threads( @@ -395,7 +395,7 @@ impl BankingStage { log_messages_bytes_limit: Option, connection_cache: Arc, bank_forks: Arc>, - prioritization_fee_cache: &Arc, + prioritization_fee_cache: Option>, enable_forwarding: bool, ) -> Self { match block_production_method { @@ -430,7 +430,7 @@ impl BankingStage { log_messages_bytes_limit: Option, connection_cache: Arc, bank_forks: Arc>, - prioritization_fee_cache: &Arc, + prioritization_fee_cache: Option>, ) -> Self { assert!(num_threads >= MIN_TOTAL_THREADS); // Single thread to generate entries from many banks. @@ -449,7 +449,7 @@ impl BankingStage { let committer = Committer::new( transaction_status_sender.clone(), replay_vote_sender.clone(), - Some(prioritization_fee_cache.clone()), + prioritization_fee_cache, ); let transaction_recorder = poh_recorder.read().unwrap().new_recorder(); @@ -516,7 +516,7 @@ impl BankingStage { log_messages_bytes_limit: Option, connection_cache: Arc, bank_forks: Arc>, - prioritization_fee_cache: &Arc, + prioritization_fee_cache: Option>, enable_forwarding: bool, ) -> Self { assert!(num_threads >= MIN_TOTAL_THREADS); @@ -534,7 +534,7 @@ impl BankingStage { let committer = Committer::new( transaction_status_sender.clone(), replay_vote_sender.clone(), - Some(prioritization_fee_cache.clone()), + prioritization_fee_cache, ); let transaction_recorder = poh_recorder.read().unwrap().new_recorder(); @@ -900,7 +900,7 @@ mod tests { None, Arc::new(ConnectionCache::new("connection_cache_test")), bank_forks, - &Arc::new(PrioritizationFeeCache::new(0u64)), + None, false, ); drop(non_vote_sender); @@ -960,7 +960,7 @@ mod tests { None, Arc::new(ConnectionCache::new("connection_cache_test")), bank_forks, - &Arc::new(PrioritizationFeeCache::new(0u64)), + None, false, ); trace!("sending bank"); @@ -1044,7 +1044,7 @@ mod tests { None, Arc::new(ConnectionCache::new("connection_cache_test")), bank_forks.clone(), // keep a local-copy of bank-forks so worker threads do not lose weak access to bank-forks - &Arc::new(PrioritizationFeeCache::new(0u64)), + None, false, ); @@ -1214,7 +1214,7 @@ mod tests { None, Arc::new(ConnectionCache::new("connection_cache_test")), bank_forks, - &Arc::new(PrioritizationFeeCache::new(0u64)), + None, ); // wait for banking_stage to eat the packets @@ -1409,7 +1409,7 @@ mod tests { None, Arc::new(ConnectionCache::new("connection_cache_test")), bank_forks, - &Arc::new(PrioritizationFeeCache::new(0u64)), + None, false, ); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index d715bb5c7b0534..2d6ea70623f4ba 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -279,7 +279,7 @@ impl Tpu { log_messages_bytes_limit, connection_cache.clone(), bank_forks.clone(), - prioritization_fee_cache, + Some(prioritization_fee_cache.clone()), enable_block_production_forwarding, ); From cac7edd4683650fcf444a70a9c2ea9942b0a77a7 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 16 Dec 2024 10:47:55 -0600 Subject: [PATCH 3/8] Tpu: optional priortiization_fee_cache --- core/src/tpu.rs | 4 ++-- core/src/validator.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 2d6ea70623f4ba..687996b3c7dd3e 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -116,7 +116,7 @@ impl Tpu { tracer_thread_hdl: TracerThread, tpu_enable_udp: bool, tpu_max_connections_per_ipaddr_per_minute: u64, - prioritization_fee_cache: &Arc, + prioritization_fee_cache: Option>, block_production_method: BlockProductionMethod, enable_block_production_forwarding: bool, _generator_config: Option, /* vestigial code for replay invalidator */ @@ -279,7 +279,7 @@ impl Tpu { log_messages_bytes_limit, connection_cache.clone(), bank_forks.clone(), - Some(prioritization_fee_cache.clone()), + prioritization_fee_cache, enable_block_production_forwarding, ); diff --git a/core/src/validator.rs b/core/src/validator.rs index c3318ee070f2bc..931394bb2cc713 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1534,7 +1534,7 @@ impl Validator { tracer_thread, tpu_enable_udp, tpu_max_connections_per_ipaddr_per_minute, - &prioritization_fee_cache, + Some(prioritization_fee_cache), config.block_production_method.clone(), config.enable_block_production_forwarding, config.generator_config.clone(), From dca61769edf5cd91051a68caf6c21ce9e2619c56 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 16 Dec 2024 11:01:16 -0600 Subject: [PATCH 4/8] replay processing: optional prioritization_fee_cache --- core/src/replay_stage.rs | 12 +++++---- ledger/benches/blockstore_processor.rs | 20 ++------------- ledger/src/blockstore_processor.rs | 34 ++++++++++++-------------- unified-scheduler-pool/src/lib.rs | 2 +- 4 files changed, 26 insertions(+), 42 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 0e0125a300594f..f53b462af52f1c 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -563,6 +563,8 @@ impl ReplayStage { banking_tracer, } = config; + let prioritization_fee_cache = Some(prioritization_fee_cache); + let ReplaySenders { rpc_subscriptions, slot_status_notifier, @@ -2244,7 +2246,7 @@ impl ReplayStage { replay_vote_sender: &ReplayVoteSender, verify_recyclers: &VerifyRecyclers, log_messages_bytes_limit: Option, - prioritization_fee_cache: &PrioritizationFeeCache, + prioritization_fee_cache: &Option>, ) -> result::Result { let mut w_replay_stats = replay_stats.write().unwrap(); let mut w_replay_progress = replay_progress.write().unwrap(); @@ -2850,7 +2852,7 @@ impl ReplayStage { replay_timing: &mut ReplayLoopTiming, log_messages_bytes_limit: Option, active_bank_slots: &[Slot], - prioritization_fee_cache: &PrioritizationFeeCache, + prioritization_fee_cache: &Option>, ) -> Vec { // Make mutable shared structures thread safe. let progress = RwLock::new(progress); @@ -2965,7 +2967,7 @@ impl ReplayStage { replay_timing: &mut ReplayLoopTiming, log_messages_bytes_limit: Option, bank_slot: Slot, - prioritization_fee_cache: &PrioritizationFeeCache, + prioritization_fee_cache: &Option>, ) -> ReplaySlotFromBlockstore { let mut replay_result = ReplaySlotFromBlockstore { is_slot_dead: false, @@ -3362,7 +3364,7 @@ impl ReplayStage { log_messages_bytes_limit: Option, replay_mode: &ForkReplayMode, replay_tx_thread_pool: &ThreadPool, - prioritization_fee_cache: &PrioritizationFeeCache, + prioritization_fee_cache: &Option>, purge_repair_slot_counter: &mut PurgeRepairSlotCounter, ) -> bool /* completed a bank */ { let active_bank_slots = bank_forks.read().unwrap().active_bank_slots(); @@ -5000,7 +5002,7 @@ pub(crate) mod tests { &replay_vote_sender, &VerifyRecyclers::default(), None, - &PrioritizationFeeCache::new(0u64), + &None, ); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); diff --git a/ledger/benches/blockstore_processor.rs b/ledger/benches/blockstore_processor.rs index 44f65db1d54fd4..49f979f93b17dd 100644 --- a/ledger/benches/blockstore_processor.rs +++ b/ledger/benches/blockstore_processor.rs @@ -14,7 +14,6 @@ use { solana_runtime::{ bank::Bank, bank_forks::BankForks, - prioritization_fee_cache::PrioritizationFeeCache, transaction_batch::{OwnedOrBorrowed, TransactionBatch}, }, solana_runtime_transaction::runtime_transaction::RuntimeTransaction, @@ -78,7 +77,6 @@ fn create_transactions(bank: &Bank, num: usize) -> Vec, _bank_forks: Arc>, - prioritization_fee_cache: PrioritizationFeeCache, } fn setup(apply_cost_tracker_during_replay: bool) -> BenchFrame { @@ -105,11 +103,9 @@ fn setup(apply_cost_tracker_during_replay: bool) -> BenchFrame { .unwrap() .set_limits(u64::MAX, u64::MAX, u64::MAX); let (bank, bank_forks) = bank.wrap_with_bank_forks_for_tests(); - let prioritization_fee_cache = PrioritizationFeeCache::default(); BenchFrame { bank, _bank_forks: bank_forks, - prioritization_fee_cache, } } @@ -127,11 +123,7 @@ fn bench_execute_batch( ); let batches_per_iteration = TRANSACTIONS_PER_ITERATION / batch_size; - let BenchFrame { - bank, - _bank_forks, - prioritization_fee_cache, - } = setup(apply_cost_tracker_during_replay); + let BenchFrame { bank, _bank_forks } = setup(apply_cost_tracker_during_replay); let transactions = create_transactions(&bank, 2_usize.pow(20)); let batches: Vec<_> = transactions .chunks(batch_size) @@ -154,15 +146,7 @@ fn bench_execute_batch( bencher.iter(|| { for _ in 0..batches_per_iteration { let batch = batches_iter.next().unwrap(); - let _ = execute_batch( - batch, - &bank, - None, - None, - &mut timing, - None, - &prioritization_fee_cache, - ); + let _ = execute_batch(batch, &bank, None, None, &mut timing, None, &None); } }); // drop batches here so dropping is not included in the benchmark diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 19cafec5adbbe1..a863b5c78dbec8 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -149,7 +149,7 @@ pub fn execute_batch( replay_vote_sender: Option<&ReplayVoteSender>, timings: &mut ExecuteTimings, log_messages_bytes_limit: Option, - prioritization_fee_cache: &PrioritizationFeeCache, + prioritization_fee_cache: &Option>, ) -> Result<()> { let TransactionBatchWithIndexes { batch, @@ -228,7 +228,9 @@ pub fn execute_batch( ); } - prioritization_fee_cache.update(bank, committed_transactions.into_iter()); + if let Some(prioritization_fee_cache) = prioritization_fee_cache { + prioritization_fee_cache.update(bank, committed_transactions.into_iter()); + } first_err.map(|(result, _)| result).unwrap_or(Ok(())) } @@ -300,7 +302,7 @@ fn execute_batches_internal( transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, log_messages_bytes_limit: Option, - prioritization_fee_cache: &PrioritizationFeeCache, + prioritization_fee_cache: &Option>, ) -> Result { assert!(!batches.is_empty()); let execution_timings_per_thread: Mutex> = @@ -379,7 +381,7 @@ fn process_batches( replay_vote_sender: Option<&ReplayVoteSender>, batch_execution_timing: &mut BatchExecutionTiming, log_messages_bytes_limit: Option, - prioritization_fee_cache: &PrioritizationFeeCache, + prioritization_fee_cache: &Option>, ) -> Result<()> { if bank.has_installed_scheduler() { debug!( @@ -482,7 +484,7 @@ fn rebatch_and_execute_batches( replay_vote_sender: Option<&ReplayVoteSender>, timing: &mut BatchExecutionTiming, log_messages_bytes_limit: Option, - prioritization_fee_cache: &PrioritizationFeeCache, + prioritization_fee_cache: &Option>, ) -> Result<()> { if locked_entries.len() == 0 { return Ok(()); @@ -624,7 +626,6 @@ pub fn process_entries_for_tests( }) .collect(); - let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64); let result = process_entries( bank, &replay_tx_thread_pool, @@ -633,7 +634,7 @@ pub fn process_entries_for_tests( replay_vote_sender, &mut batch_timing, None, - &ignored_prioritization_fee_cache, + &None, ); debug!("process_entries: {:?}", batch_timing); @@ -648,7 +649,7 @@ fn process_entries( replay_vote_sender: Option<&ReplayVoteSender>, batch_timing: &mut BatchExecutionTiming, log_messages_bytes_limit: Option, - prioritization_fee_cache: &PrioritizationFeeCache, + prioritization_fee_cache: &Option>, ) -> Result<()> { // accumulator for entries that can be processed in parallel let mut batches = vec![]; @@ -1163,8 +1164,6 @@ fn confirm_full_slot( ) -> result::Result<(), BlockstoreProcessorError> { let mut confirmation_timing = ConfirmationTiming::default(); let skip_verification = !opts.run_verification; - let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64); - confirm_slot( blockstore, bank, @@ -1178,7 +1177,7 @@ fn confirm_full_slot( recyclers, opts.allow_dead_slots, opts.runtime_config.log_messages_bytes_limit, - &ignored_prioritization_fee_cache, + &None, )?; timing.accumulate(&confirmation_timing.batch_execute.totals); @@ -1513,7 +1512,7 @@ pub fn confirm_slot( recyclers: &VerifyRecyclers, allow_dead_slots: bool, log_messages_bytes_limit: Option, - prioritization_fee_cache: &PrioritizationFeeCache, + prioritization_fee_cache: &Option>, ) -> result::Result<(), BlockstoreProcessorError> { let slot = bank.slot(); @@ -1560,7 +1559,7 @@ fn confirm_slot_entries( replay_vote_sender: Option<&ReplayVoteSender>, recyclers: &VerifyRecyclers, log_messages_bytes_limit: Option, - prioritization_fee_cache: &PrioritizationFeeCache, + prioritization_fee_cache: &Option>, ) -> result::Result<(), BlockstoreProcessorError> { let ConfirmationTiming { confirmation_elapsed, @@ -4813,7 +4812,7 @@ pub mod tests { None, &VerifyRecyclers::default(), None, - &PrioritizationFeeCache::new(0u64), + &None, ) } @@ -4906,7 +4905,7 @@ pub mod tests { None, &VerifyRecyclers::default(), None, - &PrioritizationFeeCache::new(0u64), + &None, ) .unwrap(); assert_eq!(progress.num_txs, 2); @@ -4951,7 +4950,7 @@ pub mod tests { None, &VerifyRecyclers::default(), None, - &PrioritizationFeeCache::new(0u64), + &None, ) .unwrap(); assert_eq!(progress.num_txs, 5); @@ -5054,7 +5053,6 @@ pub mod tests { let replay_tx_thread_pool = create_thread_pool(1); let mut batch_execution_timing = BatchExecutionTiming::default(); - let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64); let result = process_batches( &bank, &replay_tx_thread_pool, @@ -5063,7 +5061,7 @@ pub mod tests { None, &mut batch_execution_timing, None, - &ignored_prioritization_fee_cache, + &None, ); if should_succeed { assert_matches!(result, Ok(())); diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 89f3fb4d4f2103..4e728983b7dafa 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -446,7 +446,7 @@ impl TaskHandler for DefaultTaskHandler { handler_context.replay_vote_sender.as_ref(), timings, handler_context.log_messages_bytes_limit, - &handler_context.prioritization_fee_cache, + &Some(handler_context.prioritization_fee_cache.clone()), ); sleepless_testing::at(CheckPoint::TaskHandled(index)); } From 0f97a2b8a306c40e1a7467ea07f1de8623996f91 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 16 Dec 2024 11:07:05 -0600 Subject: [PATCH 5/8] HandlerContext: optional prioritization_fee_cache --- core/src/validator.rs | 2 +- core/tests/unified_scheduler.rs | 11 +-- ledger-tool/src/ledger_utils.rs | 4 +- unified-scheduler-pool/src/lib.rs | 114 ++++++++---------------------- 4 files changed, 34 insertions(+), 97 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index 931394bb2cc713..f96e9742f41a00 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -901,7 +901,7 @@ impl Validator { config.runtime_config.log_messages_bytes_limit, transaction_status_sender.clone(), Some(replay_vote_sender.clone()), - prioritization_fee_cache.clone(), + Some(prioritization_fee_cache.clone()), ); bank_forks .write() diff --git a/core/tests/unified_scheduler.rs b/core/tests/unified_scheduler.rs index a6a1e1e19582f0..4e040f4284cb02 100644 --- a/core/tests/unified_scheduler.rs +++ b/core/tests/unified_scheduler.rs @@ -18,7 +18,6 @@ use { solana_runtime::{ accounts_background_service::AbsRequestSender, bank::Bank, bank_forks::BankForks, genesis_utils::GenesisConfigInfo, installed_scheduler_pool::SchedulingContext, - prioritization_fee_cache::PrioritizationFeeCache, }, solana_runtime_transaction::runtime_transaction::RuntimeTransaction, solana_sdk::{hash::Hash, pubkey::Pubkey, system_transaction, transaction::Result}, @@ -68,14 +67,8 @@ fn test_scheduler_waited_by_drop_bank_service() { // Setup bankforks with unified scheduler enabled let genesis_bank = Bank::new_for_tests(&genesis_config); let bank_forks = BankForks::new_rw_arc(genesis_bank); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool_raw = SchedulerPool::, _>::new( - None, - None, - None, - None, - ignored_prioritization_fee_cache, - ); + let pool_raw = + SchedulerPool::, _>::new(None, None, None, None, None); let pool = pool_raw.clone(); bank_forks.write().unwrap().install_scheduler_pool(pool); let genesis = 0; diff --git a/ledger-tool/src/ledger_utils.rs b/ledger-tool/src/ledger_utils.rs index 58c2a54f725055..bb43b203befeee 100644 --- a/ledger-tool/src/ledger_utils.rs +++ b/ledger-tool/src/ledger_utils.rs @@ -36,7 +36,6 @@ use { PrunedBanksRequestHandler, SnapshotRequestHandler, }, bank_forks::BankForks, - prioritization_fee_cache::PrioritizationFeeCache, snapshot_config::SnapshotConfig, snapshot_hash::StartingSnapshotHashes, snapshot_utils::{self, clean_orphaned_account_snapshot_dirs}, @@ -376,7 +375,6 @@ pub fn load_and_process_ledger( } BlockVerificationMethod::UnifiedScheduler => { let no_replay_vote_sender = None; - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); bank_forks .write() .unwrap() @@ -385,7 +383,7 @@ pub fn load_and_process_ledger( process_options.runtime_config.log_messages_bytes_limit, transaction_status_sender.clone(), no_replay_vote_sender, - ignored_prioritization_fee_cache, + None, )); } } diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 4e728983b7dafa..bbe6a5c9b9ecdd 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -100,7 +100,7 @@ pub struct HandlerContext { log_messages_bytes_limit: Option, transaction_status_sender: Option, replay_vote_sender: Option, - prioritization_fee_cache: Arc, + prioritization_fee_cache: Option>, } pub type DefaultSchedulerPool = @@ -138,7 +138,7 @@ where log_messages_bytes_limit: Option, transaction_status_sender: Option, replay_vote_sender: Option, - prioritization_fee_cache: Arc, + prioritization_fee_cache: Option>, ) -> Arc { Self::do_new( handler_count, @@ -158,7 +158,7 @@ where log_messages_bytes_limit: Option, transaction_status_sender: Option, replay_vote_sender: Option, - prioritization_fee_cache: Arc, + prioritization_fee_cache: Option>, pool_cleaner_interval: Duration, max_pooling_duration: Duration, max_usage_queue_count: usize, @@ -284,7 +284,7 @@ where log_messages_bytes_limit: Option, transaction_status_sender: Option, replay_vote_sender: Option, - prioritization_fee_cache: Arc, + prioritization_fee_cache: Option>, ) -> InstalledSchedulerPoolArc { Self::new( handler_count, @@ -446,7 +446,7 @@ impl TaskHandler for DefaultTaskHandler { handler_context.replay_vote_sender.as_ref(), timings, handler_context.log_messages_bytes_limit, - &Some(handler_context.prioritization_fee_cache.clone()), + &handler_context.prioritization_fee_cache, ); sleepless_testing::at(CheckPoint::TaskHandled(index)); } @@ -1472,7 +1472,6 @@ mod tests { bank_forks::BankForks, genesis_utils::{create_genesis_config, GenesisConfigInfo}, installed_scheduler_pool::{BankWithScheduler, SchedulingContext}, - prioritization_fee_cache::PrioritizationFeeCache, }, solana_sdk::{ clock::{Slot, MAX_PROCESSING_AGE}, @@ -1507,9 +1506,7 @@ mod tests { fn test_scheduler_pool_new() { solana_logger::setup(); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new_dyn(None, None, None, None, None); // this indirectly proves that there should be circular link because there's only one Arc // at this moment now @@ -1523,9 +1520,7 @@ mod tests { fn test_scheduler_spawn() { solana_logger::setup(); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new_dyn(None, None, None, None, None); let bank = Arc::new(Bank::default_for_tests()); let context = SchedulingContext::new(bank); let scheduler = pool.take_scheduler(context); @@ -1548,13 +1543,12 @@ mod tests { &TestCheckPoint::AfterIdleSchedulerCleaned, ]); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool_raw = DefaultSchedulerPool::do_new( None, None, None, None, - ignored_prioritization_fee_cache, + None, SHORTENED_POOL_CLEANER_INTERVAL, SHORTENED_MAX_POOLING_DURATION, DEFAULT_MAX_USAGE_QUEUE_COUNT, @@ -1612,14 +1606,13 @@ mod tests { &TestCheckPoint::AfterTrashedSchedulerCleaned, ]); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); const REDUCED_MAX_USAGE_QUEUE_COUNT: usize = 1; let pool_raw = DefaultSchedulerPool::do_new( None, None, None, None, - ignored_prioritization_fee_cache, + None, SHORTENED_POOL_CLEANER_INTERVAL, DEFAULT_MAX_POOLING_DURATION, REDUCED_MAX_USAGE_QUEUE_COUNT, @@ -1688,13 +1681,12 @@ mod tests { &TestCheckPoint::AfterIdleSchedulerCleaned, ]); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool_raw = DefaultSchedulerPool::do_new( None, None, None, None, - ignored_prioritization_fee_cache, + None, SHORTENED_POOL_CLEANER_INTERVAL, SHORTENED_MAX_POOLING_DURATION, DEFAULT_MAX_USAGE_QUEUE_COUNT, @@ -1736,13 +1728,12 @@ mod tests { &TestCheckPoint::AfterTimeoutListenerTriggered, ]); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool_raw = SchedulerPool::, _>::do_new( None, None, None, None, - ignored_prioritization_fee_cache, + None, SHORTENED_POOL_CLEANER_INTERVAL, DEFAULT_MAX_POOLING_DURATION, DEFAULT_MAX_USAGE_QUEUE_COUNT, @@ -1822,13 +1813,12 @@ mod tests { &TestCheckPoint::AfterTimeoutListenerTriggered, ]); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool_raw = DefaultSchedulerPool::do_new( None, None, None, None, - ignored_prioritization_fee_cache, + None, SHORTENED_POOL_CLEANER_INTERVAL, DEFAULT_MAX_POOLING_DURATION, DEFAULT_MAX_USAGE_QUEUE_COUNT, @@ -1869,13 +1859,12 @@ mod tests { &TestCheckPoint::AfterTimeoutListenerTriggered, ]); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool_raw = SchedulerPool::, _>::do_new( None, None, None, None, - ignored_prioritization_fee_cache, + None, SHORTENED_POOL_CLEANER_INTERVAL, DEFAULT_MAX_POOLING_DURATION, DEFAULT_MAX_USAGE_QUEUE_COUNT, @@ -1971,14 +1960,8 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); let (bank, _bank_forks) = setup_dummy_fork_graph(bank); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = SchedulerPool::, _>::new( - None, - None, - None, - None, - ignored_prioritization_fee_cache, - ); + let pool = + SchedulerPool::, _>::new(None, None, None, None, None); let context = SchedulingContext::new(bank.clone()); let scheduler = pool.do_take_scheduler(context); scheduler.schedule_execution(tx, 0).unwrap(); @@ -2063,14 +2046,8 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); let (bank, _bank_forks) = setup_dummy_fork_graph(bank); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = SchedulerPool::, _>::new( - None, - None, - None, - None, - ignored_prioritization_fee_cache, - ); + let pool = + SchedulerPool::, _>::new(None, None, None, None, None); let context = SchedulingContext::new(bank.clone()); let scheduler = pool.do_take_scheduler(context); @@ -2103,9 +2080,7 @@ mod tests { fn test_scheduler_pool_filo() { solana_logger::setup(); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new(None, None, None, None, None); let bank = Arc::new(Bank::default_for_tests()); let context = &SchedulingContext::new(bank); @@ -2132,9 +2107,7 @@ mod tests { fn test_scheduler_pool_context_drop_unless_reinitialized() { solana_logger::setup(); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new(None, None, None, None, None); let bank = Arc::new(Bank::default_for_tests()); let context = &SchedulingContext::new(bank); let mut scheduler = pool.do_take_scheduler(context.clone()); @@ -2151,9 +2124,7 @@ mod tests { fn test_scheduler_pool_context_replace() { solana_logger::setup(); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new(None, None, None, None, None); let old_bank = &Arc::new(Bank::default_for_tests()); let new_bank = &Arc::new(Bank::default_for_tests()); assert!(!Arc::ptr_eq(old_bank, new_bank)); @@ -2177,9 +2148,7 @@ mod tests { let bank = Bank::default_for_tests(); let bank_forks = BankForks::new_rw_arc(bank); let mut bank_forks = bank_forks.write().unwrap(); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new_dyn(None, None, None, None, None); bank_forks.install_scheduler_pool(pool); } @@ -2191,9 +2160,7 @@ mod tests { let bank = Arc::new(Bank::new_for_tests(&genesis_config)); let child_bank = Bank::new_from_parent(bank, &Pubkey::default(), 1); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new_dyn(None, None, None, None, None); let bank = Bank::default_for_tests(); let bank_forks = BankForks::new_rw_arc(bank); @@ -2241,9 +2208,7 @@ mod tests { )); let bank = Bank::new_for_tests(&genesis_config); let (bank, _bank_forks) = setup_dummy_fork_graph(bank); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new_dyn(None, None, None, None, None); let context = SchedulingContext::new(bank.clone()); assert_eq!(bank.transaction_count(), 0); @@ -2276,13 +2241,12 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); let (bank, _bank_forks) = setup_dummy_fork_graph(bank); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool_raw = DefaultSchedulerPool::do_new( None, None, None, None, - ignored_prioritization_fee_cache, + None, SHORTENED_POOL_CLEANER_INTERVAL, DEFAULT_MAX_POOLING_DURATION, DEFAULT_MAX_USAGE_QUEUE_COUNT, @@ -2409,13 +2373,12 @@ mod tests { // scheduler thread has been aborted already or not. const TX_COUNT: usize = 2; - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = SchedulerPool::, _>::new_dyn( Some(TX_COUNT), // fix to use exactly 2 handlers None, None, None, - ignored_prioritization_fee_cache, + None, ); let context = SchedulingContext::new(bank.clone()); @@ -2484,13 +2447,8 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); let (bank, _bank_forks) = setup_dummy_fork_graph(bank); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = SchedulerPool::, _>::new( - None, - None, - None, - None, - ignored_prioritization_fee_cache, + None, None, None, None, None, ); let context = SchedulingContext::new(bank.clone()); let scheduler = pool.do_take_scheduler(context); @@ -2568,13 +2526,8 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); let (bank, _bank_forks) = setup_dummy_fork_graph(bank); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = SchedulerPool::, _>::new_dyn( - None, - None, - None, - None, - ignored_prioritization_fee_cache, + None, None, None, None, None, ); let context = SchedulingContext::new(bank.clone()); @@ -2634,13 +2587,12 @@ mod tests { bank0.slot().checked_add(1).unwrap(), )); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = SchedulerPool::, _>::new( Some(4), // spawn 4 threads None, None, None, - ignored_prioritization_fee_cache, + None, ); // Create a dummy tx and two contexts @@ -2833,14 +2785,9 @@ mod tests { let (bank, _bank_forks) = setup_dummy_fork_graph(bank); let context = SchedulingContext::new(bank.clone()); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = SchedulerPool::, DefaultTaskHandler>::new_dyn( - None, - None, - None, - None, - ignored_prioritization_fee_cache, + None, None, None, None, None, ); let scheduler = pool.take_scheduler(context); @@ -2916,13 +2863,12 @@ mod tests { // this internally should call SanitizedTransaction::get_account_locks(). let result = &mut Ok(()); let timings = &mut ExecuteTimings::default(); - let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let scheduling_context = &SchedulingContext::new(bank.clone()); let handler_context = &HandlerContext { log_messages_bytes_limit: None, transaction_status_sender: None, replay_vote_sender: None, - prioritization_fee_cache, + prioritization_fee_cache: None, }; let task = SchedulingStateMachine::create_task(tx, 0, &mut |_| UsageQueue::default()); From f3ad10d9abcd9358201583c3aa3a5336fd1bc098 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 16 Dec 2024 11:13:24 -0600 Subject: [PATCH 6/8] OptimisticallyConfirmedBankTracker: optional prioritization_fee_cache --- core/src/validator.rs | 2 +- .../optimistically_confirmed_bank_tracker.rs | 34 ++++++++++--------- rpc/src/rpc.rs | 8 ++--- rpc/src/rpc_subscriptions.rs | 17 +++++----- 4 files changed, 31 insertions(+), 30 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index f96e9742f41a00..73cdce933345cc 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1171,7 +1171,7 @@ impl Validator { optimistically_confirmed_bank, rpc_subscriptions.clone(), confirmed_bank_subscribers, - prioritization_fee_cache.clone(), + Some(prioritization_fee_cache.clone()), )); let bank_notification_sender_config = Some(BankNotificationSenderConfig { sender: bank_notification_sender, diff --git a/rpc/src/optimistically_confirmed_bank_tracker.rs b/rpc/src/optimistically_confirmed_bank_tracker.rs index 608b0b27071a84..068da80180323c 100644 --- a/rpc/src/optimistically_confirmed_bank_tracker.rs +++ b/rpc/src/optimistically_confirmed_bank_tracker.rs @@ -94,7 +94,7 @@ impl OptimisticallyConfirmedBankTracker { optimistically_confirmed_bank: Arc>, subscriptions: Arc, slot_notification_subscribers: Option>>>, - prioritization_fee_cache: Arc, + prioritization_fee_cache: Option>, ) -> Self { let mut pending_optimistically_confirmed_banks = HashSet::new(); let mut last_notified_confirmed_slot: Slot = 0; @@ -137,7 +137,7 @@ impl OptimisticallyConfirmedBankTracker { highest_confirmed_slot: &mut Slot, newest_root_slot: &mut Slot, slot_notification_subscribers: &Option>>>, - prioritization_fee_cache: &PrioritizationFeeCache, + prioritization_fee_cache: &Option>, ) -> Result<(), RecvTimeoutError> { let notification = receiver.recv_timeout(Duration::from_secs(1))?; Self::process_notification( @@ -181,7 +181,7 @@ impl OptimisticallyConfirmedBankTracker { last_notified_confirmed_slot: &mut Slot, pending_optimistically_confirmed_banks: &mut HashSet, slot_notification_subscribers: &Option>>>, - prioritization_fee_cache: &PrioritizationFeeCache, + prioritization_fee_cache: &Option>, ) { if bank.is_frozen() { if bank.slot() > *last_notified_confirmed_slot { @@ -197,7 +197,9 @@ impl OptimisticallyConfirmedBankTracker { ); // finalize block's minimum prioritization fee cache for this bank - prioritization_fee_cache.finalize_priority_fee(bank.slot(), bank.bank_id()); + if let Some(prioritization_fee_cache) = prioritization_fee_cache { + prioritization_fee_cache.finalize_priority_fee(bank.slot(), bank.bank_id()); + } } } else if bank.slot() > bank_forks.read().unwrap().root() { pending_optimistically_confirmed_banks.insert(bank.slot()); @@ -213,7 +215,7 @@ impl OptimisticallyConfirmedBankTracker { last_notified_confirmed_slot: &mut Slot, pending_optimistically_confirmed_banks: &mut HashSet, slot_notification_subscribers: &Option>>>, - prioritization_fee_cache: &PrioritizationFeeCache, + prioritization_fee_cache: &Option>, ) { for confirmed_bank in bank.parents_inclusive().iter().rev() { if confirmed_bank.slot() > slot_threshold { @@ -273,7 +275,7 @@ impl OptimisticallyConfirmedBankTracker { highest_confirmed_slot: &mut Slot, newest_root_slot: &mut Slot, slot_notification_subscribers: &Option>>>, - prioritization_fee_cache: &PrioritizationFeeCache, + prioritization_fee_cache: &Option>, ) { debug!("received bank notification: {:?}", notification); match notification { @@ -467,7 +469,7 @@ mod tests { &mut highest_confirmed_slot, &mut newest_root_slot, &None, - &PrioritizationFeeCache::default(), + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); assert_eq!(highest_confirmed_slot, 2); @@ -483,7 +485,7 @@ mod tests { &mut highest_confirmed_slot, &mut newest_root_slot, &None, - &PrioritizationFeeCache::default(), + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); assert_eq!(highest_confirmed_slot, 2); @@ -499,7 +501,7 @@ mod tests { &mut highest_confirmed_slot, &mut newest_root_slot, &None, - &PrioritizationFeeCache::default(), + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); assert_eq!(pending_optimistically_confirmed_banks.len(), 1); @@ -520,7 +522,7 @@ mod tests { &mut highest_confirmed_slot, &mut newest_root_slot, &None, - &PrioritizationFeeCache::default(), + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3); assert_eq!(highest_confirmed_slot, 3); @@ -540,7 +542,7 @@ mod tests { &mut highest_confirmed_slot, &mut newest_root_slot, &None, - &PrioritizationFeeCache::default(), + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3); assert_eq!(pending_optimistically_confirmed_banks.len(), 1); @@ -569,7 +571,7 @@ mod tests { &mut highest_confirmed_slot, &mut newest_root_slot, &subscribers, - &PrioritizationFeeCache::default(), + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5); assert_eq!(pending_optimistically_confirmed_banks.len(), 0); @@ -588,7 +590,7 @@ mod tests { &mut highest_confirmed_slot, &mut newest_root_slot, &subscribers, - &PrioritizationFeeCache::default(), + &None, ); assert_eq!(newest_root_slot, 5); @@ -619,7 +621,7 @@ mod tests { &mut highest_confirmed_slot, &mut newest_root_slot, &None, - &PrioritizationFeeCache::default(), + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5); assert_eq!(pending_optimistically_confirmed_banks.len(), 0); @@ -640,7 +642,7 @@ mod tests { &mut highest_confirmed_slot, &mut newest_root_slot, &subscribers, - &PrioritizationFeeCache::default(), + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 7); assert_eq!(pending_optimistically_confirmed_banks.len(), 0); @@ -658,7 +660,7 @@ mod tests { &mut highest_confirmed_slot, &mut newest_root_slot, &subscribers, - &PrioritizationFeeCache::default(), + &None, ); assert_eq!(newest_root_slot, 7); diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index dfe856ee0b6a1a..d3a8dabd3382e0 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -8444,7 +8444,7 @@ pub mod tests { &mut highest_confirmed_slot, &mut highest_root_slot, &None, - &PrioritizationFeeCache::default(), + &None, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; @@ -8464,7 +8464,7 @@ pub mod tests { &mut highest_confirmed_slot, &mut highest_root_slot, &None, - &PrioritizationFeeCache::default(), + &None, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; @@ -8484,7 +8484,7 @@ pub mod tests { &mut highest_confirmed_slot, &mut highest_root_slot, &None, - &PrioritizationFeeCache::default(), + &None, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; @@ -8505,7 +8505,7 @@ pub mod tests { &mut highest_confirmed_slot, &mut highest_root_slot, &None, - &PrioritizationFeeCache::default(), + &None, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 5aefa09c5b6603..3432774abfb49c 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -1245,7 +1245,6 @@ pub(crate) mod tests { solana_runtime::{ commitment::BlockCommitment, genesis_utils::{create_genesis_config, GenesisConfigInfo}, - prioritization_fee_cache::PrioritizationFeeCache, }, solana_sdk::{ commitment_config::CommitmentConfig, @@ -2029,7 +2028,7 @@ pub(crate) mod tests { &mut highest_confirmed_slot, &mut highest_root_slot, &None, - &PrioritizationFeeCache::default(), + &None, ); // a closure to reduce code duplications in building expected responses: @@ -2082,7 +2081,7 @@ pub(crate) mod tests { &mut highest_confirmed_slot, &mut highest_root_slot, &None, - &PrioritizationFeeCache::default(), + &None, ); let response = receiver.recv(); @@ -2204,7 +2203,7 @@ pub(crate) mod tests { &mut highest_confirmed_slot, &mut highest_root_slot, &None, - &PrioritizationFeeCache::default(), + &None, ); // The following should panic @@ -2322,7 +2321,7 @@ pub(crate) mod tests { &mut highest_confirmed_slot, &mut highest_root_slot, &None, - &PrioritizationFeeCache::default(), + &None, ); // a closure to reduce code duplications in building expected responses: @@ -2377,7 +2376,7 @@ pub(crate) mod tests { &mut highest_confirmed_slot, &mut highest_root_slot, &None, - &PrioritizationFeeCache::default(), + &None, ); let response = receiver.recv(); @@ -2816,7 +2815,7 @@ pub(crate) mod tests { &mut highest_confirmed_slot, &mut highest_root_slot, &None, - &PrioritizationFeeCache::default(), + &None, ); // Now, notify the frozen bank and ensure its notifications are processed @@ -2831,7 +2830,7 @@ pub(crate) mod tests { &mut highest_confirmed_slot, &mut highest_root_slot, &None, - &PrioritizationFeeCache::default(), + &None, ); let response = receiver0.recv(); @@ -2886,7 +2885,7 @@ pub(crate) mod tests { &mut highest_confirmed_slot, &mut highest_root_slot, &None, - &PrioritizationFeeCache::default(), + &None, ); let response = receiver1.recv(); let expected = json!({ From b65ae9c35a174b546422d2f8f077900f9765f04a Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 16 Dec 2024 11:16:23 -0600 Subject: [PATCH 7/8] replay stage config: optional prioritization_fee_cache --- core/src/replay_stage.rs | 4 +--- core/src/tvu.rs | 7 +++---- core/src/validator.rs | 2 +- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index f53b462af52f1c..9f7795083a8906 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -267,7 +267,7 @@ pub struct ReplayStageConfig { pub vote_tracker: Arc, pub cluster_slots: Arc, pub log_messages_bytes_limit: Option, - pub prioritization_fee_cache: Arc, + pub prioritization_fee_cache: Option>, pub banking_tracer: Arc, } @@ -563,8 +563,6 @@ impl ReplayStage { banking_tracer, } = config; - let prioritization_fee_cache = Some(prioritization_fee_cache); - let ReplaySenders { rpc_subscriptions, slot_status_notifier, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index e14a66971201fb..df94b892f6a63f 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -151,7 +151,7 @@ impl Tvu { accounts_background_request_sender: AbsRequestSender, log_messages_bytes_limit: Option, connection_cache: Option<&Arc>, - prioritization_fee_cache: &Arc, + prioritization_fee_cache: Option>, banking_tracer: Arc, turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>, turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, @@ -323,7 +323,7 @@ impl Tvu { vote_tracker, cluster_slots, log_messages_bytes_limit, - prioritization_fee_cache: prioritization_fee_cache.clone(), + prioritization_fee_cache, banking_tracer, }; @@ -505,7 +505,6 @@ pub mod tests { let (_, gossip_confirmed_slots_receiver) = unbounded(); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let outstanding_repair_requests = Arc::>::default(); let cluster_slots = Arc::new(ClusterSlots::default()); let wen_restart_repair_slots = if enable_wen_restart { @@ -574,7 +573,7 @@ pub mod tests { AbsRequestSender::default(), None, Some(&Arc::new(ConnectionCache::new("connection_cache_test"))), - &ignored_prioritization_fee_cache, + None, BankingTracer::new_disabled(), turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver, diff --git a/core/src/validator.rs b/core/src/validator.rs index 73cdce933345cc..4e669a26bbf378 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1458,7 +1458,7 @@ impl Validator { accounts_background_request_sender.clone(), config.runtime_config.log_messages_bytes_limit, json_rpc_service.is_some().then_some(&connection_cache), // for the cache warmer only used for STS for RPC service - &prioritization_fee_cache, + Some(prioritization_fee_cache.clone()), banking_tracer.clone(), turbine_quic_endpoint_sender.clone(), turbine_quic_endpoint_receiver, From 2784b15eb03ef5a6092c1ab5c64197248d77fa25 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 16 Dec 2024 11:27:15 -0600 Subject: [PATCH 8/8] enable prioritization fee cache if rpc addrs is some --- core/src/validator.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index 4e669a26bbf378..643c44d841c722 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -881,9 +881,12 @@ impl Validator { let (replay_vote_sender, replay_vote_receiver) = unbounded(); - // block min prioritization fee cache should be readable by RPC, and writable by validator - // (by both replay stage and banking stage) - let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default()); + // priorization fee cache is `Some` if RPC is enabled, otherwise it's `None` + let prioritization_fee_cache = if config.rpc_addrs.is_some() { + Some(Arc::new(PrioritizationFeeCache::default())) + } else { + None + }; match &config.block_verification_method { BlockVerificationMethod::BlockstoreProcessor => { @@ -901,7 +904,7 @@ impl Validator { config.runtime_config.log_messages_bytes_limit, transaction_status_sender.clone(), Some(replay_vote_sender.clone()), - Some(prioritization_fee_cache.clone()), + prioritization_fee_cache.clone(), ); bank_forks .write() @@ -1107,7 +1110,7 @@ impl Validator { connection_cache.clone(), max_complete_transaction_status_slot, max_complete_rewards_slot, - prioritization_fee_cache.clone(), + prioritization_fee_cache.clone().unwrap(), ) .map_err(ValidatorError::Other)?; @@ -1171,7 +1174,7 @@ impl Validator { optimistically_confirmed_bank, rpc_subscriptions.clone(), confirmed_bank_subscribers, - Some(prioritization_fee_cache.clone()), + prioritization_fee_cache.clone(), )); let bank_notification_sender_config = Some(BankNotificationSenderConfig { sender: bank_notification_sender, @@ -1458,7 +1461,7 @@ impl Validator { accounts_background_request_sender.clone(), config.runtime_config.log_messages_bytes_limit, json_rpc_service.is_some().then_some(&connection_cache), // for the cache warmer only used for STS for RPC service - Some(prioritization_fee_cache.clone()), + prioritization_fee_cache.clone(), banking_tracer.clone(), turbine_quic_endpoint_sender.clone(), turbine_quic_endpoint_receiver, @@ -1534,7 +1537,7 @@ impl Validator { tracer_thread, tpu_enable_udp, tpu_max_connections_per_ipaddr_per_minute, - Some(prioritization_fee_cache), + prioritization_fee_cache, config.block_production_method.clone(), config.enable_block_production_forwarding, config.generator_config.clone(),