Skip to content

Commit

Permalink
Add some basic documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
JackThomson2 committed Nov 16, 2022
1 parent 5dc2ed4 commit 927b0a0
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 8 deletions.
122 changes: 114 additions & 8 deletions src/counter.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,44 @@
use std::fmt;
use std::sync::atomic::{AtomicIsize, AtomicUsize, Ordering};
use std::cell::Cell;

use crate::utils::CachePadded;
use crate::utils::{CachePadded, make_new_padded_counter};
use crate::safe_getters::SafeGetters;

pub struct ConcurrentCounter {
cells: Vec<CachePadded::<AtomicIsize>>,
}

static THREAD_COUNTER: AtomicUsize = AtomicUsize::new(1);

thread_local! {
static THREAD_ID: Cell<usize> = Cell::new(THREAD_COUNTER.fetch_add(1, Ordering::SeqCst));
}

fn make_new_padded_counter() -> CachePadded::<AtomicIsize> {
CachePadded {
value: AtomicIsize::new(0)
/// A sharded atomic counter
///
/// ConcurrentCounter shards cacheline aligned AtomicIsizes across a vector for faster updates in
/// a high contention scenarios.
pub struct ConcurrentCounter {
cells: Vec<CachePadded::<AtomicIsize>>,
}

impl fmt::Debug for ConcurrentCounter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConcurrentCounter")
.field("sum", &self.sum())
.field("cells", &self.cells.len())
.finish()
}
}

impl ConcurrentCounter {
/// Creates a new ConcurrentCounter with a minimum of the `count` cells. Concurrent counter
/// will align the `count` to the next power of two for better speed when doing the modulus.
///
/// # Examples
///
/// ```
/// use fast_counter::ConcurrentCounter;
///
/// let counter = ConcurrentCounter::new(10);
/// ```
#[inline]
pub fn new(count: usize) -> Self {
let count = count.next_power_of_two();
Expand All @@ -39,22 +57,94 @@ impl ConcurrentCounter {
})
}

/// Adds the value to the counter, internally with is using `add_with_ordering` with a
/// `Ordering::Relaxed` and is mainly for convenience.
///
/// ConcurrentCounter will identify a cell to add the `value` too with using a thread_local
/// which will try to aleviate the contention on a single number
///
/// # Examples
///
/// ```
/// use fast_counter::ConcurrentCounter;
///
/// let counter = ConcurrentCounter::new(10);
/// counter.add(1);
/// counter.add(-1);
/// ```
#[inline]
pub fn add(&self, value: isize) {
self.add_with_ordering(value, Ordering::Relaxed)
}

/// ConcurrentCounter will identify a cell to add the `value` too with using a thread_local
/// which will try to aleviate the contention on a single number. The cell will be updated
/// atomically using the ordering provided in `ordering`
///
/// # Examples
///
/// ```
/// use fast_counter::ConcurrentCounter;
/// use std::sync::atomic::Ordering;
///
/// let counter = ConcurrentCounter::new(10);
/// counter.add_with_ordering(1, Ordering::SeqCst);
/// counter.add_with_ordering(-1, Ordering::Relaxed);
/// ```
#[inline]
pub fn add_with_ordering(&self, value: isize, ordering: Ordering) {
let c = self.cells.safely_get(self.thread_id() & (self.cells.len() - 1));
c.value.fetch_add(value, ordering);
}

/// This will fetch the sum of the concurrent counter be iterating through each of the cells
/// and loading the values. Internally this uses `sum_with_ordering` with a `Relaxed` ordering.
///
/// Due to the fact the cells are sharded and the concurrent nature of the library this sum
/// may be slightly inaccurate. For example if used in a concurrent map and using
/// ConcurrentCounter to track the length, depending on the ordering the length may be returned
/// as a negative value.
///
/// # Examples
///
/// ```rust
/// use fast_counter::ConcurrentCounter;
///
/// let counter = ConcurrentCounter::new(10);
///
/// counter.add(1);
///
/// let sum = counter.sum();
///
/// assert_eq!(sum, 1);
/// ```
#[inline]
pub fn sum(&self) -> isize {
self.sum_with_ordering(Ordering::Relaxed)
}

/// This will fetch the sum of the concurrent counter be iterating through each of the cells
/// and loading the values with the ordering defined by `ordering`.
///
/// Due to the fact the cells are sharded and the concurrent nature of the library this sum
/// may be slightly inaccurate. For example if used in a concurrent map and using
/// ConcurrentCounter to track the length, depending on the ordering the length may be returned
/// as a negative value.
///
/// # Examples
///
/// ```rust
/// use std::sync::atomic::Ordering;
/// use fast_counter::ConcurrentCounter;
///
/// let counter = ConcurrentCounter::new(10);
///
/// counter.add(1);
///
/// let sum = counter.sum_with_ordering(Ordering::SeqCst);
///
/// assert_eq!(sum, 1);
/// ```
#[inline]
pub fn sum_with_ordering(&self, ordering: Ordering) -> isize {
self.cells.iter().map(|c| c.value.load(ordering)).sum()
Expand Down Expand Up @@ -138,4 +228,20 @@ mod tests {

assert_eq!(counter.sum(), THREAD_COUNT * WRITE_COUNT);
}

#[test]
fn debug_works_as_expected() {
const WRITE_COUNT: isize = 1_000_000;
const THREAD_COUNT: isize = 8;
// Spin up two threads that increment the counter concurrently
let counter = ConcurrentCounter::new(THREAD_COUNT as usize);

for _ in 0..WRITE_COUNT {
counter.add(1);
}

assert_eq!(counter.sum(), WRITE_COUNT);

assert_eq!(format!("Counter is: {counter:?}"), "Counter is: ConcurrentCounter { sum: 1000000, cells: 8 }")
}
}
38 changes: 38 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,41 @@
//! Fast-counter is a shareded concurrent atomic counter
//!
//! The library works by sharding the atomic numbers between multiple values, each thread will
//! attempt to read from a different cell. This helps with cache-thrashing and contention. This
//! benefit is seen when there is a greater number of threads competing over a single cell, on my
//! machine with 16 cores attempting to update the value it can be nearly 60x faster. The price you
//! pay for this is higher memory usage as we have multiple Atomic numbers which are cache-padded
//! so can be significantly higher.
//!
//! # Usage
//!
//! Usage of the library is simple, create a new ConcurrentCounter with then number of shards you
//! wish to have, internally the library will use the next power of two as the number of cells for
//! a faster modulus.
//!
//! ```rust
//! use fast_counter::ConcurrentCounter;
//!
//! let counter = ConcurrentCounter::new(10);
//!
//! counter.add(1);
//!
//! let sum = counter.sum();
//!
//! assert_eq!(sum, 1);
//! ```
//!
//! # Performance considerations
//!
//! The library will perform best when the threads are accessing their own cell consistently. This
//! can helped by making sure more than enough cells are allocated for the number of threads which
//! are going to be writing to the cell.
//!
//! Due to the sharding behaviour the time to call the `sum()` method does slow down with the
//! increase in shards, if this becomes a bottleneck it may be worth investigating running with a
//! lower shard counter
//!
#[macro_use]
mod safe_getters;
mod utils;
Expand Down
9 changes: 9 additions & 0 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::atomic::AtomicIsize;

/// Pads and aligns a value to the length of a cache line.
/// Code borrowed from https://github.com/ibraheemdev/seize/blob/master/src/utils.rs
#[cfg_attr(
Expand Down Expand Up @@ -34,3 +36,10 @@
pub struct CachePadded<T> {
pub value: T,
}

pub fn make_new_padded_counter() -> CachePadded::<AtomicIsize> {
CachePadded {
value: AtomicIsize::new(0)
}
}

0 comments on commit 927b0a0

Please sign in to comment.