From 636261b73a9cd591297de835399077aefdfc7bfd Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 26 Nov 2024 15:13:13 -0600 Subject: [PATCH 01/26] impl TransactionData for Bytes --- Cargo.lock | 1 + programs/sbf/Cargo.lock | 1 + svm/examples/Cargo.lock | 1 + transaction-view/Cargo.toml | 1 + transaction-view/src/transaction_data.rs | 7 +++++++ 5 files changed, 11 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index a717c722109dba..26a09d7cf8966e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -238,6 +238,7 @@ version = "2.2.0" dependencies = [ "agave-transaction-view", "bincode", + "bytes", "criterion", "solana-sdk", "solana-sdk-ids", diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 4e80af8676fad5..721737913ee6a4 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -77,6 +77,7 @@ dependencies = [ name = "agave-transaction-view" version = "2.2.0" dependencies = [ + "bytes", "solana-sdk", "solana-sdk-ids", "solana-svm-transaction", diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 170f59f40311d4..48faf2e878c70e 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -77,6 +77,7 @@ dependencies = [ name = "agave-transaction-view" version = "2.2.0" dependencies = [ + "bytes", "solana-sdk", "solana-sdk-ids", "solana-svm-transaction", diff --git a/transaction-view/Cargo.toml b/transaction-view/Cargo.toml index cb4a43aa7753bd..49885c0fd7c7bd 100644 --- a/transaction-view/Cargo.toml +++ b/transaction-view/Cargo.toml @@ -10,6 +10,7 @@ license = { workspace = true } edition = { workspace = true } [dependencies] +bytes = { workspace = true } solana-sdk = { workspace = true } solana-sdk-ids = { workspace = true } solana-svm-transaction = { workspace = true } diff --git a/transaction-view/src/transaction_data.rs b/transaction-view/src/transaction_data.rs index 2bfe0c85ce0e55..d6152c8069d3e8 100644 --- a/transaction-view/src/transaction_data.rs +++ b/transaction-view/src/transaction_data.rs @@ -10,3 +10,10 @@ impl TransactionData for &[u8] { self } } + +impl TransactionData for bytes::Bytes { + #[inline] + fn data(&self) -> &[u8] { + self.as_ref() + } +} From 13d46bbbc06d0c407ae61b39018ab838614e5b4e Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 26 Nov 2024 15:36:21 -0600 Subject: [PATCH 02/26] Optional packet --- .../prio_graph_scheduler.rs | 2 +- .../receive_and_buffer.rs | 2 +- .../scheduler_controller.rs | 4 ++-- .../transaction_state.rs | 23 +++++++++++-------- .../transaction_state_container.rs | 6 ++--- 5 files changed, 21 insertions(+), 16 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 8edebc1f80c200..5e401f15942fb6 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -721,7 +721,7 @@ mod tests { const TEST_TRANSACTION_COST: u64 = 5000; container.insert_new_transaction( transaction_ttl, - packet, + Some(packet), compute_unit_price, TEST_TRANSACTION_COST, ); diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index 21afe448d151d3..8925611dd2961f 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -242,7 +242,7 @@ impl SanitizedTransactionReceiveAndBuffer { max_age, }; - if container.insert_new_transaction(transaction_ttl, packet, priority, cost) { + if container.insert_new_transaction(transaction_ttl, Some(packet), priority, cost) { saturating_add_assign!(num_dropped_on_capacity, 1); } saturating_add_assign!(num_buffered, 1); diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 0a7bcf34fc0a01..a25c7a73d4b6fd 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -282,13 +282,13 @@ impl SchedulerController { ids_to_add_back.push(*id); // add back to the queue at end let state = self.container.get_mut_transaction_state(id.id).unwrap(); let sanitized_transaction = &state.transaction_ttl().transaction; - let immutable_packet = state.packet().clone(); + let immutable_packet = state.packet().expect("forwarding requires packet"); // If not already forwarded and can be forwarded, add to forwardable packets. if state.should_forward() && forwarder.try_add_packet( sanitized_transaction, - immutable_packet, + immutable_packet.clone(), feature_set, ) { diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state.rs b/core/src/banking_stage/transaction_scheduler/transaction_state.rs index 12c2b5de5f01b0..c4b0bfc5c0b564 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state.rs @@ -35,14 +35,14 @@ pub(crate) enum TransactionState { /// The transaction is available for scheduling. Unprocessed { transaction_ttl: SanitizedTransactionTTL, - packet: Arc, + packet: Option>, priority: u64, cost: u64, should_forward: bool, }, /// The transaction is currently scheduled or being processed. Pending { - packet: Arc, + packet: Option>, priority: u64, cost: u64, should_forward: bool, @@ -55,12 +55,17 @@ impl TransactionState { /// Creates a new `TransactionState` in the `Unprocessed` state. pub(crate) fn new( transaction_ttl: SanitizedTransactionTTL, - packet: Arc, + packet: Option>, priority: u64, cost: u64, ) -> Self { - let should_forward = !packet.original_packet().meta().forwarded() - && packet.original_packet().meta().is_from_staked_node(); + let should_forward = !packet + .as_ref() + .map(|packet| { + packet.original_packet().meta().forwarded() + && packet.original_packet().meta().is_from_staked_node() + }) + .unwrap_or_default(); Self::Unprocessed { transaction_ttl, packet, @@ -116,10 +121,10 @@ impl TransactionState { } /// Return the packet of the transaction. - pub(crate) fn packet(&self) -> &Arc { + pub(crate) fn packet(&self) -> Option<&Arc> { match self { - Self::Unprocessed { packet, .. } => packet, - Self::Pending { packet, .. } => packet, + Self::Unprocessed { packet, .. } => packet.as_ref(), + Self::Pending { packet, .. } => packet.as_ref(), Self::Transitioning => unreachable!(), } } @@ -248,7 +253,7 @@ mod tests { const TEST_TRANSACTION_COST: u64 = 5000; TransactionState::new( transaction_ttl, - packet, + Some(packet), compute_unit_price, TEST_TRANSACTION_COST, ) diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index cb57ee13c200ba..f606d5e4e21fd1 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -70,7 +70,7 @@ pub(crate) trait StateContainer { fn insert_new_transaction( &mut self, transaction_ttl: SanitizedTransactionTTL, - packet: Arc, + packet: Option>, priority: u64, cost: u64, ) -> bool; @@ -135,7 +135,7 @@ impl StateContainer for TransactionStateContainer, - packet: Arc, + packet: Option>, priority: u64, cost: u64, ) -> bool { @@ -268,7 +268,7 @@ mod tests { ) { for priority in 0..num as u64 { let (transaction_ttl, packet, priority, cost) = test_transaction(priority); - container.insert_new_transaction(transaction_ttl, packet, priority, cost); + container.insert_new_transaction(transaction_ttl, Some(packet), priority, cost); } } From ba825d684d63596b60ff8043079f5ab650938d0e Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 27 Nov 2024 08:28:16 -0600 Subject: [PATCH 03/26] TransactionViewStateContainer --- Cargo.lock | 1 + core/Cargo.toml | 1 + .../transaction_state_container.rs | 232 +++++++++++++++--- programs/sbf/Cargo.lock | 1 + svm/examples/Cargo.lock | 1 + svm/examples/Cargo.toml | 6 +- 6 files changed, 199 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 26a09d7cf8966e..97b21f370420d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6745,6 +6745,7 @@ dependencies = [ name = "solana-core" version = "2.2.0" dependencies = [ + "agave-transaction-view", "ahash 0.8.11", "anyhow", "arrayvec", diff --git a/core/Cargo.toml b/core/Cargo.toml index 0143180ced5b4d..4683d3d49bf536 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -14,6 +14,7 @@ edition = { workspace = true } codecov = { repository = "solana-labs/solana", branch = "master", service = "github" } [dependencies] +agave-transaction-view = { workspace = true } ahash = { workspace = true } anyhow = { workspace = true } arrayvec = { workspace = true } diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index f606d5e4e21fd1..6654910e1c0b24 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -7,10 +7,15 @@ use { immutable_deserialized_packet::ImmutableDeserializedPacket, scheduler_messages::TransactionId, }, + agave_transaction_view::resolved_transaction_view::ResolvedTransactionView, + bytes::{Bytes, BytesMut}, itertools::MinMaxResult, min_max_heap::MinMaxHeap, slab::Slab, - solana_runtime_transaction::transaction_with_meta::TransactionWithMeta, + solana_runtime_transaction::{ + runtime_transaction::RuntimeTransaction, transaction_with_meta::TransactionWithMeta, + }, + solana_sdk::packet::PACKET_DATA_SIZE, std::sync::Arc, }; @@ -65,23 +70,20 @@ pub(crate) trait StateContainer { /// Panics if the transaction does not exist. fn get_transaction_ttl(&self, id: TransactionId) -> Option<&SanitizedTransactionTTL>; - /// Insert a new transaction into the container's queues and maps. - /// Returns `true` if a packet was dropped due to capacity limits. - fn insert_new_transaction( - &mut self, - transaction_ttl: SanitizedTransactionTTL, - packet: Option>, - priority: u64, - cost: u64, - ) -> bool; - /// Retries a transaction - inserts transaction back into map (but not packet). /// This transitions the transaction to `Unprocessed` state. fn retry_transaction( &mut self, transaction_id: TransactionId, transaction_ttl: SanitizedTransactionTTL, - ); + ) { + let transaction_state = self + .get_mut_transaction_state(transaction_id) + .expect("transaction must exist"); + let priority_id = TransactionPriorityId::new(transaction_state.priority(), transaction_id); + transaction_state.transition_to_unprocessed(transaction_ttl); + self.push_id_into_queue(priority_id); + } /// Pushes a transaction id into the priority queue. If the queue is full, the lowest priority /// transaction will be dropped (removed from the queue and map). @@ -132,7 +134,29 @@ impl StateContainer for TransactionStateContainer bool { + self.push_id_into_queue_with_remaining_capacity(priority_id, self.remaining_capacity()) + } + + fn remove_by_id(&mut self, id: TransactionId) { + self.id_to_transaction_state.remove(id); + } + + fn get_min_max_priority(&self) -> MinMaxResult { + match self.priority_queue.peek_min() { + Some(min) => match self.priority_queue.peek_max() { + Some(max) => MinMaxResult::MinMax(min.priority, max.priority), + None => MinMaxResult::OneElement(min.priority), + }, + None => MinMaxResult::NoElements, + } + } +} + +impl TransactionStateContainer { + /// Insert a new transaction into the container's queues and maps. + /// Returns `true` if a packet was dropped due to capacity limits. + pub(crate) fn insert_new_transaction( &mut self, transaction_ttl: SanitizedTransactionTTL, packet: Option>, @@ -157,55 +181,187 @@ impl StateContainer for TransactionStateContainer, - ) { - let transaction_state = self - .get_mut_transaction_state(transaction_id) - .expect("transaction must exist"); - let priority_id = TransactionPriorityId::new(transaction_state.priority(), transaction_id); - transaction_state.transition_to_unprocessed(transaction_ttl); - self.push_id_into_queue(priority_id); + priority_id: TransactionPriorityId, + remaining_capacity: usize, + ) -> bool { + if remaining_capacity == 0 { + let popped_id = self.priority_queue.push_pop_min(priority_id); + self.remove_by_id(popped_id.id); + true + } else { + self.priority_queue.push(priority_id); + false + } + } +} + +/// A wrapper around `TransactionStateContainer` that allows re-uses +/// pre-allocated `Bytes` to copy packet data into and use for serialization. +/// This is used to avoid allocations in parsing transactions. +pub struct TransactionViewStateContainer { + inner: TransactionStateContainer>>, + bytes_buffer: Box<[MaybeBytes]>, +} + +enum MaybeBytes { + None, + Bytes(Bytes), + BytesMut(BytesMut), +} + +impl MaybeBytes { + fn reserve_space(&mut self) -> BytesMut { + match core::mem::replace(self, MaybeBytes::None) { + MaybeBytes::BytesMut(bytes) => bytes, + _ => unreachable!("invalid state"), + } } - fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) -> bool { - self.push_id_into_queue_with_remaining_capacity(priority_id, self.remaining_capacity()) + fn freeze(&mut self, bytes: Bytes) { + debug_assert!(matches!(self, MaybeBytes::None)); + *self = MaybeBytes::Bytes(bytes); } - fn remove_by_id(&mut self, id: TransactionId) { - self.id_to_transaction_state.remove(id); + fn free_space(&mut self, mut bytes: BytesMut) { + debug_assert!(matches!(self, MaybeBytes::None)); + bytes.clear(); + *self = MaybeBytes::BytesMut(bytes); } +} - fn get_min_max_priority(&self) -> MinMaxResult { - match self.priority_queue.peek_min() { - Some(min) => match self.priority_queue.peek_max() { - Some(max) => MinMaxResult::MinMax(min.priority, max.priority), - None => MinMaxResult::OneElement(min.priority), - }, - None => MinMaxResult::NoElements, +struct SuccessfulInsert { + state: TransactionState>>, + bytes: Bytes, +} + +impl TransactionViewStateContainer { + fn try_insert_with(&mut self, f: impl FnOnce(BytesMut) -> Result) { + if self.inner.id_to_transaction_state.len() == self.inner.id_to_transaction_state.capacity() + { + return; + } + + // Get a vacant entry in the slab. + let vacant_entry = self.inner.id_to_transaction_state.vacant_entry(); + let transaction_id = vacant_entry.key(); + + // Get the vacant space in the bytes buffer. + let bytes_entry = &mut self.bytes_buffer[transaction_id]; + let bytes = bytes_entry.reserve_space(); + + // Attempt to insert the transaction, storing the frozen bytes back into bytes buffer. + match f(bytes) { + Ok(SuccessfulInsert { state, bytes }) => { + vacant_entry.insert(state); + bytes_entry.freeze(bytes); + } + Err(bytes) => bytes_entry.free_space(bytes), } } -} -impl TransactionStateContainer { + // This is re-implemented since we need it to call `remove_by_id` on this + // struct rather than `inner`. This is important because we need to return + // the `Bytes` to the pool. fn push_id_into_queue_with_remaining_capacity( &mut self, priority_id: TransactionPriorityId, remaining_capacity: usize, ) -> bool { if remaining_capacity == 0 { - let popped_id = self.priority_queue.push_pop_min(priority_id); + let popped_id = self.inner.priority_queue.push_pop_min(priority_id); self.remove_by_id(popped_id.id); true } else { - self.priority_queue.push(priority_id); + self.inner.priority_queue.push(priority_id); false } } } +impl StateContainer>> + for TransactionViewStateContainer +{ + fn with_capacity(capacity: usize) -> Self { + let inner = TransactionStateContainer::with_capacity(capacity); + let bytes_buffer = (0..inner.id_to_transaction_state.capacity()) + .map(|_| { + MaybeBytes::BytesMut({ + let mut bytes = BytesMut::zeroed(PACKET_DATA_SIZE); + bytes.clear(); + bytes + }) + }) + .collect::>() + .into_boxed_slice(); + Self { + inner, + bytes_buffer, + } + } + + #[inline] + fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + #[inline] + fn remaining_capacity(&self) -> usize { + self.inner.remaining_capacity() + } + + #[inline] + fn pop(&mut self) -> Option { + self.inner.pop() + } + + #[inline] + fn get_mut_transaction_state( + &mut self, + id: TransactionId, + ) -> Option<&mut TransactionState>>> { + self.inner.get_mut_transaction_state(id) + } + + #[inline] + fn get_transaction_ttl( + &self, + id: TransactionId, + ) -> Option<&SanitizedTransactionTTL>>> { + self.inner.get_transaction_ttl(id) + } + + #[inline] + fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) -> bool { + self.push_id_into_queue_with_remaining_capacity(priority_id, self.remaining_capacity()) + } + + fn remove_by_id(&mut self, id: TransactionId) { + // Remove the entry from the map: + // 1. If it was unprocessed, this will drop the `Bytes` held. + // 2. If it was scheduled, this just marks the entry as removed. + let _ = self.inner.id_to_transaction_state.remove(id); + + // Clear the bytes buffer. + let bytes_entry = &mut self.bytes_buffer[id]; + let MaybeBytes::Bytes(bytes) = core::mem::replace(bytes_entry, MaybeBytes::None) else { + unreachable!("invalid state"); + }; + + // Return the `Bytes` to the pool. + let bytes = bytes + .try_into_mut() + .expect("all `Bytes` instances must be dropped"); + bytes_entry.free_space(bytes); + } + + #[inline] + fn get_min_max_priority(&self) -> MinMaxResult { + self.inner.get_min_max_priority() + } +} + #[cfg(test)] mod tests { use { diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 721737913ee6a4..82c40c0af73754 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5434,6 +5434,7 @@ dependencies = [ name = "solana-core" version = "2.2.0" dependencies = [ + "agave-transaction-view", "ahash 0.8.11", "anyhow", "arrayvec", diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 48faf2e878c70e..3e4748d18d7a81 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -5285,6 +5285,7 @@ dependencies = [ name = "solana-core" version = "2.2.0" dependencies = [ + "agave-transaction-view", "ahash 0.8.11", "anyhow", "arrayvec", diff --git a/svm/examples/Cargo.toml b/svm/examples/Cargo.toml index a5df7288e95576..5015d183ba2d13 100644 --- a/svm/examples/Cargo.toml +++ b/svm/examples/Cargo.toml @@ -1,9 +1,5 @@ [workspace] -members = [ - "json-rpc/client", - "json-rpc/server", - "paytube", -] +members = ["json-rpc/client", "json-rpc/server", "paytube"] resolver = "2" From c23a70b7a73e31c206b0b4dd260ad4a23adffe58 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 27 Nov 2024 09:20:13 -0600 Subject: [PATCH 04/26] TransactionViewReceiveAndBuffer --- core/src/banking_stage/packet_filter.rs | 3 +- .../receive_and_buffer.rs | 263 +++++++++++++++++- .../transaction_state_container.rs | 26 +- runtime/src/bank/fee_distribution.rs | 4 +- 4 files changed, 276 insertions(+), 20 deletions(-) diff --git a/core/src/banking_stage/packet_filter.rs b/core/src/banking_stage/packet_filter.rs index b9176c9b8ac91d..d4e68f590d8045 100644 --- a/core/src/banking_stage/packet_filter.rs +++ b/core/src/banking_stage/packet_filter.rs @@ -9,6 +9,8 @@ use { thiserror::Error, }; +pub const MAX_ALLOWED_PRECOMPILE_SIGNATURES: u64 = 8; + lazy_static! { // To calculate the static_builtin_cost_sum conservatively, an all-enabled dummy feature_set // is used. It lowers required minimal compute_unit_limit, aligns with future versions. @@ -58,7 +60,6 @@ impl ImmutableDeserializedPacket { } } - const MAX_ALLOWED_PRECOMPILE_SIGNATURES: u64 = 8; if num_precompile_signatures <= MAX_ALLOWED_PRECOMPILE_SIGNATURES { Ok(()) } else { diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index 8925611dd2961f..1b6c672de705d8 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -1,18 +1,30 @@ use { super::{ scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics}, - transaction_state_container::StateContainer, + transaction_state::TransactionState, + transaction_state_container::{ + StateContainer, SuccessfulInsert, TransactionViewStateContainer, + }, }, - crate::banking_stage::{ - decision_maker::BufferedPacketsDecision, - immutable_deserialized_packet::ImmutableDeserializedPacket, - packet_deserializer::PacketDeserializer, scheduler_messages::MaxAge, - transaction_scheduler::transaction_state::SanitizedTransactionTTL, - TransactionStateContainer, + crate::{ + banking_stage::{ + decision_maker::BufferedPacketsDecision, + immutable_deserialized_packet::ImmutableDeserializedPacket, + packet_deserializer::PacketDeserializer, + packet_filter::MAX_ALLOWED_PRECOMPILE_SIGNATURES, scheduler_messages::MaxAge, + transaction_scheduler::transaction_state::SanitizedTransactionTTL, + TransactionStateContainer, + }, + banking_trace::{BankingPacketBatch, BankingPacketReceiver}, + }, + agave_transaction_view::{ + resolved_transaction_view::ResolvedTransactionView, + transaction_version::TransactionVersion, transaction_view::SanitizedTransactionView, }, arrayvec::ArrayVec, + bytes::Bytes, core::time::Duration, - crossbeam_channel::RecvTimeoutError, + crossbeam_channel::{RecvTimeoutError, TryRecvError}, solana_accounts_db::account_locks::validate_account_locks, solana_cost_model::cost_model::CostModel, solana_measure::measure_us, @@ -26,10 +38,14 @@ use { clock::{Epoch, Slot, MAX_PROCESSING_AGE}, fee::FeeBudgetLimits, saturating_add_assign, - transaction::SanitizedTransaction, + transaction::{MessageHash, SanitizedTransaction}, }, solana_svm::transaction_error_metrics::TransactionErrorMetrics, - std::sync::{Arc, RwLock}, + solana_svm_transaction::svm_message::SVMMessage, + std::{ + sync::{Arc, RwLock}, + time::Instant, + }, }; pub(crate) trait ReceiveAndBuffer { @@ -278,6 +294,231 @@ impl SanitizedTransactionReceiveAndBuffer { } } +pub(crate) struct TransactionViewRecieveAndBuffer { + pub receiver: BankingPacketReceiver, + pub bank_forks: Arc>, +} + +impl ReceiveAndBuffer for TransactionViewRecieveAndBuffer { + type Transaction = RuntimeTransaction>; + type Container = TransactionViewStateContainer; + + fn receive_and_buffer_packets( + &mut self, + container: &mut Self::Container, + timing_metrics: &mut SchedulerTimingMetrics, + count_metrics: &mut SchedulerCountMetrics, + decision: &BufferedPacketsDecision, + ) -> bool { + // Receive packet batches. + const TIMEOUT: Duration = Duration::from_millis(10); + let start = Instant::now(); + + // If not leader, do a blocking-receive initially. This lets the thread + // sleep when there is not work to do. + // TODO: Is it better to manually sleep instead, avoiding the locking + // overhead for wakers? But then risk not waking up when message + // received - as long as sleep is somewhat short, this should be + // fine. + if matches!( + decision, + BufferedPacketsDecision::Forward + | BufferedPacketsDecision::ForwardAndHold + | BufferedPacketsDecision::Hold + ) { + match self.receiver.recv_timeout(TIMEOUT) { + Ok(packet_batch_message) => { + self.handle_packet_batch_message( + container, + timing_metrics, + count_metrics, + decision, + packet_batch_message, + ); + } + Err(RecvTimeoutError::Timeout) => return true, + Err(RecvTimeoutError::Disconnected) => return false, + } + } + + while start.elapsed() < TIMEOUT { + match self.receiver.try_recv() { + Ok(packet_batch_message) => { + self.handle_packet_batch_message( + container, + timing_metrics, + count_metrics, + decision, + packet_batch_message, + ); + } + Err(TryRecvError::Empty) => return true, + Err(TryRecvError::Disconnected) => return false, + } + } + + true + } +} + +impl TransactionViewRecieveAndBuffer { + fn handle_packet_batch_message( + &mut self, + container: &mut TransactionViewStateContainer, + timing_metrics: &mut SchedulerTimingMetrics, + count_metrics: &mut SchedulerCountMetrics, + decision: &BufferedPacketsDecision, + packet_batch_message: BankingPacketBatch, + ) { + // Do not support forwarding - only add support for this if we really need it. + if matches!(decision, BufferedPacketsDecision::Forward) { + return; + } + + let start = Instant::now(); + // Sanitize packets, generate IDs, and insert into the container. + let (root_bank, working_bank) = { + let bank_forks = self.bank_forks.read().unwrap(); + let root_bank = bank_forks.root_bank(); + let working_bank = bank_forks.working_bank(); + (root_bank, working_bank) + }; + let alt_resolved_slot = root_bank.slot(); + let sanitized_epoch = root_bank.epoch(); + let transaction_account_lock_limit = working_bank.get_transaction_account_lock_limit(); + + let mut num_received = 0usize; + let mut num_buffered = 0usize; + let mut num_dropped_on_capacity = 0usize; + let mut num_dropped_on_receive = 0usize; + for packet_batch in packet_batch_message.0.iter() { + for packet in packet_batch.iter() { + let Some(packet_data) = packet.data(..) else { + continue; + }; + + num_received += 1; + + // Reserve free-space to copy packet into, run sanitization checks, and insert. + if container.try_insert_with(|mut bytes| { + // Copy packet data into the buffer, and freeze. + bytes.extend_from_slice(packet_data); + let bytes = bytes.freeze(); + + match Self::try_handle_packet( + bytes.clone(), + &root_bank, + &working_bank, + alt_resolved_slot, + sanitized_epoch, + transaction_account_lock_limit, + ) { + Ok(state) => { + num_buffered += 1; + Ok(SuccessfulInsert { state, bytes }) + } + Err(()) => { + num_dropped_on_receive += 1; + Err(bytes.try_into_mut().expect("no leaks")) + } + } + }) { + num_dropped_on_capacity += 1; + }; + } + } + + let buffer_time_us = start.elapsed().as_micros() as u64; + timing_metrics.update(|timing_metrics| { + saturating_add_assign!(timing_metrics.buffer_time_us, buffer_time_us); + }); + count_metrics.update(|count_metrics| { + saturating_add_assign!(count_metrics.num_received, num_received); + saturating_add_assign!(count_metrics.num_buffered, num_buffered); + saturating_add_assign!( + count_metrics.num_dropped_on_capacity, + num_dropped_on_capacity + ); + saturating_add_assign!(count_metrics.num_dropped_on_receive, num_dropped_on_receive); + }); + } + + fn try_handle_packet( + bytes: Bytes, + root_bank: &Bank, + working_bank: &Bank, + alt_resolved_slot: Slot, + sanitized_epoch: Epoch, + transaction_account_lock_limit: usize, + ) -> Result>>, ()> { + // Parsing and basic sanitization checks + let Ok(view) = SanitizedTransactionView::try_new_sanitized(bytes.clone()) else { + return Err(()); + }; + + let Ok(view) = RuntimeTransaction::>::try_from( + view, + MessageHash::Compute, + None, + ) else { + return Err(()); + }; + + // Check excessive pre-compiles. + let signature_detials = view.signature_details(); + let num_precompiles = signature_detials.num_ed25519_instruction_signatures() + + signature_detials.num_secp256k1_instruction_signatures(); + if num_precompiles > MAX_ALLOWED_PRECOMPILE_SIGNATURES { + return Err(()); + } + + // Load addresses for transaction. + let load_addresses_result = match view.version() { + TransactionVersion::Legacy => Ok((None, u64::MAX)), + TransactionVersion::V0 => root_bank + .load_addresses_from_ref(view.address_table_lookup_iter()) + .map(|(loaded_addresses, deactivation_slot)| { + (Some(loaded_addresses), deactivation_slot) + }), + }; + let Ok((loaded_addresses, deactivation_slot)) = load_addresses_result else { + return Err(()); + }; + + let Ok(view) = RuntimeTransaction::>::try_from( + view, + loaded_addresses, + root_bank.get_reserved_account_keys(), + ) else { + return Err(()); + }; + + if validate_account_locks(view.account_keys(), transaction_account_lock_limit).is_err() { + return Err(()); + } + + let Ok(compute_budget_limits) = view.compute_budget_limits(&working_bank.feature_set) + else { + return Err(()); + }; + + let max_age = calculate_max_age(sanitized_epoch, deactivation_slot, alt_resolved_slot); + let fee_budget_limits = FeeBudgetLimits::from(compute_budget_limits); + let (priority, cost) = + calculate_priority_and_cost(&view, &fee_budget_limits, &working_bank); + + Ok(TransactionState::new( + SanitizedTransactionTTL { + transaction: view, + max_age, + }, + None, + priority, + cost, + )) + } +} + /// Calculate priority and cost for a transaction: /// /// Cost is calculated through the `CostModel`, @@ -297,7 +538,7 @@ impl SanitizedTransactionReceiveAndBuffer { /// Any difference in the prioritization is negligible for /// the current transaction costs. fn calculate_priority_and_cost( - transaction: &RuntimeTransaction, + transaction: &impl TransactionWithMeta, fee_budget_limits: &FeeBudgetLimits, bank: &Bank, ) -> (u64, u64) { diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index 6654910e1c0b24..68a46b9d94492e 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -231,17 +231,23 @@ impl MaybeBytes { } } -struct SuccessfulInsert { - state: TransactionState>>, - bytes: Bytes, +pub(crate) struct SuccessfulInsert { + pub state: TransactionState>>, + pub bytes: Bytes, } impl TransactionViewStateContainer { - fn try_insert_with(&mut self, f: impl FnOnce(BytesMut) -> Result) { + /// Returns true if packet was dropped due to capacity limits. + pub(crate) fn try_insert_with( + &mut self, + f: impl FnOnce(BytesMut) -> Result, + ) -> bool { if self.inner.id_to_transaction_state.len() == self.inner.id_to_transaction_state.capacity() { - return; + return true; } + // Get remaining capacity before inserting. + let remaining_capacity = self.remaining_capacity(); // Get a vacant entry in the slab. let vacant_entry = self.inner.id_to_transaction_state.vacant_entry(); @@ -254,16 +260,24 @@ impl TransactionViewStateContainer { // Attempt to insert the transaction, storing the frozen bytes back into bytes buffer. match f(bytes) { Ok(SuccessfulInsert { state, bytes }) => { + let priority_id = TransactionPriorityId::new(state.priority(), transaction_id); vacant_entry.insert(state); bytes_entry.freeze(bytes); + + // Push the transaction into the queue. + self.push_id_into_queue_with_remaining_capacity(priority_id, remaining_capacity) + } + Err(bytes) => { + bytes_entry.free_space(bytes); + false } - Err(bytes) => bytes_entry.free_space(bytes), } } // This is re-implemented since we need it to call `remove_by_id` on this // struct rather than `inner`. This is important because we need to return // the `Bytes` to the pool. + /// Returns true if packet was dropped due to capacity limits. fn push_id_into_queue_with_remaining_capacity( &mut self, priority_id: TransactionPriorityId, diff --git a/runtime/src/bank/fee_distribution.rs b/runtime/src/bank/fee_distribution.rs index e0be18d5e609fc..e90444053df51e 100644 --- a/runtime/src/bank/fee_distribution.rs +++ b/runtime/src/bank/fee_distribution.rs @@ -3,6 +3,7 @@ use { crate::bank::CollectorFeeDetails, log::{debug, warn}, solana_feature_set::{remove_rounding_in_fee_calculation, reward_full_priority_fee}, + solana_runtime_transaction::transaction_with_meta::TransactionWithMeta, solana_sdk::{ account::{ReadableAccount, WritableAccount}, fee::FeeBudgetLimits, @@ -10,7 +11,6 @@ use { reward_info::RewardInfo, reward_type::RewardType, system_program, - transaction::SanitizedTransaction, }, solana_svm_rent_collector::svm_rent_collector::SVMRentCollector, solana_vote::vote_account::VoteAccountsHashMap, @@ -73,7 +73,7 @@ impl Bank { pub fn calculate_reward_for_transaction( &self, - transaction: &SanitizedTransaction, + transaction: &impl TransactionWithMeta, fee_budget_limits: &FeeBudgetLimits, ) -> u64 { let fee_details = solana_fee::calculate_fee_details( From 0b856326407e962eae0f784e9f6b81bc0d3bca1e Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 27 Nov 2024 09:31:37 -0600 Subject: [PATCH 05/26] spawn_scheduler_and_workers --- core/src/banking_stage.rs | 134 +++++++++++++----- .../receive_and_buffer.rs | 19 +-- core/src/validator.rs | 25 ++++ 3 files changed, 137 insertions(+), 41 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 4dfc6bc8e5c733..882822c695d567 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -25,7 +25,7 @@ use { }, }, banking_trace::BankingPacketReceiver, - validator::BlockProductionMethod, + validator::{BlockProductionMethod, TransactionStructure}, }, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, histogram::Histogram, @@ -52,7 +52,9 @@ use { }, transaction_scheduler::{ prio_graph_scheduler::PrioGraphSchedulerConfig, - receive_and_buffer::SanitizedTransactionReceiveAndBuffer, + receive_and_buffer::{ + ReceiveAndBuffer, SanitizedTransactionReceiveAndBuffer, TransactionViewReceiveAndBuffer, + }, transaction_state_container::TransactionStateContainer, }, }; @@ -400,6 +402,7 @@ impl BankingStage { ) -> Self { match block_production_method { BlockProductionMethod::CentralScheduler => Self::new_central_scheduler( + TransactionStructure::Sdk, cluster_info, poh_recorder, non_vote_receiver, @@ -504,7 +507,8 @@ impl BankingStage { } #[allow(clippy::too_many_arguments)] - pub fn new_central_scheduler( + fn new_central_scheduler( + transaction_struct: TransactionStructure, cluster_info: &impl LikeClusterInfo, poh_recorder: &Arc>, non_vote_receiver: BankingPacketReceiver, @@ -567,6 +571,77 @@ impl BankingStage { )); } + let transaction_struct = if enable_forwarding { + warn!( + "Forwarding only supported for `Sdk` transaction struct. Overriding to use `Sdk`." + ); + TransactionStructure::Sdk + } else { + transaction_struct + }; + + match transaction_struct { + TransactionStructure::Sdk => { + let receive_and_buffer = SanitizedTransactionReceiveAndBuffer::new( + PacketDeserializer::new(non_vote_receiver), + bank_forks.clone(), + enable_forwarding, + ); + Self::spawn_scheduler_and_workers( + &mut bank_thread_hdls, + receive_and_buffer, + decision_maker, + committer, + cluster_info, + poh_recorder, + num_threads, + log_messages_bytes_limit, + connection_cache, + bank_forks, + enable_forwarding, + data_budget, + ); + } + TransactionStructure::View => { + let receive_and_buffer = TransactionViewReceiveAndBuffer { + receiver: non_vote_receiver, + bank_forks: bank_forks.clone(), + }; + Self::spawn_scheduler_and_workers( + &mut bank_thread_hdls, + receive_and_buffer, + decision_maker, + committer, + cluster_info, + poh_recorder, + num_threads, + log_messages_bytes_limit, + connection_cache, + bank_forks, + enable_forwarding, + data_budget, + ); + } + } + + Self { bank_thread_hdls } + } + + #[allow(clippy::too_many_arguments)] + fn spawn_scheduler_and_workers( + bank_thread_hdls: &mut Vec>, + receive_and_buffer: R, + decision_maker: DecisionMaker, + committer: Committer, + cluster_info: &impl LikeClusterInfo, + poh_recorder: &Arc>, + num_threads: u32, + log_messages_bytes_limit: Option, + connection_cache: Arc, + bank_forks: Arc>, + enable_forwarding: bool, + data_budget: Arc, + ) { // Create channels for communication between scheduler and workers let num_workers = (num_threads).saturating_sub(NUM_VOTE_PROCESSING_THREADS); let (work_senders, work_receivers): (Vec>, Vec>) = @@ -612,39 +687,34 @@ impl BankingStage { }); // Spawn the central scheduler thread - bank_thread_hdls.push({ - let packet_deserializer = PacketDeserializer::new(non_vote_receiver); - let receive_and_buffer = SanitizedTransactionReceiveAndBuffer::new( - packet_deserializer, - bank_forks.clone(), - forwarder.is_some(), - ); - let scheduler = PrioGraphScheduler::new( - work_senders, - finished_work_receiver, - PrioGraphSchedulerConfig::default(), - ); - let scheduler_controller = SchedulerController::new( - decision_maker.clone(), - receive_and_buffer, - bank_forks, - scheduler, - worker_metrics, - forwarder, - ); + bank_thread_hdls.push( Builder::new() .name("solBnkTxSched".to_string()) - .spawn(move || match scheduler_controller.run() { - Ok(_) => {} - Err(SchedulerError::DisconnectedRecvChannel(_)) => {} - Err(SchedulerError::DisconnectedSendChannel(_)) => { - warn!("Unexpected worker disconnect from scheduler") + .spawn(move || { + let scheduler = PrioGraphScheduler::new( + work_senders, + finished_work_receiver, + PrioGraphSchedulerConfig::default(), + ); + let scheduler_controller = SchedulerController::new( + decision_maker.clone(), + receive_and_buffer, + bank_forks, + scheduler, + worker_metrics, + forwarder, + ); + + match scheduler_controller.run() { + Ok(_) => {} + Err(SchedulerError::DisconnectedRecvChannel(_)) => {} + Err(SchedulerError::DisconnectedSendChannel(_)) => { + warn!("Unexpected worker disconnect from scheduler") + } } }) - .unwrap() - }); - - Self { bank_thread_hdls } + .unwrap(), + ); } fn spawn_thread_local_multi_iterator_thread( diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index 1b6c672de705d8..259d0e8580a92e 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -49,8 +49,8 @@ use { }; pub(crate) trait ReceiveAndBuffer { - type Transaction: TransactionWithMeta; - type Container: StateContainer; + type Transaction: TransactionWithMeta + Send + Sync; + type Container: StateContainer + Send + Sync; /// Returns whether the packet receiver is still connected. fn receive_and_buffer_packets( @@ -294,12 +294,12 @@ impl SanitizedTransactionReceiveAndBuffer { } } -pub(crate) struct TransactionViewRecieveAndBuffer { +pub(crate) struct TransactionViewReceiveAndBuffer { pub receiver: BankingPacketReceiver, pub bank_forks: Arc>, } -impl ReceiveAndBuffer for TransactionViewRecieveAndBuffer { +impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { type Transaction = RuntimeTransaction>; type Container = TransactionViewStateContainer; @@ -361,7 +361,7 @@ impl ReceiveAndBuffer for TransactionViewRecieveAndBuffer { } } -impl TransactionViewRecieveAndBuffer { +impl TransactionViewReceiveAndBuffer { fn handle_packet_batch_message( &mut self, container: &mut TransactionViewStateContainer, @@ -391,7 +391,7 @@ impl TransactionViewRecieveAndBuffer { let mut num_buffered = 0usize; let mut num_dropped_on_capacity = 0usize; let mut num_dropped_on_receive = 0usize; - for packet_batch in packet_batch_message.0.iter() { + for packet_batch in packet_batch_message.iter() { for packet in packet_batch.iter() { let Some(packet_data) = packet.data(..) else { continue; @@ -497,15 +497,16 @@ impl TransactionViewRecieveAndBuffer { return Err(()); } - let Ok(compute_budget_limits) = view.compute_budget_limits(&working_bank.feature_set) + let Ok(compute_budget_limits) = view + .compute_budget_instruction_details() + .sanitize_and_convert_to_compute_budget_limits(&working_bank.feature_set) else { return Err(()); }; let max_age = calculate_max_age(sanitized_epoch, deactivation_slot, alt_resolved_slot); let fee_budget_limits = FeeBudgetLimits::from(compute_budget_limits); - let (priority, cost) = - calculate_priority_and_cost(&view, &fee_budget_limits, &working_bank); + let (priority, cost) = calculate_priority_and_cost(&view, &fee_budget_limits, working_bank); Ok(TransactionState::new( SanitizedTransactionTTL { diff --git a/core/src/validator.rs b/core/src/validator.rs index c3318ee070f2bc..90b08f40158ad7 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -208,6 +208,31 @@ impl BlockProductionMethod { } } +#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)] +#[strum(serialize_all = "kebab-case")] +pub enum TransactionStructure { + #[default] + Sdk, + View, +} + +impl TransactionStructure { + pub const fn cli_names() -> &'static [&'static str] { + Self::VARIANTS + } + + pub fn cli_message() -> &'static str { + lazy_static! { + static ref MESSAGE: String = format!( + "Switch internal transaction structure/representation [default: {}]", + TransactionStructure::default() + ); + }; + + &MESSAGE + } +} + /// Configuration for the block generator invalidator for replay. #[derive(Clone, Debug)] pub struct GeneratorConfig { From 46dfb546cc9e0bd2d4f0c6ddb9ebad96c3a297e1 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 27 Nov 2024 09:43:41 -0600 Subject: [PATCH 06/26] convert benches/tests to be switchable --- banking-bench/src/main.rs | 14 +++++- core/benches/banking_stage.rs | 83 +++++++++++++++++++++++++++++++--- core/src/banking_simulation.rs | 6 ++- core/src/banking_stage.rs | 40 +++++++++++----- core/src/tpu.rs | 3 +- ledger-tool/src/main.rs | 8 +++- 6 files changed, 131 insertions(+), 23 deletions(-) diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 99722c7cf6082e..377f93bcf68ef2 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -11,7 +11,7 @@ use { banking_trace::{ BankingPacketBatch, BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, }, - validator::BlockProductionMethod, + validator::{BlockProductionMethod, TransactionStructure}, }, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_ledger::{ @@ -290,6 +290,14 @@ fn main() { .possible_values(BlockProductionMethod::cli_names()) .help(BlockProductionMethod::cli_message()), ) + .arg( + Arg::with_name("transaction_struct") + .long("transaction-structure") + .value_name("STRUCT") + .takes_value(true) + .possible_values(TransactionStructure::cli_names()) + .help(TransactionStructure::cli_message()), + ) .arg( Arg::new("num_banking_threads") .long("num-banking-threads") @@ -320,6 +328,9 @@ fn main() { let block_production_method = matches .value_of_t::("block_production_method") .unwrap_or_default(); + let transaction_struct = matches + .value_of_t::("transaction_struct") + .unwrap_or_default(); let num_banking_threads = matches .value_of_t::("num_banking_threads") .unwrap_or_else(|_| BankingStage::num_threads()); @@ -470,6 +481,7 @@ fn main() { }; let banking_stage = BankingStage::new_num_threads( block_production_method, + transaction_struct, &cluster_info, &poh_recorder, non_vote_receiver, diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 5d764f4b7c1d44..1f6bb0545dd6b8 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -2,7 +2,10 @@ #![feature(test)] use { - solana_core::{banking_trace::Channels, validator::BlockProductionMethod}, + solana_core::{ + banking_trace::Channels, + validator::{BlockProductionMethod, TransactionStructure}, + }, solana_vote_program::{vote_state::TowerSync, vote_transaction::new_tower_sync_transaction}, }; @@ -192,7 +195,12 @@ enum TransactionType { ProgramsAndVotes, } -fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { +fn bench_banking( + bencher: &mut Bencher, + tx_type: TransactionType, + block_production_method: BlockProductionMethod, + transaction_struct: TransactionStructure, +) { solana_logger::setup(); let num_threads = BankingStage::num_threads() as usize; // a multiple of packet chunk duplicates to avoid races @@ -296,7 +304,8 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { let cluster_info = Arc::new(cluster_info); let (s, _r) = unbounded(); let _banking_stage = BankingStage::new( - BlockProductionMethod::CentralScheduler, + block_production_method, + transaction_struct, &cluster_info, &poh_recorder, non_vote_receiver, @@ -371,22 +380,82 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { #[bench] fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { - bench_banking(bencher, TransactionType::Accounts); + bench_banking( + bencher, + TransactionType::Accounts, + BlockProductionMethod::CentralScheduler, + TransactionStructure::Sdk, + ); } #[bench] fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { - bench_banking(bencher, TransactionType::Programs); + bench_banking( + bencher, + TransactionType::Programs, + BlockProductionMethod::CentralScheduler, + TransactionStructure::Sdk, + ); } #[bench] fn bench_banking_stage_multi_accounts_with_voting(bencher: &mut Bencher) { - bench_banking(bencher, TransactionType::AccountsAndVotes); + bench_banking( + bencher, + TransactionType::AccountsAndVotes, + BlockProductionMethod::CentralScheduler, + TransactionStructure::Sdk, + ); } #[bench] fn bench_banking_stage_multi_programs_with_voting(bencher: &mut Bencher) { - bench_banking(bencher, TransactionType::ProgramsAndVotes); + bench_banking( + bencher, + TransactionType::ProgramsAndVotes, + BlockProductionMethod::CentralScheduler, + TransactionStructure::Sdk, + ); +} + +#[bench] +fn bench_banking_stage_multi_accounts_view(bencher: &mut Bencher) { + bench_banking( + bencher, + TransactionType::Accounts, + BlockProductionMethod::CentralScheduler, + TransactionStructure::View, + ); +} + +#[bench] +fn bench_banking_stage_multi_programs_view(bencher: &mut Bencher) { + bench_banking( + bencher, + TransactionType::Programs, + BlockProductionMethod::CentralScheduler, + TransactionStructure::View, + ); +} + +#[bench] +fn bench_banking_stage_multi_accounts_with_voting_view(bencher: &mut Bencher) { + bench_banking( + bencher, + TransactionType::AccountsAndVotes, + BlockProductionMethod::CentralScheduler, + TransactionStructure::View, + ); +} + +#[bench] +fn bench_banking_stage_multi_programs_with_voting_view(bencher: &mut Bencher) { + bench_banking( + bencher, + TransactionType::ProgramsAndVotes, + BlockProductionMethod::CentralScheduler, + TransactionStructure::View, + ); } fn simulate_process_entries( diff --git a/core/src/banking_simulation.rs b/core/src/banking_simulation.rs index 82a4e5e94a3e76..71e87589e2a3e4 100644 --- a/core/src/banking_simulation.rs +++ b/core/src/banking_simulation.rs @@ -7,7 +7,7 @@ use { TracedEvent, TracedSender, TracerThread, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, BASENAME, }, - validator::BlockProductionMethod, + validator::{BlockProductionMethod, TransactionStructure}, }, bincode::deserialize_from, crossbeam_channel::{unbounded, Sender}, @@ -672,6 +672,7 @@ impl BankingSimulator { bank_forks: Arc>, blockstore: Arc, block_production_method: BlockProductionMethod, + transaction_struct: TransactionStructure, ) -> (SenderLoop, SimulatorLoop, SimulatorThreads) { let parent_slot = self.parent_slot().unwrap(); let mut packet_batches_by_time = self.banking_trace_events.packet_batches_by_time; @@ -803,6 +804,7 @@ impl BankingSimulator { let prioritization_fee_cache = &Arc::new(PrioritizationFeeCache::new(0u64)); let banking_stage = BankingStage::new_num_threads( block_production_method.clone(), + transaction_struct.clone(), &cluster_info, &poh_recorder, non_vote_receiver, @@ -889,12 +891,14 @@ impl BankingSimulator { bank_forks: Arc>, blockstore: Arc, block_production_method: BlockProductionMethod, + transaction_struct: TransactionStructure, ) -> Result<(), SimulateError> { let (sender_loop, simulator_loop, simulator_threads) = self.prepare_simulation( genesis_config, bank_forks, blockstore, block_production_method, + transaction_struct, ); sender_loop.log_starting(); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 882822c695d567..0f400bfa6f0f43 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -352,6 +352,7 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] pub fn new( block_production_method: BlockProductionMethod, + transaction_struct: TransactionStructure, cluster_info: &impl LikeClusterInfo, poh_recorder: &Arc>, non_vote_receiver: BankingPacketReceiver, @@ -367,6 +368,7 @@ impl BankingStage { ) -> Self { Self::new_num_threads( block_production_method, + transaction_struct, cluster_info, poh_recorder, non_vote_receiver, @@ -386,6 +388,7 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] pub fn new_num_threads( block_production_method: BlockProductionMethod, + transaction_struct: TransactionStructure, cluster_info: &impl LikeClusterInfo, poh_recorder: &Arc>, non_vote_receiver: BankingPacketReceiver, @@ -402,7 +405,7 @@ impl BankingStage { ) -> Self { match block_production_method { BlockProductionMethod::CentralScheduler => Self::new_central_scheduler( - TransactionStructure::Sdk, + transaction_struct, cluster_info, poh_recorder, non_vote_receiver, @@ -915,6 +918,7 @@ mod tests { sync::atomic::{AtomicBool, Ordering}, thread::sleep, }, + test_case::test_case, }; pub(crate) fn new_test_cluster_info(keypair: Option>) -> (Node, ClusterInfo) { @@ -933,8 +937,9 @@ mod tests { .collect() } - #[test] - fn test_banking_stage_shutdown1() { + #[test_case(TransactionStructure::Sdk)] + #[test_case(TransactionStructure::View)] + fn test_banking_stage_shutdown1(transaction_struct: TransactionStructure) { let genesis_config = create_genesis_config(2).genesis_config; let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); let banking_tracer = BankingTracer::new_disabled(); @@ -960,6 +965,7 @@ mod tests { let banking_stage = BankingStage::new( BlockProductionMethod::CentralScheduler, + transaction_struct, &cluster_info, &poh_recorder, non_vote_receiver, @@ -983,8 +989,9 @@ mod tests { Blockstore::destroy(ledger_path.path()).unwrap(); } - #[test] - fn test_banking_stage_tick() { + #[test_case(TransactionStructure::Sdk)] + #[test_case(TransactionStructure::View)] + fn test_banking_stage_tick(transaction_struct: TransactionStructure) { solana_logger::setup(); let GenesisConfigInfo { mut genesis_config, .. @@ -1020,6 +1027,7 @@ mod tests { let banking_stage = BankingStage::new( BlockProductionMethod::CentralScheduler, + transaction_struct, &cluster_info, &poh_recorder, non_vote_receiver, @@ -1066,7 +1074,10 @@ mod tests { with_vers.into_iter().map(|(b, _)| b).collect() } - fn test_banking_stage_entries_only(block_production_method: BlockProductionMethod) { + fn test_banking_stage_entries_only( + block_production_method: BlockProductionMethod, + transaction_struct: TransactionStructure, + ) { solana_logger::setup(); let GenesisConfigInfo { genesis_config, @@ -1104,6 +1115,7 @@ mod tests { let banking_stage = BankingStage::new( block_production_method, + transaction_struct, &cluster_info, &poh_recorder, non_vote_receiver, @@ -1197,9 +1209,13 @@ mod tests { Blockstore::destroy(ledger_path.path()).unwrap(); } - #[test] - fn test_banking_stage_entries_only_central_scheduler() { - test_banking_stage_entries_only(BlockProductionMethod::CentralScheduler); + #[test_case(TransactionStructure::Sdk)] + #[test_case(TransactionStructure::View)] + fn test_banking_stage_entries_only_central_scheduler(transaction_struct: TransactionStructure) { + test_banking_stage_entries_only( + BlockProductionMethod::CentralScheduler, + transaction_struct, + ); } #[test] @@ -1430,8 +1446,9 @@ mod tests { tick_producer.unwrap() } - #[test] - fn test_unprocessed_transaction_storage_full_send() { + #[test_case(TransactionStructure::Sdk)] + #[test_case(TransactionStructure::View)] + fn test_unprocessed_transaction_storage_full_send(transaction_struct: TransactionStructure) { solana_logger::setup(); let GenesisConfigInfo { genesis_config, @@ -1469,6 +1486,7 @@ mod tests { let banking_stage = BankingStage::new( BlockProductionMethod::CentralScheduler, + transaction_struct, &cluster_info, &poh_recorder, non_vote_receiver, diff --git a/core/src/tpu.rs b/core/src/tpu.rs index d715bb5c7b0534..9f2a483a62b2a8 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -15,7 +15,7 @@ use { sigverify_stage::SigVerifyStage, staked_nodes_updater_service::StakedNodesUpdaterService, tpu_entry_notifier::TpuEntryNotifier, - validator::{BlockProductionMethod, GeneratorConfig}, + validator::{BlockProductionMethod, GeneratorConfig, TransactionStructure}, }, bytes::Bytes, crossbeam_channel::{unbounded, Receiver}, @@ -269,6 +269,7 @@ impl Tpu { let banking_stage = BankingStage::new( block_production_method, + TransactionStructure::Sdk, // TODO: add cli cluster_info, poh_recorder, non_vote_receiver, diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 4580c0b7731abe..3c4befa212d0ea 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -33,7 +33,7 @@ use { solana_core::{ banking_simulation::{BankingSimulator, BankingTraceEvents}, system_monitor_service::{SystemMonitorService, SystemMonitorStatsReportConfig}, - validator::{BlockProductionMethod, BlockVerificationMethod}, + validator::{BlockProductionMethod, BlockVerificationMethod, TransactionStructure}, }, solana_cost_model::{cost_model::CostModel, cost_tracker::CostTracker}, solana_feature_set::{self as feature_set, FeatureSet}, @@ -2515,14 +2515,18 @@ fn main() { BlockProductionMethod ) .unwrap_or_default(); + let transaction_struct = + value_t!(arg_matches, "transaction_struct", TransactionStructure) + .unwrap_or_default(); - info!("Using: block-production-method: {block_production_method}"); + info!("Using: block-production-method: {block_production_method} transaction-structure: {transaction_struct}"); match simulator.start( genesis_config, bank_forks, blockstore, block_production_method, + transaction_struct, ) { Ok(()) => println!("Ok"), Err(error) => { From 923e4105df09c354835ba30a3fa5cc0c2109b713 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 27 Nov 2024 09:46:01 -0600 Subject: [PATCH 07/26] add cli hookup for validator --- core/src/tpu.rs | 3 ++- core/src/validator.rs | 7 +++++-- local-cluster/src/validator_configs.rs | 1 + multinode-demo/bootstrap-validator.sh | 3 +++ multinode-demo/validator.sh | 3 +++ validator/src/cli.rs | 10 +++++++++- validator/src/main.rs | 11 +++++++++-- 7 files changed, 32 insertions(+), 6 deletions(-) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 9f2a483a62b2a8..b5a73415160260 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -118,6 +118,7 @@ impl Tpu { tpu_max_connections_per_ipaddr_per_minute: u64, prioritization_fee_cache: &Arc, block_production_method: BlockProductionMethod, + transaction_struct: TransactionStructure, enable_block_production_forwarding: bool, _generator_config: Option, /* vestigial code for replay invalidator */ ) -> (Self, Vec>) { @@ -269,7 +270,7 @@ impl Tpu { let banking_stage = BankingStage::new( block_production_method, - TransactionStructure::Sdk, // TODO: add cli + transaction_struct, cluster_info, poh_recorder, non_vote_receiver, diff --git a/core/src/validator.rs b/core/src/validator.rs index 90b08f40158ad7..62eb4fa23346e8 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -299,6 +299,7 @@ pub struct ValidatorConfig { pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit, pub block_verification_method: BlockVerificationMethod, pub block_production_method: BlockProductionMethod, + pub transaction_struct: TransactionStructure, pub enable_block_production_forwarding: bool, pub generator_config: Option, pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup, @@ -371,6 +372,7 @@ impl Default for ValidatorConfig { banking_trace_dir_byte_limit: 0, block_verification_method: BlockVerificationMethod::default(), block_production_method: BlockProductionMethod::default(), + transaction_struct: TransactionStructure::default(), enable_block_production_forwarding: false, generator_config: None, use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(), @@ -900,8 +902,8 @@ impl Validator { config.accounts_db_test_hash_calculation, ); info!( - "Using: block-verification-method: {}, block-production-method: {}", - config.block_verification_method, config.block_production_method + "Using: block-verification-method: {}, block-production-method: {}, transaction-structure: {}", + config.block_verification_method, config.block_production_method, config.transaction_struct ); let (replay_vote_sender, replay_vote_receiver) = unbounded(); @@ -1561,6 +1563,7 @@ impl Validator { tpu_max_connections_per_ipaddr_per_minute, &prioritization_fee_cache, config.block_production_method.clone(), + config.transaction_struct.clone(), config.enable_block_production_forwarding, config.generator_config.clone(), ); diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index e90475aad2a06f..50199be9e7172c 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -61,6 +61,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { banking_trace_dir_byte_limit: config.banking_trace_dir_byte_limit, block_verification_method: config.block_verification_method.clone(), block_production_method: config.block_production_method.clone(), + transaction_struct: config.transaction_struct.clone(), enable_block_production_forwarding: config.enable_block_production_forwarding, generator_config: config.generator_config.clone(), use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup, diff --git a/multinode-demo/bootstrap-validator.sh b/multinode-demo/bootstrap-validator.sh index d21ee1aaa8b73f..4285be6cac5dc1 100755 --- a/multinode-demo/bootstrap-validator.sh +++ b/multinode-demo/bootstrap-validator.sh @@ -112,6 +112,9 @@ while [[ -n $1 ]]; do elif [[ $1 == --block-production-method ]]; then args+=("$1" "$2") shift 2 + elif [[ $1 == --transaction-structure ]]; then + args+=("$1" "$2") + shift 2 elif [[ $1 == --wen-restart ]]; then args+=("$1" "$2") shift 2 diff --git a/multinode-demo/validator.sh b/multinode-demo/validator.sh index c97812c6cbb910..800b4ce9d136d2 100755 --- a/multinode-demo/validator.sh +++ b/multinode-demo/validator.sh @@ -182,6 +182,9 @@ while [[ -n $1 ]]; do elif [[ $1 == --block-production-method ]]; then args+=("$1" "$2") shift 2 + elif [[ $1 == --transaction-structure ]]; then + args+=("$1" "$2") + shift 2 elif [[ $1 == --wen-restart ]]; then args+=("$1" "$2") shift 2 diff --git a/validator/src/cli.rs b/validator/src/cli.rs index aed7a3bffc1d9f..1f9758770a792f 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -21,7 +21,7 @@ use { }, solana_core::{ banking_trace::{DirByteLimit, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT}, - validator::{BlockProductionMethod, BlockVerificationMethod}, + validator::{BlockProductionMethod, BlockVerificationMethod, TransactionStructure}, }, solana_faucet::faucet::{self, FAUCET_PORT}, solana_ledger::use_snapshot_archives_at_startup, @@ -1595,6 +1595,14 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .possible_values(BlockProductionMethod::cli_names()) .help(BlockProductionMethod::cli_message()), ) + .arg( + Arg::with_name("transaction_struct") + .long("transaction-structure") + .value_name("STRUCT") + .takes_value(true) + .possible_values(TransactionStructure::cli_names()) + .help(TransactionStructure::cli_message()), + ) .arg( Arg::with_name("unified_scheduler_handler_threads") .long("unified-scheduler-handler-threads") diff --git a/validator/src/main.rs b/validator/src/main.rs index a7de615b3be9ac..a067fc6ff89be5 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -36,8 +36,9 @@ use { system_monitor_service::SystemMonitorService, tpu::DEFAULT_TPU_COALESCE, validator::{ - is_snapshot_config_valid, BlockProductionMethod, BlockVerificationMethod, Validator, - ValidatorConfig, ValidatorError, ValidatorStartProgress, ValidatorTpuConfig, + is_snapshot_config_valid, BlockProductionMethod, BlockVerificationMethod, + TransactionStructure, Validator, ValidatorConfig, ValidatorError, + ValidatorStartProgress, ValidatorTpuConfig, }, }, solana_gossip::{ @@ -1852,6 +1853,12 @@ pub fn main() { BlockProductionMethod ) .unwrap_or_default(); + validator_config.transaction_struct = value_t!( + matches, // comment to align formatting... + "transaction_struct", + TransactionStructure + ) + .unwrap_or_default(); validator_config.enable_block_production_forwarding = staked_nodes_overrides_path.is_some(); validator_config.unified_scheduler_handler_threads = value_t!(matches, "unified_scheduler_handler_threads", usize).ok(); From 703825ac056cc168a4e68975bfffd579c07c8110 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 27 Nov 2024 10:18:30 -0600 Subject: [PATCH 08/26] test both ReceiveAndBuffer in scheduler_controller --- .../scheduler_controller.rs | 131 +++++++++++------- 1 file changed, 81 insertions(+), 50 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index a25c7a73d4b6fd..56e95be61ac1c2 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -443,10 +443,12 @@ mod tests { tests::create_slow_genesis_config, transaction_scheduler::{ prio_graph_scheduler::PrioGraphSchedulerConfig, - receive_and_buffer::SanitizedTransactionReceiveAndBuffer, + receive_and_buffer::{ + SanitizedTransactionReceiveAndBuffer, TransactionViewReceiveAndBuffer, + }, }, }, - banking_trace::BankingPacketBatch, + banking_trace::{BankingPacketBatch, BankingPacketReceiver}, }, crossbeam_channel::{unbounded, Receiver, Sender}, itertools::Itertools, @@ -458,21 +460,15 @@ mod tests { solana_perf::packet::{to_packet_batches, PacketBatch, NUM_PACKETS}, solana_poh::poh_recorder::{PohRecorder, Record, WorkingBankEntry}, solana_runtime::bank::Bank, - solana_runtime_transaction::runtime_transaction::RuntimeTransaction, + solana_runtime_transaction::transaction_meta::StaticMeta, solana_sdk::{ - compute_budget::ComputeBudgetInstruction, - fee_calculator::FeeRateGovernor, - hash::Hash, - message::Message, - poh_config::PohConfig, - pubkey::Pubkey, - signature::Keypair, - signer::Signer, - system_instruction, system_transaction, - transaction::{SanitizedTransaction, Transaction}, + compute_budget::ComputeBudgetInstruction, fee_calculator::FeeRateGovernor, hash::Hash, + message::Message, poh_config::PohConfig, pubkey::Pubkey, signature::Keypair, + signer::Signer, system_instruction, system_transaction, transaction::Transaction, }, std::sync::{atomic::AtomicBool, Arc, RwLock}, tempfile::TempDir, + test_case::test_case, }; fn create_channels(num: usize) -> (Vec>, Vec>) { @@ -481,7 +477,7 @@ mod tests { // Helper struct to create tests that hold channels, files, etc. // such that our tests can be more easily set up and run. - struct TestFrame { + struct TestFrame { bank: Arc, mint_keypair: Keypair, _ledger_path: TempDir, @@ -490,18 +486,38 @@ mod tests { poh_recorder: Arc>, banking_packet_sender: Sender>>, - consume_work_receivers: - Vec>>>, - finished_consume_work_sender: - Sender>>, + consume_work_receivers: Vec>>, + finished_consume_work_sender: Sender>, + } + + fn test_create_sanitized_transaction_receive_and_buffer( + receiver: BankingPacketReceiver, + bank_forks: Arc>, + ) -> SanitizedTransactionReceiveAndBuffer { + SanitizedTransactionReceiveAndBuffer::new( + PacketDeserializer::new(receiver), + bank_forks, + false, + ) + } + + fn test_create_transaction_view_receive_and_buffer( + receiver: BankingPacketReceiver, + bank_forks: Arc>, + ) -> TransactionViewReceiveAndBuffer { + TransactionViewReceiveAndBuffer { + receiver, + bank_forks, + } } #[allow(clippy::type_complexity)] - fn create_test_frame( + fn create_test_frame( num_threads: usize, + create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc>) -> R, ) -> ( - TestFrame, - SchedulerController, SanitizedTransactionReceiveAndBuffer>, + TestFrame, + SchedulerController, R>, ) { let GenesisConfigInfo { mut genesis_config, @@ -529,7 +545,8 @@ mod tests { let decision_maker = DecisionMaker::new(Pubkey::new_unique(), poh_recorder.clone()); let (banking_packet_sender, banking_packet_receiver) = unbounded(); - let packet_deserializer = PacketDeserializer::new(banking_packet_receiver); + let receive_and_buffer = + create_receive_and_buffer(banking_packet_receiver, bank_forks.clone()); let (consume_work_senders, consume_work_receivers) = create_channels(num_threads); let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded(); @@ -546,12 +563,6 @@ mod tests { finished_consume_work_sender, }; - let receive_and_buffer = SanitizedTransactionReceiveAndBuffer::new( - packet_deserializer, - bank_forks.clone(), - false, - ); - let scheduler = PrioGraphScheduler::new( consume_work_senders, finished_consume_work_receiver, @@ -607,10 +618,7 @@ mod tests { // In the tests, the decision will not become stale, so it is more convenient // to receive first and then schedule. fn test_receive_then_schedule( - scheduler_controller: &mut SchedulerController< - Arc, - SanitizedTransactionReceiveAndBuffer, - >, + scheduler_controller: &mut SchedulerController, impl ReceiveAndBuffer>, ) { let decision = scheduler_controller .decision_maker @@ -621,10 +629,13 @@ mod tests { assert!(scheduler_controller.process_transactions(&decision).is_ok()); } - #[test] + #[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")] + #[test_case(test_create_transaction_view_receive_and_buffer; "View")] #[should_panic(expected = "batch id 0 is not being tracked")] - fn test_unexpected_batch_id() { - let (test_frame, scheduler_controller) = create_test_frame(1); + fn test_unexpected_batch_id( + create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc>) -> R, + ) { + let (test_frame, scheduler_controller) = create_test_frame(1, create_receive_and_buffer); let TestFrame { finished_consume_work_sender, .. @@ -645,9 +656,13 @@ mod tests { scheduler_controller.run().unwrap(); } - #[test] - fn test_schedule_consume_single_threaded_no_conflicts() { - let (test_frame, mut scheduler_controller) = create_test_frame(1); + #[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")] + #[test_case(test_create_transaction_view_receive_and_buffer; "View")] + fn test_schedule_consume_single_threaded_no_conflicts( + create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc>) -> R, + ) { + let (test_frame, mut scheduler_controller) = + create_test_frame(1, create_receive_and_buffer); let TestFrame { bank, mint_keypair, @@ -701,9 +716,13 @@ mod tests { assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]); } - #[test] - fn test_schedule_consume_single_threaded_conflict() { - let (test_frame, mut scheduler_controller) = create_test_frame(1); + #[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")] + #[test_case(test_create_transaction_view_receive_and_buffer; "View")] + fn test_schedule_consume_single_threaded_conflict( + create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc>) -> R, + ) { + let (test_frame, mut scheduler_controller) = + create_test_frame(1, create_receive_and_buffer); let TestFrame { bank, mint_keypair, @@ -760,9 +779,13 @@ mod tests { assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]); } - #[test] - fn test_schedule_consume_single_threaded_multi_batch() { - let (test_frame, mut scheduler_controller) = create_test_frame(1); + #[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")] + #[test_case(test_create_transaction_view_receive_and_buffer; "View")] + fn test_schedule_consume_single_threaded_multi_batch( + create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc>) -> R, + ) { + let (test_frame, mut scheduler_controller) = + create_test_frame(1, create_receive_and_buffer); let TestFrame { bank, mint_keypair, @@ -824,9 +847,13 @@ mod tests { ); } - #[test] - fn test_schedule_consume_simple_thread_selection() { - let (test_frame, mut scheduler_controller) = create_test_frame(2); + #[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")] + #[test_case(test_create_transaction_view_receive_and_buffer; "View")] + fn test_schedule_consume_simple_thread_selection( + create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc>) -> R, + ) { + let (test_frame, mut scheduler_controller) = + create_test_frame(2, create_receive_and_buffer); let TestFrame { bank, mint_keypair, @@ -891,9 +918,13 @@ mod tests { assert_eq!(t1_actual, t1_expected); } - #[test] - fn test_schedule_consume_retryable() { - let (test_frame, mut scheduler_controller) = create_test_frame(1); + #[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")] + #[test_case(test_create_transaction_view_receive_and_buffer; "View")] + fn test_schedule_consume_retryable( + create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc>) -> R, + ) { + let (test_frame, mut scheduler_controller) = + create_test_frame(1, create_receive_and_buffer); let TestFrame { bank, mint_keypair, From d995a77e251a10c940846a9ee6a23a60281c37a8 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 27 Nov 2024 10:33:51 -0600 Subject: [PATCH 09/26] Fix bytes leak on retry --- .../transaction_scheduler/prio_graph_scheduler.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 5e401f15942fb6..7b016d41fc43c7 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -378,6 +378,11 @@ impl PrioGraphScheduler { continue; } } + // Transaction must be dropped before removing, since we + // currently have ownership of the transaction, and + // therefore may have a reference to the backing-memory + // that the container expects to be free. + drop(transaction); container.remove_by_id(id); } From 408fcdfdf95fbd6f402a1936dac733d245c3b161 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 27 Nov 2024 10:48:46 -0600 Subject: [PATCH 10/26] receive_and_buffer exit condition fix --- .../transaction_scheduler/receive_and_buffer.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index 259d0e8580a92e..25b49e3388f654 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -52,7 +52,8 @@ pub(crate) trait ReceiveAndBuffer { type Transaction: TransactionWithMeta + Send + Sync; type Container: StateContainer + Send + Sync; - /// Returns whether the packet receiver is still connected. + /// Returns false only if no packets were received + /// AND the receiver is disconnected. fn receive_and_buffer_packets( &mut self, container: &mut Self::Container, @@ -313,6 +314,7 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { // Receive packet batches. const TIMEOUT: Duration = Duration::from_millis(10); let start = Instant::now(); + let mut received_message = false; // If not leader, do a blocking-receive initially. This lets the thread // sleep when there is not work to do. @@ -328,6 +330,7 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { ) { match self.receiver.recv_timeout(TIMEOUT) { Ok(packet_batch_message) => { + received_message = true; self.handle_packet_batch_message( container, timing_metrics, @@ -337,13 +340,16 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { ); } Err(RecvTimeoutError::Timeout) => return true, - Err(RecvTimeoutError::Disconnected) => return false, + Err(RecvTimeoutError::Disconnected) => { + return received_message; + } } } while start.elapsed() < TIMEOUT { match self.receiver.try_recv() { Ok(packet_batch_message) => { + received_message = true; self.handle_packet_batch_message( container, timing_metrics, @@ -353,7 +359,9 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { ); } Err(TryRecvError::Empty) => return true, - Err(TryRecvError::Disconnected) => return false, + Err(TryRecvError::Disconnected) => { + return received_message; + } } } From b094b68d7fac4b9651f6bae297e85f2638209afe Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 27 Nov 2024 11:42:23 -0600 Subject: [PATCH 11/26] lock bank_forks less often --- .../receive_and_buffer.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index 25b49e3388f654..7f5adfbfe1bdde 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -311,6 +311,13 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { count_metrics: &mut SchedulerCountMetrics, decision: &BufferedPacketsDecision, ) -> bool { + let (root_bank, working_bank) = { + let bank_forks = self.bank_forks.read().unwrap(); + let root_bank = bank_forks.root_bank(); + let working_bank = bank_forks.working_bank(); + (root_bank, working_bank) + }; + // Receive packet batches. const TIMEOUT: Duration = Duration::from_millis(10); let start = Instant::now(); @@ -336,6 +343,8 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { timing_metrics, count_metrics, decision, + &root_bank, + &working_bank, packet_batch_message, ); } @@ -355,6 +364,8 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { timing_metrics, count_metrics, decision, + &root_bank, + &working_bank, packet_batch_message, ); } @@ -376,6 +387,8 @@ impl TransactionViewReceiveAndBuffer { timing_metrics: &mut SchedulerTimingMetrics, count_metrics: &mut SchedulerCountMetrics, decision: &BufferedPacketsDecision, + root_bank: &Bank, + working_bank: &Bank, packet_batch_message: BankingPacketBatch, ) { // Do not support forwarding - only add support for this if we really need it. @@ -385,12 +398,6 @@ impl TransactionViewReceiveAndBuffer { let start = Instant::now(); // Sanitize packets, generate IDs, and insert into the container. - let (root_bank, working_bank) = { - let bank_forks = self.bank_forks.read().unwrap(); - let root_bank = bank_forks.root_bank(); - let working_bank = bank_forks.working_bank(); - (root_bank, working_bank) - }; let alt_resolved_slot = root_bank.slot(); let sanitized_epoch = root_bank.epoch(); let transaction_account_lock_limit = working_bank.get_transaction_account_lock_limit(); From d616b259c3faccea5b3ac1c288e8dd651d79c391 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 2 Dec 2024 09:03:25 -0600 Subject: [PATCH 12/26] remove references --- .../banking_stage/transaction_scheduler/receive_and_buffer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index 7f5adfbfe1bdde..be99db8842aa26 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -422,8 +422,8 @@ impl TransactionViewReceiveAndBuffer { match Self::try_handle_packet( bytes.clone(), - &root_bank, - &working_bank, + root_bank, + working_bank, alt_resolved_slot, sanitized_epoch, transaction_account_lock_limit, From 5a5647b01cd86dcb97b27e5f83993f7cc6865730 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 3 Dec 2024 10:03:11 -0600 Subject: [PATCH 13/26] minor sleeping efficiency improvement --- .../receive_and_buffer.rs | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index be99db8842aa26..c3868855425438 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -323,18 +323,19 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { let start = Instant::now(); let mut received_message = false; - // If not leader, do a blocking-receive initially. This lets the thread - // sleep when there is not work to do. - // TODO: Is it better to manually sleep instead, avoiding the locking - // overhead for wakers? But then risk not waking up when message - // received - as long as sleep is somewhat short, this should be - // fine. - if matches!( - decision, - BufferedPacketsDecision::Forward - | BufferedPacketsDecision::ForwardAndHold - | BufferedPacketsDecision::Hold - ) { + // If not leader/unknown, do a blocking-receive initially. This lets + // the thread sleep until a message is received, or until the timeout. + // Additionally, only sleep if the container is empty. + if container.is_empty() + && matches!( + decision, + BufferedPacketsDecision::Forward | BufferedPacketsDecision::ForwardAndHold + ) + { + // TODO: Is it better to manually sleep instead, avoiding the locking + // overhead for wakers? But then risk not waking up when message + // received - as long as sleep is somewhat short, this should be + // fine. match self.receiver.recv_timeout(TIMEOUT) { Ok(packet_batch_message) => { received_message = true; From a4050ca01f90fcba1c02287fb9b3d76e75e27f97 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 5 Dec 2024 16:36:36 -0600 Subject: [PATCH 14/26] impl TransactionData for Arc> --- transaction-view/src/transaction_data.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/transaction-view/src/transaction_data.rs b/transaction-view/src/transaction_data.rs index d6152c8069d3e8..df20f96b65a39c 100644 --- a/transaction-view/src/transaction_data.rs +++ b/transaction-view/src/transaction_data.rs @@ -17,3 +17,10 @@ impl TransactionData for bytes::Bytes { self.as_ref() } } + +impl TransactionData for std::sync::Arc> { + #[inline] + fn data(&self) -> &[u8] { + self.as_ref() + } +} From cd92b92b3a2d67eab18d278905cae95b6eb127f1 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 5 Dec 2024 17:00:53 -0600 Subject: [PATCH 15/26] SharedBytes = Arc> --- .../receive_and_buffer.rs | 31 +++-- .../transaction_state_container.rs | 119 +++++------------- 2 files changed, 45 insertions(+), 105 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index c3868855425438..4597d4c27b6eab 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -3,7 +3,7 @@ use { scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics}, transaction_state::TransactionState, transaction_state_container::{ - StateContainer, SuccessfulInsert, TransactionViewStateContainer, + SharedBytes, StateContainer, SuccessfulInsert, TransactionViewStateContainer, }, }, crate::{ @@ -22,7 +22,6 @@ use { transaction_version::TransactionVersion, transaction_view::SanitizedTransactionView, }, arrayvec::ArrayVec, - bytes::Bytes, core::time::Duration, crossbeam_channel::{RecvTimeoutError, TryRecvError}, solana_accounts_db::account_locks::validate_account_locks, @@ -301,7 +300,7 @@ pub(crate) struct TransactionViewReceiveAndBuffer { } impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { - type Transaction = RuntimeTransaction>; + type Transaction = RuntimeTransaction>; type Container = TransactionViewStateContainer; fn receive_and_buffer_packets( @@ -416,13 +415,10 @@ impl TransactionViewReceiveAndBuffer { num_received += 1; // Reserve free-space to copy packet into, run sanitization checks, and insert. - if container.try_insert_with(|mut bytes| { - // Copy packet data into the buffer, and freeze. - bytes.extend_from_slice(packet_data); - let bytes = bytes.freeze(); - - match Self::try_handle_packet( - bytes.clone(), + if container.try_insert_with_data( + packet_data, + |bytes| match Self::try_handle_packet( + bytes, root_bank, working_bank, alt_resolved_slot, @@ -431,14 +427,14 @@ impl TransactionViewReceiveAndBuffer { ) { Ok(state) => { num_buffered += 1; - Ok(SuccessfulInsert { state, bytes }) + Ok(SuccessfulInsert { state }) } Err(()) => { num_dropped_on_receive += 1; - Err(bytes.try_into_mut().expect("no leaks")) + Err(()) } - } - }) { + }, + ) { num_dropped_on_capacity += 1; }; } @@ -460,15 +456,16 @@ impl TransactionViewReceiveAndBuffer { } fn try_handle_packet( - bytes: Bytes, + bytes: SharedBytes, root_bank: &Bank, working_bank: &Bank, alt_resolved_slot: Slot, sanitized_epoch: Epoch, transaction_account_lock_limit: usize, - ) -> Result>>, ()> { + ) -> Result>>, ()> + { // Parsing and basic sanitization checks - let Ok(view) = SanitizedTransactionView::try_new_sanitized(bytes.clone()) else { + let Ok(view) = SanitizedTransactionView::try_new_sanitized(bytes) else { return Err(()); }; diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index 68a46b9d94492e..14ed0ae23cc05b 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -8,7 +8,6 @@ use { scheduler_messages::TransactionId, }, agave_transaction_view::resolved_transaction_view::ResolvedTransactionView, - bytes::{Bytes, BytesMut}, itertools::MinMaxResult, min_max_heap::MinMaxHeap, slab::Slab, @@ -197,50 +196,26 @@ impl TransactionStateContainer { } } +pub type SharedBytes = Arc>; + /// A wrapper around `TransactionStateContainer` that allows re-uses /// pre-allocated `Bytes` to copy packet data into and use for serialization. /// This is used to avoid allocations in parsing transactions. pub struct TransactionViewStateContainer { - inner: TransactionStateContainer>>, - bytes_buffer: Box<[MaybeBytes]>, -} - -enum MaybeBytes { - None, - Bytes(Bytes), - BytesMut(BytesMut), -} - -impl MaybeBytes { - fn reserve_space(&mut self) -> BytesMut { - match core::mem::replace(self, MaybeBytes::None) { - MaybeBytes::BytesMut(bytes) => bytes, - _ => unreachable!("invalid state"), - } - } - - fn freeze(&mut self, bytes: Bytes) { - debug_assert!(matches!(self, MaybeBytes::None)); - *self = MaybeBytes::Bytes(bytes); - } - - fn free_space(&mut self, mut bytes: BytesMut) { - debug_assert!(matches!(self, MaybeBytes::None)); - bytes.clear(); - *self = MaybeBytes::BytesMut(bytes); - } + inner: TransactionStateContainer>>, + bytes_buffer: Box<[SharedBytes]>, } pub(crate) struct SuccessfulInsert { - pub state: TransactionState>>, - pub bytes: Bytes, + pub state: TransactionState>>, } impl TransactionViewStateContainer { /// Returns true if packet was dropped due to capacity limits. - pub(crate) fn try_insert_with( + pub(crate) fn try_insert_with_data( &mut self, - f: impl FnOnce(BytesMut) -> Result, + data: &[u8], + f: impl FnOnce(SharedBytes) -> Result, ) -> bool { if self.inner.id_to_transaction_state.len() == self.inner.id_to_transaction_state.capacity() { @@ -255,58 +230,38 @@ impl TransactionViewStateContainer { // Get the vacant space in the bytes buffer. let bytes_entry = &mut self.bytes_buffer[transaction_id]; - let bytes = bytes_entry.reserve_space(); + // Assert the entry is unique, then copy the packet data. + { + assert_eq!(Arc::strong_count(bytes_entry), 1, "entry must be unique"); + let bytes = Arc::make_mut(bytes_entry); + + // Clear and copy the packet data into the bytes buffer. + bytes.clear(); + bytes.extend_from_slice(data); + } // Attempt to insert the transaction, storing the frozen bytes back into bytes buffer. - match f(bytes) { - Ok(SuccessfulInsert { state, bytes }) => { + match f(Arc::clone(bytes_entry)) { + Ok(SuccessfulInsert { state }) => { let priority_id = TransactionPriorityId::new(state.priority(), transaction_id); vacant_entry.insert(state); - bytes_entry.freeze(bytes); // Push the transaction into the queue. - self.push_id_into_queue_with_remaining_capacity(priority_id, remaining_capacity) - } - Err(bytes) => { - bytes_entry.free_space(bytes); - false + self.inner + .push_id_into_queue_with_remaining_capacity(priority_id, remaining_capacity) } - } - } - - // This is re-implemented since we need it to call `remove_by_id` on this - // struct rather than `inner`. This is important because we need to return - // the `Bytes` to the pool. - /// Returns true if packet was dropped due to capacity limits. - fn push_id_into_queue_with_remaining_capacity( - &mut self, - priority_id: TransactionPriorityId, - remaining_capacity: usize, - ) -> bool { - if remaining_capacity == 0 { - let popped_id = self.inner.priority_queue.push_pop_min(priority_id); - self.remove_by_id(popped_id.id); - true - } else { - self.inner.priority_queue.push(priority_id); - false + Err(_) => false, } } } -impl StateContainer>> +impl StateContainer>> for TransactionViewStateContainer { fn with_capacity(capacity: usize) -> Self { let inner = TransactionStateContainer::with_capacity(capacity); let bytes_buffer = (0..inner.id_to_transaction_state.capacity()) - .map(|_| { - MaybeBytes::BytesMut({ - let mut bytes = BytesMut::zeroed(PACKET_DATA_SIZE); - bytes.clear(); - bytes - }) - }) + .map(|_| Arc::new(Vec::with_capacity(PACKET_DATA_SIZE))) .collect::>() .into_boxed_slice(); Self { @@ -334,7 +289,8 @@ impl StateContainer>> fn get_mut_transaction_state( &mut self, id: TransactionId, - ) -> Option<&mut TransactionState>>> { + ) -> Option<&mut TransactionState>>> + { self.inner.get_mut_transaction_state(id) } @@ -342,32 +298,19 @@ impl StateContainer>> fn get_transaction_ttl( &self, id: TransactionId, - ) -> Option<&SanitizedTransactionTTL>>> { + ) -> Option<&SanitizedTransactionTTL>>> + { self.inner.get_transaction_ttl(id) } #[inline] fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) -> bool { - self.push_id_into_queue_with_remaining_capacity(priority_id, self.remaining_capacity()) + self.inner.push_id_into_queue(priority_id) } + #[inline] fn remove_by_id(&mut self, id: TransactionId) { - // Remove the entry from the map: - // 1. If it was unprocessed, this will drop the `Bytes` held. - // 2. If it was scheduled, this just marks the entry as removed. - let _ = self.inner.id_to_transaction_state.remove(id); - - // Clear the bytes buffer. - let bytes_entry = &mut self.bytes_buffer[id]; - let MaybeBytes::Bytes(bytes) = core::mem::replace(bytes_entry, MaybeBytes::None) else { - unreachable!("invalid state"); - }; - - // Return the `Bytes` to the pool. - let bytes = bytes - .try_into_mut() - .expect("all `Bytes` instances must be dropped"); - bytes_entry.free_space(bytes); + self.inner.remove_by_id(id); } #[inline] From eb5bafa7e45e51b2a3fddc50e5765b2a5a34a687 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Dec 2024 10:27:48 -0600 Subject: [PATCH 16/26] only warn about forwarding if not already using Sdk --- core/src/banking_stage.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 0f400bfa6f0f43..2c8531035e199f 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -574,14 +574,15 @@ impl BankingStage { )); } - let transaction_struct = if enable_forwarding { - warn!( + let transaction_struct = + if enable_forwarding && !matches!(transaction_struct, TransactionStructure::Sdk) { + warn!( "Forwarding only supported for `Sdk` transaction struct. Overriding to use `Sdk`." ); - TransactionStructure::Sdk - } else { - transaction_struct - }; + TransactionStructure::Sdk + } else { + transaction_struct + }; match transaction_struct { TransactionStructure::Sdk => { From 6a9b598169b3d3d9b28f7d8200e4098d0aeb0473 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Dec 2024 14:18:05 -0600 Subject: [PATCH 17/26] fix bug on should_forward --- .../banking_stage/transaction_scheduler/transaction_state.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state.rs b/core/src/banking_stage/transaction_scheduler/transaction_state.rs index c4b0bfc5c0b564..e03d8d35d489b0 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state.rs @@ -59,10 +59,10 @@ impl TransactionState { priority: u64, cost: u64, ) -> Self { - let should_forward = !packet + let should_forward = packet .as_ref() .map(|packet| { - packet.original_packet().meta().forwarded() + !packet.original_packet().meta().forwarded() && packet.original_packet().meta().is_from_staked_node() }) .unwrap_or_default(); From 7cbcdc53af44b1ebf20a2f836fa904e679d8e344 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Dec 2024 14:26:15 -0600 Subject: [PATCH 18/26] test_initialize_should_forward --- .../transaction_state.rs | 30 ++++++++++++++++--- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state.rs b/core/src/banking_stage/transaction_scheduler/transaction_state.rs index e03d8d35d489b0..0d08f7edc7fce4 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state.rs @@ -2,6 +2,7 @@ use { crate::banking_stage::{ immutable_deserialized_packet::ImmutableDeserializedPacket, scheduler_messages::MaxAge, }, + solana_sdk::packet::{self}, std::sync::Arc, }; @@ -61,10 +62,7 @@ impl TransactionState { ) -> Self { let should_forward = packet .as_ref() - .map(|packet| { - !packet.original_packet().meta().forwarded() - && packet.original_packet().meta().is_from_staked_node() - }) + .map(|packet| initialize_should_forward(packet.original_packet().meta())) .unwrap_or_default(); Self::Unprocessed { transaction_ttl, @@ -211,10 +209,15 @@ impl TransactionState { } } +fn initialize_should_forward(meta: &packet::Meta) -> bool { + !meta.forwarded() && meta.is_from_staked_node() +} + #[cfg(test)] mod tests { use { super::*, + packet::PacketFlags, solana_runtime_transaction::runtime_transaction::RuntimeTransaction, solana_sdk::{ compute_budget::ComputeBudgetInstruction, @@ -374,4 +377,23 @@ mod tests { )); assert_eq!(transaction_ttl.max_age, MaxAge::MAX); } + + #[test] + fn test_initialize_should_forward() { + let meta = packet::Meta::default(); + assert!(!initialize_should_forward(&meta)); + + let mut meta = packet::Meta::default(); + meta.flags.set(PacketFlags::FORWARDED, true); + assert!(!initialize_should_forward(&meta)); + + let mut meta = packet::Meta::default(); + meta.set_from_staked_node(true); + assert!(initialize_should_forward(&meta)); + + let mut meta = packet::Meta::default(); + meta.flags.set(PacketFlags::FORWARDED, true); + meta.set_from_staked_node(true); + assert!(!initialize_should_forward(&meta)); + } } From d2be83759858498dfb72a03f6eac17a9abbce6db Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Dec 2024 14:31:16 -0600 Subject: [PATCH 19/26] remove option --- .../transaction_scheduler/prio_graph_scheduler.rs | 2 +- .../transaction_scheduler/receive_and_buffer.rs | 2 +- .../transaction_scheduler/transaction_state_container.rs | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 7b016d41fc43c7..726d21284ea23e 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -726,7 +726,7 @@ mod tests { const TEST_TRANSACTION_COST: u64 = 5000; container.insert_new_transaction( transaction_ttl, - Some(packet), + packet, compute_unit_price, TEST_TRANSACTION_COST, ); diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index 4597d4c27b6eab..5848d6697e0d03 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -258,7 +258,7 @@ impl SanitizedTransactionReceiveAndBuffer { max_age, }; - if container.insert_new_transaction(transaction_ttl, Some(packet), priority, cost) { + if container.insert_new_transaction(transaction_ttl, packet, priority, cost) { saturating_add_assign!(num_dropped_on_capacity, 1); } saturating_add_assign!(num_buffered, 1); diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index 14ed0ae23cc05b..d0ef22f3cdea06 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -158,7 +158,7 @@ impl TransactionStateContainer { pub(crate) fn insert_new_transaction( &mut self, transaction_ttl: SanitizedTransactionTTL, - packet: Option>, + packet: Arc, priority: u64, cost: u64, ) -> bool { @@ -170,7 +170,7 @@ impl TransactionStateContainer { let transaction_id = entry.key(); entry.insert(TransactionState::new( transaction_ttl, - packet, + Some(packet), priority, cost, )); @@ -381,7 +381,7 @@ mod tests { ) { for priority in 0..num as u64 { let (transaction_ttl, packet, priority, cost) = test_transaction(priority); - container.insert_new_transaction(transaction_ttl, Some(packet), priority, cost); + container.insert_new_transaction(transaction_ttl, packet, priority, cost); } } From 26635cd9227e846881a0f9794e5ca08ffdd2bd95 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Dec 2024 14:36:34 -0600 Subject: [PATCH 20/26] remove impl TransactionData for Bytes --- Cargo.lock | 1 - programs/sbf/Cargo.lock | 1 - svm/examples/Cargo.lock | 1 - transaction-view/Cargo.toml | 1 - transaction-view/src/transaction_data.rs | 7 ------- 5 files changed, 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 97b21f370420d4..6c00f0459e93cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -238,7 +238,6 @@ version = "2.2.0" dependencies = [ "agave-transaction-view", "bincode", - "bytes", "criterion", "solana-sdk", "solana-sdk-ids", diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 82c40c0af73754..9c4f0caff1c9db 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -77,7 +77,6 @@ dependencies = [ name = "agave-transaction-view" version = "2.2.0" dependencies = [ - "bytes", "solana-sdk", "solana-sdk-ids", "solana-svm-transaction", diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 3e4748d18d7a81..170160ad1bb0d4 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -77,7 +77,6 @@ dependencies = [ name = "agave-transaction-view" version = "2.2.0" dependencies = [ - "bytes", "solana-sdk", "solana-sdk-ids", "solana-svm-transaction", diff --git a/transaction-view/Cargo.toml b/transaction-view/Cargo.toml index 49885c0fd7c7bd..cb4a43aa7753bd 100644 --- a/transaction-view/Cargo.toml +++ b/transaction-view/Cargo.toml @@ -10,7 +10,6 @@ license = { workspace = true } edition = { workspace = true } [dependencies] -bytes = { workspace = true } solana-sdk = { workspace = true } solana-sdk-ids = { workspace = true } solana-svm-transaction = { workspace = true } diff --git a/transaction-view/src/transaction_data.rs b/transaction-view/src/transaction_data.rs index df20f96b65a39c..323c085660fa85 100644 --- a/transaction-view/src/transaction_data.rs +++ b/transaction-view/src/transaction_data.rs @@ -11,13 +11,6 @@ impl TransactionData for &[u8] { } } -impl TransactionData for bytes::Bytes { - #[inline] - fn data(&self) -> &[u8] { - self.as_ref() - } -} - impl TransactionData for std::sync::Arc> { #[inline] fn data(&self) -> &[u8] { From 10bd87f8517504a3f892b70b2abbec21c063bcc5 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Dec 2024 15:21:56 -0600 Subject: [PATCH 21/26] get_vacant_map_entry --- .../transaction_state_container.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index d0ef22f3cdea06..1edb4f22636e30 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -10,7 +10,7 @@ use { agave_transaction_view::resolved_transaction_view::ResolvedTransactionView, itertools::MinMaxResult, min_max_heap::MinMaxHeap, - slab::Slab, + slab::{Slab, VacantEntry}, solana_runtime_transaction::{ runtime_transaction::RuntimeTransaction, transaction_with_meta::TransactionWithMeta, }, @@ -166,7 +166,7 @@ impl TransactionStateContainer { // the next vacant entry. i.e. get the size before we insert. let remaining_capacity = self.remaining_capacity(); let priority_id = { - let entry = self.id_to_transaction_state.vacant_entry(); + let entry = self.get_vacant_map_entry(); let transaction_id = entry.key(); entry.insert(TransactionState::new( transaction_ttl, @@ -194,6 +194,11 @@ impl TransactionStateContainer { false } } + + fn get_vacant_map_entry(&mut self) -> VacantEntry> { + assert!(self.id_to_transaction_state.len() < self.id_to_transaction_state.capacity()); + self.id_to_transaction_state.vacant_entry() + } } pub type SharedBytes = Arc>; @@ -217,15 +222,11 @@ impl TransactionViewStateContainer { data: &[u8], f: impl FnOnce(SharedBytes) -> Result, ) -> bool { - if self.inner.id_to_transaction_state.len() == self.inner.id_to_transaction_state.capacity() - { - return true; - } // Get remaining capacity before inserting. let remaining_capacity = self.remaining_capacity(); // Get a vacant entry in the slab. - let vacant_entry = self.inner.id_to_transaction_state.vacant_entry(); + let vacant_entry = self.inner.get_vacant_map_entry(); let transaction_id = vacant_entry.key(); // Get the vacant space in the bytes buffer. @@ -240,7 +241,7 @@ impl TransactionViewStateContainer { bytes.extend_from_slice(data); } - // Attempt to insert the transaction, storing the frozen bytes back into bytes buffer. + // Attempt to insert the transaction. match f(Arc::clone(bytes_entry)) { Ok(SuccessfulInsert { state }) => { let priority_id = TransactionPriorityId::new(state.priority(), transaction_id); From 87690b6e2b36d7f228f6945fa24eb2c18bdfab44 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Dec 2024 15:29:58 -0600 Subject: [PATCH 22/26] Safety comment --- .../transaction_scheduler/transaction_state_container.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index 1edb4f22636e30..0cdfba5710a286 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -233,6 +233,14 @@ impl TransactionViewStateContainer { let bytes_entry = &mut self.bytes_buffer[transaction_id]; // Assert the entry is unique, then copy the packet data. { + // The strong count must be 1 here. These are only cloned into the + // inner container below, wrapped by a `ResolveTransactionView`, + // which does not expose the backing memory (the `Arc`), or + // implement `Clone`. + // This could only fail if there is a bug in the container that the + // entry in the slab was not cleared. However, since we share + // indexing between the slab and our `bytes_buffer`, we know that + // `vacant_entry` is not occupied. assert_eq!(Arc::strong_count(bytes_entry), 1, "entry must be unique"); let bytes = Arc::make_mut(bytes_entry); From dfd3cc1b8b690e50006699e0f32590afed80e30c Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Dec 2024 15:39:15 -0600 Subject: [PATCH 23/26] remove SuccessfulInsert type --- .../transaction_scheduler/receive_and_buffer.rs | 6 ++---- .../transaction_state_container.rs | 13 +++++++------ 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index 5848d6697e0d03..3923c9623d7486 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -2,9 +2,7 @@ use { super::{ scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics}, transaction_state::TransactionState, - transaction_state_container::{ - SharedBytes, StateContainer, SuccessfulInsert, TransactionViewStateContainer, - }, + transaction_state_container::{SharedBytes, StateContainer, TransactionViewStateContainer}, }, crate::{ banking_stage::{ @@ -427,7 +425,7 @@ impl TransactionViewReceiveAndBuffer { ) { Ok(state) => { num_buffered += 1; - Ok(SuccessfulInsert { state }) + Ok(state) } Err(()) => { num_dropped_on_receive += 1; diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index 0cdfba5710a286..bbd594e11f1f34 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -211,16 +211,17 @@ pub struct TransactionViewStateContainer { bytes_buffer: Box<[SharedBytes]>, } -pub(crate) struct SuccessfulInsert { - pub state: TransactionState>>, -} - impl TransactionViewStateContainer { /// Returns true if packet was dropped due to capacity limits. pub(crate) fn try_insert_with_data( &mut self, data: &[u8], - f: impl FnOnce(SharedBytes) -> Result, + f: impl FnOnce( + SharedBytes, + ) -> Result< + TransactionState>>, + (), + >, ) -> bool { // Get remaining capacity before inserting. let remaining_capacity = self.remaining_capacity(); @@ -251,7 +252,7 @@ impl TransactionViewStateContainer { // Attempt to insert the transaction. match f(Arc::clone(bytes_entry)) { - Ok(SuccessfulInsert { state }) => { + Ok(state) => { let priority_id = TransactionPriorityId::new(state.priority(), transaction_id); vacant_entry.insert(state); From c6dffd7fafd78b335ff266c26264ea512d45b013 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Dec 2024 15:42:19 -0600 Subject: [PATCH 24/26] type aliases --- .../receive_and_buffer.rs | 7 ++++--- .../transaction_state_container.rs | 21 +++++++------------ 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index 3923c9623d7486..f320ddfb9b7c5d 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -2,7 +2,9 @@ use { super::{ scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics}, transaction_state::TransactionState, - transaction_state_container::{SharedBytes, StateContainer, TransactionViewStateContainer}, + transaction_state_container::{ + SharedBytes, StateContainer, TransactionViewState, TransactionViewStateContainer, + }, }, crate::{ banking_stage::{ @@ -460,8 +462,7 @@ impl TransactionViewReceiveAndBuffer { alt_resolved_slot: Slot, sanitized_epoch: Epoch, transaction_account_lock_limit: usize, - ) -> Result>>, ()> - { + ) -> Result { // Parsing and basic sanitization checks let Ok(view) = SanitizedTransactionView::try_new_sanitized(bytes) else { return Err(()); diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index bbd594e11f1f34..94aabd683f3f3a 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -202,12 +202,14 @@ impl TransactionStateContainer { } pub type SharedBytes = Arc>; +pub(crate) type RuntimeTransactionView = RuntimeTransaction>; +pub(crate) type TransactionViewState = TransactionState; /// A wrapper around `TransactionStateContainer` that allows re-uses /// pre-allocated `Bytes` to copy packet data into and use for serialization. /// This is used to avoid allocations in parsing transactions. pub struct TransactionViewStateContainer { - inner: TransactionStateContainer>>, + inner: TransactionStateContainer, bytes_buffer: Box<[SharedBytes]>, } @@ -216,12 +218,7 @@ impl TransactionViewStateContainer { pub(crate) fn try_insert_with_data( &mut self, data: &[u8], - f: impl FnOnce( - SharedBytes, - ) -> Result< - TransactionState>>, - (), - >, + f: impl FnOnce(SharedBytes) -> Result, ()>, ) -> bool { // Get remaining capacity before inserting. let remaining_capacity = self.remaining_capacity(); @@ -265,9 +262,7 @@ impl TransactionViewStateContainer { } } -impl StateContainer>> - for TransactionViewStateContainer -{ +impl StateContainer for TransactionViewStateContainer { fn with_capacity(capacity: usize) -> Self { let inner = TransactionStateContainer::with_capacity(capacity); let bytes_buffer = (0..inner.id_to_transaction_state.capacity()) @@ -299,8 +294,7 @@ impl StateContainer>> fn get_mut_transaction_state( &mut self, id: TransactionId, - ) -> Option<&mut TransactionState>>> - { + ) -> Option<&mut TransactionViewState> { self.inner.get_mut_transaction_state(id) } @@ -308,8 +302,7 @@ impl StateContainer>> fn get_transaction_ttl( &self, id: TransactionId, - ) -> Option<&SanitizedTransactionTTL>>> - { + ) -> Option<&SanitizedTransactionTTL> { self.inner.get_transaction_ttl(id) } From 61909725c4cd485285ba0f65900f876fc2a13419 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Dec 2024 15:45:44 -0600 Subject: [PATCH 25/26] remove explicit drop --- .../transaction_scheduler/prio_graph_scheduler.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 726d21284ea23e..8edebc1f80c200 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -378,11 +378,6 @@ impl PrioGraphScheduler { continue; } } - // Transaction must be dropped before removing, since we - // currently have ownership of the transaction, and - // therefore may have a reference to the backing-memory - // that the container expects to be free. - drop(transaction); container.remove_by_id(id); } From f2c9d4417ffec814d60c4b571cc1b096ce0a69b7 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 10 Dec 2024 09:08:46 -0600 Subject: [PATCH 26/26] Remove timeout from testing --- .../receive_and_buffer.rs | 43 +++++++++++-------- .../scheduler_controller.rs | 18 ++++++-- 2 files changed, 39 insertions(+), 22 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index f320ddfb9b7c5d..bcb3f55f9778da 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -51,15 +51,15 @@ pub(crate) trait ReceiveAndBuffer { type Transaction: TransactionWithMeta + Send + Sync; type Container: StateContainer + Send + Sync; - /// Returns false only if no packets were received - /// AND the receiver is disconnected. + /// Return Err if the receiver is disconnected AND no packets were + /// received. Otherwise return Ok(num_received). fn receive_and_buffer_packets( &mut self, container: &mut Self::Container, timing_metrics: &mut SchedulerTimingMetrics, count_metrics: &mut SchedulerCountMetrics, decision: &BufferedPacketsDecision, - ) -> bool; + ) -> Result; } pub(crate) struct SanitizedTransactionReceiveAndBuffer { @@ -81,7 +81,7 @@ impl ReceiveAndBuffer for SanitizedTransactionReceiveAndBuffer { timing_metrics: &mut SchedulerTimingMetrics, count_metrics: &mut SchedulerCountMetrics, decision: &BufferedPacketsDecision, - ) -> bool { + ) -> Result { let remaining_queue_capacity = container.remaining_capacity(); const MAX_PACKET_RECEIVE_TIME: Duration = Duration::from_millis(10); @@ -111,7 +111,7 @@ impl ReceiveAndBuffer for SanitizedTransactionReceiveAndBuffer { saturating_add_assign!(timing_metrics.receive_time_us, receive_time_us); }); - match received_packet_results { + let num_received = match received_packet_results { Ok(receive_packet_results) => { let num_received_packets = receive_packet_results.deserialized_packets.len(); @@ -137,12 +137,13 @@ impl ReceiveAndBuffer for SanitizedTransactionReceiveAndBuffer { ); }); } + num_received_packets } - Err(RecvTimeoutError::Timeout) => {} - Err(RecvTimeoutError::Disconnected) => return false, - } + Err(RecvTimeoutError::Timeout) => 0, + Err(RecvTimeoutError::Disconnected) => return Err(()), + }; - true + Ok(num_received) } } @@ -309,7 +310,7 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { timing_metrics: &mut SchedulerTimingMetrics, count_metrics: &mut SchedulerCountMetrics, decision: &BufferedPacketsDecision, - ) -> bool { + ) -> Result { let (root_bank, working_bank) = { let bank_forks = self.bank_forks.read().unwrap(); let root_bank = bank_forks.root_bank(); @@ -320,6 +321,7 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { // Receive packet batches. const TIMEOUT: Duration = Duration::from_millis(10); let start = Instant::now(); + let mut num_received = 0; let mut received_message = false; // If not leader/unknown, do a blocking-receive initially. This lets @@ -338,7 +340,7 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { match self.receiver.recv_timeout(TIMEOUT) { Ok(packet_batch_message) => { received_message = true; - self.handle_packet_batch_message( + num_received += self.handle_packet_batch_message( container, timing_metrics, count_metrics, @@ -348,9 +350,9 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { packet_batch_message, ); } - Err(RecvTimeoutError::Timeout) => return true, + Err(RecvTimeoutError::Timeout) => return Ok(num_received), Err(RecvTimeoutError::Disconnected) => { - return received_message; + return received_message.then_some(num_received).ok_or(()); } } } @@ -359,7 +361,7 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { match self.receiver.try_recv() { Ok(packet_batch_message) => { received_message = true; - self.handle_packet_batch_message( + num_received += self.handle_packet_batch_message( container, timing_metrics, count_metrics, @@ -369,18 +371,19 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { packet_batch_message, ); } - Err(TryRecvError::Empty) => return true, + Err(TryRecvError::Empty) => return Ok(num_received), Err(TryRecvError::Disconnected) => { - return received_message; + return received_message.then_some(num_received).ok_or(()); } } } - true + Ok(num_received) } } impl TransactionViewReceiveAndBuffer { + /// Return number of received packets. fn handle_packet_batch_message( &mut self, container: &mut TransactionViewStateContainer, @@ -390,10 +393,10 @@ impl TransactionViewReceiveAndBuffer { root_bank: &Bank, working_bank: &Bank, packet_batch_message: BankingPacketBatch, - ) { + ) -> usize { // Do not support forwarding - only add support for this if we really need it. if matches!(decision, BufferedPacketsDecision::Forward) { - return; + return 0; } let start = Instant::now(); @@ -453,6 +456,8 @@ impl TransactionViewReceiveAndBuffer { ); saturating_add_assign!(count_metrics.num_dropped_on_receive, num_dropped_on_receive); }); + + num_received } fn try_handle_packet( diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 56e95be61ac1c2..ecdcc7dc473e90 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -107,7 +107,7 @@ impl SchedulerController { self.process_transactions(&decision)?; self.receive_completed()?; - if !self.receive_and_buffer_packets(&decision) { + if self.receive_and_buffer_packets(&decision).is_err() { break; } // Report metrics only if there is data. @@ -421,7 +421,10 @@ impl SchedulerController { } /// Returns whether the packet receiver is still connected. - fn receive_and_buffer_packets(&mut self, decision: &BufferedPacketsDecision) -> bool { + fn receive_and_buffer_packets( + &mut self, + decision: &BufferedPacketsDecision, + ) -> Result { self.receive_and_buffer.receive_and_buffer_packets( &mut self.container, &mut self.timing_metrics, @@ -625,7 +628,16 @@ mod tests { .make_consume_or_forward_decision(); assert!(matches!(decision, BufferedPacketsDecision::Consume(_))); assert!(scheduler_controller.receive_completed().is_ok()); - assert!(scheduler_controller.receive_and_buffer_packets(&decision)); + + // Time is not a reliable way for deterministic testing. + // Loop here until no more packets are received, this avoids parallel + // tests from inconsistently timing out and not receiving + // from the channel. + while scheduler_controller + .receive_and_buffer_packets(&decision) + .map(|n| n > 0) + .unwrap_or_default() + {} assert!(scheduler_controller.process_transactions(&decision).is_ok()); }