-
Notifications
You must be signed in to change notification settings - Fork 256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TransactionView: ReceiveAndBuffer #3820
base: master
Are you sure you want to change the base?
TransactionView: ReceiveAndBuffer #3820
Conversation
2876a3f
to
c868ff5
Compare
type Transaction = RuntimeTransaction<ResolvedTransactionView<Bytes>>; | ||
type Container = TransactionViewStateContainer; | ||
|
||
fn receive_and_buffer_packets( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: this does not run the bank.check_transactions
because I prioritized safety around not leaking Bytes
in the container, which is much trickier if we do batching.
We should probably follow this up with a call to clean immediately after, so that we check age on transactions we actually inserted, which is likely better anyway. However, I'd like to profile with and without that change first.
if container.is_empty() | ||
&& matches!( | ||
decision, | ||
BufferedPacketsDecision::Forward | BufferedPacketsDecision::ForwardAndHold |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed Hold
branch here. Sometimes after a leader slot, we do not have a bank ready in Consume
for our next leader slot. We should still avoid the recv_timeout
in that case. Also added a check that container is not empty - if container has items, then we should try to process them instead of sleeping for packets
f87bd2f
to
2d9cc88
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good from SVM point of view.
2d9cc88
to
e310dd7
Compare
4e927a9
to
a8c7a41
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No major qualms with what you have. Left a few comments to consider
}; | ||
|
||
// Receive packet batches. | ||
const TIMEOUT: Duration = Duration::from_millis(10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking this will need to be profiled and tweaked. E.g. pull for a certain amount of time or up to some packet limit to make sure we don't spend all our time receiving and cleaning (and thus less time on actually scheduling to feed the workers)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Want to make these more flexible instead of just constants.
After this PR lands, I have #4064 to hoist all the prio-graph constants into a config.
I am hoping to do something similar for the controller's timeouts.
timing_metrics: &mut SchedulerTimingMetrics, | ||
count_metrics: &mut SchedulerCountMetrics, | ||
decision: &BufferedPacketsDecision, | ||
) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice to add a comment or something indicating what this return bool signifies
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a comment on the trait fn. I guess my editor will show me that comment if I hover this function.
Do you prefer I just copy the description to all (only 2) implementations of this trait?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's in the trait, I think what you have is fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still reviewing but posting some feedback along the way
let should_forward = !packet | ||
.as_ref() | ||
.map(|packet| { | ||
packet.original_packet().meta().forwarded() | ||
&& packet.original_packet().meta().is_from_staked_node() | ||
}) | ||
.unwrap_or_default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let should_forward = !packet | |
.as_ref() | |
.map(|packet| { | |
packet.original_packet().meta().forwarded() | |
&& packet.original_packet().meta().is_from_staked_node() | |
}) | |
.unwrap_or_default(); | |
let should_forward = packet | |
.as_ref() | |
.map(|packet| { | |
!packet.original_packet().meta().forwarded() | |
&& packet.original_packet().meta().is_from_staked_node() | |
}) | |
.unwrap_or_default(); |
I think we should add some unit tests for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
&mut self, | ||
transaction_ttl: SanitizedTransactionTTL<Tx>, | ||
packet: Arc<ImmutableDeserializedPacket>, | ||
packet: Option<Arc<ImmutableDeserializedPacket>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we never pass None
here, how about Option
wrapping before passing to TransactionState::new
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah - was originally calling this from the other variant of ReceiveAndBuffer
with None
, but found a better/safer way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impl TransactionData for bytes::Bytes { | ||
#[inline] | ||
fn data(&self) -> &[u8] { | ||
self.as_ref() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to be used anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah was lazy and about leaving it because we will eventually move to Bytes. But, better to do that when that happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bytes.extend_from_slice(data); | ||
} | ||
|
||
// Attempt to insert the transaction, storing the frozen bytes back into bytes buffer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by "storing the frozen bytes back into [the] bytes buffer"? Aren't they already in there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgot to remove the comment when I updated the code to not use Bytes
. Originally, this had a state transition in an enum where we need to move the frozen bytes back into bytes_buffer
.
It's 100% simpler this way, and I can just delete this comment since it is just entirely wrong.
if self.inner.id_to_transaction_state.len() == self.inner.id_to_transaction_state.capacity() | ||
{ | ||
return true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding of the code is that it's not actually possible for this condition to be hit, is that right? Every time we insert into the container, we will remove an element if the container is at capacity. So this extra check is just a sanity check? Should we add this sanity check to TransactionStateContainer::insert_new_transaction
too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a sanity check to make sure we're not expanding the Slab
. I don't think it makes sense to just fail silently like this - I'll create a helper method and assert that this condition is never hit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let bytes_entry = &mut self.bytes_buffer[transaction_id]; | ||
// Assert the entry is unique, then copy the packet data. | ||
{ | ||
assert_eq!(Arc::strong_count(bytes_entry), 1, "entry must be unique"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a safety comment here explaining that the only possible clone is stored in the slab container and is wrapped in a RuntimeTransaction<ResolvedTransactionView<...>>
type which doesn't implement Clone
and doesn't allow the underlying Arc
to be referenced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub(crate) struct SuccessfulInsert { | ||
pub state: TransactionState<RuntimeTransaction<ResolvedTransactionView<SharedBytes>>>, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: doesn't seem like we need this struct. Maybe just a type alias instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// currently have ownership of the transaction, and | ||
// therefore may have a reference to the backing-memory | ||
// that the container expects to be free. | ||
drop(transaction); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this drop actually necessary before the remove still? I think it should get dropped when it goes out of scope right after the remove and we have a mutable reference to container
still so there won't be any inserts into the container mixed in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 artifact of the fact I did the clear
-ing in remove_by_id
which required mutability of the Arc/Bytes.
No longer necessary!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
core/src/banking_stage.rs
Outdated
warn!( | ||
"Forwarding only supported for `Sdk` transaction struct. Overriding to use `Sdk`." | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's skip the warning log if transaction_struct
was already set to sdk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
6ccd99a
to
19fb288
Compare
fn receive_and_buffer_packets( | ||
&mut self, | ||
container: &mut Self::Container, | ||
timing_metrics: &mut SchedulerTimingMetrics, | ||
count_metrics: &mut SchedulerCountMetrics, | ||
decision: &BufferedPacketsDecision, | ||
) -> bool; | ||
) -> Result<usize, ()>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change was to improve the testing which was seeing some inconsistent failures, due to timing out in this call.
Those tests now loop until this call returns Ok(0) - or an error.
Natural question - Why did this only pop up for TransactionView
?
The receive_and_buffer timeout for view is done differently than for sdk types.
The sdk types have a receive timeout, i.e. a time limit to receive packets, and then will deserialize all packets it received regardless of the time that takes.
The view type has an overall timeout so that it does not take longer than specified time for all operations - receive AND parsing.
Since receiving takes almost no time/work, this consistently would receive all the batches sent in the sdk
variant. and then it would take however long to deserialize them - whether or not OS scheduler decides to rug the thread doesn't matter. If the view variant of the test gets rugged during parsing, then it can time out and not receive the next message.
2b8807d
to
d579b84
Compare
d579b84
to
05779cd
Compare
05779cd
to
f2c9d44
Compare
Problem
Summary of Changes
TransactionViewReceiveAndBuffer
to allow banking stage to ingestTransactionView
-based transactionsFixes #