Skip to content

Commit

Permalink
Add tests of full/missing slots for MultiProducer.
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholassm committed Jun 23, 2024
1 parent a25da9e commit b8595ee
Showing 1 changed file with 89 additions and 5 deletions.
94 changes: 89 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ mod tests {
}

#[test]
fn full_ringbuffer() {
fn spsc_full_ringbuffer() {
let (s, r) = mpsc::channel();
let barrier = Arc::new(AtomicBool::new(true));
let processor = {
Expand Down Expand Up @@ -292,7 +292,43 @@ mod tests {
}

#[test]
fn insufficient_space_for_batch_publication() {
fn mpsc_full_ringbuffer() {
let (s, r) = mpsc::channel();
let barrier = Arc::new(AtomicBool::new(true));
let processor = {
let barrier = Arc::clone(&barrier);
move |e: &Event, _, _| {
while barrier.load(Relaxed) { /* Wait. */ }
s.send(e.num).expect("Should be able to send.");
}
};
let mut producer1 = build_multi_producer(4, factory(), BusySpinWithSpinLoopHint)
.handle_events_with(processor)
.build();

let mut producer2 = producer1.clone();

for i in 0..4 {
producer1.try_publish(|e| e.num = i).expect("Should publish");
}

// Now ring buffer is full.
assert_eq!(RingBufferFull, producer1.try_publish(|e| e.num = 4).err().unwrap());
// And it is full also as seen from second producer.
assert_eq!(RingBufferFull, producer2.try_publish(|e| e.num = 4).err().unwrap());
// Until the processor continues reading events.
barrier.store(false, Relaxed);
producer1.publish(|e| e.num = 4);
producer2.publish(|e| e.num = 5);

drop(producer1);
drop(producer2);
let result: Vec<_> = r.iter().collect();
assert_eq!(result, [0, 1, 2, 3, 4, 5]);
}

#[test]
fn spsc_insufficient_space_for_batch_publication() {
let (s, r) = mpsc::channel();
let barrier = Arc::new(AtomicBool::new(true));
let processor = {
Expand All @@ -309,14 +345,62 @@ mod tests {
for i in 0..2 {
producer.publish(|e| e.num = i);
}
assert_eq!(MissingFreeSlots(2), producer.try_batch_publish(4, |_iter| {} ).err().unwrap());
assert_eq!(MissingFreeSlots(2), producer.try_batch_publish( 4, |_iter| {} ).err().unwrap());
assert_eq!(MissingFreeSlots(100), producer.try_batch_publish(102, |_iter| {} ).err().unwrap());

barrier.store(false, Relaxed);
drop(producer);
producer.try_batch_publish(2, |iter| {
for e in iter {
e.num = 2;
}
}).expect("Batch publication should now succeed.");

drop(producer);
let result: Vec<_> = r.iter().collect();
assert_eq!(result, [0, 1]);
assert_eq!(result, [0, 1, 2, 2]);
}

#[test]
fn mpsc_insufficient_space_for_batch_publication() {
let (s, r) = mpsc::channel();
let barrier = Arc::new(AtomicBool::new(true));
let processor = {
let barrier = Arc::clone(&barrier);
move |e: &Event, _, _| {
while barrier.load(Relaxed) { /* Wait. */ }
s.send(e.num).expect("Should be able to send.");
}
};
let mut producer1 = build_multi_producer(8, factory(), BusySpin)
.handle_events_with(processor)
.build();
let mut producer2 = producer1.clone();

for i in 0..2 {
producer1.publish(|e| e.num = i);
}
assert_eq!(MissingFreeSlots(2), producer1.try_batch_publish( 8, |_iter| {} ).err().unwrap());
assert_eq!(MissingFreeSlots(100), producer1.try_batch_publish(106, |_iter| {} ).err().unwrap());
assert_eq!(MissingFreeSlots(2), producer2.try_batch_publish( 8, |_iter| {} ).err().unwrap());
assert_eq!(MissingFreeSlots(100), producer2.try_batch_publish(106, |_iter| {} ).err().unwrap());

barrier.store(false, Relaxed);
producer1.try_batch_publish(2, |iter| {
for e in iter {
e.num = 2;
}
}).expect("Batch publication should now succeed.");
producer2.try_batch_publish(2, |iter| {
for e in iter {
e.num = 3;
}
}).expect("Batch publication should now succeed.");

drop(producer1);
drop(producer2);
let mut result: Vec<_> = r.iter().collect();
result.sort();
assert_eq!(result, [0, 1, 2, 2, 3, 3]);
}

#[test]
Expand Down

0 comments on commit b8595ee

Please sign in to comment.