Skip to content

Commit

Permalink
Move processor logic out of builder and into consumer module.
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholassm committed Aug 4, 2024
1 parent 551fbc9 commit aa48cdc
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 123 deletions.
128 changes: 7 additions & 121 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
//!
//! Use [build_single_producer] or [build_multi_producer] to get started.
use std::{sync::{atomic::{fence, AtomicI64, Ordering}, Arc}, thread};
use std::sync::{atomic::AtomicI64, Arc};
use core_affinity::CoreId;
use crossbeam_utils::CachePadded;
use crate::{affinity::{cpu_has_core_else_panic, set_affinity_if_defined}, barrier::{Barrier, NONE}, Sequence};
use crate::{affinity::cpu_has_core_else_panic, barrier::{Barrier, NONE}, consumer::{start_processor, start_processor_with_state}, Sequence};
use crate::consumer::Consumer;
use crate::cursor::Cursor;
use crate::producer::{single::SingleProducerBarrier, multi::MultiProducerBarrier};
Expand Down Expand Up @@ -177,8 +177,8 @@ pub struct Shared<E, W> {
pub(crate) ring_buffer: Arc<RingBuffer<E>>,
pub(crate) consumers: Vec<Consumer>,
current_consumer_cursors: Option<Vec<Arc<Cursor>>>,
wait_strategy: W,
thread_context: ThreadContext,
pub(crate) wait_strategy: W,
pub(crate) thread_context: ThreadContext,
}

impl <E, W> Shared<E, W> {
Expand Down Expand Up @@ -216,135 +216,21 @@ impl <E, W> Shared<E, W> {
}

#[derive(Default)]
struct ThreadContext {
pub(crate) struct ThreadContext {
affinity: Option<CoreId>,
name: Option<String>,
id: usize,
}

impl ThreadContext {
fn name(&mut self) -> String {
pub(crate) fn name(&mut self) -> String {
self.name.take().or_else(|| {
self.id += 1;
Some(format!("processor-{}", self.id))
}).unwrap()
}

fn affinity(&mut self) -> Option<CoreId> {
pub(crate) fn affinity(&mut self) -> Option<CoreId> {
self.affinity.take()
}
}

fn start_processor<E, EP, W, B> (
mut event_handler: EP,
builder: &mut Shared<E, W>,
barrier: Arc<B>)
-> (Arc<Cursor>, Consumer)
where
E: 'static + Send + Sync,
EP: 'static + Send + FnMut(&E, Sequence, bool),
W: 'static + WaitStrategy,
B: 'static + Barrier + Send + Sync,
{
let consumer_cursor = Arc::new(Cursor::new(-1));// Initially, the consumer has not read slot 0 yet.
let wait_strategy = builder.wait_strategy;
let ring_buffer = Arc::clone(&builder.ring_buffer);
let shutdown_at_sequence = Arc::clone(&builder.shutdown_at_sequence);
let thread_name = builder.thread_context.name();
let affinity = builder.thread_context.affinity();
let thread_builder = thread::Builder::new().name(thread_name.clone());
let join_handle = {
let consumer_cursor = Arc::clone(&consumer_cursor);
thread_builder.spawn(move || {
set_affinity_if_defined(affinity, thread_name.as_str());
let mut sequence = 0;
while let Some(available) = wait_for_events(sequence, &shutdown_at_sequence, barrier.as_ref(), &wait_strategy) {
while available >= sequence { // Potentiel batch processing.
let end_of_batch = available == sequence;
// SAFETY: Now, we have (shared) read access to the event at `sequence`.
let event_ptr = ring_buffer.get(sequence);
let event = unsafe { & *event_ptr };
event_handler(event, sequence, end_of_batch);
// Signal to producers or later consumers that we're done processing `sequence`.
consumer_cursor.store(sequence);
// Update next sequence to read.
sequence += 1;
}
}
}).expect("Should spawn thread.")
};

let consumer = Consumer::new(join_handle);
(consumer_cursor, consumer)
}

fn start_processor_with_state<E, EP, W, B, S, IS> (
mut event_handler: EP,
builder: &mut Shared<E, W>,
barrier: Arc<B>,
initialize_state: IS)
-> (Arc<Cursor>, Consumer)
where
E: 'static + Send + Sync,
IS: 'static + Send + FnOnce() -> S,
EP: 'static + Send + FnMut(&mut S, &E, Sequence, bool),
W: 'static + WaitStrategy,
B: 'static + Barrier + Send + Sync,
{
let consumer_cursor = Arc::new(Cursor::new(-1));// Initially, the consumer has not read slot 0 yet.
let wait_strategy = builder.wait_strategy;
let ring_buffer = Arc::clone(&builder.ring_buffer);
let shutdown_at_sequence = Arc::clone(&builder.shutdown_at_sequence);
let thread_name = builder.thread_context.name();
let affinity = builder.thread_context.affinity();
let thread_builder = thread::Builder::new().name(thread_name.clone());
let join_handle = {
let consumer_cursor = Arc::clone(&consumer_cursor);
thread_builder.spawn(move || {
set_affinity_if_defined(affinity, thread_name.as_str());
let mut sequence = 0;
let mut state = initialize_state();
while let Some(available_sequence) = wait_for_events(sequence, &shutdown_at_sequence, barrier.as_ref(), &wait_strategy) {
while available_sequence >= sequence { // Potentiel batch processing.
let end_of_batch = available_sequence == sequence;
// SAFETY: Now, we have (shared) read access to the event at `sequence`.
let event_ptr = ring_buffer.get(sequence);
let event = unsafe { & *event_ptr };
event_handler(&mut state, event, sequence, end_of_batch);
// Signal to producers or later consumers that we're done processing `sequence`.
consumer_cursor.store(sequence);
// Update next sequence to read.
sequence += 1;
}
}
}).expect("Should spawn thread.")
};

let consumer = Consumer::new(join_handle);
(consumer_cursor, consumer)
}

#[inline]
fn wait_for_events<B, W>(
sequence: Sequence,
shutdown_at_sequence: &CachePadded<AtomicI64>,
barrier: &B,
wait_strategy: &W
)
-> Option<Sequence>
where
B: Barrier + Send + Sync,
W: WaitStrategy,
{
let mut available = barrier.get_after(sequence);
while available < sequence {
// If publisher(s) are done publishing events we're done when we've seen the last event.
if shutdown_at_sequence.load(Ordering::Relaxed) == sequence {
return None;
}
wait_strategy.wait_for(sequence);
available = barrier.get_after(sequence);
}
fence(Ordering::Acquire);
Some(available)
}
120 changes: 118 additions & 2 deletions src/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{sync::Arc, thread::JoinHandle};
use crate::{barrier::Barrier, cursor::Cursor, Sequence};
use std::{sync::{atomic::{fence, AtomicI64, Ordering}, Arc}, thread::{self, JoinHandle}};
use crossbeam_utils::CachePadded;

use crate::{affinity::set_affinity_if_defined, barrier::Barrier, builder::Shared, cursor::Cursor, wait_strategies::WaitStrategy, Sequence};

#[doc(hidden)]
pub struct Consumer {
Expand Down Expand Up @@ -61,3 +63,117 @@ impl Barrier for MultiConsumerBarrier {
})
}
}

pub(crate) fn start_processor<E, EP, W, B> (
mut event_handler: EP,
builder: &mut Shared<E, W>,
barrier: Arc<B>)
-> (Arc<Cursor>, Consumer)
where
E: 'static + Send + Sync,
EP: 'static + Send + FnMut(&E, Sequence, bool),
W: 'static + WaitStrategy,
B: 'static + Barrier + Send + Sync,
{
let consumer_cursor = Arc::new(Cursor::new(-1));// Initially, the consumer has not read slot 0 yet.
let wait_strategy = builder.wait_strategy;
let ring_buffer = Arc::clone(&builder.ring_buffer);
let shutdown_at_sequence = Arc::clone(&builder.shutdown_at_sequence);
let thread_name = builder.thread_context.name();
let affinity = builder.thread_context.affinity();
let thread_builder = thread::Builder::new().name(thread_name.clone());
let join_handle = {
let consumer_cursor = Arc::clone(&consumer_cursor);
thread_builder.spawn(move || {
set_affinity_if_defined(affinity, thread_name.as_str());
let mut sequence = 0;
while let Some(available) = wait_for_events(sequence, &shutdown_at_sequence, barrier.as_ref(), &wait_strategy) {
while available >= sequence { // Potentiel batch processing.
let end_of_batch = available == sequence;
// SAFETY: Now, we have (shared) read access to the event at `sequence`.
let event_ptr = ring_buffer.get(sequence);
let event = unsafe { & *event_ptr };
event_handler(event, sequence, end_of_batch);
// Signal to producers or later consumers that we're done processing `sequence`.
consumer_cursor.store(sequence);
// Update next sequence to read.
sequence += 1;
}
}
}).expect("Should spawn thread.")
};

let consumer = Consumer::new(join_handle);
(consumer_cursor, consumer)
}

pub(crate) fn start_processor_with_state<E, EP, W, B, S, IS> (
mut event_handler: EP,
builder: &mut Shared<E, W>,
barrier: Arc<B>,
initialize_state: IS)
-> (Arc<Cursor>, Consumer)
where
E: 'static + Send + Sync,
IS: 'static + Send + FnOnce() -> S,
EP: 'static + Send + FnMut(&mut S, &E, Sequence, bool),
W: 'static + WaitStrategy,
B: 'static + Barrier + Send + Sync,
{
let consumer_cursor = Arc::new(Cursor::new(-1));// Initially, the consumer has not read slot 0 yet.
let wait_strategy = builder.wait_strategy;
let ring_buffer = Arc::clone(&builder.ring_buffer);
let shutdown_at_sequence = Arc::clone(&builder.shutdown_at_sequence);
let thread_name = builder.thread_context.name();
let affinity = builder.thread_context.affinity();
let thread_builder = thread::Builder::new().name(thread_name.clone());
let join_handle = {
let consumer_cursor = Arc::clone(&consumer_cursor);
thread_builder.spawn(move || {
set_affinity_if_defined(affinity, thread_name.as_str());
let mut sequence = 0;
let mut state = initialize_state();
while let Some(available_sequence) = wait_for_events(sequence, &shutdown_at_sequence, barrier.as_ref(), &wait_strategy) {
while available_sequence >= sequence { // Potentiel batch processing.
let end_of_batch = available_sequence == sequence;
// SAFETY: Now, we have (shared) read access to the event at `sequence`.
let event_ptr = ring_buffer.get(sequence);
let event = unsafe { & *event_ptr };
event_handler(&mut state, event, sequence, end_of_batch);
// Signal to producers or later consumers that we're done processing `sequence`.
consumer_cursor.store(sequence);
// Update next sequence to read.
sequence += 1;
}
}
}).expect("Should spawn thread.")
};

let consumer = Consumer::new(join_handle);
(consumer_cursor, consumer)
}

#[inline]
fn wait_for_events<B, W>(
sequence: Sequence,
shutdown_at_sequence: &CachePadded<AtomicI64>,
barrier: &B,
wait_strategy: &W
)
-> Option<Sequence>
where
B: Barrier + Send + Sync,
W: WaitStrategy,
{
let mut available = barrier.get_after(sequence);
while available < sequence {
// If publisher(s) are done publishing events we're done when we've seen the last event.
if shutdown_at_sequence.load(Ordering::Relaxed) == sequence {
return None;
}
wait_strategy.wait_for(sequence);
available = barrier.get_after(sequence);
}
fence(Ordering::Acquire);
Some(available)
}

0 comments on commit aa48cdc

Please sign in to comment.