diff --git a/src/lib.rs b/src/lib.rs index b44164e..c516757 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -261,7 +261,7 @@ mod tests { } #[test] - fn full_ringbuffer() { + fn spsc_full_ringbuffer() { let (s, r) = mpsc::channel(); let barrier = Arc::new(AtomicBool::new(true)); let processor = { @@ -292,7 +292,43 @@ mod tests { } #[test] - fn insufficient_space_for_batch_publication() { + fn mpsc_full_ringbuffer() { + let (s, r) = mpsc::channel(); + let barrier = Arc::new(AtomicBool::new(true)); + let processor = { + let barrier = Arc::clone(&barrier); + move |e: &Event, _, _| { + while barrier.load(Relaxed) { /* Wait. */ } + s.send(e.num).expect("Should be able to send."); + } + }; + let mut producer1 = build_multi_producer(4, factory(), BusySpinWithSpinLoopHint) + .handle_events_with(processor) + .build(); + + let mut producer2 = producer1.clone(); + + for i in 0..4 { + producer1.try_publish(|e| e.num = i).expect("Should publish"); + } + + // Now ring buffer is full. + assert_eq!(RingBufferFull, producer1.try_publish(|e| e.num = 4).err().unwrap()); + // And it is full also as seen from second producer. + assert_eq!(RingBufferFull, producer2.try_publish(|e| e.num = 4).err().unwrap()); + // Until the processor continues reading events. + barrier.store(false, Relaxed); + producer1.publish(|e| e.num = 4); + producer2.publish(|e| e.num = 5); + + drop(producer1); + drop(producer2); + let result: Vec<_> = r.iter().collect(); + assert_eq!(result, [0, 1, 2, 3, 4, 5]); + } + + #[test] + fn spsc_insufficient_space_for_batch_publication() { let (s, r) = mpsc::channel(); let barrier = Arc::new(AtomicBool::new(true)); let processor = { @@ -309,14 +345,62 @@ mod tests { for i in 0..2 { producer.publish(|e| e.num = i); } - assert_eq!(MissingFreeSlots(2), producer.try_batch_publish(4, |_iter| {} ).err().unwrap()); + assert_eq!(MissingFreeSlots(2), producer.try_batch_publish( 4, |_iter| {} ).err().unwrap()); assert_eq!(MissingFreeSlots(100), producer.try_batch_publish(102, |_iter| {} ).err().unwrap()); barrier.store(false, Relaxed); - drop(producer); + producer.try_batch_publish(2, |iter| { + for e in iter { + e.num = 2; + } + }).expect("Batch publication should now succeed."); + drop(producer); let result: Vec<_> = r.iter().collect(); - assert_eq!(result, [0, 1]); + assert_eq!(result, [0, 1, 2, 2]); + } + + #[test] + fn mpsc_insufficient_space_for_batch_publication() { + let (s, r) = mpsc::channel(); + let barrier = Arc::new(AtomicBool::new(true)); + let processor = { + let barrier = Arc::clone(&barrier); + move |e: &Event, _, _| { + while barrier.load(Relaxed) { /* Wait. */ } + s.send(e.num).expect("Should be able to send."); + } + }; + let mut producer1 = build_multi_producer(8, factory(), BusySpin) + .handle_events_with(processor) + .build(); + let mut producer2 = producer1.clone(); + + for i in 0..2 { + producer1.publish(|e| e.num = i); + } + assert_eq!(MissingFreeSlots(2), producer1.try_batch_publish( 8, |_iter| {} ).err().unwrap()); + assert_eq!(MissingFreeSlots(100), producer1.try_batch_publish(106, |_iter| {} ).err().unwrap()); + assert_eq!(MissingFreeSlots(2), producer2.try_batch_publish( 8, |_iter| {} ).err().unwrap()); + assert_eq!(MissingFreeSlots(100), producer2.try_batch_publish(106, |_iter| {} ).err().unwrap()); + + barrier.store(false, Relaxed); + producer1.try_batch_publish(2, |iter| { + for e in iter { + e.num = 2; + } + }).expect("Batch publication should now succeed."); + producer2.try_batch_publish(2, |iter| { + for e in iter { + e.num = 3; + } + }).expect("Batch publication should now succeed."); + + drop(producer1); + drop(producer2); + let mut result: Vec<_> = r.iter().collect(); + result.sort(); + assert_eq!(result, [0, 1, 2, 2, 3, 3]); } #[test]