Skip to content

Commit

Permalink
Improve documentation and structure.
Browse files Browse the repository at this point in the history
- Hide structs and traits not part of the api.
  • Loading branch information
nicholassm committed Apr 4, 2024
1 parent de88b74 commit bb92914
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 266 deletions.
6 changes: 2 additions & 4 deletions src/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ use crate::Sequence;
/// Indicates no sequence number has been claimed (yet).
pub const NONE: Sequence = -1;

#[doc(hidden)]
pub trait Barrier {
/// Creates a new barrier of the specified size.
fn new(size: usize) -> Self;

/// Gets the sequence number of the barrier with relaxed memory ordering.
///
/// Note, to establish proper happens-before relationships (and thus proper synchronization),
/// the caller must issue a [`std::sync::atomic::fence`] with [`Ordering::Acquire`].
fn get_relaxed(&self, lower_bound: Sequence) -> Sequence;
fn get_after(&self, lower_bound: Sequence) -> Sequence;
}
86 changes: 69 additions & 17 deletions src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,70 @@
//! Module for building the Disruptor and adding event handlers.
//!
//! # Examples
//!
//! ```
//!# use disruptor::build_single_producer;
//!# use disruptor::Producer;
//!# use disruptor::BusySpin;
//!# use disruptor::RingBufferFull;
//!#
//! // The example data entity on the ring buffer.
//! struct Event {
//! price: f64
//! }
//! let factory = || { Event { price: 0.0 }};
//!# let processor1 = |e: &Event, _, _| {};
//!# let processor2 = |e: &Event, _, _| {};
//!# let processor3 = |e: &Event, _, _| {};
//! let mut producer = disruptor::build_single_producer(8, factory, BusySpin)
//! .pined_at_core(1).thread_named("my_processor").handle_events_with(processor1)
//! .handle_events_with(processor2) // Not pinned and getting a generic name.
//! .and_then()
//! .pined_at_core(2).handle_events_with(processor3) // Pined but with a generic name.
//! .build();
//! ```
use std::{marker::PhantomData, sync::{atomic::{fence, AtomicI64, Ordering}, Arc}, thread};
use core_affinity::CoreId;
use crossbeam_utils::CachePadded;
use crate::{affinity::{cpu_has_core_else_panic, set_affinity_if_defined}, barrier::{Barrier, NONE}, multi_producer::MultiProducer, single_producer::SingleProducer, Sequence};
use crate::{affinity::{cpu_has_core_else_panic, set_affinity_if_defined}, barrier::{Barrier, NONE}, Sequence};
use crate::consumer::{Consumer, ConsumerBarrier};
use crate::cursor::Cursor;
use crate::multi_producer::MultiProducerBarrier;
use crate::producer::{Producer, ProducerBarrier};
use crate::producer::{Producer, single::{SingleProducer, SingleProducerBarrier}, multi::{MultiProducer, MultiProducerBarrier}, ProducerBarrier};
use crate::ringbuffer::RingBuffer;
use crate::single_producer::SingleProducerBarrier;
use crate::wait_strategies::WaitStrategy;

/// Create builder for a [`SingleProducer`]. Use this if you only need to publish
/// events from one thread.
/// Build a single producer Disruptor. Use this if you only need to publish events from one thread.
///
/// For using a producer see [`Producer`].
pub fn build_single_producer<E, W, F>(size: usize, event_factory: F, wait_strategy: W)
-> Builder<E, W, SingleProducerBarrier, SingleProducer<E, SingleProducerBarrier>>
where
F: FnMut() -> E,
E: 'static,
W: 'static + WaitStrategy,
{
Builder::new(size, event_factory, wait_strategy)
let producer_barrier = SingleProducerBarrier::new();
Builder::new(size, event_factory, wait_strategy, producer_barrier)
}

/// Create builder for a [`MultiProducer`]. Use this if you need to publish events
/// from many threads.
/// Build a multi producer Disruptor. Use this if you need to publish events from many threads.
///
/// For using a producer see [`Producer`].
pub fn build_multi_producer<E, W, F>(size: usize, event_factory: F, wait_strategy: W)
-> Builder<E, W, MultiProducerBarrier, MultiProducer<E, MultiProducerBarrier>>
where
F: FnMut() -> E,
E: 'static,
W: 'static + WaitStrategy,
{
Builder::new(size, event_factory, wait_strategy)
let producer_barrier = MultiProducerBarrier::new(size);
Builder::new(size, event_factory, wait_strategy, producer_barrier)
}

/// Adds a dependency on all previously added event handlers.
///
/// See [`Builder`] for examples of usage (they have the same methods).
pub struct DependencyChain<E, W, P, PR>
where
PR: Producer<E, P>
Expand All @@ -47,6 +75,30 @@ where
}

/// Builder used for configuring and constructing a Disruptor.
///
/// # Examples
///
/// ```
///# use disruptor::build_single_producer;
///# use disruptor::Producer;
///# use disruptor::BusySpin;
///# use disruptor::RingBufferFull;
///#
/// // The example data entity on the ring buffer.
/// struct Event {
/// price: f64
/// }
/// let factory = || { Event { price: 0.0 }};
///# let processor1 = |e: &Event, _, _| {};
///# let processor2 = |e: &Event, _, _| {};
///# let processor3 = |e: &Event, _, _| {};
/// let mut producer = disruptor::build_single_producer(8, factory, BusySpin)
/// .pined_at_core(1).thread_named("my_processor").handle_events_with(processor1)
/// .handle_events_with(processor2) // Not pinned and getting a generic name.
/// .and_then()
/// .pined_at_core(2).handle_events_with(processor3) // Pined but with a generic name.
/// .build();
/// ```
pub struct Builder<E, W, P, PR>
where
PR: Producer<E, P>
Expand Down Expand Up @@ -97,14 +149,14 @@ where
W: 'static + WaitStrategy,
PR: Producer<E, P>,
{
fn new<F>(size: usize, event_factory: F, wait_strategy: W) -> Self
fn new<F>(size: usize, event_factory: F, wait_strategy: W, producer_barrier: P) -> Self
where
F: FnMut() -> E
{
let ring_buffer = Box::into_raw(Box::new(RingBuffer::new(size, event_factory)));
let producer_barrier = Arc::new(P::new(size));
let producer_barrier = Arc::new(producer_barrier);
let shutdown_at_sequence = Arc::new(CachePadded::new(AtomicI64::new(NONE)));
let consumer_barrier = Some(ConsumerBarrier::new(0));
let consumer_barrier = Some(ConsumerBarrier::new());

Builder {
ring_buffer,
Expand Down Expand Up @@ -148,7 +200,7 @@ where
/// events after all previous consumers have read them.
pub fn and_then(mut self) -> DependencyChain<E, W, P, PR> {
let dependent_barrier = Arc::new(self.consumer_barrier.take().unwrap());
let consumer_barrier = Some(ConsumerBarrier::new(0));
let consumer_barrier = Some(ConsumerBarrier::new());
DependencyChain {
builder: self,
dependent_barrier,
Expand Down Expand Up @@ -204,7 +256,7 @@ where
/// events after all previous consumers have read them.
pub fn and_then(mut self) -> DependencyChain<E, W, P, PR> {
let dependent_barrier = Arc::new(self.consumer_barrier.take().unwrap());
let consumer_barrier = Some(ConsumerBarrier::new(0));
let consumer_barrier = Some(ConsumerBarrier::new());
DependencyChain {
builder: self.builder,
dependent_barrier,
Expand Down Expand Up @@ -249,14 +301,14 @@ where
let ring_buffer = wrapper.unwrap();
let mut sequence = 0;
loop {
let mut available = barrier.get_relaxed(sequence);
let mut available = barrier.get_after(sequence);
while available < sequence {
// If publisher(s) are done publishing events we're done.
if shutdown_at_sequence.load(Ordering::Relaxed) == sequence {
return;
}
wait_strategy.wait_for(sequence);
available = barrier.get_relaxed(sequence);
available = barrier.get_after(sequence);
}
fence(Ordering::Acquire);

Expand Down
57 changes: 34 additions & 23 deletions src/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,55 @@
use std::{sync::Arc, thread::JoinHandle};
use crate::{barrier::Barrier, cursor::Cursor, Sequence};

#[doc(hidden)]
pub struct Consumer {
join_handle: Option<JoinHandle<()>>,
}

impl Consumer {
pub(crate) fn new(join_handle: JoinHandle<()>) -> Self {
Self {
join_handle: Some(join_handle),
}
}

pub(crate) fn join(&mut self) {
if let Some(h) = self.join_handle.take() { h.join().expect("Consumer should not panic.") }
}
}

#[doc(hidden)]
pub struct ConsumerBarrier {
cursors: Vec<Arc<Cursor>>
}

impl ConsumerBarrier {
pub(crate) fn new() -> Self {
Self {
cursors: vec![]
}
}

pub(crate) fn add(&mut self, cursor: Arc<Cursor>) {
self.cursors.push(cursor);
}
}

impl Barrier for ConsumerBarrier {
fn new(_size: usize) -> Self {
Self {
cursors: vec![]
}
/// Gets the available `Sequence` of the slowest consumer.
///
/// Note, to establish proper happens-before relationships (and thus proper synchronization),
/// the caller must issue a [`std::sync::atomic::fence`] with
/// [`Ordering::Acquire`](std::sync::atomic::Ordering::Acquire).
pub(crate) fn get(&self) -> Sequence {
self.get_after(0)
}
}

impl Barrier for ConsumerBarrier {
/// Gets the available `Sequence` of the slowest consumer.
fn get_relaxed(&self, _lower_bound: Sequence) -> Sequence {
fn get_after(&self, _lower_bound: Sequence) -> Sequence {
self.cursors.iter().fold(i64::MAX, |min_sequence, cursor| {
let sequence = cursor.relaxed_value();
std::cmp::min(sequence, min_sequence)
})
}
}

pub(crate) struct Consumer {
join_handle: Option<JoinHandle<()>>,
}

impl Consumer {
pub(crate) fn new(join_handle: JoinHandle<()>) -> Self {
Self {
join_handle: Some(join_handle),
}
}

pub(crate) fn join(&mut self) {
if let Some(h) = self.join_handle.take() { h.join().expect("Consumer should not panic.") }
}
}
15 changes: 6 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! It also owns and manages the processing thread(s) for the convenience of the library users.
//!
//! When the Disruptor is created, you choose whether publication to the Disruptor will happen from
//! one or multiple threads via **Producer** handles.
//! one or multiple threads via [`Producer`] handles.
//! In any case, when the last Producer goes out of scope, all events published are processed and
//! then the processing thread(s) will be stopped and the entire Disruptor will be dropped.
//!
Expand Down Expand Up @@ -52,24 +52,21 @@
/// The type of Sequence numbers in the Ring Buffer.
pub type Sequence = i64;

pub use wait_strategies::BusySpin;
pub use wait_strategies::BusySpinWithSpinLoopHint;
pub use builder::build_single_producer;
pub use builder::build_multi_producer;
pub use producer::Producer;
pub use producer::RingBufferFull;
pub use wait_strategies::BusySpin;
pub use wait_strategies::BusySpinWithSpinLoopHint;

mod affinity;
mod barrier;
mod consumer;
mod cursor;
mod ringbuffer;

pub mod producer;
pub mod single_producer;
pub mod multi_producer;
pub mod wait_strategies;
pub mod builder;
mod producer;
mod builder;
mod wait_strategies;

#[cfg(test)]
mod tests {
Expand Down
Loading

0 comments on commit bb92914

Please sign in to comment.