Skip to content

Commit

Permalink
Simplify publication logic in MultiProducerBarrier.
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholassm committed Aug 9, 2024
1 parent e1a1ecb commit 2a2ea8f
Showing 1 changed file with 24 additions and 13 deletions.
37 changes: 24 additions & 13 deletions src/producer/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ pub struct MultiProducerBarrier {
cursor: Cursor,
/// AtomicU64s each track availability of 64 slots.
/// Each bit in the AtomicU64 encodes whether the slot was published in an even or odd round.
/// This way produceres avoid coordinating directly (with the added overhead that would have).
/// This way producers avoid coordinating directly (with the added overhead that would have).
/// Note, producers can never "overtake" each other and overwrite the availability of an event
/// in the "previous" round as all producers must wait for the consumer furtherst behind (which
/// is again blocked by the slowest producer).
Expand All @@ -249,10 +249,11 @@ impl MultiProducerBarrier {
let cursor = Cursor::new(-1);
let i64_needed = size/64;
// available encodes 1 bit for each slot.
// If the bit is 1 it means that the slot was published in the latest even round
// and 0 means latest odd round. A ring buffer starts with round 0 (an even round)
// so that is why we initialize with 0.
let available = (0..i64_needed).map(|_i| { AtomicU64::new(0) }).collect();
// If the bit is 1 it means that the slot was published in the latest odd round
// and 0 means latest even round. A ring buffer starts with round 0 (an even round)
// so that is why we initialize with all 1's as nothing is published initially.
let all_ones = !0_u64;
let available = (0..i64_needed).map(|_i| { AtomicU64::new(all_ones) }).collect();
let index_mask = size - 1;
let index_shift = Self::log2(size);

Expand All @@ -277,7 +278,7 @@ impl MultiProducerBarrier {
#[inline]
fn calculate_availability_index(&self, sequence: Sequence) -> (usize, usize) {
let slot_index = self.slot_index(sequence);
let availability_index = slot_index/64;
let availability_index = slot_index >> 6; // == divide by 64.
let bit_index = slot_index - availability_index*64;
(availability_index, bit_index)
}
Expand All @@ -287,18 +288,24 @@ impl MultiProducerBarrier {
sequence as usize & self.index_mask
}

/// Calculates if we're in an even (1) or odd (0) round.
/// Calculates if we're in an even (0) or odd (1) round.
#[inline]
fn calculate_availability_flag(&self, sequence: Sequence) -> u64 {
let round = (sequence >> self.index_shift) as u64;
(round + 1)%2
round & 1
}

#[inline]
fn availability_at(&self, index: usize) -> &AtomicU64 {
// SAFETY: Index is always calculated with `calculate_availability_index` and is therefore within bounds.
unsafe { self.available.get_unchecked(index) }
}

#[inline]
fn publish_range_relaxed(&self, from: Sequence, n: i64) {
let (mut availability_index, mut bit_index) = self.calculate_availability_index(from);
let mut flip_mask = 0_u64;
let mut availability = unsafe { self.available.get_unchecked(availability_index) };
let mut availability = self.availability_at(availability_index);

for i in 0..n {
// Encode which bits need to be flipped in the next bit field, counting bit_index upwards while < 64.
Expand All @@ -314,7 +321,7 @@ impl MultiProducerBarrier {
let next_sequence = from + i + 1;
(availability_index, bit_index) = self.calculate_availability_index(next_sequence);
debug_assert!(bit_index == 0, "bit_index must be 0 because a new bit field was loaded.");
availability = unsafe { self.available.get_unchecked(availability_index) };
availability = self.availability_at(availability_index);
flip_mask = 0;
}
}
Expand All @@ -327,7 +334,7 @@ impl MultiProducerBarrier {
#[inline]
fn publish_with_ordering(&self, sequence: Sequence, ordering: Ordering) {
let (availability_index, bit_index) = self.calculate_availability_index(sequence);
let availability = unsafe { self.available.get_unchecked(availability_index) };
let availability = self.availability_at(availability_index);
let mask = 1 << bit_index;
// XOR operation will flip the bit on exactly the bit_index position - encoding that we have
// published an event in an even or odd round.
Expand All @@ -340,7 +347,7 @@ impl Barrier for MultiProducerBarrier {
fn get_after(&self, prev: Sequence) -> Sequence {
let mut availability_flag = self.calculate_availability_flag(prev);
let (mut availability_index, mut bit_index) = self.calculate_availability_index(prev);
let mut availability = unsafe { self.available.get_unchecked(availability_index).load(Ordering::Relaxed) };
let mut availability = self.availability_at(availability_index).load(Ordering::Relaxed);
// Shift bits to first relevant bit.
availability = availability >> bit_index;
let mut highest_available = prev;
Expand All @@ -360,7 +367,7 @@ impl Barrier for MultiProducerBarrier {
// Load next bit field.
(availability_index, bit_index) = self.calculate_availability_index(highest_available);
debug_assert!(bit_index == 0, "bit_index must be 0 because a new bit field was loaded.");
availability = unsafe { self.available.get_unchecked(availability_index).load(Ordering::Relaxed) };
availability = self.availability_at(availability_index).load(Ordering::Relaxed);

if availability_index == 0 {
// If we wrapped then we're now looking for the flipped bit.
Expand Down Expand Up @@ -436,5 +443,9 @@ mod tests {
barrier.publish_range_relaxed(100, 100);
// Verify published:
assert_eq!(barrier.get_after(99), 199);

barrier.publish_range_relaxed(200, 100);
// Verify published:
assert_eq!(barrier.get_after(199), 299);
}
}

0 comments on commit 2a2ea8f

Please sign in to comment.