Skip to content

Commit

Permalink
fp Map uses KVSet
Browse files Browse the repository at this point in the history
  • Loading branch information
beling committed Sep 29, 2024
1 parent 7e388b4 commit fc4d759
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 92 deletions.
2 changes: 1 addition & 1 deletion csf/src/fp/cmap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl<C: Coding, S: BuildSeededHasher> CMap<C, S> {
let mut value_rev_indices: Box<[u8]> = values.iter().map(|c| value_coding.len_of(*c)-1).collect();
let mut level_nr = 0u32;
while input_size != 0 {
let level_size_segments = conf.level_sizer.size_segments(
let level_size_segments = conf.level_sizer.size_segments_for_values(
|| values[0..input_size].iter().zip(value_rev_indices[0..input_size].iter()).map(|(c, ri)| value_coding.rev_fragment_of(*c, *ri) as u64),
input_size,
value_coding.bits_per_fragment());
Expand Down
4 changes: 2 additions & 2 deletions csf/src/fp/collision_solver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ pub trait CollisionSolver {
/// Returns triple consisted of:
/// - an array that shows indices which have assigned values and are not under collision,
/// - values (each stored at bits_per_fragment bits) assigned to successive bit ones in the array,
/// - number of bit ones in the array.
fn to_collision_and_values(self, bits_per_fragment: u8) -> (Box<[u64]>, Box<[u64]>, usize);
/// - number of bit ones in the array (number of values).
fn to_collision_and_values(self, bits_per_value: u8) -> (Box<[u64]>, Box<[u64]>, usize);

/// Constructs array for values to fill with `set_value` method.
fn construct_value_array(number_of_values: usize, bits_per_fragment: u8) -> Box<[u64]> {
Expand Down
16 changes: 16 additions & 0 deletions csf/src/fp/common.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
use std::hash::Hash;
use std::collections::HashMap;
use bitm::{n_lowest_bits, BitAccess, BitVec};

use crate::coding::Coding;

pub (crate) fn concatenate_values(values: &[Box<[u64]>], values_lens: &[usize], bits_per_value: u8) -> Box<[u64]> {
let mut result: Box<[u64]> = BitVec::with_zeroed_bits(values_lens.iter().sum::<usize>() * bits_per_value as usize);
let mut result_index = 0;
let mask = n_lowest_bits(bits_per_value);
for (src, src_len) in values.iter().zip(values_lens.iter()) { // TODO faster implementation that copies 64-bits at ones
for src_index in 0..*src_len {
result.init_successive_fragment(&mut result_index,
src.get_fragment_unmasked(src_index, bits_per_value) & mask,
bits_per_value);
}
}
result
}

// Returns `conf` if it is greater than `0`, or `max(1, available parallelism + conf)` otherwise.
/*pub fn threads_count(conf: isize) -> NonZeroUsize {
if conf > 0 {
Expand Down
2 changes: 1 addition & 1 deletion csf/src/fp/gocmap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl<C: Coding, GS: GroupSize, SS: SeedSize, S: BuildSeededHasher> GOCMap<C, GS,
let in_keys = &keys[0..input_size];
let in_values = &values[0..input_size];
let in_value_rev_indices = &value_rev_indices[0..input_size];
let suggested_level_size_segments = conf.level_sizer.size_segments(
let suggested_level_size_segments = conf.level_sizer.size_segments_for_values(
|| in_values.iter().zip(in_value_rev_indices.iter()).map(|(c, ri)| value_coding.rev_fragment_of(*c, *ri) as u64),
input_size,
value_coding.bits_per_fragment());
Expand Down
74 changes: 48 additions & 26 deletions csf/src/fp/kvset.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
use std::collections::{BTreeMap, HashMap};

use crate::bits_to_store_any_of_ref;

/// Moves all non-zeros to the begging of `values` and returns their number.
pub fn remove_zeros(values: &mut [usize]) -> usize {
let mut new_len: usize = 0usize;
for i in 0usize..values.len() {
if values[i] != 0 {
values[new_len] = values[i];
new_len += 1;
}
}
new_len
}

/// A trait for accessing and managing sets of key (of the type `K`) and value pairs
/// during construction of [`fp::Map`](super::Map) or [`fp::GOMap`](super::GOMap).
pub trait KVSet<K> {
Expand All @@ -11,28 +21,31 @@ pub trait KVSet<K> {
/// Call `f` for each key-value pair in the set, using single thread.
///
/// If `self` doesn't remember which keys are retained it uses `retained_hint` to check this.
fn for_each_key_value<F, P>(&self, f: F, retained_hint: P) where F: FnMut(&K, u8), P: FnMut(&K) -> bool;
fn for_each_key_value<F>(&self, f: F/*, retained_hint: P*/) where F: FnMut(&K, u8)/*, P: FnMut(&K) -> bool*/;

/// Returns minimal number of bits that can store any value.
fn bits_per_value(&self) -> u8;

/// Returns the (non-zero) numbers of all the different remaining values.
fn value_distribution(&self/*, retained_hint: P*/) -> Box<[usize]>;

/// Calls `map` for each key-value pair in the set, and returns outputs of these calls. Uses single thread.
///
/// If `self` doesn't remember which keys are retained it uses `retained_hint` to check this.
fn map_each_key_value<R, M, P>(&self, mut map: M, retained_hint: P) -> Vec<R>
where M: FnMut(&K, u8) -> R, P: FnMut(&K) -> bool
fn map_each_key_value<R, M>(&self, mut map: M/*, retained_hint: P*/) -> Vec<R>
where M: FnMut(&K, u8) -> R/*, P: FnMut(&K) -> bool*/
{
let mut result = Vec::with_capacity(self.kv_len());
self.for_each_key_value(|k, v| result.push(map(k, v)), retained_hint);
self.for_each_key_value(|k, v| result.push(map(k, v))/*, retained_hint*/);
result
}

/// Retains in `self` keys pointed by the `filter` and remove the rest, using single thread.
/// - `filter` shows the keys to be retained (the result of the function can be unspecified for keys removed earlier),
/// - `retained_earlier` shows the keys that have not been removed earlier,
/// - `remove_count` returns number of keys to remove.
fn retain_keys<F, P, R>(&mut self, filter: F, retained_earlier: P, remove_count: R)
where F: FnMut(&K) -> bool, P: FnMut(&K) -> bool, R: FnMut() -> usize;
fn retain_keys<F>(&mut self, filter: F/*, retained_earlier: P, remove_count: R*/)
where F: FnMut(&K) -> bool/*, P: FnMut(&K) -> bool, R: FnMut() -> usize*/;

/// Retains in `self` keys pointed by the `index_filter`
/// (or `filter` if `self` does not support `index_filter`)
Expand All @@ -45,26 +58,26 @@ pub trait KVSet<K> {
///
/// The results of `index_filter` and `filter` are unspecified for keys removed earlier.
#[inline(always)]
fn retain_keys_with_indices<IF, F, P, R>(&mut self, _index_filter: IF, filter: F, retained_earlier: P, remove_count: R)
where IF: FnMut(usize) -> bool, F: FnMut(&K) -> bool, P: FnMut(&K) -> bool, R: FnMut() -> usize
fn retain_keys_with_indices<IF, F>(&mut self, _index_filter: IF, filter: F/*, retained_earlier: P, remove_count: R*/)
where IF: FnMut(usize) -> bool, F: FnMut(&K) -> bool/*, P: FnMut(&K) -> bool, R: FnMut() -> usize*/
{
self.retain_keys(filter, retained_earlier, remove_count)
self.retain_keys(filter/*, retained_earlier, remove_count*/)
}

/// Convert `self` into the vector of retained key-value pairs.
///
/// If `self` doesn't remember which keys are retained it uses `retained_hint` to check this.
#[inline] fn into_vec<P>(self, retained_hint: P) -> Vec<(K, u8)> // TODO maybe return a pair of vectors
#[inline] fn into_vec<P>(self/*, retained_hint: P*/) -> Vec<(K, u8)> // TODO maybe return a pair of vectors
where P: FnMut(&K) -> bool, K: Clone, Self: Sized
{
self.map_each_key_value(|k, v| ((*k).clone(), v), retained_hint)
self.map_each_key_value(|k, v| ((*k).clone(), v)/*, retained_hint*/)
}
}

impl<K, S> KVSet<K> for HashMap<K, u8, S> {
/*impl<K, S> KVSet<K> for HashMap<K, u8, S> {
#[inline] fn kv_len(&self) -> usize { self.len() }
fn for_each_key_value<F, P>(&self, mut f: F, _retained_hint: P) where F: FnMut(&K, u8), P: FnMut(&K) -> bool {
fn for_each_key_value<F, P>(&self, mut f: F/*, _retained_hint: P*/) where F: FnMut(&K, u8)/*, P: FnMut(&K) -> bool*/ {
for (k, v) in self { f(k, *v) }
}
Expand All @@ -73,7 +86,7 @@ impl<K, S> KVSet<K> for HashMap<K, u8, S> {
}
fn retain_keys<F, P, R>(&mut self, mut filter: F, _retained_earlier: P, _remove_count: R)
where F: FnMut(&K) -> bool, P: FnMut(&K) -> bool, R: FnMut() -> usize
where F: FnMut(&K) -> bool/*, P: FnMut(&K) -> bool*/, R: FnMut() -> usize
{
self.retain(|k, _| filter(k));
}
Expand All @@ -82,7 +95,7 @@ impl<K, S> KVSet<K> for HashMap<K, u8, S> {
impl<K: Ord> KVSet<K> for BTreeMap<K, u8> {
#[inline] fn kv_len(&self) -> usize { self.len() }
fn for_each_key_value<F, P>(&self, mut f: F, _retained_hint: P) where F: FnMut(&K, u8), P: FnMut(&K) -> bool {
fn for_each_key_value<F, P>(&self, mut f: F/*, _retained_hint: P*/) where F: FnMut(&K, u8)/*, P: FnMut(&K) -> bool*/ {
for (k, v) in self { f(k, *v) }
}
Expand All @@ -91,11 +104,11 @@ impl<K: Ord> KVSet<K> for BTreeMap<K, u8> {
}
fn retain_keys<F, P, R>(&mut self, mut filter: F, _retained_earlier: P, _remove_count: R)
where F: FnMut(&K) -> bool, P: FnMut(&K) -> bool, R: FnMut() -> usize
where F: FnMut(&K) -> bool/*, P: FnMut(&K) -> bool*/, R: FnMut() -> usize
{
self.retain(|k, _| filter(k));
}
}
}*/

/// Implements [`KVSet`], storing keys and values in the mutable slices.
///
Expand All @@ -122,27 +135,34 @@ impl<'k, K> SlicesMutSource<'k, K> {
}
}

impl<'k, K: Sync> KVSet<K> for SlicesMutSource<'k, K> {
impl<'k, K> KVSet<K> for SlicesMutSource<'k, K> {
#[inline(always)] fn kv_len(&self) -> usize { self.len }

#[inline(always)] fn for_each_key_value<F, P>(&self, mut f: F, _retained_hint: P) where F: FnMut(&K, u8), P: FnMut(&K) -> bool {
#[inline(always)] fn for_each_key_value<F>(&self, mut f: F/*, _retained_hint: P*/) where F: FnMut(&K, u8)/*, P: FnMut(&K) -> bool*/ {
for (k, v) in self.keys[0..self.len].iter().zip(self.values[0..self.len].iter()) {
f(k, *v);
}
}

fn bits_per_value(&self) -> u8 {
#[inline] fn bits_per_value(&self) -> u8 {
self.bits_per_value
}

#[inline(always)] fn map_each_key_value<R, M, P>(&self, mut map: M, _retained_hint: P) -> Vec<R>
where M: FnMut(&K, u8) -> R, P: FnMut(&K) -> bool
fn value_distribution(&self/*, _retained_hint: P*/) -> Box<[usize]> {
let mut counts = [0usize; 256];
for v in self.values.iter() { counts[*v as usize] += 1; }
let counts_len = remove_zeros(&mut counts);
counts[0..counts_len].into_iter().cloned().collect()
}

#[inline(always)] fn map_each_key_value<R, M>(&self, mut map: M/*, _retained_hint: P*/) -> Vec<R>
where M: FnMut(&K, u8) -> R/*, P: FnMut(&K) -> bool*/
{
self.keys[0..self.len].into_iter().zip(self.values[0..self.len].into_iter()).map(|(k, v)| map(k, *v)).collect()
}

fn retain_keys<F, P, R>(&mut self, mut filter: F, _retained_earlier: P, _remove_count: R)
where F: FnMut(&K) -> bool, P: FnMut(&K) -> bool, R: FnMut() -> usize
fn retain_keys<F>(&mut self, mut filter: F/*, _retained_earlier: P, _remove_count: R*/)
where F: FnMut(&K) -> bool/*, P: FnMut(&K) -> bool, R: FnMut() -> usize*/
{
let mut i = 0usize;
while i < self.len {
Expand All @@ -156,4 +176,6 @@ impl<'k, K: Sync> KVSet<K> for SlicesMutSource<'k, K> {
}
}
}


}
51 changes: 46 additions & 5 deletions csf/src/fp/level_sizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,25 @@ use std::mem::MaybeUninit;
use fsum::FSum;
use std::fmt;
use std::fmt::Formatter;
use super::kvset::KVSet;

/// Chooses the size of level for the given sequence of retained values.
pub trait LevelSizer {

/// Returns number of 64-bit segments to use for given sequence of retained `values`.
fn size_segments<VIt, F>(&self, _values: F, values_len: usize, _bits_per_value: u8) -> usize
fn size_segments_for_values<VIt, F>(&self, _values: F, values_len: usize, _bits_per_value: u8) -> usize
where VIt: IntoIterator<Item = u64>, F: FnMut() -> VIt
{
self.max_size_segments(values_len)
}

/// Returns number of 64-bit segments to use for given sequence of retained `values`.
fn size_segments<K, KV: KVSet<K>>(&self, kv: &KV) -> usize
{
self.max_size_segments(kv.kv_len())
}


/// Returns maximal number of segment that can be returned by `size_segments` for level of size `max_level_size` or less.
fn max_size_segments(&self, max_level_size: usize) -> usize;
}
Expand Down Expand Up @@ -144,7 +152,7 @@ impl OptimalLevelSize {
}*/

impl LevelSizer for OptimalLevelSize {
fn size_segments<VIt, F>(&self, mut values: F, values_len: usize, bits_per_value: u8) -> usize
fn size_segments_for_values<VIt, F>(&self, mut values: F, values_len: usize, bits_per_value: u8) -> usize
where VIt: IntoIterator<Item = u64>, F: FnMut() -> VIt
{
let mut counts = [0u32; 256]; // TODO support bits_per_value > 8
Expand All @@ -156,6 +164,18 @@ impl LevelSizer for OptimalLevelSize {
)
}

fn size_segments<K, KV: KVSet<K>>(&self, kv: &KV) -> usize
{
let mut counts = [0u32; 256]; // TODO support bits_per_value > 8
kv.for_each_key_value(|_, v| counts[v as usize] += 1);
let bits_per_value = kv.bits_per_value();
Self::size_segments_for_dist(
&mut counts[0..(1usize<<bits_per_value)],
kv.kv_len(),
bits_per_value
)
}

fn max_size_segments(&self, max_level_size: usize) -> usize {
ceiling_div(max_level_size, 64)
}
Expand Down Expand Up @@ -186,7 +206,7 @@ impl OptimalGroupedLevelSize {
}

impl LevelSizer for OptimalGroupedLevelSize {
fn size_segments<VIt, F>(&self, mut values: F, values_len: usize, bits_per_value: u8) -> usize
fn size_segments_for_values<VIt, F>(&self, mut values: F, values_len: usize, bits_per_value: u8) -> usize
where VIt: IntoIterator<Item = u64>, F: FnMut() -> VIt
{
let divider = self.divider as usize;
Expand All @@ -202,6 +222,22 @@ impl LevelSizer for OptimalGroupedLevelSize {
}).min().unwrap()
}

fn size_segments<K, KV: KVSet<K>>(&self, kv: &KV) -> usize
{
let bits_per_value = kv.bits_per_value();
let divider = self.divider as usize;
let max_value = (1usize<<bits_per_value) - 1;
(0..divider).map(|delta| {
let mut counts = [0u32; 256]; // TODO support for bits_per_value > 8
kv.for_each_key_value(|_, v| counts[(v as usize + delta) / divider] += 1);
OptimalLevelSize::size_segments_for_dist(
&mut counts[0 ..= (max_value + delta) / divider],
kv.kv_len(),
bits_per_value // this must be unchanged as it is used to calculate memory used by a value
)
}).min().unwrap()
}

fn max_size_segments(&self, max_level_size: usize) -> usize {
ceiling_div(max_level_size, 64)
}
Expand Down Expand Up @@ -231,10 +267,15 @@ impl<LSC> ResizedLevel<LSC> {
}

impl<LSC: LevelSizer> LevelSizer for ResizedLevel<LSC> {
#[inline] fn size_segments<VIt, F>(&self, values: F, values_len: usize, bits_per_value: u8) -> usize
#[inline] fn size_segments_for_values<VIt, F>(&self, values: F, values_len: usize, bits_per_value: u8) -> usize
where VIt: IntoIterator<Item = u64>, F: FnMut() -> VIt
{
self.resized(self.level_size_chooser.size_segments(values, values_len, bits_per_value))
self.resized(self.level_size_chooser.size_segments_for_values(values, values_len, bits_per_value))
}

fn size_segments<K, KV: KVSet<K>>(&self, kv: &KV) -> usize
{
self.resized(self.level_size_chooser.size_segments(kv))
}

#[inline] fn max_size_segments(&self, max_level_size: usize) -> usize {
Expand Down
3 changes: 0 additions & 3 deletions csf/src/fp/map/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,5 @@ impl<LSC, CS: CollisionSolverBuilder, S: BuildSeededHasher> MapConf<LSC, CS, S>
pub fn lsize_cs_hash(level_size_chooser: LSC, collision_solver: CS, hash: S) -> Self {
Self { level_sizer: level_size_chooser, collision_solver, hash }
}
pub fn lsize_cs_hash_bpv(level_size_chooser: LSC, collision_solver: CS, hash: S, bits_per_value: u8) -> Self {
Self { level_sizer: level_size_chooser, collision_solver, hash }
}
}

Loading

0 comments on commit fc4d759

Please sign in to comment.