Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optional prioritization_fee_cache #4144

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
6 changes: 2 additions & 4 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ use {
solana_measure::measure::Measure,
solana_perf::packet::{to_packet_batches, PacketBatch},
solana_poh::poh_recorder::{create_test_recorder, PohRecorder, WorkingBankEntry},
solana_runtime::{
bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
compute_budget::ComputeBudgetInstruction,
hash::Hash,
Expand Down Expand Up @@ -481,7 +479,7 @@ fn main() {
None,
Arc::new(connection_cache),
bank_forks.clone(),
&Arc::new(PrioritizationFeeCache::new(0u64)),
None,
false,
);

Expand Down
8 changes: 3 additions & 5 deletions core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ use {
test_tx::test_tx,
},
solana_poh::poh_recorder::{create_test_recorder, WorkingBankEntry},
solana_runtime::{
bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
genesis_config::GenesisConfig,
hash::Hash,
Expand Down Expand Up @@ -109,7 +107,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
ThreadType::Transactions,
);
let (s, _r) = unbounded();
let committer = Committer::new(None, s, Arc::new(PrioritizationFeeCache::new(0u64)));
let committer = Committer::new(None, s, None);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
// This tests the performance of buffering packets.
// If the packet buffers are copied, performance will be poor.
Expand Down Expand Up @@ -307,7 +305,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
None,
false,
);

Expand Down
2 changes: 1 addition & 1 deletion core/benches/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ fn create_transactions(bank: &Bank, num: usize) -> Vec<RuntimeTransaction<Saniti

fn create_consumer(poh_recorder: &RwLock<PohRecorder>) -> Consumer {
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender, Arc::default());
let committer = Committer::new(None, replay_vote_sender, None);
let transaction_recorder = poh_recorder.read().unwrap().new_recorder();
Consumer::new(committer, transaction_recorder, QosService::new(0), None)
}
Expand Down
4 changes: 1 addition & 3 deletions core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use {
bank::{Bank, HashOverrides},
bank_forks::BankForks,
installed_scheduler_pool::BankWithScheduler,
prioritization_fee_cache::PrioritizationFeeCache,
},
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT, HOLD_TRANSACTIONS_SLOT_OFFSET},
Expand Down Expand Up @@ -800,7 +799,6 @@ impl BankingSimulator {
let cluster_info = Arc::new(DummyClusterInfo {
id: simulated_leader.into(),
});
let prioritization_fee_cache = &Arc::new(PrioritizationFeeCache::new(0u64));
let banking_stage = BankingStage::new_num_threads(
block_production_method.clone(),
&cluster_info,
Expand All @@ -814,7 +812,7 @@ impl BankingSimulator {
None,
connection_cache,
bank_forks.clone(),
prioritization_fee_cache,
None,
false,
);

Expand Down
22 changes: 11 additions & 11 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ impl BankingStage {
log_messages_bytes_limit: Option<usize>,
connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
prioritization_fee_cache: Option<Arc<PrioritizationFeeCache>>,
enable_forwarding: bool,
) -> Self {
Self::new_num_threads(
Expand Down Expand Up @@ -395,7 +395,7 @@ impl BankingStage {
log_messages_bytes_limit: Option<usize>,
connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
prioritization_fee_cache: Option<Arc<PrioritizationFeeCache>>,
enable_forwarding: bool,
) -> Self {
match block_production_method {
Expand Down Expand Up @@ -430,7 +430,7 @@ impl BankingStage {
log_messages_bytes_limit: Option<usize>,
connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
prioritization_fee_cache: Option<Arc<PrioritizationFeeCache>>,
) -> Self {
assert!(num_threads >= MIN_TOTAL_THREADS);
// Single thread to generate entries from many banks.
Expand All @@ -449,7 +449,7 @@ impl BankingStage {
let committer = Committer::new(
transaction_status_sender.clone(),
replay_vote_sender.clone(),
prioritization_fee_cache.clone(),
prioritization_fee_cache,
);
let transaction_recorder = poh_recorder.read().unwrap().new_recorder();

Expand Down Expand Up @@ -516,7 +516,7 @@ impl BankingStage {
log_messages_bytes_limit: Option<usize>,
connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
prioritization_fee_cache: Option<Arc<PrioritizationFeeCache>>,
enable_forwarding: bool,
) -> Self {
assert!(num_threads >= MIN_TOTAL_THREADS);
Expand All @@ -534,7 +534,7 @@ impl BankingStage {
let committer = Committer::new(
transaction_status_sender.clone(),
replay_vote_sender.clone(),
prioritization_fee_cache.clone(),
prioritization_fee_cache,
);
let transaction_recorder = poh_recorder.read().unwrap().new_recorder();

Expand Down Expand Up @@ -900,7 +900,7 @@ mod tests {
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
None,
false,
);
drop(non_vote_sender);
Expand Down Expand Up @@ -960,7 +960,7 @@ mod tests {
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
None,
false,
);
trace!("sending bank");
Expand Down Expand Up @@ -1044,7 +1044,7 @@ mod tests {
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks.clone(), // keep a local-copy of bank-forks so worker threads do not lose weak access to bank-forks
&Arc::new(PrioritizationFeeCache::new(0u64)),
None,
false,
);

Expand Down Expand Up @@ -1214,7 +1214,7 @@ mod tests {
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
None,
);

// wait for banking_stage to eat the packets
Expand Down Expand Up @@ -1409,7 +1409,7 @@ mod tests {
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
None,
false,
);

Expand Down
10 changes: 6 additions & 4 deletions core/src/banking_stage/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ pub(super) struct PreBalanceInfo {
pub struct Committer {
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
prioritization_fee_cache: Option<Arc<PrioritizationFeeCache>>,
}

impl Committer {
pub fn new(
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
prioritization_fee_cache: Option<Arc<PrioritizationFeeCache>>,
) -> Self {
Self {
transaction_status_sender,
Expand Down Expand Up @@ -119,8 +119,10 @@ impl Committer {
pre_balance_info,
starting_transaction_index,
);
self.prioritization_fee_cache
.update(bank, processed_transactions.into_iter());

if let Some(prioritization_fee_cache) = self.prioritization_fee_cache.as_ref() {
prioritization_fee_cache.update(bank, processed_transactions.into_iter());
}
});
execute_and_commit_timings.find_and_send_votes_us = find_and_send_votes_us;
(commit_time_us, commit_transaction_statuses)
Expand Down
11 changes: 2 additions & 9 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,10 +764,7 @@ mod tests {
get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache,
},
solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry},
solana_runtime::{
bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
vote_sender_types::ReplayVoteReceiver,
},
solana_runtime::{bank_forks::BankForks, vote_sender_types::ReplayVoteReceiver},
solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
solana_sdk::{
address_lookup_table::AddressLookupTableAccount,
Expand Down Expand Up @@ -848,11 +845,7 @@ mod tests {
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);

let (replay_vote_sender, replay_vote_receiver) = unbounded();
let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let committer = Committer::new(None, replay_vote_sender, None);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);

let (consume_sender, consume_receiver) = unbounded();
Expand Down
Loading
Loading