diff --git a/src/barrier.rs b/src/barrier.rs index 831d9c4..08eb896 100644 --- a/src/barrier.rs +++ b/src/barrier.rs @@ -3,13 +3,11 @@ use crate::Sequence; /// Indicates no sequence number has been claimed (yet). pub const NONE: Sequence = -1; +#[doc(hidden)] pub trait Barrier { - /// Creates a new barrier of the specified size. - fn new(size: usize) -> Self; - /// Gets the sequence number of the barrier with relaxed memory ordering. /// /// Note, to establish proper happens-before relationships (and thus proper synchronization), /// the caller must issue a [`std::sync::atomic::fence`] with [`Ordering::Acquire`]. - fn get_relaxed(&self, lower_bound: Sequence) -> Sequence; + fn get_after(&self, lower_bound: Sequence) -> Sequence; } diff --git a/src/builder.rs b/src/builder.rs index 77912c2..14c3c51 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1,19 +1,42 @@ //! Module for building the Disruptor and adding event handlers. +//! +//! # Examples +//! +//! ``` +//!# use disruptor::build_single_producer; +//!# use disruptor::Producer; +//!# use disruptor::BusySpin; +//!# use disruptor::RingBufferFull; +//!# +//! // The example data entity on the ring buffer. +//! struct Event { +//! price: f64 +//! } +//! let factory = || { Event { price: 0.0 }}; +//!# let processor1 = |e: &Event, _, _| {}; +//!# let processor2 = |e: &Event, _, _| {}; +//!# let processor3 = |e: &Event, _, _| {}; +//! let mut producer = disruptor::build_single_producer(8, factory, BusySpin) +//! .pined_at_core(1).thread_named("my_processor").handle_events_with(processor1) +//! .handle_events_with(processor2) // Not pinned and getting a generic name. +//! .and_then() +//! .pined_at_core(2).handle_events_with(processor3) // Pined but with a generic name. +//! .build(); +//! ``` use std::{marker::PhantomData, sync::{atomic::{fence, AtomicI64, Ordering}, Arc}, thread}; use core_affinity::CoreId; use crossbeam_utils::CachePadded; -use crate::{affinity::{cpu_has_core_else_panic, set_affinity_if_defined}, barrier::{Barrier, NONE}, multi_producer::MultiProducer, single_producer::SingleProducer, Sequence}; +use crate::{affinity::{cpu_has_core_else_panic, set_affinity_if_defined}, barrier::{Barrier, NONE}, Sequence}; use crate::consumer::{Consumer, ConsumerBarrier}; use crate::cursor::Cursor; -use crate::multi_producer::MultiProducerBarrier; -use crate::producer::{Producer, ProducerBarrier}; +use crate::producer::{Producer, single::{SingleProducer, SingleProducerBarrier}, multi::{MultiProducer, MultiProducerBarrier}, ProducerBarrier}; use crate::ringbuffer::RingBuffer; -use crate::single_producer::SingleProducerBarrier; use crate::wait_strategies::WaitStrategy; -/// Create builder for a [`SingleProducer`]. Use this if you only need to publish -/// events from one thread. +/// Build a single producer Disruptor. Use this if you only need to publish events from one thread. +/// +/// For using a producer see [`Producer`]. pub fn build_single_producer(size: usize, event_factory: F, wait_strategy: W) -> Builder> where @@ -21,11 +44,13 @@ where E: 'static, W: 'static + WaitStrategy, { - Builder::new(size, event_factory, wait_strategy) + let producer_barrier = SingleProducerBarrier::new(); + Builder::new(size, event_factory, wait_strategy, producer_barrier) } -/// Create builder for a [`MultiProducer`]. Use this if you need to publish events -/// from many threads. +/// Build a multi producer Disruptor. Use this if you need to publish events from many threads. +/// +/// For using a producer see [`Producer`]. pub fn build_multi_producer(size: usize, event_factory: F, wait_strategy: W) -> Builder> where @@ -33,10 +58,13 @@ where E: 'static, W: 'static + WaitStrategy, { - Builder::new(size, event_factory, wait_strategy) + let producer_barrier = MultiProducerBarrier::new(size); + Builder::new(size, event_factory, wait_strategy, producer_barrier) } /// Adds a dependency on all previously added event handlers. +/// +/// See [`Builder`] for examples of usage (they have the same methods). pub struct DependencyChain where PR: Producer @@ -47,6 +75,30 @@ where } /// Builder used for configuring and constructing a Disruptor. +/// +/// # Examples +/// +/// ``` +///# use disruptor::build_single_producer; +///# use disruptor::Producer; +///# use disruptor::BusySpin; +///# use disruptor::RingBufferFull; +///# +/// // The example data entity on the ring buffer. +/// struct Event { +/// price: f64 +/// } +/// let factory = || { Event { price: 0.0 }}; +///# let processor1 = |e: &Event, _, _| {}; +///# let processor2 = |e: &Event, _, _| {}; +///# let processor3 = |e: &Event, _, _| {}; +/// let mut producer = disruptor::build_single_producer(8, factory, BusySpin) +/// .pined_at_core(1).thread_named("my_processor").handle_events_with(processor1) +/// .handle_events_with(processor2) // Not pinned and getting a generic name. +/// .and_then() +/// .pined_at_core(2).handle_events_with(processor3) // Pined but with a generic name. +/// .build(); +/// ``` pub struct Builder where PR: Producer @@ -97,14 +149,14 @@ where W: 'static + WaitStrategy, PR: Producer, { - fn new(size: usize, event_factory: F, wait_strategy: W) -> Self + fn new(size: usize, event_factory: F, wait_strategy: W, producer_barrier: P) -> Self where F: FnMut() -> E { let ring_buffer = Box::into_raw(Box::new(RingBuffer::new(size, event_factory))); - let producer_barrier = Arc::new(P::new(size)); + let producer_barrier = Arc::new(producer_barrier); let shutdown_at_sequence = Arc::new(CachePadded::new(AtomicI64::new(NONE))); - let consumer_barrier = Some(ConsumerBarrier::new(0)); + let consumer_barrier = Some(ConsumerBarrier::new()); Builder { ring_buffer, @@ -148,7 +200,7 @@ where /// events after all previous consumers have read them. pub fn and_then(mut self) -> DependencyChain { let dependent_barrier = Arc::new(self.consumer_barrier.take().unwrap()); - let consumer_barrier = Some(ConsumerBarrier::new(0)); + let consumer_barrier = Some(ConsumerBarrier::new()); DependencyChain { builder: self, dependent_barrier, @@ -204,7 +256,7 @@ where /// events after all previous consumers have read them. pub fn and_then(mut self) -> DependencyChain { let dependent_barrier = Arc::new(self.consumer_barrier.take().unwrap()); - let consumer_barrier = Some(ConsumerBarrier::new(0)); + let consumer_barrier = Some(ConsumerBarrier::new()); DependencyChain { builder: self.builder, dependent_barrier, @@ -249,14 +301,14 @@ where let ring_buffer = wrapper.unwrap(); let mut sequence = 0; loop { - let mut available = barrier.get_relaxed(sequence); + let mut available = barrier.get_after(sequence); while available < sequence { // If publisher(s) are done publishing events we're done. if shutdown_at_sequence.load(Ordering::Relaxed) == sequence { return; } wait_strategy.wait_for(sequence); - available = barrier.get_relaxed(sequence); + available = barrier.get_after(sequence); } fence(Ordering::Acquire); diff --git a/src/consumer.rs b/src/consumer.rs index dc8f4c0..7e9f000 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -1,44 +1,55 @@ use std::{sync::Arc, thread::JoinHandle}; use crate::{barrier::Barrier, cursor::Cursor, Sequence}; +#[doc(hidden)] +pub struct Consumer { + join_handle: Option>, +} + +impl Consumer { + pub(crate) fn new(join_handle: JoinHandle<()>) -> Self { + Self { + join_handle: Some(join_handle), + } + } + + pub(crate) fn join(&mut self) { + if let Some(h) = self.join_handle.take() { h.join().expect("Consumer should not panic.") } + } +} + +#[doc(hidden)] pub struct ConsumerBarrier { cursors: Vec> } impl ConsumerBarrier { + pub(crate) fn new() -> Self { + Self { + cursors: vec![] + } + } + pub(crate) fn add(&mut self, cursor: Arc) { self.cursors.push(cursor); } -} -impl Barrier for ConsumerBarrier { - fn new(_size: usize) -> Self { - Self { - cursors: vec![] - } + /// Gets the available `Sequence` of the slowest consumer. + /// + /// Note, to establish proper happens-before relationships (and thus proper synchronization), + /// the caller must issue a [`std::sync::atomic::fence`] with + /// [`Ordering::Acquire`](std::sync::atomic::Ordering::Acquire). + pub(crate) fn get(&self) -> Sequence { + self.get_after(0) } +} +impl Barrier for ConsumerBarrier { /// Gets the available `Sequence` of the slowest consumer. - fn get_relaxed(&self, _lower_bound: Sequence) -> Sequence { + fn get_after(&self, _lower_bound: Sequence) -> Sequence { self.cursors.iter().fold(i64::MAX, |min_sequence, cursor| { let sequence = cursor.relaxed_value(); std::cmp::min(sequence, min_sequence) }) } } - -pub(crate) struct Consumer { - join_handle: Option>, -} - -impl Consumer { - pub(crate) fn new(join_handle: JoinHandle<()>) -> Self { - Self { - join_handle: Some(join_handle), - } - } - - pub(crate) fn join(&mut self) { - if let Some(h) = self.join_handle.take() { h.join().expect("Consumer should not panic.") } - } -} diff --git a/src/lib.rs b/src/lib.rs index 5b2e636..874751f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,7 +9,7 @@ //! It also owns and manages the processing thread(s) for the convenience of the library users. //! //! When the Disruptor is created, you choose whether publication to the Disruptor will happen from -//! one or multiple threads via **Producer** handles. +//! one or multiple threads via [`Producer`] handles. //! In any case, when the last Producer goes out of scope, all events published are processed and //! then the processing thread(s) will be stopped and the entire Disruptor will be dropped. //! @@ -52,24 +52,21 @@ /// The type of Sequence numbers in the Ring Buffer. pub type Sequence = i64; -pub use wait_strategies::BusySpin; -pub use wait_strategies::BusySpinWithSpinLoopHint; pub use builder::build_single_producer; pub use builder::build_multi_producer; pub use producer::Producer; pub use producer::RingBufferFull; +pub use wait_strategies::BusySpin; +pub use wait_strategies::BusySpinWithSpinLoopHint; mod affinity; mod barrier; mod consumer; mod cursor; mod ringbuffer; - -pub mod producer; -pub mod single_producer; -pub mod multi_producer; -pub mod wait_strategies; -pub mod builder; +mod producer; +mod builder; +mod wait_strategies; #[cfg(test)] mod tests { diff --git a/src/producer.rs b/src/producer.rs index bdebeb9..f9c5fb8 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -1,27 +1,16 @@ //! Module with different producer handles for publishing into the Disruptor. //! //! Both publishing from a single thread (fastest) and from multiple threads is supported. -//! -//! A `Producer` has two methods for publication: -//! 1. `try_publish` and -//! 2. `publish` -//! -//! It is recommended to use `try_publish` and handle the [`RingBufferFull`] error as appropriate in -//! the application. -//! -//! Note, that a [`RingBufferFull`] error indicates that the consumer logic cannot keep up with the -//! data ingestion rate and that latency is increasing. Therefore, the safe route is to panic the -//! application instead of sending latent data out. (Of course appropriate action should be taken to -//! make e.g. prices indicative in a price engine or cancel all open orders in a trading -//! application before panicking.) -use std::sync::{atomic::AtomicI64, Arc}; +pub mod single; +pub mod multi; +use std::sync::{atomic::AtomicI64, Arc}; use crossbeam_utils::CachePadded; - use crate::{barrier::Barrier, consumer::{Consumer, ConsumerBarrier}, ringbuffer::RingBuffer, Sequence}; /// Barrier for producers. +#[doc(hidden)] pub trait ProducerBarrier : Barrier { /// Claim the next Sequence (for publication). fn next(&self) -> Sequence; @@ -38,13 +27,22 @@ pub trait ProducerBarrier : Barrier { #[derive(Debug)] pub struct RingBufferFull; -pub(crate) trait ProducerImpl { - fn next_sequence(&mut self) -> Result; - - fn apply_update(&mut self, update: F) -> Result - where - F: FnOnce(&mut E); - +/// Producer used for publishing into the Disruptor. +/// +/// A `Producer` has two methods for publication: +/// 1. `try_publish` and +/// 2. `publish` +/// +/// It is recommended to use `try_publish` and handle the [`RingBufferFull`] error as appropriate in +/// the application. +/// +/// Note, that a [`RingBufferFull`] error indicates that the consumer logic cannot keep up with the +/// data ingestion rate and that latency is increasing. Therefore, the safe route is to panic the +/// application instead of sending latent data out. (Of course appropriate action should be taken to +/// make e.g. prices indicative in a price engine or cancel all open orders in a trading +/// application before panicking.) +pub trait Producer { + #[doc(hidden)] fn new( shutdown_at_sequence: Arc>, ring_buffer: *mut RingBuffer, @@ -52,10 +50,7 @@ pub(crate) trait ProducerImpl { consumers: Vec, consumer_barrier: ConsumerBarrier, ) -> Self; -} -/// Producer used for publishing into the Disruptor. -pub trait Producer : ProducerImpl { /// Publish an Event into the Disruptor. /// /// Returns a `Result` with the published sequence number or a [RingBufferFull] in case the @@ -64,7 +59,6 @@ pub trait Producer : ProducerImpl { /// # Examples /// /// ``` - ///# use disruptor::build_single_producer; ///# use disruptor::Producer; ///# use disruptor::BusySpin; ///# use disruptor::RingBufferFull; @@ -76,7 +70,7 @@ pub trait Producer : ProducerImpl { ///# fn main() -> Result<(), RingBufferFull> { /// let factory = || { Event { price: 0.0 }}; ///# let processor = |e: &Event, _, _| {}; - ///# let mut builder = build_single_producer(8, factory, BusySpin); + ///# let mut builder = disruptor::build_single_producer(8, factory, BusySpin); ///# let mut producer = builder.handle_events_with(processor).build(); /// producer.try_publish(|e| { e.price = 42.0; })?; ///# Ok(()) @@ -84,14 +78,8 @@ pub trait Producer : ProducerImpl { /// ``` /// /// See also [`Self::publish`]. - #[inline] fn try_publish(&mut self, update: F) -> Result - where - F: FnOnce(&mut E) - { - self.next_sequence()?; - self.apply_update(update) - } + where F: FnOnce(&mut E); /// Publish an Event into the Disruptor. /// @@ -100,7 +88,6 @@ pub trait Producer : ProducerImpl { /// # Examples /// /// ``` - ///# use disruptor::build_single_producer; ///# use disruptor::Producer; ///# use disruptor::BusySpin; ///# use disruptor::RingBufferFull; @@ -111,18 +98,12 @@ pub trait Producer : ProducerImpl { /// } /// let factory = || { Event { price: 0.0 }}; ///# let processor = |e: &Event, _, _| {}; - ///# let mut builder = build_single_producer(8, factory, BusySpin); + ///# let mut builder = disruptor::build_single_producer(8, factory, BusySpin); ///# let mut producer = builder.handle_events_with(processor).build(); /// producer.publish(|e| { e.price = 42.0; }); /// ``` /// /// See also [`Self::try_publish`]. - #[inline] fn publish(&mut self, update: F) - where - F: FnOnce(&mut E) - { - while let Err(RingBufferFull) = self.next_sequence() { /* Empty. */ } - self.apply_update(update).expect("Ringbuffer should not be full."); - } + where F: FnOnce(&mut E); } diff --git a/src/multi_producer.rs b/src/producer/multi.rs similarity index 89% rename from src/multi_producer.rs rename to src/producer/multi.rs index 4702280..e574a92 100644 --- a/src/multi_producer.rs +++ b/src/producer/multi.rs @@ -1,92 +1,17 @@ -//! Module with a producer for multi-threaded publication into the Disruptor. - use std::{process, sync::{atomic::{fence, AtomicI32, AtomicI64, Ordering}, Arc, Mutex}}; - use crossbeam_utils::CachePadded; - -use crate::{barrier::{Barrier, NONE}, consumer::{Consumer, ConsumerBarrier}, cursor::Cursor, producer::{Producer, ProducerBarrier, ProducerImpl, RingBufferFull}, ringbuffer::RingBuffer, Sequence}; - -/// Barrier for multiple producers. -pub struct MultiProducerBarrier { - cursor: Cursor, - available: Box<[CachePadded]>, - index_mask: usize, - index_shift: usize, -} - -impl MultiProducerBarrier { - fn log2(i: usize) -> usize { - std::mem::size_of::()*8 - (i.leading_zeros() as usize) - 1 - } - - #[inline] - fn calculate_availability_index(&self, sequence: Sequence) -> usize { - sequence as usize & self.index_mask - } - - #[inline] - fn calculate_availability_flag(&self, sequence: Sequence) -> i32 { - (sequence >> self.index_shift) as i32 - } - - #[inline] - fn get_availability(&self, sequence: Sequence) -> &AtomicI32 { - let availability_index = self.calculate_availability_index(sequence); - unsafe { - self.available.get_unchecked(availability_index) - } - } - - #[inline] - fn is_published(&self, sequence: Sequence) -> bool { - let availability = self.get_availability(sequence); - let availability_flag = self.calculate_availability_flag(sequence); - availability.load(Ordering::Relaxed) == availability_flag - } -} - -impl Barrier for MultiProducerBarrier { - fn new(size: usize) -> Self { - let cursor = Cursor::new(-1); - let available = (0..size).map(|_i| { CachePadded::new(AtomicI32::new(-1)) }).collect(); - let index_mask = size - 1; - let index_shift = Self::log2(size); - - MultiProducerBarrier { cursor, available, index_mask, index_shift } - } - - #[inline] - fn get_relaxed(&self, lower_bound: Sequence) -> Sequence { - let mut highest_available = lower_bound; - loop { - if ! self.is_published(highest_available) { - return highest_available - 1; - } - highest_available += 1; - } - } -} - -impl ProducerBarrier for MultiProducerBarrier { - #[inline] - fn next(&self) -> Sequence { - self.cursor.next() - } - - #[inline] - fn publish(&self, sequence: Sequence) { - let availability = self.get_availability(sequence); - let availability_flag = self.calculate_availability_flag(sequence); - availability.store(availability_flag, Ordering::Release); - } -} +use crate::{barrier::{Barrier, NONE}, consumer::{Consumer, ConsumerBarrier}, producer::ProducerBarrier, ringbuffer::RingBuffer, Producer, RingBufferFull, Sequence}; +use crate::cursor::Cursor; struct SharedProducer { consumers: Vec, counter: AtomicI64, } -/// See also [`crate::single_producer::SingleProducer`] for single-threaded publication. +/// Producer for publishing to the Disruptor from multiple threads. +/// +/// See also [SingleProducer](crate::single_producer::SingleProducer) for single-threaded publication and +/// [`Producer`] for how to use a Producer. pub struct MultiProducer { shutdown_at_sequence: Arc>, ring_buffer: *mut RingBuffer, @@ -100,9 +25,7 @@ pub struct MultiProducer { sequence_clear_of_consumers: Sequence, } -impl Producer for MultiProducer {} - -impl ProducerImpl for MultiProducer { +impl Producer for MultiProducer { fn new( shutdown_at_sequence: Arc>, ring_buffer: *mut RingBuffer, @@ -119,59 +42,21 @@ impl ProducerImpl for MultiProducer { } #[inline] - fn next_sequence(&mut self) -> Result { - // We get the last produced sequence number and increment it for the next publisher. - // `sequence` is now exclusive for this producer. - // We need to store it, because the ring buffer could be full (and the producer barrier has - // already increased its publication counter so we *must* eventually use it for publication). - if self.claimed_sequence == NONE { - let next_sequence = self.producer_barrier.next(); - self.claimed_sequence = next_sequence; - } - - let sequence = self.claimed_sequence; - if self.sequence_clear_of_consumers < sequence { - let ring_buffer = self.ring_buffer(); - // We have to check where the consumer is in case we're about to - // publish into the slot currently being read by the consumer. - // (Consumer is an entire ring buffer behind the producer). - let wrap_point = ring_buffer.wrap_point(sequence); - let lowest_sequence_being_read = self.consumer_barrier.get_relaxed(sequence) + 1; - // `<=` because a producer can claim a sequence number that a consumer is still using - // before the wrap_point. (Compare with the single-threaded Producer that cannot claim - // a sequence number beyond the wrap_point). - if lowest_sequence_being_read <= wrap_point { - return Err(RingBufferFull); - } - fence(Ordering::Acquire); - - // We can now continue until we get right behind the consumer's current - // position without checking where it actually is. - self.sequence_clear_of_consumers = lowest_sequence_being_read + ring_buffer.size() - 1; - } - - Ok(sequence) + fn try_publish(&mut self, update: F) -> Result + where + F: FnOnce(&mut E) + { + self.next_sequence()?; + self.apply_update(update) } - /// Precondition: `sequence` is available for publication. #[inline] - fn apply_update(&mut self, update: F) -> Result + fn publish(&mut self, update: F) where F: FnOnce(&mut E) { - let sequence = self.claimed_sequence; - // SAFETY: Now, we have exclusive access to the element at `sequence` and a producer - // can now update the data. - let ring_buffer = self.ring_buffer(); - unsafe { - let element = &mut *ring_buffer.get(sequence); - update(element); - } - // Make publication available by publishing `sequence`. - self.producer_barrier.publish(sequence); - // sequence is now used - replace it with None. - self.claimed_sequence = NONE; - Ok(sequence) + while let Err(RingBufferFull) = self.next_sequence() { /* Empty. */ } + self.apply_update(update).expect("Ringbuffer should not be full."); } } @@ -225,7 +110,7 @@ impl Drop for MultiProducer { } impl MultiProducer { - pub(crate) fn new( + fn new( shutdown_at_sequence: Arc>, ring_buffer: *mut RingBuffer, producer_barrier: Arc

, @@ -255,12 +140,144 @@ impl MultiProducer { } } + #[inline] + fn next_sequence(&mut self) -> Result { + // We get the last produced sequence number and increment it for the next publisher. + // `sequence` is now exclusive for this producer. + // We need to store it, because the ring buffer could be full (and the producer barrier has + // already increased its publication counter so we *must* eventually use it for publication). + if self.claimed_sequence == NONE { + let next_sequence = self.producer_barrier.next(); + self.claimed_sequence = next_sequence; + } + + let sequence = self.claimed_sequence; + if self.sequence_clear_of_consumers < sequence { + let ring_buffer = self.ring_buffer(); + // We have to check where the consumer is in case we're about to + // publish into the slot currently being read by the consumer. + // (Consumer is an entire ring buffer behind the producer). + let wrap_point = ring_buffer.wrap_point(sequence); + let lowest_sequence_being_read = self.consumer_barrier.get() + 1; + // `<=` because a producer can claim a sequence number that a consumer is still using + // before the wrap_point. (Compare with the single-threaded Producer that cannot claim + // a sequence number beyond the wrap_point). + if lowest_sequence_being_read <= wrap_point { + return Err(RingBufferFull); + } + fence(Ordering::Acquire); + + // We can now continue until we get right behind the consumer's current + // position without checking where it actually is. + self.sequence_clear_of_consumers = lowest_sequence_being_read + ring_buffer.size() - 1; + } + + Ok(sequence) + } + + /// Precondition: `sequence` is available for publication. + #[inline] + fn apply_update(&mut self, update: F) -> Result + where + F: FnOnce(&mut E) + { + let sequence = self.claimed_sequence; + // SAFETY: Now, we have exclusive access to the element at `sequence` and a producer + // can now update the data. + let ring_buffer = self.ring_buffer(); + unsafe { + let element = &mut *ring_buffer.get(sequence); + update(element); + } + // Make publication available by publishing `sequence`. + self.producer_barrier.publish(sequence); + // sequence is now used - replace it with None. + self.claimed_sequence = NONE; + Ok(sequence) + } + #[inline] fn ring_buffer(&self) -> &RingBuffer { unsafe { &*self.ring_buffer } } } +/// Barrier for multiple producers. +#[doc(hidden)] +pub struct MultiProducerBarrier { + cursor: Cursor, + available: Box<[CachePadded]>, + index_mask: usize, + index_shift: usize, +} + +impl MultiProducerBarrier { + pub(crate) fn new(size: usize) -> Self { + let cursor = Cursor::new(-1); + let available = (0..size).map(|_i| { CachePadded::new(AtomicI32::new(-1)) }).collect(); + let index_mask = size - 1; + let index_shift = Self::log2(size); + + Self { cursor, available, index_mask, index_shift } + } + + fn log2(i: usize) -> usize { + std::mem::size_of::()*8 - (i.leading_zeros() as usize) - 1 + } + + #[inline] + fn calculate_availability_index(&self, sequence: Sequence) -> usize { + sequence as usize & self.index_mask + } + + #[inline] + fn calculate_availability_flag(&self, sequence: Sequence) -> i32 { + (sequence >> self.index_shift) as i32 + } + + #[inline] + fn get_availability(&self, sequence: Sequence) -> &AtomicI32 { + let availability_index = self.calculate_availability_index(sequence); + unsafe { + self.available.get_unchecked(availability_index) + } + } + + #[inline] + fn is_published(&self, sequence: Sequence) -> bool { + let availability = self.get_availability(sequence); + let availability_flag = self.calculate_availability_flag(sequence); + availability.load(Ordering::Relaxed) == availability_flag + } +} + +impl Barrier for MultiProducerBarrier { + #[inline] + fn get_after(&self, lower_bound: Sequence) -> Sequence { + let mut highest_available = lower_bound; + loop { + if ! self.is_published(highest_available) { + return highest_available - 1; + } + highest_available += 1; + } + } +} + +impl ProducerBarrier for MultiProducerBarrier { + #[inline] + fn next(&self) -> Sequence { + self.cursor.next() + } + + #[inline] + fn publish(&self, sequence: Sequence) { + let availability = self.get_availability(sequence); + let availability_flag = self.calculate_availability_flag(sequence); + availability.store(availability_flag, Ordering::Release); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/single_producer.rs b/src/producer/single.rs similarity index 77% rename from src/single_producer.rs rename to src/producer/single.rs index ca23d86..a0d7851 100644 --- a/src/single_producer.rs +++ b/src/producer/single.rs @@ -1,13 +1,13 @@ -//! Module with a producer for single threaded publication into the Disruptor. - -use std::sync::{atomic::{fence, AtomicI64, Ordering}, Arc}; +use std::sync::atomic::Ordering; +use crate::{barrier::{Barrier, NONE}, cursor::Cursor, producer::ProducerBarrier}; use crossbeam_utils::CachePadded; - -use crate::{barrier::{Barrier, NONE}, consumer::{Consumer, ConsumerBarrier}, cursor::Cursor, producer::{Producer, ProducerBarrier, ProducerImpl, RingBufferFull}, ringbuffer::RingBuffer, Sequence}; +use crate::{consumer::{Consumer, ConsumerBarrier}, ringbuffer::RingBuffer, Sequence}; +use super::*; /// Producer for publishing to the Disruptor from a single thread. /// -/// See also [`crate::multi_producer::MultiProducer`] for multi-threaded publication. +/// See also [MultiProducer](crate::multi_producer::MultiProducer) for multi-threaded publication and +/// [`Producer`] for how to use a Producer. pub struct SingleProducer { shutdown_at_sequence: Arc>, ring_buffer: *mut RingBuffer, @@ -23,9 +23,25 @@ pub struct SingleProducer { unsafe impl Send for SingleProducer {} -impl Producer for SingleProducer {} +impl Producer for SingleProducer where P: ProducerBarrier { + #[inline] + fn try_publish(&mut self, update: F) -> Result + where + F: FnOnce(&mut E) + { + self.next_sequence()?; + self.apply_update(update) + } + + #[inline] + fn publish(&mut self, update: F) + where + F: FnOnce(&mut E) + { + while let Err(RingBufferFull) = self.next_sequence() { /* Empty. */ } + self.apply_update(update).expect("Ringbuffer should not be full."); + } -impl ProducerImpl for SingleProducer { fn new( shutdown_at_sequence: Arc>, ring_buffer: *mut RingBuffer, @@ -40,6 +56,28 @@ impl ProducerImpl for SingleProducer { consumers, consumer_barrier) } +} + +impl SingleProducer where P: ProducerBarrier { + fn new( + shutdown_at_sequence: Arc>, + ring_buffer: *mut RingBuffer, + producer_barrier: Arc

, + consumers: Vec, + consumer_barrier: ConsumerBarrier, + ) -> Self + { + let sequence_clear_of_consumers = unsafe { (*ring_buffer).size() - 1}; + Self { + shutdown_at_sequence, + ring_buffer, + producer_barrier, + consumers, + consumer_barrier, + sequence: 0, + sequence_clear_of_consumers, + } + } #[inline] fn next_sequence(&mut self) -> Result { @@ -51,12 +89,10 @@ impl ProducerImpl for SingleProducer { // (The slowest consumer is an entire ring buffer behind the producer). let ring_buffer = self.ring_buffer(); let wrap_point = ring_buffer.wrap_point(sequence); - // TODO: Change interface so we don't need sequence. - let lowest_sequence_being_read = self.consumer_barrier.get_relaxed(sequence) + 1; + let lowest_sequence_being_read = self.consumer_barrier.get() + 1; if lowest_sequence_being_read == wrap_point { return Err(RingBufferFull); } - fence(Ordering::Acquire); // We can now continue until we get right behind the slowest consumer's current // position without checking where it actually is. @@ -86,28 +122,6 @@ impl ProducerImpl for SingleProducer { self.sequence += 1; Ok(sequence) } -} - -impl SingleProducer { - pub(crate) fn new( - shutdown_at_sequence: Arc>, - ring_buffer: *mut RingBuffer, - producer_barrier: Arc

, - consumers: Vec, - consumer_barrier: ConsumerBarrier, - ) -> Self - { - let sequence_clear_of_consumers = unsafe { (*ring_buffer).size() - 1}; - Self { - shutdown_at_sequence, - ring_buffer, - producer_barrier, - consumers, - consumer_barrier, - sequence: 0, - sequence_clear_of_consumers, - } - } #[inline] fn ring_buffer(&self) -> &RingBuffer { @@ -129,19 +143,22 @@ impl Drop for SingleProducer { } /// Barrier for a single producer. +#[doc(hidden)] pub struct SingleProducerBarrier { cursor: Cursor } -impl Barrier for SingleProducerBarrier { - fn new(_size: usize) -> Self { - SingleProducerBarrier { +impl SingleProducerBarrier { + pub(crate) fn new() -> Self { + Self { cursor: Cursor::new(NONE) } } +} +impl Barrier for SingleProducerBarrier { /// Gets the `Sequence` of the last published event. - fn get_relaxed(&self, _lower_bound: Sequence) -> Sequence { + fn get_after(&self, _lower_bound: Sequence) -> Sequence { self.cursor.relaxed_value() } } diff --git a/src/ringbuffer.rs b/src/ringbuffer.rs index c541aa5..8281ab8 100644 --- a/src/ringbuffer.rs +++ b/src/ringbuffer.rs @@ -2,7 +2,8 @@ use std::cell::UnsafeCell; use crate::Sequence; -pub(crate) struct RingBuffer { +#[doc(hidden)] +pub struct RingBuffer { buffer: Box<[UnsafeCell]>, index_mask: i64, }