From 4e927a973c6197b113488b8c757bb933c3387979 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 5 Dec 2024 17:00:53 -0600 Subject: [PATCH] SharedBytes = Arc> --- .../receive_and_buffer.rs | 31 +++-- .../transaction_state_container.rs | 119 +++++------------- 2 files changed, 45 insertions(+), 105 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index 7e540041cc56ad..ba6a7d9d278785 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -3,7 +3,7 @@ use { scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics}, transaction_state::TransactionState, transaction_state_container::{ - StateContainer, SuccessfulInsert, TransactionViewStateContainer, + SharedBytes, StateContainer, SuccessfulInsert, TransactionViewStateContainer, }, }, crate::{ @@ -22,7 +22,6 @@ use { transaction_version::TransactionVersion, transaction_view::SanitizedTransactionView, }, arrayvec::ArrayVec, - bytes::Bytes, core::time::Duration, crossbeam_channel::{RecvTimeoutError, TryRecvError}, solana_accounts_db::account_locks::validate_account_locks, @@ -301,7 +300,7 @@ pub(crate) struct TransactionViewReceiveAndBuffer { } impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { - type Transaction = RuntimeTransaction>; + type Transaction = RuntimeTransaction>; type Container = TransactionViewStateContainer; fn receive_and_buffer_packets( @@ -416,13 +415,10 @@ impl TransactionViewReceiveAndBuffer { num_received += 1; // Reserve free-space to copy packet into, run sanitization checks, and insert. - if container.try_insert_with(|mut bytes| { - // Copy packet data into the buffer, and freeze. - bytes.extend_from_slice(packet_data); - let bytes = bytes.freeze(); - - match Self::try_handle_packet( - bytes.clone(), + if container.try_insert_with_data( + packet_data, + |bytes| match Self::try_handle_packet( + bytes, root_bank, working_bank, alt_resolved_slot, @@ -431,14 +427,14 @@ impl TransactionViewReceiveAndBuffer { ) { Ok(state) => { num_buffered += 1; - Ok(SuccessfulInsert { state, bytes }) + Ok(SuccessfulInsert { state }) } Err(()) => { num_dropped_on_receive += 1; - Err(bytes.try_into_mut().expect("no leaks")) + Err(()) } - } - }) { + }, + ) { num_dropped_on_capacity += 1; }; } @@ -460,15 +456,16 @@ impl TransactionViewReceiveAndBuffer { } fn try_handle_packet( - bytes: Bytes, + bytes: SharedBytes, root_bank: &Bank, working_bank: &Bank, alt_resolved_slot: Slot, sanitized_epoch: Epoch, transaction_account_lock_limit: usize, - ) -> Result>>, ()> { + ) -> Result>>, ()> + { // Parsing and basic sanitization checks - let Ok(view) = SanitizedTransactionView::try_new_sanitized(bytes.clone()) else { + let Ok(view) = SanitizedTransactionView::try_new_sanitized(bytes) else { return Err(()); }; diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index 68a46b9d94492e..14ed0ae23cc05b 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -8,7 +8,6 @@ use { scheduler_messages::TransactionId, }, agave_transaction_view::resolved_transaction_view::ResolvedTransactionView, - bytes::{Bytes, BytesMut}, itertools::MinMaxResult, min_max_heap::MinMaxHeap, slab::Slab, @@ -197,50 +196,26 @@ impl TransactionStateContainer { } } +pub type SharedBytes = Arc>; + /// A wrapper around `TransactionStateContainer` that allows re-uses /// pre-allocated `Bytes` to copy packet data into and use for serialization. /// This is used to avoid allocations in parsing transactions. pub struct TransactionViewStateContainer { - inner: TransactionStateContainer>>, - bytes_buffer: Box<[MaybeBytes]>, -} - -enum MaybeBytes { - None, - Bytes(Bytes), - BytesMut(BytesMut), -} - -impl MaybeBytes { - fn reserve_space(&mut self) -> BytesMut { - match core::mem::replace(self, MaybeBytes::None) { - MaybeBytes::BytesMut(bytes) => bytes, - _ => unreachable!("invalid state"), - } - } - - fn freeze(&mut self, bytes: Bytes) { - debug_assert!(matches!(self, MaybeBytes::None)); - *self = MaybeBytes::Bytes(bytes); - } - - fn free_space(&mut self, mut bytes: BytesMut) { - debug_assert!(matches!(self, MaybeBytes::None)); - bytes.clear(); - *self = MaybeBytes::BytesMut(bytes); - } + inner: TransactionStateContainer>>, + bytes_buffer: Box<[SharedBytes]>, } pub(crate) struct SuccessfulInsert { - pub state: TransactionState>>, - pub bytes: Bytes, + pub state: TransactionState>>, } impl TransactionViewStateContainer { /// Returns true if packet was dropped due to capacity limits. - pub(crate) fn try_insert_with( + pub(crate) fn try_insert_with_data( &mut self, - f: impl FnOnce(BytesMut) -> Result, + data: &[u8], + f: impl FnOnce(SharedBytes) -> Result, ) -> bool { if self.inner.id_to_transaction_state.len() == self.inner.id_to_transaction_state.capacity() { @@ -255,58 +230,38 @@ impl TransactionViewStateContainer { // Get the vacant space in the bytes buffer. let bytes_entry = &mut self.bytes_buffer[transaction_id]; - let bytes = bytes_entry.reserve_space(); + // Assert the entry is unique, then copy the packet data. + { + assert_eq!(Arc::strong_count(bytes_entry), 1, "entry must be unique"); + let bytes = Arc::make_mut(bytes_entry); + + // Clear and copy the packet data into the bytes buffer. + bytes.clear(); + bytes.extend_from_slice(data); + } // Attempt to insert the transaction, storing the frozen bytes back into bytes buffer. - match f(bytes) { - Ok(SuccessfulInsert { state, bytes }) => { + match f(Arc::clone(bytes_entry)) { + Ok(SuccessfulInsert { state }) => { let priority_id = TransactionPriorityId::new(state.priority(), transaction_id); vacant_entry.insert(state); - bytes_entry.freeze(bytes); // Push the transaction into the queue. - self.push_id_into_queue_with_remaining_capacity(priority_id, remaining_capacity) - } - Err(bytes) => { - bytes_entry.free_space(bytes); - false + self.inner + .push_id_into_queue_with_remaining_capacity(priority_id, remaining_capacity) } - } - } - - // This is re-implemented since we need it to call `remove_by_id` on this - // struct rather than `inner`. This is important because we need to return - // the `Bytes` to the pool. - /// Returns true if packet was dropped due to capacity limits. - fn push_id_into_queue_with_remaining_capacity( - &mut self, - priority_id: TransactionPriorityId, - remaining_capacity: usize, - ) -> bool { - if remaining_capacity == 0 { - let popped_id = self.inner.priority_queue.push_pop_min(priority_id); - self.remove_by_id(popped_id.id); - true - } else { - self.inner.priority_queue.push(priority_id); - false + Err(_) => false, } } } -impl StateContainer>> +impl StateContainer>> for TransactionViewStateContainer { fn with_capacity(capacity: usize) -> Self { let inner = TransactionStateContainer::with_capacity(capacity); let bytes_buffer = (0..inner.id_to_transaction_state.capacity()) - .map(|_| { - MaybeBytes::BytesMut({ - let mut bytes = BytesMut::zeroed(PACKET_DATA_SIZE); - bytes.clear(); - bytes - }) - }) + .map(|_| Arc::new(Vec::with_capacity(PACKET_DATA_SIZE))) .collect::>() .into_boxed_slice(); Self { @@ -334,7 +289,8 @@ impl StateContainer>> fn get_mut_transaction_state( &mut self, id: TransactionId, - ) -> Option<&mut TransactionState>>> { + ) -> Option<&mut TransactionState>>> + { self.inner.get_mut_transaction_state(id) } @@ -342,32 +298,19 @@ impl StateContainer>> fn get_transaction_ttl( &self, id: TransactionId, - ) -> Option<&SanitizedTransactionTTL>>> { + ) -> Option<&SanitizedTransactionTTL>>> + { self.inner.get_transaction_ttl(id) } #[inline] fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) -> bool { - self.push_id_into_queue_with_remaining_capacity(priority_id, self.remaining_capacity()) + self.inner.push_id_into_queue(priority_id) } + #[inline] fn remove_by_id(&mut self, id: TransactionId) { - // Remove the entry from the map: - // 1. If it was unprocessed, this will drop the `Bytes` held. - // 2. If it was scheduled, this just marks the entry as removed. - let _ = self.inner.id_to_transaction_state.remove(id); - - // Clear the bytes buffer. - let bytes_entry = &mut self.bytes_buffer[id]; - let MaybeBytes::Bytes(bytes) = core::mem::replace(bytes_entry, MaybeBytes::None) else { - unreachable!("invalid state"); - }; - - // Return the `Bytes` to the pool. - let bytes = bytes - .try_into_mut() - .expect("all `Bytes` instances must be dropped"); - bytes_entry.free_space(bytes); + self.inner.remove_by_id(id); } #[inline]