Skip to content

Commit

Permalink
test both ReceiveAndBuffer in scheduler_controller
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge committed Nov 27, 2024
1 parent a66af83 commit 1029f10
Showing 1 changed file with 79 additions and 49 deletions.
128 changes: 79 additions & 49 deletions core/src/banking_stage/transaction_scheduler/scheduler_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,9 @@ mod tests {
scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId},
tests::create_slow_genesis_config,
transaction_scheduler::receive_and_buffer::SanitizedTransactionReceiveAndBuffer,
TransactionViewReceiveAndBuffer,
},
banking_trace::BankingPacketBatch,
banking_trace::{BankingPacketBatch, BankingPacketReceiver},
sigverify::SigverifyTracerPacketStats,
},
crossbeam_channel::{unbounded, Receiver, Sender},
Expand All @@ -456,21 +457,15 @@ mod tests {
solana_perf::packet::{to_packet_batches, PacketBatch, NUM_PACKETS},
solana_poh::poh_recorder::{PohRecorder, Record, WorkingBankEntry},
solana_runtime::bank::Bank,
solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
solana_runtime_transaction::transaction_meta::StaticMeta,
solana_sdk::{
compute_budget::ComputeBudgetInstruction,
fee_calculator::FeeRateGovernor,
hash::Hash,
message::Message,
poh_config::PohConfig,
pubkey::Pubkey,
signature::Keypair,
signer::Signer,
system_instruction, system_transaction,
transaction::{SanitizedTransaction, Transaction},
compute_budget::ComputeBudgetInstruction, fee_calculator::FeeRateGovernor, hash::Hash,
message::Message, poh_config::PohConfig, pubkey::Pubkey, signature::Keypair,
signer::Signer, system_instruction, system_transaction, transaction::Transaction,
},
std::sync::{atomic::AtomicBool, Arc, RwLock},
tempfile::TempDir,
test_case::test_case,
};

fn create_channels<T>(num: usize) -> (Vec<Sender<T>>, Vec<Receiver<T>>) {
Expand All @@ -479,7 +474,7 @@ mod tests {

// Helper struct to create tests that hold channels, files, etc.
// such that our tests can be more easily set up and run.
struct TestFrame {
struct TestFrame<Tx> {
bank: Arc<Bank>,
mint_keypair: Keypair,
_ledger_path: TempDir,
Expand All @@ -488,18 +483,38 @@ mod tests {
poh_recorder: Arc<RwLock<PohRecorder>>,
banking_packet_sender: Sender<Arc<(Vec<PacketBatch>, Option<SigverifyTracerPacketStats>)>>,

consume_work_receivers:
Vec<Receiver<ConsumeWork<RuntimeTransaction<SanitizedTransaction>>>>,
finished_consume_work_sender:
Sender<FinishedConsumeWork<RuntimeTransaction<SanitizedTransaction>>>,
consume_work_receivers: Vec<Receiver<ConsumeWork<Tx>>>,
finished_consume_work_sender: Sender<FinishedConsumeWork<Tx>>,
}

fn test_create_sanitized_transaction_receive_and_buffer(
receiver: BankingPacketReceiver,
bank_forks: Arc<RwLock<BankForks>>,
) -> SanitizedTransactionReceiveAndBuffer {
SanitizedTransactionReceiveAndBuffer::new(
PacketDeserializer::new(receiver),
bank_forks,
false,
)
}

fn test_create_transaction_view_receive_and_buffer(
receiver: BankingPacketReceiver,
bank_forks: Arc<RwLock<BankForks>>,
) -> TransactionViewReceiveAndBuffer {
TransactionViewReceiveAndBuffer {
receiver,
bank_forks,
}
}

#[allow(clippy::type_complexity)]
fn create_test_frame(
fn create_test_frame<R: ReceiveAndBuffer>(
num_threads: usize,
create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc<RwLock<BankForks>>) -> R,
) -> (
TestFrame,
SchedulerController<Arc<ClusterInfo>, SanitizedTransactionReceiveAndBuffer>,
TestFrame<R::Transaction>,
SchedulerController<Arc<ClusterInfo>, R>,
) {
let GenesisConfigInfo {
mut genesis_config,
Expand Down Expand Up @@ -527,7 +542,8 @@ mod tests {
let decision_maker = DecisionMaker::new(Pubkey::new_unique(), poh_recorder.clone());

let (banking_packet_sender, banking_packet_receiver) = unbounded();
let packet_deserializer = PacketDeserializer::new(banking_packet_receiver);
let receive_and_buffer =
create_receive_and_buffer(banking_packet_receiver, bank_forks.clone());

let (consume_work_senders, consume_work_receivers) = create_channels(num_threads);
let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded();
Expand All @@ -544,12 +560,6 @@ mod tests {
finished_consume_work_sender,
};

let receive_and_buffer = SanitizedTransactionReceiveAndBuffer::new(
packet_deserializer,
bank_forks.clone(),
false,
);

let scheduler_controller = SchedulerController::new(
decision_maker,
receive_and_buffer,
Expand Down Expand Up @@ -601,10 +611,7 @@ mod tests {
// In the tests, the decision will not become stale, so it is more convenient
// to receive first and then schedule.
fn test_receive_then_schedule(
scheduler_controller: &mut SchedulerController<
Arc<ClusterInfo>,
SanitizedTransactionReceiveAndBuffer,
>,
scheduler_controller: &mut SchedulerController<Arc<ClusterInfo>, impl ReceiveAndBuffer>,
) {
let decision = scheduler_controller
.decision_maker
Expand All @@ -615,10 +622,13 @@ mod tests {
assert!(scheduler_controller.process_transactions(&decision).is_ok());
}

#[test]
#[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")]
#[test_case(test_create_transaction_view_receive_and_buffer; "View")]
#[should_panic(expected = "batch id 0 is not being tracked")]
fn test_unexpected_batch_id() {
let (test_frame, scheduler_controller) = create_test_frame(1);
fn test_unexpected_batch_id<R: ReceiveAndBuffer>(
create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc<RwLock<BankForks>>) -> R,
) {
let (test_frame, scheduler_controller) = create_test_frame(1, create_receive_and_buffer);
let TestFrame {
finished_consume_work_sender,
..
Expand All @@ -639,9 +649,13 @@ mod tests {
scheduler_controller.run().unwrap();
}

#[test]
fn test_schedule_consume_single_threaded_no_conflicts() {
let (test_frame, mut scheduler_controller) = create_test_frame(1);
#[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")]
#[test_case(test_create_transaction_view_receive_and_buffer; "View")]
fn test_schedule_consume_single_threaded_no_conflicts<R: ReceiveAndBuffer>(
create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc<RwLock<BankForks>>) -> R,
) {
let (test_frame, mut scheduler_controller) =
create_test_frame(1, create_receive_and_buffer);
let TestFrame {
bank,
mint_keypair,
Expand Down Expand Up @@ -695,9 +709,13 @@ mod tests {
assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]);
}

#[test]
fn test_schedule_consume_single_threaded_conflict() {
let (test_frame, mut scheduler_controller) = create_test_frame(1);
#[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")]
#[test_case(test_create_transaction_view_receive_and_buffer; "View")]
fn test_schedule_consume_single_threaded_conflict<R: ReceiveAndBuffer>(
create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc<RwLock<BankForks>>) -> R,
) {
let (test_frame, mut scheduler_controller) =
create_test_frame(1, create_receive_and_buffer);
let TestFrame {
bank,
mint_keypair,
Expand Down Expand Up @@ -754,9 +772,13 @@ mod tests {
assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]);
}

#[test]
fn test_schedule_consume_single_threaded_multi_batch() {
let (test_frame, mut scheduler_controller) = create_test_frame(1);
#[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")]
#[test_case(test_create_transaction_view_receive_and_buffer; "View")]
fn test_schedule_consume_single_threaded_multi_batch<R: ReceiveAndBuffer>(
create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc<RwLock<BankForks>>) -> R,
) {
let (test_frame, mut scheduler_controller) =
create_test_frame(1, create_receive_and_buffer);
let TestFrame {
bank,
mint_keypair,
Expand Down Expand Up @@ -818,9 +840,13 @@ mod tests {
);
}

#[test]
fn test_schedule_consume_simple_thread_selection() {
let (test_frame, mut scheduler_controller) = create_test_frame(2);
#[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")]
#[test_case(test_create_transaction_view_receive_and_buffer; "View")]
fn test_schedule_consume_simple_thread_selection<R: ReceiveAndBuffer>(
create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc<RwLock<BankForks>>) -> R,
) {
let (test_frame, mut scheduler_controller) =
create_test_frame(2, create_receive_and_buffer);
let TestFrame {
bank,
mint_keypair,
Expand Down Expand Up @@ -885,9 +911,13 @@ mod tests {
assert_eq!(t1_actual, t1_expected);
}

#[test]
fn test_schedule_consume_retryable() {
let (test_frame, mut scheduler_controller) = create_test_frame(1);
#[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")]
#[test_case(test_create_transaction_view_receive_and_buffer; "View")]
fn test_schedule_consume_retryable<R: ReceiveAndBuffer>(
create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc<RwLock<BankForks>>) -> R,
) {
let (test_frame, mut scheduler_controller) =
create_test_frame(1, create_receive_and_buffer);
let TestFrame {
bank,
mint_keypair,
Expand Down

0 comments on commit 1029f10

Please sign in to comment.