Skip to content

Commit

Permalink
SharedBytes = Arc<Vec<u8>>
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge committed Dec 5, 2024
1 parent f38e093 commit 4e927a9
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 105 deletions.
31 changes: 14 additions & 17 deletions core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics},
transaction_state::TransactionState,
transaction_state_container::{
StateContainer, SuccessfulInsert, TransactionViewStateContainer,
SharedBytes, StateContainer, SuccessfulInsert, TransactionViewStateContainer,
},
},
crate::{
Expand All @@ -22,7 +22,6 @@ use {
transaction_version::TransactionVersion, transaction_view::SanitizedTransactionView,
},
arrayvec::ArrayVec,
bytes::Bytes,
core::time::Duration,
crossbeam_channel::{RecvTimeoutError, TryRecvError},
solana_accounts_db::account_locks::validate_account_locks,
Expand Down Expand Up @@ -301,7 +300,7 @@ pub(crate) struct TransactionViewReceiveAndBuffer {
}

impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer {
type Transaction = RuntimeTransaction<ResolvedTransactionView<Bytes>>;
type Transaction = RuntimeTransaction<ResolvedTransactionView<SharedBytes>>;
type Container = TransactionViewStateContainer;

fn receive_and_buffer_packets(
Expand Down Expand Up @@ -416,13 +415,10 @@ impl TransactionViewReceiveAndBuffer {
num_received += 1;

// Reserve free-space to copy packet into, run sanitization checks, and insert.
if container.try_insert_with(|mut bytes| {
// Copy packet data into the buffer, and freeze.
bytes.extend_from_slice(packet_data);
let bytes = bytes.freeze();

match Self::try_handle_packet(
bytes.clone(),
if container.try_insert_with_data(
packet_data,
|bytes| match Self::try_handle_packet(
bytes,
root_bank,
working_bank,
alt_resolved_slot,
Expand All @@ -431,14 +427,14 @@ impl TransactionViewReceiveAndBuffer {
) {
Ok(state) => {
num_buffered += 1;
Ok(SuccessfulInsert { state, bytes })
Ok(SuccessfulInsert { state })
}
Err(()) => {
num_dropped_on_receive += 1;
Err(bytes.try_into_mut().expect("no leaks"))
Err(())
}
}
}) {
},
) {
num_dropped_on_capacity += 1;
};
}
Expand All @@ -460,15 +456,16 @@ impl TransactionViewReceiveAndBuffer {
}

fn try_handle_packet(
bytes: Bytes,
bytes: SharedBytes,
root_bank: &Bank,
working_bank: &Bank,
alt_resolved_slot: Slot,
sanitized_epoch: Epoch,
transaction_account_lock_limit: usize,
) -> Result<TransactionState<RuntimeTransaction<ResolvedTransactionView<Bytes>>>, ()> {
) -> Result<TransactionState<RuntimeTransaction<ResolvedTransactionView<SharedBytes>>>, ()>
{
// Parsing and basic sanitization checks
let Ok(view) = SanitizedTransactionView::try_new_sanitized(bytes.clone()) else {
let Ok(view) = SanitizedTransactionView::try_new_sanitized(bytes) else {
return Err(());
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use {
scheduler_messages::TransactionId,
},
agave_transaction_view::resolved_transaction_view::ResolvedTransactionView,
bytes::{Bytes, BytesMut},
itertools::MinMaxResult,
min_max_heap::MinMaxHeap,
slab::Slab,
Expand Down Expand Up @@ -197,50 +196,26 @@ impl<Tx: TransactionWithMeta> TransactionStateContainer<Tx> {
}
}

pub type SharedBytes = Arc<Vec<u8>>;

/// A wrapper around `TransactionStateContainer` that allows re-uses
/// pre-allocated `Bytes` to copy packet data into and use for serialization.
/// This is used to avoid allocations in parsing transactions.
pub struct TransactionViewStateContainer {
inner: TransactionStateContainer<RuntimeTransaction<ResolvedTransactionView<Bytes>>>,
bytes_buffer: Box<[MaybeBytes]>,
}

enum MaybeBytes {
None,
Bytes(Bytes),
BytesMut(BytesMut),
}

impl MaybeBytes {
fn reserve_space(&mut self) -> BytesMut {
match core::mem::replace(self, MaybeBytes::None) {
MaybeBytes::BytesMut(bytes) => bytes,
_ => unreachable!("invalid state"),
}
}

fn freeze(&mut self, bytes: Bytes) {
debug_assert!(matches!(self, MaybeBytes::None));
*self = MaybeBytes::Bytes(bytes);
}

fn free_space(&mut self, mut bytes: BytesMut) {
debug_assert!(matches!(self, MaybeBytes::None));
bytes.clear();
*self = MaybeBytes::BytesMut(bytes);
}
inner: TransactionStateContainer<RuntimeTransaction<ResolvedTransactionView<SharedBytes>>>,
bytes_buffer: Box<[SharedBytes]>,
}

pub(crate) struct SuccessfulInsert {
pub state: TransactionState<RuntimeTransaction<ResolvedTransactionView<Bytes>>>,
pub bytes: Bytes,
pub state: TransactionState<RuntimeTransaction<ResolvedTransactionView<SharedBytes>>>,
}

impl TransactionViewStateContainer {
/// Returns true if packet was dropped due to capacity limits.
pub(crate) fn try_insert_with(
pub(crate) fn try_insert_with_data(
&mut self,
f: impl FnOnce(BytesMut) -> Result<SuccessfulInsert, BytesMut>,
data: &[u8],
f: impl FnOnce(SharedBytes) -> Result<SuccessfulInsert, ()>,
) -> bool {
if self.inner.id_to_transaction_state.len() == self.inner.id_to_transaction_state.capacity()
{
Expand All @@ -255,58 +230,38 @@ impl TransactionViewStateContainer {

// Get the vacant space in the bytes buffer.
let bytes_entry = &mut self.bytes_buffer[transaction_id];
let bytes = bytes_entry.reserve_space();
// Assert the entry is unique, then copy the packet data.
{
assert_eq!(Arc::strong_count(bytes_entry), 1, "entry must be unique");
let bytes = Arc::make_mut(bytes_entry);

// Clear and copy the packet data into the bytes buffer.
bytes.clear();
bytes.extend_from_slice(data);
}

// Attempt to insert the transaction, storing the frozen bytes back into bytes buffer.
match f(bytes) {
Ok(SuccessfulInsert { state, bytes }) => {
match f(Arc::clone(bytes_entry)) {
Ok(SuccessfulInsert { state }) => {
let priority_id = TransactionPriorityId::new(state.priority(), transaction_id);
vacant_entry.insert(state);
bytes_entry.freeze(bytes);

// Push the transaction into the queue.
self.push_id_into_queue_with_remaining_capacity(priority_id, remaining_capacity)
}
Err(bytes) => {
bytes_entry.free_space(bytes);
false
self.inner
.push_id_into_queue_with_remaining_capacity(priority_id, remaining_capacity)
}
}
}

// This is re-implemented since we need it to call `remove_by_id` on this
// struct rather than `inner`. This is important because we need to return
// the `Bytes` to the pool.
/// Returns true if packet was dropped due to capacity limits.
fn push_id_into_queue_with_remaining_capacity(
&mut self,
priority_id: TransactionPriorityId,
remaining_capacity: usize,
) -> bool {
if remaining_capacity == 0 {
let popped_id = self.inner.priority_queue.push_pop_min(priority_id);
self.remove_by_id(popped_id.id);
true
} else {
self.inner.priority_queue.push(priority_id);
false
Err(_) => false,
}
}
}

impl StateContainer<RuntimeTransaction<ResolvedTransactionView<Bytes>>>
impl StateContainer<RuntimeTransaction<ResolvedTransactionView<SharedBytes>>>
for TransactionViewStateContainer
{
fn with_capacity(capacity: usize) -> Self {
let inner = TransactionStateContainer::with_capacity(capacity);
let bytes_buffer = (0..inner.id_to_transaction_state.capacity())
.map(|_| {
MaybeBytes::BytesMut({
let mut bytes = BytesMut::zeroed(PACKET_DATA_SIZE);
bytes.clear();
bytes
})
})
.map(|_| Arc::new(Vec::with_capacity(PACKET_DATA_SIZE)))
.collect::<Vec<_>>()
.into_boxed_slice();
Self {
Expand Down Expand Up @@ -334,40 +289,28 @@ impl StateContainer<RuntimeTransaction<ResolvedTransactionView<Bytes>>>
fn get_mut_transaction_state(
&mut self,
id: TransactionId,
) -> Option<&mut TransactionState<RuntimeTransaction<ResolvedTransactionView<Bytes>>>> {
) -> Option<&mut TransactionState<RuntimeTransaction<ResolvedTransactionView<SharedBytes>>>>
{
self.inner.get_mut_transaction_state(id)
}

#[inline]
fn get_transaction_ttl(
&self,
id: TransactionId,
) -> Option<&SanitizedTransactionTTL<RuntimeTransaction<ResolvedTransactionView<Bytes>>>> {
) -> Option<&SanitizedTransactionTTL<RuntimeTransaction<ResolvedTransactionView<SharedBytes>>>>
{
self.inner.get_transaction_ttl(id)
}

#[inline]
fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) -> bool {
self.push_id_into_queue_with_remaining_capacity(priority_id, self.remaining_capacity())
self.inner.push_id_into_queue(priority_id)
}

#[inline]
fn remove_by_id(&mut self, id: TransactionId) {
// Remove the entry from the map:
// 1. If it was unprocessed, this will drop the `Bytes` held.
// 2. If it was scheduled, this just marks the entry as removed.
let _ = self.inner.id_to_transaction_state.remove(id);

// Clear the bytes buffer.
let bytes_entry = &mut self.bytes_buffer[id];
let MaybeBytes::Bytes(bytes) = core::mem::replace(bytes_entry, MaybeBytes::None) else {
unreachable!("invalid state");
};

// Return the `Bytes` to the pool.
let bytes = bytes
.try_into_mut()
.expect("all `Bytes` instances must be dropped");
bytes_entry.free_space(bytes);
self.inner.remove_by_id(id);
}

#[inline]
Expand Down

0 comments on commit 4e927a9

Please sign in to comment.