From 927b0a0010477ee9ab939854e261cd6825e712fe Mon Sep 17 00:00:00 2001 From: Jack Thomson Date: Wed, 16 Nov 2022 11:43:59 +0000 Subject: [PATCH] Add some basic documentation --- src/counter.rs | 122 +++++++++++++++++++++++++++++++++++++++++++++---- src/lib.rs | 38 +++++++++++++++ src/utils.rs | 9 ++++ 3 files changed, 161 insertions(+), 8 deletions(-) diff --git a/src/counter.rs b/src/counter.rs index 45e7ef8..109da6a 100644 --- a/src/counter.rs +++ b/src/counter.rs @@ -1,26 +1,44 @@ +use std::fmt; use std::sync::atomic::{AtomicIsize, AtomicUsize, Ordering}; use std::cell::Cell; -use crate::utils::CachePadded; +use crate::utils::{CachePadded, make_new_padded_counter}; use crate::safe_getters::SafeGetters; -pub struct ConcurrentCounter { - cells: Vec>, -} - static THREAD_COUNTER: AtomicUsize = AtomicUsize::new(1); thread_local! { static THREAD_ID: Cell = Cell::new(THREAD_COUNTER.fetch_add(1, Ordering::SeqCst)); } -fn make_new_padded_counter() -> CachePadded:: { - CachePadded { - value: AtomicIsize::new(0) +/// A sharded atomic counter +/// +/// ConcurrentCounter shards cacheline aligned AtomicIsizes across a vector for faster updates in +/// a high contention scenarios. +pub struct ConcurrentCounter { + cells: Vec>, +} + +impl fmt::Debug for ConcurrentCounter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ConcurrentCounter") + .field("sum", &self.sum()) + .field("cells", &self.cells.len()) + .finish() } } impl ConcurrentCounter { + /// Creates a new ConcurrentCounter with a minimum of the `count` cells. Concurrent counter + /// will align the `count` to the next power of two for better speed when doing the modulus. + /// + /// # Examples + /// + /// ``` + /// use fast_counter::ConcurrentCounter; + /// + /// let counter = ConcurrentCounter::new(10); + /// ``` #[inline] pub fn new(count: usize) -> Self { let count = count.next_power_of_two(); @@ -39,22 +57,94 @@ impl ConcurrentCounter { }) } + /// Adds the value to the counter, internally with is using `add_with_ordering` with a + /// `Ordering::Relaxed` and is mainly for convenience. + /// + /// ConcurrentCounter will identify a cell to add the `value` too with using a thread_local + /// which will try to aleviate the contention on a single number + /// + /// # Examples + /// + /// ``` + /// use fast_counter::ConcurrentCounter; + /// + /// let counter = ConcurrentCounter::new(10); + /// counter.add(1); + /// counter.add(-1); + /// ``` #[inline] pub fn add(&self, value: isize) { self.add_with_ordering(value, Ordering::Relaxed) } + /// ConcurrentCounter will identify a cell to add the `value` too with using a thread_local + /// which will try to aleviate the contention on a single number. The cell will be updated + /// atomically using the ordering provided in `ordering` + /// + /// # Examples + /// + /// ``` + /// use fast_counter::ConcurrentCounter; + /// use std::sync::atomic::Ordering; + /// + /// let counter = ConcurrentCounter::new(10); + /// counter.add_with_ordering(1, Ordering::SeqCst); + /// counter.add_with_ordering(-1, Ordering::Relaxed); + /// ``` #[inline] pub fn add_with_ordering(&self, value: isize, ordering: Ordering) { let c = self.cells.safely_get(self.thread_id() & (self.cells.len() - 1)); c.value.fetch_add(value, ordering); } + /// This will fetch the sum of the concurrent counter be iterating through each of the cells + /// and loading the values. Internally this uses `sum_with_ordering` with a `Relaxed` ordering. + /// + /// Due to the fact the cells are sharded and the concurrent nature of the library this sum + /// may be slightly inaccurate. For example if used in a concurrent map and using + /// ConcurrentCounter to track the length, depending on the ordering the length may be returned + /// as a negative value. + /// + /// # Examples + /// + /// ```rust + /// use fast_counter::ConcurrentCounter; + /// + /// let counter = ConcurrentCounter::new(10); + /// + /// counter.add(1); + /// + /// let sum = counter.sum(); + /// + /// assert_eq!(sum, 1); + /// ``` #[inline] pub fn sum(&self) -> isize { self.sum_with_ordering(Ordering::Relaxed) } + /// This will fetch the sum of the concurrent counter be iterating through each of the cells + /// and loading the values with the ordering defined by `ordering`. + /// + /// Due to the fact the cells are sharded and the concurrent nature of the library this sum + /// may be slightly inaccurate. For example if used in a concurrent map and using + /// ConcurrentCounter to track the length, depending on the ordering the length may be returned + /// as a negative value. + /// + /// # Examples + /// + /// ```rust + /// use std::sync::atomic::Ordering; + /// use fast_counter::ConcurrentCounter; + /// + /// let counter = ConcurrentCounter::new(10); + /// + /// counter.add(1); + /// + /// let sum = counter.sum_with_ordering(Ordering::SeqCst); + /// + /// assert_eq!(sum, 1); + /// ``` #[inline] pub fn sum_with_ordering(&self, ordering: Ordering) -> isize { self.cells.iter().map(|c| c.value.load(ordering)).sum() @@ -138,4 +228,20 @@ mod tests { assert_eq!(counter.sum(), THREAD_COUNT * WRITE_COUNT); } + + #[test] + fn debug_works_as_expected() { + const WRITE_COUNT: isize = 1_000_000; + const THREAD_COUNT: isize = 8; + // Spin up two threads that increment the counter concurrently + let counter = ConcurrentCounter::new(THREAD_COUNT as usize); + + for _ in 0..WRITE_COUNT { + counter.add(1); + } + + assert_eq!(counter.sum(), WRITE_COUNT); + + assert_eq!(format!("Counter is: {counter:?}"), "Counter is: ConcurrentCounter { sum: 1000000, cells: 8 }") + } } diff --git a/src/lib.rs b/src/lib.rs index 198a662..a2ddaac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,41 @@ +//! Fast-counter is a shareded concurrent atomic counter +//! +//! The library works by sharding the atomic numbers between multiple values, each thread will +//! attempt to read from a different cell. This helps with cache-thrashing and contention. This +//! benefit is seen when there is a greater number of threads competing over a single cell, on my +//! machine with 16 cores attempting to update the value it can be nearly 60x faster. The price you +//! pay for this is higher memory usage as we have multiple Atomic numbers which are cache-padded +//! so can be significantly higher. +//! +//! # Usage +//! +//! Usage of the library is simple, create a new ConcurrentCounter with then number of shards you +//! wish to have, internally the library will use the next power of two as the number of cells for +//! a faster modulus. +//! +//! ```rust +//! use fast_counter::ConcurrentCounter; +//! +//! let counter = ConcurrentCounter::new(10); +//! +//! counter.add(1); +//! +//! let sum = counter.sum(); +//! +//! assert_eq!(sum, 1); +//! ``` +//! +//! # Performance considerations +//! +//! The library will perform best when the threads are accessing their own cell consistently. This +//! can helped by making sure more than enough cells are allocated for the number of threads which +//! are going to be writing to the cell. +//! +//! Due to the sharding behaviour the time to call the `sum()` method does slow down with the +//! increase in shards, if this becomes a bottleneck it may be worth investigating running with a +//! lower shard counter +//! + #[macro_use] mod safe_getters; mod utils; diff --git a/src/utils.rs b/src/utils.rs index 076c6fa..10f507d 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,3 +1,5 @@ +use std::sync::atomic::AtomicIsize; + /// Pads and aligns a value to the length of a cache line. /// Code borrowed from https://github.com/ibraheemdev/seize/blob/master/src/utils.rs #[cfg_attr( @@ -34,3 +36,10 @@ pub struct CachePadded { pub value: T, } + +pub fn make_new_padded_counter() -> CachePadded:: { + CachePadded { + value: AtomicIsize::new(0) + } +} +