Skip to content

Commit

Permalink
Remove timeout from testing
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge committed Dec 11, 2024
1 parent 2db5605 commit d579b84
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 22 deletions.
43 changes: 24 additions & 19 deletions core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ pub(crate) trait ReceiveAndBuffer {
type Transaction: TransactionWithMeta + Send + Sync;
type Container: StateContainer<Self::Transaction> + Send + Sync;

/// Returns false only if no packets were received
/// AND the receiver is disconnected.
/// Return Err if the receiver is disconnected AND no packets were
/// received. Otherwise return Ok(num_received).
fn receive_and_buffer_packets(
&mut self,
container: &mut Self::Container,
timing_metrics: &mut SchedulerTimingMetrics,
count_metrics: &mut SchedulerCountMetrics,
decision: &BufferedPacketsDecision,
) -> bool;
) -> Result<usize, ()>;
}

pub(crate) struct SanitizedTransactionReceiveAndBuffer {
Expand All @@ -81,7 +81,7 @@ impl ReceiveAndBuffer for SanitizedTransactionReceiveAndBuffer {
timing_metrics: &mut SchedulerTimingMetrics,
count_metrics: &mut SchedulerCountMetrics,
decision: &BufferedPacketsDecision,
) -> bool {
) -> Result<usize, ()> {
let remaining_queue_capacity = container.remaining_capacity();

const MAX_PACKET_RECEIVE_TIME: Duration = Duration::from_millis(10);
Expand Down Expand Up @@ -111,7 +111,7 @@ impl ReceiveAndBuffer for SanitizedTransactionReceiveAndBuffer {
saturating_add_assign!(timing_metrics.receive_time_us, receive_time_us);
});

match received_packet_results {
let num_received = match received_packet_results {
Ok(receive_packet_results) => {
let num_received_packets = receive_packet_results.deserialized_packets.len();

Expand All @@ -137,12 +137,13 @@ impl ReceiveAndBuffer for SanitizedTransactionReceiveAndBuffer {
);
});
}
num_received_packets
}
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => return false,
}
Err(RecvTimeoutError::Timeout) => 0,
Err(RecvTimeoutError::Disconnected) => return Err(()),
};

true
Ok(num_received)
}
}

Expand Down Expand Up @@ -309,7 +310,7 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer {
timing_metrics: &mut SchedulerTimingMetrics,
count_metrics: &mut SchedulerCountMetrics,
decision: &BufferedPacketsDecision,
) -> bool {
) -> Result<usize, ()> {
let (root_bank, working_bank) = {
let bank_forks = self.bank_forks.read().unwrap();
let root_bank = bank_forks.root_bank();
Expand All @@ -320,6 +321,7 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer {
// Receive packet batches.
const TIMEOUT: Duration = Duration::from_millis(10);
let start = Instant::now();
let mut num_received = 0;
let mut received_message = false;

// If not leader/unknown, do a blocking-receive initially. This lets
Expand All @@ -338,7 +340,7 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer {
match self.receiver.recv_timeout(TIMEOUT) {
Ok(packet_batch_message) => {
received_message = true;
self.handle_packet_batch_message(
num_received += self.handle_packet_batch_message(
container,
timing_metrics,
count_metrics,
Expand All @@ -348,9 +350,9 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer {
packet_batch_message,
);
}
Err(RecvTimeoutError::Timeout) => return true,
Err(RecvTimeoutError::Timeout) => return Ok(num_received),
Err(RecvTimeoutError::Disconnected) => {
return received_message;
return received_message.then_some(num_received).ok_or(());
}
}
}
Expand All @@ -359,7 +361,7 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer {
match self.receiver.try_recv() {
Ok(packet_batch_message) => {
received_message = true;
self.handle_packet_batch_message(
num_received += self.handle_packet_batch_message(
container,
timing_metrics,
count_metrics,
Expand All @@ -369,18 +371,19 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer {
packet_batch_message,
);
}
Err(TryRecvError::Empty) => return true,
Err(TryRecvError::Empty) => return Ok(num_received),
Err(TryRecvError::Disconnected) => {
return received_message;
return received_message.then_some(num_received).ok_or(());
}
}
}

true
Ok(num_received)
}
}

impl TransactionViewReceiveAndBuffer {
/// Return number of received packets.
fn handle_packet_batch_message(
&mut self,
container: &mut TransactionViewStateContainer,
Expand All @@ -390,10 +393,10 @@ impl TransactionViewReceiveAndBuffer {
root_bank: &Bank,
working_bank: &Bank,
packet_batch_message: BankingPacketBatch,
) {
) -> usize {
// Do not support forwarding - only add support for this if we really need it.
if matches!(decision, BufferedPacketsDecision::Forward) {
return;
return 0;
}

let start = Instant::now();
Expand Down Expand Up @@ -453,6 +456,8 @@ impl TransactionViewReceiveAndBuffer {
);
saturating_add_assign!(count_metrics.num_dropped_on_receive, num_dropped_on_receive);
});

num_received
}

fn try_handle_packet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl<C: LikeClusterInfo, R: ReceiveAndBuffer> SchedulerController<C, R> {

self.process_transactions(&decision)?;
self.receive_completed()?;
if !self.receive_and_buffer_packets(&decision) {
if self.receive_and_buffer_packets(&decision).is_err() {
break;
}
// Report metrics only if there is data.
Expand Down Expand Up @@ -421,7 +421,10 @@ impl<C: LikeClusterInfo, R: ReceiveAndBuffer> SchedulerController<C, R> {
}

/// Returns whether the packet receiver is still connected.
fn receive_and_buffer_packets(&mut self, decision: &BufferedPacketsDecision) -> bool {
fn receive_and_buffer_packets(
&mut self,
decision: &BufferedPacketsDecision,
) -> Result<usize, ()> {
self.receive_and_buffer.receive_and_buffer_packets(
&mut self.container,
&mut self.timing_metrics,
Expand Down Expand Up @@ -618,7 +621,16 @@ mod tests {
.make_consume_or_forward_decision();
assert!(matches!(decision, BufferedPacketsDecision::Consume(_)));
assert!(scheduler_controller.receive_completed().is_ok());
assert!(scheduler_controller.receive_and_buffer_packets(&decision));

// Time is not a reliable way for deterministic testing.
// Loop here until no more packets are received, this avoids parallel
// tests from inconsistently timing out and not receiving
// from the channel.
while scheduler_controller
.receive_and_buffer_packets(&decision)
.map(|n| n > 0)
.unwrap_or_default()
{}
assert!(scheduler_controller.process_transactions(&decision).is_ok());
}

Expand Down

0 comments on commit d579b84

Please sign in to comment.