Skip to content

Commit

Permalink
Add Multi Producer stress test.
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholassm committed Aug 31, 2024
1 parent 410059f commit eaf0a65
Showing 1 changed file with 84 additions and 0 deletions.
84 changes: 84 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,4 +785,88 @@ mod tests {
result.sort();
assert_eq!(vec![1, 1, 3, 3, 5, 5, 11, 12, 21, 22, 31, 32], result);
}

#[test]
fn stress_test() {
#[derive(Debug)]
struct StressEvent {
i: Sequence,
a: i64,
b: i64,
s: String,
}

let (s_seq, r_seq) = mpsc::channel();
let (s_error, r_error) = mpsc::channel();
let num_events = 250_000;
let producers = 4;
let consumers = 3;

let mut processors: Vec<_> = (0..consumers).into_iter().map(|pid| {
let s_error = s_error.clone();
let s_seq = s_seq.clone();
let mut prev_seq = -1;
Some(move |e: &StressEvent, sequence, _| {
if e.a != e.i - 5
|| e.b != e.i + 7
|| e.s != format!("Blackbriar {}", e.i).to_string()
|| sequence != prev_seq + 1 {
s_error.send(1).expect("Should send.");
}
else {
prev_seq = sequence;
let sequence_seen_by_pid = sequence*consumers + pid;
s_seq.send(sequence_seen_by_pid).expect("Should send.");
}
})
}).collect();

// Drop unused Senders.
drop(s_seq);
drop(s_error);

let factory = || {
StressEvent {
i: -1,
a: -1,
b: -1,
s: "".to_string()
}
};

let producer = build_multi_producer(1 << 16, factory, BusySpin)
.handle_events_with(processors[0].take().unwrap())
.handle_events_with(processors[1].take().unwrap())
.handle_events_with(processors[2].take().unwrap())
.build();

thread::scope(|s| {
for _ in 0..producers {
let mut producer = producer.clone();
s.spawn(move || {
for i in 0..num_events {
producer.publish(|e| {
e.i = i;
e.a = i - 5;
e.b = i + 7;
e.s = format!("Blackbriar {}", i).to_string();
});
}
});
}
drop(producer); // Drop excess producer not used.
});

let expected_sequence_reads = consumers*num_events*producers;
let errors: Vec<_> = r_error.iter().collect();
let mut seen_sequences: Vec<_> = r_seq.iter().collect();

assert!(errors.is_empty());
assert_eq!(expected_sequence_reads as usize, seen_sequences.len());
// Assert that each consumer saw each sequence number.
seen_sequences.sort();
for seq_seen_by_pid in 0..expected_sequence_reads {
assert_eq!(seq_seen_by_pid, seen_sequences[seq_seen_by_pid as usize]);
}
}
}

0 comments on commit eaf0a65

Please sign in to comment.