From f0ff69b9cb60d74c0df1a91f8430494ca7900693 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 21 Dec 2023 06:59:20 -0800 Subject: [PATCH] Rename: AtomicBloom to ConcurrentBloom (#34483) --- bloom/benches/bloom.rs | 4 ++-- bloom/src/bloom.rs | 25 ++++++++++++++----------- gossip/src/crds_gossip_pull.rs | 6 +++--- gossip/src/push_active_set.rs | 6 +++--- 4 files changed, 22 insertions(+), 19 deletions(-) diff --git a/bloom/benches/bloom.rs b/bloom/benches/bloom.rs index 522b45ee6b8963..a0a9ed684a3423 100644 --- a/bloom/benches/bloom.rs +++ b/bloom/benches/bloom.rs @@ -5,7 +5,7 @@ use { bv::BitVec, fnv::FnvHasher, rand::Rng, - solana_bloom::bloom::{AtomicBloom, Bloom, BloomHashIndex}, + solana_bloom::bloom::{Bloom, BloomHashIndex, ConcurrentBloom}, solana_sdk::{ hash::{hash, Hash}, signature::Signature, @@ -128,7 +128,7 @@ fn bench_add_hash_atomic(bencher: &mut Bencher) { .collect(); let mut fail = 0; bencher.iter(|| { - let bloom: AtomicBloom<_> = Bloom::random(1287, 0.1, 7424).into(); + let bloom: ConcurrentBloom<_> = Bloom::random(1287, 0.1, 7424).into(); // Intentionally not using parallelism here, so that this and above // benchmark only compare the bit-vector ops. // For benchmarking the parallel code, change bellow for loop to: diff --git a/bloom/src/bloom.rs b/bloom/src/bloom.rs index d75301f6b61f23..58065bf6b275bb 100644 --- a/bloom/src/bloom.rs +++ b/bloom/src/bloom.rs @@ -141,16 +141,19 @@ impl> BloomHashIndex for T { } } -pub struct AtomicBloom { +/// Bloom filter that can be used concurrently. +/// Concurrent reads/writes are safe, but are not atomic at the struct level, +/// this means that reads may see partial writes. +pub struct ConcurrentBloom { num_bits: u64, keys: Vec, bits: Vec, _phantom: PhantomData, } -impl From> for AtomicBloom { +impl From> for ConcurrentBloom { fn from(bloom: Bloom) -> Self { - AtomicBloom { + ConcurrentBloom { num_bits: bloom.bits.len(), keys: bloom.keys, bits: bloom @@ -164,7 +167,7 @@ impl From> for AtomicBloom { } } -impl AtomicBloom { +impl ConcurrentBloom { fn pos(&self, key: &T, hash_index: u64) -> (usize, u64) { let pos = key .hash_at_index(hash_index) @@ -199,15 +202,15 @@ impl AtomicBloom { }) } - pub fn clear_for_tests(&mut self) { + pub fn clear(&self) { self.bits.iter().for_each(|bit| { bit.store(0u64, Ordering::Relaxed); }); } } -impl From> for Bloom { - fn from(atomic_bloom: AtomicBloom) -> Self { +impl From> for Bloom { + fn from(atomic_bloom: ConcurrentBloom) -> Self { let bits: Vec<_> = atomic_bloom .bits .into_iter() @@ -325,7 +328,7 @@ mod test { let hash_values: Vec<_> = std::iter::repeat_with(generate_random_hash) .take(1200) .collect(); - let bloom: AtomicBloom<_> = Bloom::::random(1287, 0.1, 7424).into(); + let bloom: ConcurrentBloom<_> = Bloom::::random(1287, 0.1, 7424).into(); assert_eq!(bloom.keys.len(), 3); assert_eq!(bloom.num_bits, 6168); assert_eq!(bloom.bits.len(), 97); @@ -360,7 +363,7 @@ mod test { let num_bits_set = bloom.num_bits_set; assert!(num_bits_set > 2000, "# bits set: {num_bits_set}"); // Round-trip with no inserts. - let bloom: AtomicBloom<_> = bloom.into(); + let bloom: ConcurrentBloom<_> = bloom.into(); assert_eq!(bloom.num_bits, 9731); assert_eq!(bloom.bits.len(), (9731 + 63) / 64); for hash_value in &hash_values { @@ -372,7 +375,7 @@ mod test { assert!(bloom.contains(hash_value)); } // Round trip, re-inserting the same hash values. - let bloom: AtomicBloom<_> = bloom.into(); + let bloom: ConcurrentBloom<_> = bloom.into(); hash_values.par_iter().for_each(|v| { bloom.add(v); }); @@ -389,7 +392,7 @@ mod test { let more_hash_values: Vec<_> = std::iter::repeat_with(generate_random_hash) .take(1000) .collect(); - let bloom: AtomicBloom<_> = bloom.into(); + let bloom: ConcurrentBloom<_> = bloom.into(); assert_eq!(bloom.num_bits, 9731); assert_eq!(bloom.bits.len(), (9731 + 63) / 64); more_hash_values.par_iter().for_each(|v| { diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index 191406dd6720d8..7f70e79bf0add5 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -28,7 +28,7 @@ use { Rng, }, rayon::{prelude::*, ThreadPool}, - solana_bloom::bloom::{AtomicBloom, Bloom}, + solana_bloom::bloom::{Bloom, ConcurrentBloom}, solana_sdk::{ hash::{hash, Hash}, native_token::LAMPORTS_PER_SOL, @@ -141,7 +141,7 @@ impl CrdsFilter { /// A vector of crds filters that together hold a complete set of Hashes. struct CrdsFilterSet { - filters: Vec>>, + filters: Vec>>, mask_bits: u32, } @@ -159,7 +159,7 @@ impl CrdsFilterSet { let k = rng.gen_range(0..indices.len()); let k = indices.swap_remove(k); let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize); - filters[k] = Some(AtomicBloom::::from(filter)); + filters[k] = Some(ConcurrentBloom::::from(filter)); } Self { filters, mask_bits } } diff --git a/gossip/src/push_active_set.rs b/gossip/src/push_active_set.rs index 8b6dcb6f58843d..1e7e3cbb22844c 100644 --- a/gossip/src/push_active_set.rs +++ b/gossip/src/push_active_set.rs @@ -2,7 +2,7 @@ use { crate::weighted_shuffle::WeightedShuffle, indexmap::IndexMap, rand::Rng, - solana_bloom::bloom::{AtomicBloom, Bloom}, + solana_bloom::bloom::{Bloom, ConcurrentBloom}, solana_sdk::{native_token::LAMPORTS_PER_SOL, pubkey::Pubkey}, std::collections::HashMap, }; @@ -19,7 +19,7 @@ pub(crate) struct PushActiveSet([PushActiveSetEntry; NUM_PUSH_ACTIVE_SET_ENTRIES // Keys are gossip nodes to push messages to. // Values are which origins the node has pruned. #[derive(Default)] -struct PushActiveSetEntry(IndexMap>); +struct PushActiveSetEntry(IndexMap>); impl PushActiveSet { #[cfg(debug_assertions)] @@ -151,7 +151,7 @@ impl PushActiveSetEntry { if self.0.contains_key(node) { continue; } - let bloom = AtomicBloom::from(Bloom::random( + let bloom = ConcurrentBloom::from(Bloom::random( num_bloom_filter_items, Self::BLOOM_FALSE_RATE, Self::BLOOM_MAX_BITS,