Skip to content

Commit

Permalink
Rename: AtomicBloom to ConcurrentBloom (solana-labs#34483)
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge authored Dec 21, 2023
1 parent fbc3999 commit f0ff69b
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 19 deletions.
4 changes: 2 additions & 2 deletions bloom/benches/bloom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
bv::BitVec,
fnv::FnvHasher,
rand::Rng,
solana_bloom::bloom::{AtomicBloom, Bloom, BloomHashIndex},
solana_bloom::bloom::{Bloom, BloomHashIndex, ConcurrentBloom},
solana_sdk::{
hash::{hash, Hash},
signature::Signature,
Expand Down Expand Up @@ -128,7 +128,7 @@ fn bench_add_hash_atomic(bencher: &mut Bencher) {
.collect();
let mut fail = 0;
bencher.iter(|| {
let bloom: AtomicBloom<_> = Bloom::random(1287, 0.1, 7424).into();
let bloom: ConcurrentBloom<_> = Bloom::random(1287, 0.1, 7424).into();
// Intentionally not using parallelism here, so that this and above
// benchmark only compare the bit-vector ops.
// For benchmarking the parallel code, change bellow for loop to:
Expand Down
25 changes: 14 additions & 11 deletions bloom/src/bloom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,19 @@ impl<T: AsRef<[u8]>> BloomHashIndex for T {
}
}

pub struct AtomicBloom<T> {
/// Bloom filter that can be used concurrently.
/// Concurrent reads/writes are safe, but are not atomic at the struct level,
/// this means that reads may see partial writes.
pub struct ConcurrentBloom<T> {
num_bits: u64,
keys: Vec<u64>,
bits: Vec<AtomicU64>,
_phantom: PhantomData<T>,
}

impl<T: BloomHashIndex> From<Bloom<T>> for AtomicBloom<T> {
impl<T: BloomHashIndex> From<Bloom<T>> for ConcurrentBloom<T> {
fn from(bloom: Bloom<T>) -> Self {
AtomicBloom {
ConcurrentBloom {
num_bits: bloom.bits.len(),
keys: bloom.keys,
bits: bloom
Expand All @@ -164,7 +167,7 @@ impl<T: BloomHashIndex> From<Bloom<T>> for AtomicBloom<T> {
}
}

impl<T: BloomHashIndex> AtomicBloom<T> {
impl<T: BloomHashIndex> ConcurrentBloom<T> {
fn pos(&self, key: &T, hash_index: u64) -> (usize, u64) {
let pos = key
.hash_at_index(hash_index)
Expand Down Expand Up @@ -199,15 +202,15 @@ impl<T: BloomHashIndex> AtomicBloom<T> {
})
}

pub fn clear_for_tests(&mut self) {
pub fn clear(&self) {
self.bits.iter().for_each(|bit| {
bit.store(0u64, Ordering::Relaxed);
});
}
}

impl<T: BloomHashIndex> From<AtomicBloom<T>> for Bloom<T> {
fn from(atomic_bloom: AtomicBloom<T>) -> Self {
impl<T: BloomHashIndex> From<ConcurrentBloom<T>> for Bloom<T> {
fn from(atomic_bloom: ConcurrentBloom<T>) -> Self {
let bits: Vec<_> = atomic_bloom
.bits
.into_iter()
Expand Down Expand Up @@ -325,7 +328,7 @@ mod test {
let hash_values: Vec<_> = std::iter::repeat_with(generate_random_hash)
.take(1200)
.collect();
let bloom: AtomicBloom<_> = Bloom::<Hash>::random(1287, 0.1, 7424).into();
let bloom: ConcurrentBloom<_> = Bloom::<Hash>::random(1287, 0.1, 7424).into();
assert_eq!(bloom.keys.len(), 3);
assert_eq!(bloom.num_bits, 6168);
assert_eq!(bloom.bits.len(), 97);
Expand Down Expand Up @@ -360,7 +363,7 @@ mod test {
let num_bits_set = bloom.num_bits_set;
assert!(num_bits_set > 2000, "# bits set: {num_bits_set}");
// Round-trip with no inserts.
let bloom: AtomicBloom<_> = bloom.into();
let bloom: ConcurrentBloom<_> = bloom.into();
assert_eq!(bloom.num_bits, 9731);
assert_eq!(bloom.bits.len(), (9731 + 63) / 64);
for hash_value in &hash_values {
Expand All @@ -372,7 +375,7 @@ mod test {
assert!(bloom.contains(hash_value));
}
// Round trip, re-inserting the same hash values.
let bloom: AtomicBloom<_> = bloom.into();
let bloom: ConcurrentBloom<_> = bloom.into();
hash_values.par_iter().for_each(|v| {
bloom.add(v);
});
Expand All @@ -389,7 +392,7 @@ mod test {
let more_hash_values: Vec<_> = std::iter::repeat_with(generate_random_hash)
.take(1000)
.collect();
let bloom: AtomicBloom<_> = bloom.into();
let bloom: ConcurrentBloom<_> = bloom.into();
assert_eq!(bloom.num_bits, 9731);
assert_eq!(bloom.bits.len(), (9731 + 63) / 64);
more_hash_values.par_iter().for_each(|v| {
Expand Down
6 changes: 3 additions & 3 deletions gossip/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use {
Rng,
},
rayon::{prelude::*, ThreadPool},
solana_bloom::bloom::{AtomicBloom, Bloom},
solana_bloom::bloom::{Bloom, ConcurrentBloom},
solana_sdk::{
hash::{hash, Hash},
native_token::LAMPORTS_PER_SOL,
Expand Down Expand Up @@ -141,7 +141,7 @@ impl CrdsFilter {

/// A vector of crds filters that together hold a complete set of Hashes.
struct CrdsFilterSet {
filters: Vec<Option<AtomicBloom<Hash>>>,
filters: Vec<Option<ConcurrentBloom<Hash>>>,
mask_bits: u32,
}

Expand All @@ -159,7 +159,7 @@ impl CrdsFilterSet {
let k = rng.gen_range(0..indices.len());
let k = indices.swap_remove(k);
let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize);
filters[k] = Some(AtomicBloom::<Hash>::from(filter));
filters[k] = Some(ConcurrentBloom::<Hash>::from(filter));
}
Self { filters, mask_bits }
}
Expand Down
6 changes: 3 additions & 3 deletions gossip/src/push_active_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
crate::weighted_shuffle::WeightedShuffle,
indexmap::IndexMap,
rand::Rng,
solana_bloom::bloom::{AtomicBloom, Bloom},
solana_bloom::bloom::{Bloom, ConcurrentBloom},
solana_sdk::{native_token::LAMPORTS_PER_SOL, pubkey::Pubkey},
std::collections::HashMap,
};
Expand All @@ -19,7 +19,7 @@ pub(crate) struct PushActiveSet([PushActiveSetEntry; NUM_PUSH_ACTIVE_SET_ENTRIES
// Keys are gossip nodes to push messages to.
// Values are which origins the node has pruned.
#[derive(Default)]
struct PushActiveSetEntry(IndexMap</*node:*/ Pubkey, /*origins:*/ AtomicBloom<Pubkey>>);
struct PushActiveSetEntry(IndexMap</*node:*/ Pubkey, /*origins:*/ ConcurrentBloom<Pubkey>>);

impl PushActiveSet {
#[cfg(debug_assertions)]
Expand Down Expand Up @@ -151,7 +151,7 @@ impl PushActiveSetEntry {
if self.0.contains_key(node) {
continue;
}
let bloom = AtomicBloom::from(Bloom::random(
let bloom = ConcurrentBloom::from(Bloom::random(
num_bloom_filter_items,
Self::BLOOM_FALSE_RATE,
Self::BLOOM_MAX_BITS,
Expand Down

0 comments on commit f0ff69b

Please sign in to comment.