Skip to content

Commit

Permalink
TransactionStateContainer: use Slab (#3750)
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge authored Nov 27, 2024
1 parent 53af223 commit ea02412
Show file tree
Hide file tree
Showing 14 changed files with 109 additions and 173 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ sha3 = "0.10.8"
shuttle = "0.7.1"
signal-hook = "0.3.17"
siphasher = "0.3.11"
slab = "0.4.9"
smallvec = "1.13.2"
smpl_jwt = "0.7.1"
socket2 = "0.5.7"
Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ rustls = { workspace = true }
serde = { workspace = true }
serde_bytes = { workspace = true }
serde_derive = { workspace = true }
slab = { workspace = true }
solana-accounts-db = { workspace = true }
solana-bloom = { workspace = true }
solana-builtins-default-costs = { workspace = true }
Expand Down
23 changes: 8 additions & 15 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ mod tests {
crate::banking_stage::{
committer::Committer,
qos_service::QosService,
scheduler_messages::{MaxAge, TransactionBatchId, TransactionId},
scheduler_messages::{MaxAge, TransactionBatchId},
tests::{create_slow_genesis_config, sanitize_transactions, simulate_poh},
},
crossbeam_channel::unbounded,
Expand Down Expand Up @@ -902,7 +902,7 @@ mod tests {
genesis_config.hash(),
)]);
let bid = TransactionBatchId::new(0);
let id = TransactionId::new(0);
let id = 0;
let max_age = MaxAge {
sanitized_epoch: bank.epoch(),
alt_invalidation_slot: bank.slot(),
Expand Down Expand Up @@ -951,7 +951,7 @@ mod tests {
genesis_config.hash(),
)]);
let bid = TransactionBatchId::new(0);
let id = TransactionId::new(0);
let id = 0;
let max_age = MaxAge {
sanitized_epoch: bank.epoch(),
alt_invalidation_slot: bank.slot(),
Expand Down Expand Up @@ -1000,8 +1000,8 @@ mod tests {
]);

let bid = TransactionBatchId::new(0);
let id1 = TransactionId::new(1);
let id2 = TransactionId::new(0);
let id1 = 1;
let id2 = 0;
let max_age = MaxAge {
sanitized_epoch: bank.epoch(),
alt_invalidation_slot: bank.slot(),
Expand Down Expand Up @@ -1061,8 +1061,8 @@ mod tests {

let bid1 = TransactionBatchId::new(0);
let bid2 = TransactionBatchId::new(1);
let id1 = TransactionId::new(1);
let id2 = TransactionId::new(0);
let id1 = 1;
let id2 = 0;
let max_age = MaxAge {
sanitized_epoch: bank.epoch(),
alt_invalidation_slot: bank.slot(),
Expand Down Expand Up @@ -1192,14 +1192,7 @@ mod tests {
consume_sender
.send(ConsumeWork {
batch_id: TransactionBatchId::new(1),
ids: vec![
TransactionId::new(0),
TransactionId::new(1),
TransactionId::new(2),
TransactionId::new(3),
TransactionId::new(4),
TransactionId::new(5),
],
ids: vec![0, 1, 2, 3, 4, 5],
transactions: txs,
max_ages: vec![
MaxAge {
Expand Down
16 changes: 1 addition & 15 deletions core/src/banking_stage/scheduler_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,7 @@ impl Display for TransactionBatchId {
}
}

/// A unique identifier for a transaction.
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct TransactionId(u64);

impl TransactionId {
pub fn new(index: u64) -> Self {
Self(index)
}
}

impl Display for TransactionId {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
pub type TransactionId = usize;

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct MaxAge {
Expand Down
1 change: 0 additions & 1 deletion core/src/banking_stage/transaction_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ pub(crate) mod scheduler_controller;
pub(crate) mod scheduler_error;
mod scheduler_metrics;
mod thread_aware_account_locks;
pub(crate) mod transaction_id_generator;
mod transaction_priority_id;
mod transaction_state;
pub(crate) mod transaction_state_container;
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
*window_budget = window_budget.saturating_sub(chunk_size);

ids.iter().for_each(|id| {
let transaction = container.get_transaction_ttl(&id.id).unwrap();
let transaction = container.get_transaction_ttl(id.id).unwrap();
txs.push(&transaction.transaction);
});

Expand All @@ -149,14 +149,14 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {

for (id, filter_result) in ids.iter().zip(&filter_array[..chunk_size]) {
if *filter_result {
let transaction = container.get_transaction_ttl(&id.id).unwrap();
let transaction = container.get_transaction_ttl(id.id).unwrap();
prio_graph.insert_transaction(
*id,
Self::get_transaction_account_access(transaction),
);
} else {
saturating_add_assign!(num_filtered_out, 1);
container.remove_by_id(&id.id);
container.remove_by_id(id.id);
}
}

Expand Down Expand Up @@ -187,7 +187,7 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {

// Should always be in the container, during initial testing phase panic.
// Later, we can replace with a continue in case this does happen.
let Some(transaction_state) = container.get_mut_transaction_state(&id.id) else {
let Some(transaction_state) = container.get_mut_transaction_state(id.id) else {
panic!("transaction state must exist")
};

Expand All @@ -210,7 +210,7 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {

match maybe_schedule_info {
Err(TransactionSchedulingError::Filtered) => {
container.remove_by_id(&id.id);
container.remove_by_id(id.id);
}
Err(TransactionSchedulingError::UnschedulableConflicts) => {
unschedulable_ids.push(id);
Expand Down Expand Up @@ -358,7 +358,7 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
continue;
}
}
container.remove_by_id(&id);
container.remove_by_id(id);
}

Ok((num_transactions, num_retryable))
Expand Down Expand Up @@ -626,18 +626,6 @@ mod tests {
std::{borrow::Borrow, sync::Arc},
};

macro_rules! txid {
($value:expr) => {
TransactionId::new($value)
};
}

macro_rules! txids {
([$($element:expr),*]) => {
vec![ $(txid!($element)),* ]
};
}

#[allow(clippy::type_complexity)]
fn create_test_frame(
num_threads: usize,
Expand Down Expand Up @@ -689,10 +677,7 @@ mod tests {
>,
) -> TransactionStateContainer<RuntimeTransaction<SanitizedTransaction>> {
let mut container = TransactionStateContainer::with_capacity(10 * 1024);
for (index, (from_keypair, to_pubkeys, lamports, compute_unit_price)) in
tx_infos.into_iter().enumerate()
{
let id = TransactionId::new(index as u64);
for (from_keypair, to_pubkeys, lamports, compute_unit_price) in tx_infos.into_iter() {
let transaction = prioritized_tranfers(
from_keypair.borrow(),
to_pubkeys,
Expand All @@ -711,7 +696,6 @@ mod tests {
};
const TEST_TRANSACTION_COST: u64 = 5000;
container.insert_new_transaction(
id,
transaction_ttl,
packet,
compute_unit_price,
Expand Down Expand Up @@ -773,7 +757,7 @@ mod tests {
.unwrap();
assert_eq!(scheduling_summary.num_scheduled, 2);
assert_eq!(scheduling_summary.num_unschedulable, 0);
assert_eq!(collect_work(&work_receivers[0]).1, vec![txids!([1, 0])]);
assert_eq!(collect_work(&work_receivers[0]).1, vec![vec![1, 0]]);
}

#[test]
Expand All @@ -790,10 +774,7 @@ mod tests {
.unwrap();
assert_eq!(scheduling_summary.num_scheduled, 2);
assert_eq!(scheduling_summary.num_unschedulable, 0);
assert_eq!(
collect_work(&work_receivers[0]).1,
vec![txids!([1]), txids!([0])]
);
assert_eq!(collect_work(&work_receivers[0]).1, vec![vec![1], vec![0]]);
}

#[test]
Expand Down Expand Up @@ -832,8 +813,8 @@ mod tests {
.unwrap();
assert_eq!(scheduling_summary.num_scheduled, 4);
assert_eq!(scheduling_summary.num_unschedulable, 0);
assert_eq!(collect_work(&work_receivers[0]).1, [txids!([3, 1])]);
assert_eq!(collect_work(&work_receivers[1]).1, [txids!([2, 0])]);
assert_eq!(collect_work(&work_receivers[0]).1, [vec![3, 1]]);
assert_eq!(collect_work(&work_receivers[1]).1, [vec![2, 0]]);
}

#[test]
Expand Down Expand Up @@ -874,11 +855,8 @@ mod tests {
assert_eq!(scheduling_summary.num_scheduled, 4);
assert_eq!(scheduling_summary.num_unschedulable, 2);
let (thread_0_work, thread_0_ids) = collect_work(&work_receivers[0]);
assert_eq!(thread_0_ids, [txids!([0]), txids!([2])]);
assert_eq!(
collect_work(&work_receivers[1]).1,
[txids!([1]), txids!([3])]
);
assert_eq!(thread_0_ids, [vec![0], vec![2]]);
assert_eq!(collect_work(&work_receivers[1]).1, [vec![1], vec![3]]);

// Cannot schedule even on next pass because of lock conflicts
let scheduling_summary = scheduler
Expand All @@ -901,10 +879,7 @@ mod tests {
assert_eq!(scheduling_summary.num_scheduled, 2);
assert_eq!(scheduling_summary.num_unschedulable, 0);

assert_eq!(
collect_work(&work_receivers[1]).1,
[txids!([4]), txids!([5])]
);
assert_eq!(collect_work(&work_receivers[1]).1, [vec![4], vec![5]]);
}

#[test]
Expand All @@ -927,9 +902,6 @@ mod tests {
.unwrap();
assert_eq!(scheduling_summary.num_scheduled, 2);
assert_eq!(scheduling_summary.num_unschedulable, 0);
assert_eq!(
collect_work(&work_receivers[0]).1,
vec![txids!([2]), txids!([0])]
);
assert_eq!(collect_work(&work_receivers[0]).1, vec![vec![2], vec![0]]);
}
}
15 changes: 2 additions & 13 deletions core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use {
super::{
scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics},
transaction_id_generator::TransactionIdGenerator,
transaction_state_container::StateContainer,
},
crate::banking_stage::{
Expand Down Expand Up @@ -51,8 +50,6 @@ pub(crate) struct SanitizedTransactionReceiveAndBuffer {
/// Packet/Transaction ingress.
packet_receiver: PacketDeserializer,
bank_forks: Arc<RwLock<BankForks>>,
/// Generates unique IDs for incoming transactions.
transaction_id_generator: TransactionIdGenerator,

forwarding_enabled: bool,
}
Expand All @@ -69,7 +66,7 @@ impl ReceiveAndBuffer for SanitizedTransactionReceiveAndBuffer {
count_metrics: &mut SchedulerCountMetrics,
decision: &BufferedPacketsDecision,
) -> bool {
let remaining_queue_capacity = container.remaining_queue_capacity();
let remaining_queue_capacity = container.remaining_capacity();

const MAX_PACKET_RECEIVE_TIME: Duration = Duration::from_millis(10);
let (recv_timeout, should_buffer) = match decision {
Expand Down Expand Up @@ -142,7 +139,6 @@ impl SanitizedTransactionReceiveAndBuffer {
Self {
packet_receiver,
bank_forks,
transaction_id_generator: TransactionIdGenerator::default(),
forwarding_enabled,
}
}
Expand Down Expand Up @@ -237,7 +233,6 @@ impl SanitizedTransactionReceiveAndBuffer {
.filter(|(_, check_result)| check_result.is_ok())
{
saturating_add_assign!(post_transaction_check_count, 1);
let transaction_id = self.transaction_id_generator.next();

let (priority, cost) =
calculate_priority_and_cost(&transaction, &fee_budget_limits, &working_bank);
Expand All @@ -246,13 +241,7 @@ impl SanitizedTransactionReceiveAndBuffer {
max_age,
};

if container.insert_new_transaction(
transaction_id,
transaction_ttl,
packet,
priority,
cost,
) {
if container.insert_new_transaction(transaction_ttl, packet, priority, cost) {
saturating_add_assign!(num_dropped_on_capacity, 1);
}
saturating_add_assign!(num_buffered, 1);
Expand Down
Loading

0 comments on commit ea02412

Please sign in to comment.