From ea02412752c4f02864aa81f01e19ff7df8b57b4f Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 27 Nov 2024 07:40:17 -0600 Subject: [PATCH] TransactionStateContainer: use Slab (#3750) --- Cargo.lock | 1 + Cargo.toml | 1 + core/Cargo.toml | 1 + core/src/banking_stage/consume_worker.rs | 23 ++-- core/src/banking_stage/scheduler_messages.rs | 16 +-- .../transaction_scheduler/mod.rs | 1 - .../prio_graph_scheduler.rs | 58 +++------ .../receive_and_buffer.rs | 15 +-- .../scheduler_controller.rs | 18 +-- .../transaction_id_generator.rs | 21 ---- .../transaction_priority_id.rs | 12 +- .../transaction_state_container.rs | 113 ++++++++++-------- programs/sbf/Cargo.lock | 1 + svm/examples/Cargo.lock | 1 + 14 files changed, 109 insertions(+), 173 deletions(-) delete mode 100644 core/src/banking_stage/transaction_scheduler/transaction_id_generator.rs diff --git a/Cargo.lock b/Cargo.lock index 19ef30409ccdb6..3dee58192927d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6693,6 +6693,7 @@ dependencies = [ "serde_derive", "serde_json", "serial_test", + "slab", "solana-accounts-db", "solana-bloom", "solana-builtins-default-costs", diff --git a/Cargo.toml b/Cargo.toml index 7634a7baeb4abc..4730460f1594ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -407,6 +407,7 @@ sha3 = "0.10.8" shuttle = "0.7.1" signal-hook = "0.3.17" siphasher = "0.3.11" +slab = "0.4.9" smallvec = "1.13.2" smpl_jwt = "0.7.1" socket2 = "0.5.7" diff --git a/core/Cargo.toml b/core/Cargo.toml index 3b8f37a0db3298..422a193a5a1346 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -44,6 +44,7 @@ rustls = { workspace = true } serde = { workspace = true } serde_bytes = { workspace = true } serde_derive = { workspace = true } +slab = { workspace = true } solana-accounts-db = { workspace = true } solana-bloom = { workspace = true } solana-builtins-default-costs = { workspace = true } diff --git a/core/src/banking_stage/consume_worker.rs b/core/src/banking_stage/consume_worker.rs index 966564628b8747..815c51b9b6b253 100644 --- a/core/src/banking_stage/consume_worker.rs +++ b/core/src/banking_stage/consume_worker.rs @@ -752,7 +752,7 @@ mod tests { crate::banking_stage::{ committer::Committer, qos_service::QosService, - scheduler_messages::{MaxAge, TransactionBatchId, TransactionId}, + scheduler_messages::{MaxAge, TransactionBatchId}, tests::{create_slow_genesis_config, sanitize_transactions, simulate_poh}, }, crossbeam_channel::unbounded, @@ -902,7 +902,7 @@ mod tests { genesis_config.hash(), )]); let bid = TransactionBatchId::new(0); - let id = TransactionId::new(0); + let id = 0; let max_age = MaxAge { sanitized_epoch: bank.epoch(), alt_invalidation_slot: bank.slot(), @@ -951,7 +951,7 @@ mod tests { genesis_config.hash(), )]); let bid = TransactionBatchId::new(0); - let id = TransactionId::new(0); + let id = 0; let max_age = MaxAge { sanitized_epoch: bank.epoch(), alt_invalidation_slot: bank.slot(), @@ -1000,8 +1000,8 @@ mod tests { ]); let bid = TransactionBatchId::new(0); - let id1 = TransactionId::new(1); - let id2 = TransactionId::new(0); + let id1 = 1; + let id2 = 0; let max_age = MaxAge { sanitized_epoch: bank.epoch(), alt_invalidation_slot: bank.slot(), @@ -1061,8 +1061,8 @@ mod tests { let bid1 = TransactionBatchId::new(0); let bid2 = TransactionBatchId::new(1); - let id1 = TransactionId::new(1); - let id2 = TransactionId::new(0); + let id1 = 1; + let id2 = 0; let max_age = MaxAge { sanitized_epoch: bank.epoch(), alt_invalidation_slot: bank.slot(), @@ -1192,14 +1192,7 @@ mod tests { consume_sender .send(ConsumeWork { batch_id: TransactionBatchId::new(1), - ids: vec![ - TransactionId::new(0), - TransactionId::new(1), - TransactionId::new(2), - TransactionId::new(3), - TransactionId::new(4), - TransactionId::new(5), - ], + ids: vec![0, 1, 2, 3, 4, 5], transactions: txs, max_ages: vec![ MaxAge { diff --git a/core/src/banking_stage/scheduler_messages.rs b/core/src/banking_stage/scheduler_messages.rs index a0de8f18d008a3..1c7cf31592b791 100644 --- a/core/src/banking_stage/scheduler_messages.rs +++ b/core/src/banking_stage/scheduler_messages.rs @@ -19,21 +19,7 @@ impl Display for TransactionBatchId { } } -/// A unique identifier for a transaction. -#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct TransactionId(u64); - -impl TransactionId { - pub fn new(index: u64) -> Self { - Self(index) - } -} - -impl Display for TransactionId { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} +pub type TransactionId = usize; #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub struct MaxAge { diff --git a/core/src/banking_stage/transaction_scheduler/mod.rs b/core/src/banking_stage/transaction_scheduler/mod.rs index 5fb940eccbdb2d..caa24fb46f41b6 100644 --- a/core/src/banking_stage/transaction_scheduler/mod.rs +++ b/core/src/banking_stage/transaction_scheduler/mod.rs @@ -6,7 +6,6 @@ pub(crate) mod scheduler_controller; pub(crate) mod scheduler_error; mod scheduler_metrics; mod thread_aware_account_locks; -pub(crate) mod transaction_id_generator; mod transaction_priority_id; mod transaction_state; pub(crate) mod transaction_state_container; diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 402d8603ebcb0d..950f506fd51af4 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -139,7 +139,7 @@ impl PrioGraphScheduler { *window_budget = window_budget.saturating_sub(chunk_size); ids.iter().for_each(|id| { - let transaction = container.get_transaction_ttl(&id.id).unwrap(); + let transaction = container.get_transaction_ttl(id.id).unwrap(); txs.push(&transaction.transaction); }); @@ -149,14 +149,14 @@ impl PrioGraphScheduler { for (id, filter_result) in ids.iter().zip(&filter_array[..chunk_size]) { if *filter_result { - let transaction = container.get_transaction_ttl(&id.id).unwrap(); + let transaction = container.get_transaction_ttl(id.id).unwrap(); prio_graph.insert_transaction( *id, Self::get_transaction_account_access(transaction), ); } else { saturating_add_assign!(num_filtered_out, 1); - container.remove_by_id(&id.id); + container.remove_by_id(id.id); } } @@ -187,7 +187,7 @@ impl PrioGraphScheduler { // Should always be in the container, during initial testing phase panic. // Later, we can replace with a continue in case this does happen. - let Some(transaction_state) = container.get_mut_transaction_state(&id.id) else { + let Some(transaction_state) = container.get_mut_transaction_state(id.id) else { panic!("transaction state must exist") }; @@ -210,7 +210,7 @@ impl PrioGraphScheduler { match maybe_schedule_info { Err(TransactionSchedulingError::Filtered) => { - container.remove_by_id(&id.id); + container.remove_by_id(id.id); } Err(TransactionSchedulingError::UnschedulableConflicts) => { unschedulable_ids.push(id); @@ -358,7 +358,7 @@ impl PrioGraphScheduler { continue; } } - container.remove_by_id(&id); + container.remove_by_id(id); } Ok((num_transactions, num_retryable)) @@ -626,18 +626,6 @@ mod tests { std::{borrow::Borrow, sync::Arc}, }; - macro_rules! txid { - ($value:expr) => { - TransactionId::new($value) - }; - } - - macro_rules! txids { - ([$($element:expr),*]) => { - vec![ $(txid!($element)),* ] - }; - } - #[allow(clippy::type_complexity)] fn create_test_frame( num_threads: usize, @@ -689,10 +677,7 @@ mod tests { >, ) -> TransactionStateContainer> { let mut container = TransactionStateContainer::with_capacity(10 * 1024); - for (index, (from_keypair, to_pubkeys, lamports, compute_unit_price)) in - tx_infos.into_iter().enumerate() - { - let id = TransactionId::new(index as u64); + for (from_keypair, to_pubkeys, lamports, compute_unit_price) in tx_infos.into_iter() { let transaction = prioritized_tranfers( from_keypair.borrow(), to_pubkeys, @@ -711,7 +696,6 @@ mod tests { }; const TEST_TRANSACTION_COST: u64 = 5000; container.insert_new_transaction( - id, transaction_ttl, packet, compute_unit_price, @@ -773,7 +757,7 @@ mod tests { .unwrap(); assert_eq!(scheduling_summary.num_scheduled, 2); assert_eq!(scheduling_summary.num_unschedulable, 0); - assert_eq!(collect_work(&work_receivers[0]).1, vec![txids!([1, 0])]); + assert_eq!(collect_work(&work_receivers[0]).1, vec![vec![1, 0]]); } #[test] @@ -790,10 +774,7 @@ mod tests { .unwrap(); assert_eq!(scheduling_summary.num_scheduled, 2); assert_eq!(scheduling_summary.num_unschedulable, 0); - assert_eq!( - collect_work(&work_receivers[0]).1, - vec![txids!([1]), txids!([0])] - ); + assert_eq!(collect_work(&work_receivers[0]).1, vec![vec![1], vec![0]]); } #[test] @@ -832,8 +813,8 @@ mod tests { .unwrap(); assert_eq!(scheduling_summary.num_scheduled, 4); assert_eq!(scheduling_summary.num_unschedulable, 0); - assert_eq!(collect_work(&work_receivers[0]).1, [txids!([3, 1])]); - assert_eq!(collect_work(&work_receivers[1]).1, [txids!([2, 0])]); + assert_eq!(collect_work(&work_receivers[0]).1, [vec![3, 1]]); + assert_eq!(collect_work(&work_receivers[1]).1, [vec![2, 0]]); } #[test] @@ -874,11 +855,8 @@ mod tests { assert_eq!(scheduling_summary.num_scheduled, 4); assert_eq!(scheduling_summary.num_unschedulable, 2); let (thread_0_work, thread_0_ids) = collect_work(&work_receivers[0]); - assert_eq!(thread_0_ids, [txids!([0]), txids!([2])]); - assert_eq!( - collect_work(&work_receivers[1]).1, - [txids!([1]), txids!([3])] - ); + assert_eq!(thread_0_ids, [vec![0], vec![2]]); + assert_eq!(collect_work(&work_receivers[1]).1, [vec![1], vec![3]]); // Cannot schedule even on next pass because of lock conflicts let scheduling_summary = scheduler @@ -901,10 +879,7 @@ mod tests { assert_eq!(scheduling_summary.num_scheduled, 2); assert_eq!(scheduling_summary.num_unschedulable, 0); - assert_eq!( - collect_work(&work_receivers[1]).1, - [txids!([4]), txids!([5])] - ); + assert_eq!(collect_work(&work_receivers[1]).1, [vec![4], vec![5]]); } #[test] @@ -927,9 +902,6 @@ mod tests { .unwrap(); assert_eq!(scheduling_summary.num_scheduled, 2); assert_eq!(scheduling_summary.num_unschedulable, 0); - assert_eq!( - collect_work(&work_receivers[0]).1, - vec![txids!([2]), txids!([0])] - ); + assert_eq!(collect_work(&work_receivers[0]).1, vec![vec![2], vec![0]]); } } diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index 430cea3604b8ee..f193208cad8857 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -1,7 +1,6 @@ use { super::{ scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics}, - transaction_id_generator::TransactionIdGenerator, transaction_state_container::StateContainer, }, crate::banking_stage::{ @@ -51,8 +50,6 @@ pub(crate) struct SanitizedTransactionReceiveAndBuffer { /// Packet/Transaction ingress. packet_receiver: PacketDeserializer, bank_forks: Arc>, - /// Generates unique IDs for incoming transactions. - transaction_id_generator: TransactionIdGenerator, forwarding_enabled: bool, } @@ -69,7 +66,7 @@ impl ReceiveAndBuffer for SanitizedTransactionReceiveAndBuffer { count_metrics: &mut SchedulerCountMetrics, decision: &BufferedPacketsDecision, ) -> bool { - let remaining_queue_capacity = container.remaining_queue_capacity(); + let remaining_queue_capacity = container.remaining_capacity(); const MAX_PACKET_RECEIVE_TIME: Duration = Duration::from_millis(10); let (recv_timeout, should_buffer) = match decision { @@ -142,7 +139,6 @@ impl SanitizedTransactionReceiveAndBuffer { Self { packet_receiver, bank_forks, - transaction_id_generator: TransactionIdGenerator::default(), forwarding_enabled, } } @@ -237,7 +233,6 @@ impl SanitizedTransactionReceiveAndBuffer { .filter(|(_, check_result)| check_result.is_ok()) { saturating_add_assign!(post_transaction_check_count, 1); - let transaction_id = self.transaction_id_generator.next(); let (priority, cost) = calculate_priority_and_cost(&transaction, &fee_budget_limits, &working_bank); @@ -246,13 +241,7 @@ impl SanitizedTransactionReceiveAndBuffer { max_age, }; - if container.insert_new_transaction( - transaction_id, - transaction_ttl, - packet, - priority, - cost, - ) { + if container.insert_new_transaction(transaction_ttl, packet, priority, cost) { saturating_add_assign!(num_dropped_on_capacity, 1); } saturating_add_assign!(num_buffered, 1); diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 8382a1d48c2c2e..14a175b2018260 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -9,13 +9,13 @@ use { scheduler_metrics::{ SchedulerCountMetrics, SchedulerLeaderDetectionMetrics, SchedulerTimingMetrics, }, - transaction_state_container::StateContainer, }, crate::banking_stage::{ consume_worker::ConsumeWorkerMetrics, consumer::Consumer, decision_maker::{BufferedPacketsDecision, DecisionMaker}, forwarder::Forwarder, + transaction_scheduler::transaction_state_container::StateContainer, ForwardOption, LikeClusterInfo, TOTAL_BUFFERED_PACKETS, }, solana_measure::measure_us, @@ -259,7 +259,7 @@ impl SchedulerController { } let chunk_size = ids.len(); ids.iter().for_each(|id| { - let transaction = self.container.get_transaction_ttl(&id.id).unwrap(); + let transaction = self.container.get_transaction_ttl(id.id).unwrap(); txs.push(&transaction.transaction); }); @@ -275,12 +275,12 @@ impl SchedulerController { for (id, filter_result) in ids.iter().zip(&filter_array[..chunk_size]) { if !*filter_result { - self.container.remove_by_id(&id.id); + self.container.remove_by_id(id.id); continue; } ids_to_add_back.push(*id); // add back to the queue at end - let state = self.container.get_mut_transaction_state(&id.id).unwrap(); + let state = self.container.get_mut_transaction_state(id.id).unwrap(); let sanitized_transaction = &state.transaction_ttl().transaction; let immutable_packet = state.packet().clone(); @@ -312,7 +312,7 @@ impl SchedulerController { // leader slot. if max_time_reached { while let Some(id) = self.container.pop() { - self.container.remove_by_id(&id.id); + self.container.remove_by_id(id.id); } } @@ -322,7 +322,7 @@ impl SchedulerController { } } else { for priority_id in ids_to_add_back { - self.container.remove_by_id(&priority_id.id); + self.container.remove_by_id(priority_id.id); } } @@ -336,7 +336,7 @@ impl SchedulerController { fn clear_container(&mut self) { let mut num_dropped_on_clear: usize = 0; while let Some(id) = self.container.pop() { - self.container.remove_by_id(&id.id); + self.container.remove_by_id(id.id); saturating_add_assign!(num_dropped_on_clear, 1); } @@ -370,7 +370,7 @@ impl SchedulerController { .map(|id| { &self .container - .get_transaction_ttl(&id.id) + .get_transaction_ttl(id.id) .expect("transaction must exist") .transaction }) @@ -386,7 +386,7 @@ impl SchedulerController { for (result, id) in check_results.into_iter().zip(chunk.iter()) { if result.is_err() { saturating_add_assign!(num_dropped_on_age_and_status, 1); - self.container.remove_by_id(&id.id); + self.container.remove_by_id(id.id); } else { self.container.push_id_into_queue(*id); } diff --git a/core/src/banking_stage/transaction_scheduler/transaction_id_generator.rs b/core/src/banking_stage/transaction_scheduler/transaction_id_generator.rs deleted file mode 100644 index f54523890f9caf..00000000000000 --- a/core/src/banking_stage/transaction_scheduler/transaction_id_generator.rs +++ /dev/null @@ -1,21 +0,0 @@ -use crate::banking_stage::scheduler_messages::TransactionId; - -/// Simple reverse-sequential ID generator for `TransactionId`s. -/// These IDs uniquely identify transactions during the scheduling process. -pub struct TransactionIdGenerator { - next_id: u64, -} - -impl Default for TransactionIdGenerator { - fn default() -> Self { - Self { next_id: u64::MAX } - } -} - -impl TransactionIdGenerator { - pub fn next(&mut self) -> TransactionId { - let id = self.next_id; - self.next_id = self.next_id.wrapping_sub(1); - TransactionId::new(id) - } -} diff --git a/core/src/banking_stage/transaction_scheduler/transaction_priority_id.rs b/core/src/banking_stage/transaction_scheduler/transaction_priority_id.rs index 9857a689519502..7142f3ac09736d 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_priority_id.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_priority_id.rs @@ -37,8 +37,8 @@ mod tests { fn test_transaction_priority_id_ordering() { // Higher priority first { - let id1 = TransactionPriorityId::new(1, TransactionId::new(1)); - let id2 = TransactionPriorityId::new(2, TransactionId::new(1)); + let id1 = TransactionPriorityId::new(1, 1); + let id2 = TransactionPriorityId::new(2, 1); assert!(id1 < id2); assert!(id1 <= id2); assert!(id2 > id1); @@ -47,8 +47,8 @@ mod tests { // Equal priority then compare by id { - let id1 = TransactionPriorityId::new(1, TransactionId::new(1)); - let id2 = TransactionPriorityId::new(1, TransactionId::new(2)); + let id1 = TransactionPriorityId::new(1, 1); + let id2 = TransactionPriorityId::new(1, 2); assert!(id1 < id2); assert!(id1 <= id2); assert!(id2 > id1); @@ -57,8 +57,8 @@ mod tests { // Equal priority and id { - let id1 = TransactionPriorityId::new(1, TransactionId::new(1)); - let id2 = TransactionPriorityId::new(1, TransactionId::new(1)); + let id1 = TransactionPriorityId::new(1, 1); + let id2 = TransactionPriorityId::new(1, 1); assert_eq!(id1, id2); assert!(id1 >= id2); assert!(id1 <= id2); diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index b3f42983365b2e..cb57ee13c200ba 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -9,8 +9,9 @@ use { }, itertools::MinMaxResult, min_max_heap::MinMaxHeap, + slab::Slab, solana_runtime_transaction::transaction_with_meta::TransactionWithMeta, - std::{collections::HashMap, sync::Arc}, + std::sync::Arc, }; /// This structure will hold `TransactionState` for the entirety of a @@ -40,7 +41,7 @@ use { /// a new transaction, the lowest priority transaction will be dropped. pub(crate) struct TransactionStateContainer { priority_queue: MinMaxHeap, - id_to_transaction_state: HashMap>, + id_to_transaction_state: Slab>, } pub(crate) trait StateContainer { @@ -50,27 +51,24 @@ pub(crate) trait StateContainer { /// Returns true if the queue is empty. fn is_empty(&self) -> bool; - /// Returns the remaining capacity of the queue - fn remaining_queue_capacity(&self) -> usize; + /// Returns the remaining capacity of the container + fn remaining_capacity(&self) -> usize; /// Get the top transaction id in the priority queue. fn pop(&mut self) -> Option; /// Get mutable transaction state by id. - fn get_mut_transaction_state( - &mut self, - id: &TransactionId, - ) -> Option<&mut TransactionState>; + fn get_mut_transaction_state(&mut self, id: TransactionId) + -> Option<&mut TransactionState>; /// Get reference to `SanitizedTransactionTTL` by id. /// Panics if the transaction does not exist. - fn get_transaction_ttl(&self, id: &TransactionId) -> Option<&SanitizedTransactionTTL>; + fn get_transaction_ttl(&self, id: TransactionId) -> Option<&SanitizedTransactionTTL>; /// Insert a new transaction into the container's queues and maps. /// Returns `true` if a packet was dropped due to capacity limits. fn insert_new_transaction( &mut self, - transaction_id: TransactionId, transaction_ttl: SanitizedTransactionTTL, packet: Arc, priority: u64, @@ -91,16 +89,19 @@ pub(crate) trait StateContainer { fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) -> bool; /// Remove transaction by id. - fn remove_by_id(&mut self, id: &TransactionId); + fn remove_by_id(&mut self, id: TransactionId); fn get_min_max_priority(&self) -> MinMaxResult; } impl StateContainer for TransactionStateContainer { fn with_capacity(capacity: usize) -> Self { + // Extra capacity is added because some additional space is needed when + // pushing a new transaction into the container to avoid reallocation. + const EXTRA_CAPACITY: usize = 64; Self { priority_queue: MinMaxHeap::with_capacity(capacity), - id_to_transaction_state: HashMap::with_capacity(capacity), + id_to_transaction_state: Slab::with_capacity(capacity + EXTRA_CAPACITY), } } @@ -108,8 +109,10 @@ impl StateContainer for TransactionStateContainer usize { - self.priority_queue.capacity() - self.priority_queue.len() + fn remaining_capacity(&self) -> usize { + self.priority_queue + .capacity() + .saturating_sub(self.id_to_transaction_state.len()) } fn pop(&mut self) -> Option { @@ -118,12 +121,12 @@ impl StateContainer for TransactionStateContainer Option<&mut TransactionState> { self.id_to_transaction_state.get_mut(id) } - fn get_transaction_ttl(&self, id: &TransactionId) -> Option<&SanitizedTransactionTTL> { + fn get_transaction_ttl(&self, id: TransactionId) -> Option<&SanitizedTransactionTTL> { self.id_to_transaction_state .get(id) .map(|state| state.transaction_ttl()) @@ -131,18 +134,27 @@ impl StateContainer for TransactionStateContainer, packet: Arc, priority: u64, cost: u64, ) -> bool { - let priority_id = TransactionPriorityId::new(priority, transaction_id); - self.id_to_transaction_state.insert( - transaction_id, - TransactionState::new(transaction_ttl, packet, priority, cost), - ); - self.push_id_into_queue(priority_id) + // cache the remaining capacity **before** we take ownership of + // the next vacant entry. i.e. get the size before we insert. + let remaining_capacity = self.remaining_capacity(); + let priority_id = { + let entry = self.id_to_transaction_state.vacant_entry(); + let transaction_id = entry.key(); + entry.insert(TransactionState::new( + transaction_ttl, + packet, + priority, + cost, + )); + TransactionPriorityId::new(priority, transaction_id) + }; + + self.push_id_into_queue_with_remaining_capacity(priority_id, remaining_capacity) } fn retry_transaction( @@ -151,7 +163,7 @@ impl StateContainer for TransactionStateContainer, ) { let transaction_state = self - .get_mut_transaction_state(&transaction_id) + .get_mut_transaction_state(transaction_id) .expect("transaction must exist"); let priority_id = TransactionPriorityId::new(transaction_state.priority(), transaction_id); transaction_state.transition_to_unprocessed(transaction_ttl); @@ -159,20 +171,11 @@ impl StateContainer for TransactionStateContainer bool { - if self.remaining_queue_capacity() == 0 { - let popped_id = self.priority_queue.push_pop_min(priority_id); - self.remove_by_id(&popped_id.id); - true - } else { - self.priority_queue.push(priority_id); - false - } + self.push_id_into_queue_with_remaining_capacity(priority_id, self.remaining_capacity()) } - fn remove_by_id(&mut self, id: &TransactionId) { - self.id_to_transaction_state - .remove(id) - .expect("transaction must exist"); + fn remove_by_id(&mut self, id: TransactionId) { + self.id_to_transaction_state.remove(id); } fn get_min_max_priority(&self) -> MinMaxResult { @@ -186,6 +189,23 @@ impl StateContainer for TransactionStateContainer TransactionStateContainer { + fn push_id_into_queue_with_remaining_capacity( + &mut self, + priority_id: TransactionPriorityId, + remaining_capacity: usize, + ) -> bool { + if remaining_capacity == 0 { + let popped_id = self.priority_queue.push_pop_min(priority_id); + self.remove_by_id(popped_id.id); + true + } else { + self.priority_queue.push(priority_id); + false + } + } +} + #[cfg(test)] mod tests { use { @@ -246,16 +266,9 @@ mod tests { container: &mut TransactionStateContainer>, num: usize, ) { - for id in 0..num as u64 { - let priority = id; + for priority in 0..num as u64 { let (transaction_ttl, packet, priority, cost) = test_transaction(priority); - container.insert_new_transaction( - TransactionId::new(id), - transaction_ttl, - packet, - priority, - cost, - ); + container.insert_new_transaction(transaction_ttl, packet, priority, cost); } } @@ -291,12 +304,12 @@ mod tests { let mut container = TransactionStateContainer::with_capacity(5); push_to_container(&mut container, 5); - let existing_id = TransactionId::new(3); - let non_existing_id = TransactionId::new(7); - assert!(container.get_mut_transaction_state(&existing_id).is_some()); - assert!(container.get_mut_transaction_state(&existing_id).is_some()); + let existing_id = 3; + let non_existing_id = 7; + assert!(container.get_mut_transaction_state(existing_id).is_some()); + assert!(container.get_mut_transaction_state(existing_id).is_some()); assert!(container - .get_mut_transaction_state(&non_existing_id) + .get_mut_transaction_state(non_existing_id) .is_none()); } } diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 75b914a69db651..4f5ed7ea67162e 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5393,6 +5393,7 @@ dependencies = [ "serde", "serde_bytes", "serde_derive", + "slab", "solana-accounts-db", "solana-bloom", "solana-builtins-default-costs", diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index d9ddd42f43633d..9d2d64f0ea15cc 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -5244,6 +5244,7 @@ dependencies = [ "serde", "serde_bytes", "serde_derive", + "slab", "solana-accounts-db", "solana-bloom", "solana-builtins-default-costs",