Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TransactionView: ReceiveAndBuffer #3820

Open
wants to merge 26 commits into
base: master
Choose a base branch
from

Conversation

apfitzge
Copy link

Problem

  • Need to ingest new tx type into banking stage

Summary of Changes

  • Implement a new TransactionViewReceiveAndBuffer to allow banking stage to ingest TransactionView-based transactions
  • Add argument to BankingStage::new to switch between transaction representations
  • Hook up CLI for validator and relevant benchmarks
  • Run all BankingStage and SchedulerController tests using both options

Fixes #

@apfitzge apfitzge force-pushed the new_receive_and_buffer_2024_11_26 branch 4 times, most recently from 2876a3f to c868ff5 Compare November 27, 2024 17:17
type Transaction = RuntimeTransaction<ResolvedTransactionView<Bytes>>;
type Container = TransactionViewStateContainer;

fn receive_and_buffer_packets(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: this does not run the bank.check_transactions because I prioritized safety around not leaking Bytes in the container, which is much trickier if we do batching.
We should probably follow this up with a call to clean immediately after, so that we check age on transactions we actually inserted, which is likely better anyway. However, I'd like to profile with and without that change first.

@apfitzge apfitzge marked this pull request as ready for review December 2, 2024 18:50
@apfitzge apfitzge requested a review from a team as a code owner December 2, 2024 18:50
@apfitzge apfitzge self-assigned this Dec 2, 2024
@apfitzge apfitzge requested a review from bw-solana December 2, 2024 20:23
if container.is_empty()
&& matches!(
decision,
BufferedPacketsDecision::Forward | BufferedPacketsDecision::ForwardAndHold
Copy link
Author

@apfitzge apfitzge Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed Hold branch here. Sometimes after a leader slot, we do not have a bank ready in Consume for our next leader slot. We should still avoid the recv_timeout in that case. Also added a check that container is not empty - if container has items, then we should try to process them instead of sleeping for packets

@apfitzge apfitzge force-pushed the new_receive_and_buffer_2024_11_26 branch from f87bd2f to 2d9cc88 Compare December 3, 2024 17:54
Copy link

@pgarg66 pgarg66 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good from SVM point of view.

@apfitzge apfitzge force-pushed the new_receive_and_buffer_2024_11_26 branch from 2d9cc88 to e310dd7 Compare December 5, 2024 17:06
@apfitzge apfitzge marked this pull request as draft December 5, 2024 22:06
@apfitzge apfitzge force-pushed the new_receive_and_buffer_2024_11_26 branch from 4e927a9 to a8c7a41 Compare December 6, 2024 14:00
@apfitzge apfitzge marked this pull request as ready for review December 6, 2024 15:29
Copy link

@bw-solana bw-solana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No major qualms with what you have. Left a few comments to consider

core/src/banking_stage.rs Outdated Show resolved Hide resolved
core/src/banking_stage.rs Show resolved Hide resolved
validator/src/main.rs Show resolved Hide resolved
};

// Receive packet batches.
const TIMEOUT: Duration = Duration::from_millis(10);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking this will need to be profiled and tweaked. E.g. pull for a certain amount of time or up to some packet limit to make sure we don't spend all our time receiving and cleaning (and thus less time on actually scheduling to feed the workers)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Want to make these more flexible instead of just constants.

After this PR lands, I have #4064 to hoist all the prio-graph constants into a config.
I am hoping to do something similar for the controller's timeouts.

timing_metrics: &mut SchedulerTimingMetrics,
count_metrics: &mut SchedulerCountMetrics,
decision: &BufferedPacketsDecision,
) -> bool {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to add a comment or something indicating what this return bool signifies

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a comment on the trait fn. I guess my editor will show me that comment if I hover this function.
Do you prefer I just copy the description to all (only 2) implementations of this trait?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's in the trait, I think what you have is fine

Copy link

@jstarry jstarry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still reviewing but posting some feedback along the way

Comment on lines 62 to 66
let should_forward = !packet
.as_ref()
.map(|packet| {
packet.original_packet().meta().forwarded()
&& packet.original_packet().meta().is_from_staked_node()
})
.unwrap_or_default();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let should_forward = !packet
.as_ref()
.map(|packet| {
packet.original_packet().meta().forwarded()
&& packet.original_packet().meta().is_from_staked_node()
})
.unwrap_or_default();
let should_forward = packet
.as_ref()
.map(|packet| {
!packet.original_packet().meta().forwarded()
&& packet.original_packet().meta().is_from_staked_node()
})
.unwrap_or_default();

I think we should add some unit tests for this

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&mut self,
transaction_ttl: SanitizedTransactionTTL<Tx>,
packet: Arc<ImmutableDeserializedPacket>,
packet: Option<Arc<ImmutableDeserializedPacket>>,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we never pass None here, how about Option wrapping before passing to TransactionState::new?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah - was originally calling this from the other variant of ReceiveAndBuffer with None, but found a better/safer way.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 14 to 19
impl TransactionData for bytes::Bytes {
#[inline]
fn data(&self) -> &[u8] {
self.as_ref()
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to be used anymore?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah was lazy and about leaving it because we will eventually move to Bytes. But, better to do that when that happens.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bytes.extend_from_slice(data);
}

// Attempt to insert the transaction, storing the frozen bytes back into bytes buffer.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "storing the frozen bytes back into [the] bytes buffer"? Aren't they already in there?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to remove the comment when I updated the code to not use Bytes. Originally, this had a state transition in an enum where we need to move the frozen bytes back into bytes_buffer.

It's 100% simpler this way, and I can just delete this comment since it is just entirely wrong.

Comment on lines 220 to 223
if self.inner.id_to_transaction_state.len() == self.inner.id_to_transaction_state.capacity()
{
return true;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of the code is that it's not actually possible for this condition to be hit, is that right? Every time we insert into the container, we will remove an element if the container is at capacity. So this extra check is just a sanity check? Should we add this sanity check to TransactionStateContainer::insert_new_transaction too?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a sanity check to make sure we're not expanding the Slab. I don't think it makes sense to just fail silently like this - I'll create a helper method and assert that this condition is never hit.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let bytes_entry = &mut self.bytes_buffer[transaction_id];
// Assert the entry is unique, then copy the packet data.
{
assert_eq!(Arc::strong_count(bytes_entry), 1, "entry must be unique");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a safety comment here explaining that the only possible clone is stored in the slab container and is wrapped in a RuntimeTransaction<ResolvedTransactionView<...>> type which doesn't implement Clone and doesn't allow the underlying Arc to be referenced?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 209 to 211
pub(crate) struct SuccessfulInsert {
pub state: TransactionState<RuntimeTransaction<ResolvedTransactionView<SharedBytes>>>,
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: doesn't seem like we need this struct. Maybe just a type alias instead?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// currently have ownership of the transaction, and
// therefore may have a reference to the backing-memory
// that the container expects to be free.
drop(transaction);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this drop actually necessary before the remove still? I think it should get dropped when it goes out of scope right after the remove and we have a mutable reference to container still so there won't be any inserts into the container mixed in

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 artifact of the fact I did the clear-ing in remove_by_id which required mutability of the Arc/Bytes.

No longer necessary!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 580 to 581
warn!(
"Forwarding only supported for `Sdk` transaction struct. Overriding to use `Sdk`."
);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's skip the warning log if transaction_struct was already set to sdk

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apfitzge apfitzge force-pushed the new_receive_and_buffer_2024_11_26 branch from 6ccd99a to 19fb288 Compare December 9, 2024 23:13
fn receive_and_buffer_packets(
&mut self,
container: &mut Self::Container,
timing_metrics: &mut SchedulerTimingMetrics,
count_metrics: &mut SchedulerCountMetrics,
decision: &BufferedPacketsDecision,
) -> bool;
) -> Result<usize, ()>;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was to improve the testing which was seeing some inconsistent failures, due to timing out in this call.

Those tests now loop until this call returns Ok(0) - or an error.

Natural question - Why did this only pop up for TransactionView?

The receive_and_buffer timeout for view is done differently than for sdk types.
The sdk types have a receive timeout, i.e. a time limit to receive packets, and then will deserialize all packets it received regardless of the time that takes.

The view type has an overall timeout so that it does not take longer than specified time for all operations - receive AND parsing.

Since receiving takes almost no time/work, this consistently would receive all the batches sent in the sdk variant. and then it would take however long to deserialize them - whether or not OS scheduler decides to rug the thread doesn't matter. If the view variant of the test gets rugged during parsing, then it can time out and not receive the next message.

@apfitzge apfitzge force-pushed the new_receive_and_buffer_2024_11_26 branch from 05779cd to f2c9d44 Compare December 13, 2024 16:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants