diff --git a/crates/storage/src/bloom.rs b/crates/storage/src/bloom.rs index d82145d95a..8bb141bd38 100644 --- a/crates/storage/src/bloom.rs +++ b/crates/storage/src/bloom.rs @@ -60,7 +60,6 @@ //! specific set of keys without having to load and check each individual bloom //! filter. -use std::collections::BTreeSet; use std::sync::{Arc, Mutex}; use bloomfilter::Bloom; @@ -68,34 +67,42 @@ use cached::{Cached, SizedCache}; use pathfinder_common::BlockNumber; use pathfinder_crypto::Felt; -pub const AGGREGATE_BLOOM_BLOCK_RANGE_LEN: u64 = AggregateBloom::BLOCK_RANGE_LEN; +/// Maximum number of blocks to aggregate in a single `AggregateBloom`. +#[cfg(not(test))] +pub const AGGREGATE_BLOOM_BLOCK_RANGE_LEN: u64 = 8192; + +/// Make testing faster and easier by using a smaller range. +#[cfg(test)] +pub const AGGREGATE_BLOOM_BLOCK_RANGE_LEN: u64 = 16; /// An aggregate of all Bloom filters for a given range of blocks. /// Before being added to `AggregateBloom`, each [`BloomFilter`] is /// rotated by 90 degrees (transposed). #[derive(Clone)] pub struct AggregateBloom { - /// A [Self::BLOCK_RANGE_LEN] by [BloomFilter::BITVEC_LEN] matrix stored in - /// a single array. + /// A [AGGREGATE_BLOOM_BLOCK_RANGE_LEN] by [BloomFilter::BITVEC_LEN] matrix + /// stored in a single array. bitmap: Vec, + /// Starting (inclusive) block number for the range of blocks that this /// aggregate covers. pub from_block: BlockNumber, + /// Ending (inclusive) block number for the range of blocks that this /// aggregate covers. pub to_block: BlockNumber, } impl AggregateBloom { - /// Maximum number of blocks to aggregate in a single `AggregateBloom`. - pub const BLOCK_RANGE_LEN: u64 = 8192; - const BLOCK_RANGE_BYTES: u64 = Self::BLOCK_RANGE_LEN / 8; + /// Number of bytes that an `AggregateBloom` block range is represented by. + const BLOCK_RANGE_BYTES: usize = AGGREGATE_BLOOM_BLOCK_RANGE_LEN as usize / 8; - /// Create a new `AggregateBloom` for the (`from_block`, `from_block` + - /// [`block_range_length`](Self::BLOCK_RANGE_LEN) - 1) range. + /// Create a new `AggregateBloom` for the following range: + /// + /// \[`from_block`, `from_block + (AGGREGATE_BLOOM_BLOCK_RANGE_LEN) - 1`\] pub fn new(from_block: BlockNumber) -> Self { - let to_block = from_block + Self::BLOCK_RANGE_LEN - 1; - let bitmap = vec![0; Self::BLOCK_RANGE_BYTES as usize * BloomFilter::BITVEC_LEN as usize]; + let to_block = from_block + AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1; + let bitmap = vec![0; Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN]; Self::from_parts(from_block, to_block, bitmap) } @@ -107,7 +114,7 @@ impl AggregateBloom { ) -> Self { let bitmap = zstd::bulk::decompress( &compressed_bitmap, - AggregateBloom::BLOCK_RANGE_BYTES as usize * BloomFilter::BITVEC_LEN as usize, + Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN, ) .expect("Decompressing aggregate Bloom filter"); @@ -115,9 +122,9 @@ impl AggregateBloom { } fn from_parts(from_block: BlockNumber, to_block: BlockNumber, bitmap: Vec) -> Self { - assert_eq!(from_block + Self::BLOCK_RANGE_LEN - 1, to_block); + assert_eq!(from_block + AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1, to_block); assert_eq!( - bitmap.len() as u64, + bitmap.len(), Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN ); @@ -141,7 +148,7 @@ impl AggregateBloom { /// /// Panics if the block number is not in the range of blocks that this /// aggregate covers. - pub fn add_bloom(&mut self, bloom: &BloomFilter, block_number: BlockNumber) { + pub fn insert(&mut self, bloom: &BloomFilter, block_number: BlockNumber) { assert!( (self.from_block..=self.to_block).contains(&block_number), "Block number {} is not in the range {}..={}", @@ -151,91 +158,190 @@ impl AggregateBloom { ); assert_eq!(bloom.0.number_of_hash_functions(), BloomFilter::K_NUM); - let bloom = bloom.0.bit_vec().to_bytes(); - assert_eq!(bloom.len() as u64, BloomFilter::BITVEC_BYTES); + let bloom_bytes = bloom.0.bit_vec().to_bytes(); + assert_eq!(bloom_bytes.len(), BloomFilter::BITVEC_BYTES); - let relative_block_number = block_number.get() - self.from_block.get(); - let byte_idx = (relative_block_number / 8) as usize; - let bit_idx = (relative_block_number % 8) as usize; - for (i, bloom_byte) in bloom.iter().enumerate() { - if *bloom_byte == 0 { - continue; - } + let relative_block_number = usize::try_from(block_number.get() - self.from_block.get()) + .expect("usize can fit a u64"); - let base = 8 * i; - for j in 0..8 { - let row_idx = base + j; - *self.bitmap_at_mut(row_idx, byte_idx) |= ((bloom_byte >> (7 - j)) & 1) << bit_idx; - } - } + // Column in the bitmap. + let byte_idx = relative_block_number / 8; + // Block number offset within a bitmap byte. + let bit_idx = relative_block_number % 8; + + bloom_bytes + .into_iter() + .enumerate() + .filter(|(_, b)| *b != 0) + .for_each(|(i, bloom_byte)| { + let row_idx_base = 8 * i; + + // Each bit (possible key index) in the Bloom filter has its own row. + for offset in 0..8 { + let row_idx = (row_idx_base + offset) * Self::BLOCK_RANGE_BYTES; + let bitmap_idx = row_idx + byte_idx; + // Reverse the offsets so that the most significant bit is considered as the + // first. + let bit = (bloom_byte >> (7 - offset)) & 1; + self.bitmap[bitmap_idx] |= bit << (7 - bit_idx); + } + }); } - /// Returns a set of [block numbers](BlockNumber) for which the given keys - /// are present in the aggregate. - pub fn blocks_for_keys(&self, keys: &[Felt]) -> BTreeSet { + /// Returns a [bit array](BlockRange) where each bit position represents an + /// offset from the [starting block][Self::from_block] of the aggregate + /// filter. If the bit is set, this block contains one of the gives keys. + /// False positives are possible. + /// + /// See [BlockRange::iter_ones]. + pub fn blocks_for_keys(&self, keys: &[Felt]) -> BlockRange { if keys.is_empty() { - return self.all_blocks(); + return BlockRange::FULL; } - let mut block_matches = BTreeSet::new(); + let mut block_matches = BlockRange::EMPTY; for k in keys { - let mut row_to_check = vec![u8::MAX; Self::BLOCK_RANGE_BYTES as usize]; + let mut matches_for_key = BlockRange::FULL; let indices = BloomFilter::indices_for_key(k); for row_idx in indices { - for (col_idx, row_byte) in row_to_check.iter_mut().enumerate() { - *row_byte &= self.bitmap_at(row_idx, col_idx); - } - } + let row_start = row_idx * Self::BLOCK_RANGE_BYTES; + let row_end = row_start + Self::BLOCK_RANGE_BYTES; - for (col_idx, byte) in row_to_check.iter().enumerate() { - if *byte == 0 { - continue; - } + let block_range = BlockRange::copy_from_slice(&self.bitmap[row_start..row_end]); - for i in 0..8 { - if byte & (1 << i) != 0 { - let match_number = self.from_block + col_idx as u64 * 8 + i as u64; - block_matches.insert(match_number); - } - } + matches_for_key &= block_range; } + + block_matches |= matches_for_key; } block_matches } +} - pub(super) fn all_blocks(&self) -> BTreeSet { - (self.from_block.get()..=self.to_block.get()) - .map(BlockNumber::new_or_panic) - .collect() +impl std::fmt::Debug for AggregateBloom { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AggregateBloom") + .field("from_block", &self.from_block) + .field("to_block", &self.to_block) + .field("bitmap_hash", &"...") + .finish() } +} - fn bitmap_at(&self, row: usize, col: usize) -> u8 { - let idx = row * Self::BLOCK_RANGE_BYTES as usize + col; - self.bitmap[idx] +/// A [`AGGREGATE_BLOOM_BLOCK_RANGE_LEN`] sized bit array. Each bit represents +/// an offset from the starting block of an [`AggregateBloom`]. +/// +/// Intended use is for return values of functions that check presence of keys +/// inside an [`AggregateBloom`] filter. If a bit at position N is set, then the +/// `aggregate_blom.from_block + N` [block number](BlockNumber) contains the +/// given key. False positives are possible. +#[derive(Clone, Debug, PartialEq)] +pub(crate) struct BlockRange([u8; AggregateBloom::BLOCK_RANGE_BYTES]); + +#[allow(dead_code)] +impl BlockRange { + /// An empty `BlockRange`. + pub(crate) const EMPTY: BlockRange = BlockRange([u8::MIN; AggregateBloom::BLOCK_RANGE_BYTES]); + + /// A full `BlockRange`. + pub(crate) const FULL: BlockRange = BlockRange([u8::MAX; AggregateBloom::BLOCK_RANGE_BYTES]); + + /// Create a `BlockRange` from a byte slice. + /// + /// # Panics + /// + /// Panics if the slice is not of length + /// [`AggregateBloom::BLOCK_RANGE_BYTES`]. + fn copy_from_slice(s: &[u8]) -> Self { + assert_eq!(s.len(), AggregateBloom::BLOCK_RANGE_BYTES); + let mut bytes = [0; AggregateBloom::BLOCK_RANGE_BYTES]; + bytes.copy_from_slice(s); + Self(bytes) } - fn bitmap_at_mut(&mut self, row: usize, col: usize) -> &mut u8 { - let idx = row * Self::BLOCK_RANGE_BYTES as usize + col; - &mut self.bitmap[idx] + /// Set the value of a bit at the given index. + /// + /// # Panics + /// + /// Panics if the index is out of bounds of the block range. + fn set(&mut self, idx: usize, value: bool) { + assert!(idx < AGGREGATE_BLOOM_BLOCK_RANGE_LEN as usize); + + let byte_idx = idx / 8; + let bit_idx = idx % 8; + if value { + self.0[byte_idx] |= 1 << (7 - bit_idx); + } else { + self.0[byte_idx] &= !(1 << (7 - bit_idx)); + } + } + + /// Create an iterator over the indices of bits that are set. + pub(crate) fn iter_ones(&self) -> impl Iterator + '_ { + self.iter_val(true) + } + + /// Create an iterator over the indices of bits that are not set. + pub(crate) fn iter_zeros(&self) -> impl Iterator + '_ { + self.iter_val(false) + } + + fn iter_val(&self, val: bool) -> impl Iterator + '_ { + self.0 + .iter() + .enumerate() + .flat_map(move |(byte_idx, &byte)| { + (0..8).filter_map(move |bit_idx| { + if (byte >> (7 - bit_idx)) & 1 == val as u8 { + Some(byte_idx * 8 + bit_idx) + } else { + None + } + }) + }) } } -impl std::fmt::Debug for AggregateBloom { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - use std::hash::{DefaultHasher, Hash, Hasher}; +impl Default for BlockRange { + fn default() -> Self { + Self::EMPTY + } +} - let mut hasher = DefaultHasher::new(); - self.bitmap.hash(&mut hasher); - let bitmap_hash = hasher.finish(); +impl std::ops::BitAndAssign for BlockRange { + fn bitand_assign(&mut self, rhs: Self) { + for (a, b) in self.0.iter_mut().zip(rhs.0.iter()) { + *a &= b; + } + } +} - f.debug_struct("AggregateBloom") - .field("from_block", &self.from_block) - .field("to_block", &self.to_block) - .field("bitmap_hash", &format!("{:#x}", bitmap_hash)) - .finish() +impl std::ops::BitAnd for BlockRange { + type Output = Self; + + fn bitand(mut self, rhs: Self) -> Self::Output { + self &= rhs; + self + } +} + +impl std::ops::BitOrAssign for BlockRange { + fn bitor_assign(&mut self, rhs: Self) { + for (a, b) in self.0.iter_mut().zip(rhs.0.iter()) { + *a |= b; + } + } +} + +impl std::ops::BitOr for BlockRange { + type Output = Self; + + fn bitor(mut self, rhs: Self) -> Self::Output { + self |= rhs; + self } } @@ -250,14 +356,18 @@ struct CacheKey { pub(crate) struct AggregateBloomCache(Mutex>>); impl AggregateBloomCache { + /// Create a new cache with the given size. pub fn with_size(size: usize) -> Self { Self(Mutex::new(SizedCache::with_size(size))) } + /// Reset the cache. Removes all entries and frees the memory. pub fn reset(&self) { self.0.lock().unwrap().cache_reset(); } + /// Retrieve all [AggregateBloom] filters whose range of blocks overlaps + /// with the given range. pub fn get_many( &self, from_block: BlockNumber, @@ -269,17 +379,17 @@ impl AggregateBloomCache { let to_block = to_block.get(); // Align to the nearest lower multiple of BLOCK_RANGE_LEN. - let from_block_aligned = from_block - from_block % AggregateBloom::BLOCK_RANGE_LEN; + let from_block_aligned = from_block - from_block % AGGREGATE_BLOOM_BLOCK_RANGE_LEN; // Align to the nearest higher multiple of BLOCK_RANGE_LEN, then subtract 1 // (zero based indexing). - let to_block_aligned = to_block + AggregateBloom::BLOCK_RANGE_LEN - - (to_block % AggregateBloom::BLOCK_RANGE_LEN) + let to_block_aligned = to_block + AGGREGATE_BLOOM_BLOCK_RANGE_LEN + - (to_block % AGGREGATE_BLOOM_BLOCK_RANGE_LEN) - 1; (from_block_aligned..=to_block_aligned) - .step_by(AggregateBloom::BLOCK_RANGE_LEN as usize) + .step_by(AGGREGATE_BLOOM_BLOCK_RANGE_LEN as usize) .map(|from| { - let to = from + AggregateBloom::BLOCK_RANGE_LEN - 1; + let to = from + AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1; ( BlockNumber::new_or_panic(from), BlockNumber::new_or_panic(to), @@ -295,6 +405,7 @@ impl AggregateBloomCache { .collect() } + /// Store the given filters in the cache. pub fn set_many(&self, filters: &[Arc]) { let mut cache = self.0.lock().unwrap(); filters.iter().for_each(|filter| { @@ -312,15 +423,15 @@ pub(crate) struct BloomFilter(Bloom); impl BloomFilter { // The size of the bitmap used by the Bloom filter. - const BITVEC_LEN: u64 = 16_384; + const BITVEC_LEN: usize = 16_384; // The size of the bitmap used by the Bloom filter (in bytes). - const BITVEC_BYTES: u64 = Self::BITVEC_LEN / 8; + const BITVEC_BYTES: usize = Self::BITVEC_LEN / 8; // The number of hash functions used by the Bloom filter. // We need this value to be able to re-create the filter with the deserialized // bitmap. const K_NUM: u32 = 12; // The maximal number of items anticipated to be inserted into the Bloom filter. - const ITEMS_COUNT: u32 = 1024; + const ITEMS_COUNT: usize = 1024; // The seed used by the hash functions of the filter. // This is a randomly generated vector of 32 bytes. const SEED: [u8; 32] = [ @@ -330,18 +441,14 @@ impl BloomFilter { ]; pub fn new() -> Self { - let bloom = Bloom::new_with_seed( - Self::BITVEC_BYTES as usize, - Self::ITEMS_COUNT as usize, - &Self::SEED, - ); + let bloom = Bloom::new_with_seed(Self::BITVEC_BYTES, Self::ITEMS_COUNT, &Self::SEED); assert_eq!(bloom.number_of_hash_functions(), Self::K_NUM); Self(bloom) } pub fn from_compressed_bytes(bytes: &[u8]) -> Self { - let bytes = zstd::bulk::decompress(bytes, Self::BITVEC_BYTES as usize * 2) + let bytes = zstd::bulk::decompress(bytes, Self::BITVEC_BYTES * 2) .expect("Decompressing Bloom filter"); Self::from_bytes(&bytes) } @@ -353,7 +460,7 @@ impl BloomFilter { let k4 = u64::from_le_bytes(Self::SEED[24..32].try_into().unwrap()); let bloom = Bloom::from_existing( bytes, - Self::BITVEC_BYTES * 8, + Self::BITVEC_BYTES as u64 * 8, Self::K_NUM, [(k1, k2), (k3, k4)], ); @@ -403,6 +510,17 @@ mod tests { const KEY_NOT_IN_FILTER: Felt = felt!("0x0218b538681900fad5a0b2ffe1d6781c0c3f14df5d32071ace0bdc9d46cb69ec"); + macro_rules! blockrange { + ($($block:expr),* $(,)?) => {{ + let mut bits = BlockRange::EMPTY; + $( + let idx = $block.get() - BlockNumber::GENESIS.get(); + bits.set(idx as usize, true); + )* + bits + }}; + } + mod filters { use super::*; @@ -415,10 +533,10 @@ mod tests { bloom.set(&KEY); bloom.set(&KEY1); - aggregate_bloom_filter.add_bloom(&bloom, from_block); + aggregate_bloom_filter.insert(&bloom, from_block); let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]); - let expected = BTreeSet::from_iter(vec![from_block]); + let expected = blockrange![from_block]; assert_eq!(block_matches, expected); } @@ -430,11 +548,11 @@ mod tests { let mut bloom = BloomFilter::new(); bloom.set(&KEY); - aggregate_bloom_filter.add_bloom(&bloom, from_block); - aggregate_bloom_filter.add_bloom(&bloom, from_block + 1); + aggregate_bloom_filter.insert(&bloom, from_block); + aggregate_bloom_filter.insert(&bloom, from_block + 1); let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]); - let expected = BTreeSet::from_iter(vec![from_block, from_block + 1]); + let expected = blockrange![from_block, from_block + 1]; assert_eq!(block_matches, expected); } @@ -447,11 +565,11 @@ mod tests { bloom.set(&KEY); bloom.set(&KEY1); - aggregate_bloom_filter.add_bloom(&bloom, from_block); - aggregate_bloom_filter.add_bloom(&bloom, from_block + 1); + aggregate_bloom_filter.insert(&bloom, from_block); + aggregate_bloom_filter.insert(&bloom, from_block + 1); let block_matches_empty = aggregate_bloom_filter.blocks_for_keys(&[KEY_NOT_IN_FILTER]); - assert_eq!(block_matches_empty, BTreeSet::new()); + assert_eq!(block_matches_empty, BlockRange::EMPTY); } #[test] @@ -462,8 +580,8 @@ mod tests { let mut bloom = BloomFilter::new(); bloom.set(&KEY); - aggregate_bloom_filter.add_bloom(&bloom, from_block); - aggregate_bloom_filter.add_bloom(&bloom, from_block + 1); + aggregate_bloom_filter.insert(&bloom, from_block); + aggregate_bloom_filter.insert(&bloom, from_block + 1); let compressed_bitmap = aggregate_bloom_filter.compress_bitmap(); let mut decompressed = AggregateBloom::from_existing_compressed( @@ -471,14 +589,14 @@ mod tests { aggregate_bloom_filter.to_block, compressed_bitmap, ); - decompressed.add_bloom(&bloom, from_block + 2); + decompressed.insert(&bloom, from_block + 2); let block_matches = decompressed.blocks_for_keys(&[KEY]); - let expected = BTreeSet::from_iter(vec![from_block, from_block + 1, from_block + 2]); - assert_eq!(block_matches, expected,); + let expected = blockrange![from_block, from_block + 1, from_block + 2]; + assert_eq!(block_matches, expected); let block_matches_empty = decompressed.blocks_for_keys(&[KEY_NOT_IN_FILTER]); - assert_eq!(block_matches_empty, BTreeSet::new()); + assert_eq!(block_matches_empty, BlockRange::EMPTY); } #[test] @@ -490,10 +608,10 @@ mod tests { let mut bloom = BloomFilter::new(); bloom.set(&KEY); - aggregate_bloom_filter.add_bloom(&bloom, from_block); + aggregate_bloom_filter.insert(&bloom, from_block); - let invalid_insert_pos = from_block + AggregateBloom::BLOCK_RANGE_LEN; - aggregate_bloom_filter.add_bloom(&bloom, invalid_insert_pos); + let invalid_insert_pos = from_block + AGGREGATE_BLOOM_BLOCK_RANGE_LEN; + aggregate_bloom_filter.insert(&bloom, invalid_insert_pos); } } @@ -509,44 +627,49 @@ mod tests { #[test] fn set_then_get_many_aligned() { - let cache = AggregateBloomCache::with_size(2); + let cache = AggregateBloomCache::with_size(3); - let first_range_start = BlockNumber::GENESIS; - let second_range_start = BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN; - let second_range_end = second_range_start + AggregateBloom::BLOCK_RANGE_LEN - 1; + let range_start1 = BlockNumber::GENESIS; + let range_start2 = BlockNumber::GENESIS + AGGREGATE_BLOOM_BLOCK_RANGE_LEN; + let range_start3 = BlockNumber::GENESIS + 2 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN; + + let range_end2 = range_start2 + AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1; let filters = vec![ - Arc::new(AggregateBloom::new(first_range_start)), - Arc::new(AggregateBloom::new(second_range_start)), + Arc::new(AggregateBloom::new(range_start1)), + Arc::new(AggregateBloom::new(range_start2)), + Arc::new(AggregateBloom::new(range_start3)), ]; cache.set_many(&filters); - let retrieved = cache.get_many(first_range_start, second_range_end); + let retrieved = cache.get_many(range_start1, range_end2); - assert_eq!(retrieved, filters); + assert_eq!(retrieved, filters[0..2]); } #[test] fn set_then_get_many_unaligned() { - let cache = AggregateBloomCache::with_size(2); + let cache = AggregateBloomCache::with_size(3); - let first_range_start = BlockNumber::GENESIS; - let second_range_start = BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN; + let range_start1 = BlockNumber::GENESIS; + let range_start2 = BlockNumber::GENESIS + AGGREGATE_BLOOM_BLOCK_RANGE_LEN; + let range_start3 = BlockNumber::GENESIS + 2 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN; let filters = vec![ - Arc::new(AggregateBloom::new(first_range_start)), - Arc::new(AggregateBloom::new(second_range_start)), + Arc::new(AggregateBloom::new(range_start1)), + Arc::new(AggregateBloom::new(range_start2)), + Arc::new(AggregateBloom::new(range_start3)), ]; - let start = first_range_start + 15; - let end = second_range_start + 15; + let start = range_start2 + 15; + let end = range_start3 + 15; cache.set_many(&filters); let retrieved = cache.get_many(start, end); - assert_eq!(retrieved, filters); + assert_eq!(retrieved, &filters[1..3]); } #[test] @@ -556,24 +679,57 @@ mod tests { let filters = vec![ Arc::new(AggregateBloom::new(BlockNumber::GENESIS)), Arc::new(AggregateBloom::new( - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN, + BlockNumber::GENESIS + AGGREGATE_BLOOM_BLOCK_RANGE_LEN, )), Arc::new(AggregateBloom::new( - BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN, + BlockNumber::GENESIS + 2 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN, )), Arc::new(AggregateBloom::new( - BlockNumber::GENESIS + 3 * AggregateBloom::BLOCK_RANGE_LEN, + BlockNumber::GENESIS + 3 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN, )), ]; cache.set_many(&filters); - let first_range_start = BlockNumber::GENESIS; - let second_range_end = BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN - 1; + let range_start1 = BlockNumber::GENESIS; + let range_end2 = BlockNumber::GENESIS + 2 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1; - let retrieved = cache.get_many(first_range_start, second_range_end); + let retrieved = cache.get_many(range_start1, range_end2); assert_eq!(retrieved, filters[0..2].to_vec()); } + + #[test] + fn cache_edge_cases() { + let cache = AggregateBloomCache::with_size(2); + + let first_range_start = BlockNumber::GENESIS; + let first_range_end = first_range_start + AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1; + let second_range_start = first_range_end + 1; + let second_range_end = second_range_start + AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1; + + let filters = vec![ + Arc::new(AggregateBloom::new(first_range_start)), + Arc::new(AggregateBloom::new(second_range_start)), + ]; + + cache.set_many(&filters); + + // Edge cases around the lower bound. + let retrieved = cache.get_many(first_range_end - 1, second_range_end); + assert_eq!(retrieved, filters[..]); + let retrieved = cache.get_many(first_range_end, second_range_end); + assert_eq!(retrieved, filters[..]); + let retrieved = cache.get_many(first_range_end + 1, second_range_end); + assert_eq!(retrieved, filters[1..]); + + // Edge cases around the upper bound. + let retrieved = cache.get_many(first_range_start, first_range_end - 1); + assert_eq!(retrieved, filters[0..1]); + let retrieved = cache.get_many(first_range_start, first_range_end); + assert_eq!(retrieved, filters[0..1]); + let retrieved = cache.get_many(first_range_start, first_range_end + 1); + assert_eq!(retrieved, filters[0..=1]); + } } } diff --git a/crates/storage/src/connection/event.rs b/crates/storage/src/connection/event.rs index e0ec7a08d6..1993db105e 100644 --- a/crates/storage/src/connection/event.rs +++ b/crates/storage/src/connection/event.rs @@ -1,4 +1,3 @@ -use std::collections::BTreeSet; use std::num::NonZeroUsize; use std::rc::Rc; use std::sync::Arc; @@ -16,7 +15,7 @@ use pathfinder_common::{ }; use rusqlite::types::Value; -use crate::bloom::{AggregateBloom, BloomFilter}; +use crate::bloom::{AggregateBloom, BlockRange, BloomFilter}; use crate::prelude::*; // We're using the upper 4 bits of the 32 byte representation of a felt @@ -109,7 +108,7 @@ impl Transaction<'_> { bloom.set_address(&event.from_address); } - running_event_filter.filter.add_bloom(&bloom, block_number); + running_event_filter.filter.insert(&bloom, block_number); running_event_filter.next_block = block_number + 1; // This check is the reason that blocks cannot be skipped, if they were we would @@ -490,43 +489,53 @@ impl Transaction<'_> { impl AggregateBloom { /// Returns the block numbers that match the given constraints. - pub fn check(&self, constraints: &EventConstraints) -> BTreeSet { + pub fn check(&self, constraints: &EventConstraints) -> Vec { let addr_blocks = self.check_address(constraints.contract_address); let keys_blocks = self.check_keys(&constraints.keys); - addr_blocks.intersection(&keys_blocks).cloned().collect() + let block_matches = addr_blocks & keys_blocks; + + block_matches + .iter_ones() + .map(|offset| self.from_block + offset as u64) + .collect() } - fn check_address(&self, address: Option) -> BTreeSet { + fn check_address(&self, address: Option) -> BlockRange { match address { Some(addr) => self.blocks_for_keys(&[addr.0]), - None => self.all_blocks(), + None => BlockRange::FULL, } } - fn check_keys(&self, keys: &[Vec]) -> BTreeSet { - if keys.is_empty() { - return self.all_blocks(); + fn check_keys(&self, keys: &[Vec]) -> BlockRange { + if keys.is_empty() || keys.iter().any(Vec::is_empty) { + return BlockRange::FULL; } - keys.iter() - .enumerate() - .map(|(idx, key_group)| { - let indexed_keys: Vec<_> = key_group - .iter() - .map(|key| { - let mut key_with_idx = key.0; - key_with_idx.as_mut_be_bytes()[0] |= (idx as u8) << 4; - key_with_idx - }) - .collect(); + let mut result = BlockRange::FULL; - self.blocks_for_keys(&indexed_keys) - }) - .reduce(|blocks, blocks_for_key| { - blocks.intersection(&blocks_for_key).cloned().collect() - }) - .unwrap_or_default() + for (idx, key_group) in keys.iter().enumerate() { + let indexed_keys: Vec<_> = key_group + .iter() + .map(|key| { + let mut key_with_idx = key.0; + key_with_idx.as_mut_be_bytes()[0] |= (idx as u8) << 4; + key_with_idx + }) + .collect(); + + let blocks_for_key = self.blocks_for_keys(&indexed_keys); + + // No point to continue AND operations with an empty range. + if blocks_for_key == BlockRange::EMPTY { + return BlockRange::EMPTY; + } + + result &= blocks_for_key; + } + + result } } @@ -552,7 +561,7 @@ pub(crate) struct RunningEventFilter { /// Rebuild the [event filter](RunningEventFilter) for the range of blocks /// between the last stored `to_block` in the event filter table and the last /// overall block in the database. This is needed because the aggregate event -/// filter for each [block range](crate::bloom::AggregateBloom::BLOCK_RANGE_LEN) +/// filter for each [block range](crate::bloom::AGGREGATE_BLOOM_BLOCK_RANGE_LEN) /// is stored once the range is complete, before that it is kept in memory and /// can be lost upon shutdown. pub(crate) fn rebuild_running_event_filter( @@ -688,7 +697,7 @@ pub(crate) fn rebuild_running_event_filter( }; let block_number = first_running_event_filter_block + block as u64; - filter.add_bloom(bloom, block_number); + filter.insert(bloom, block_number); } Ok(RunningEventFilter { @@ -741,14 +750,15 @@ mod tests { use pathfinder_common::{transaction as common, BlockHeader, BlockTimestamp, EntryPoint, Fee}; use pathfinder_crypto::Felt; use pretty_assertions_sorted::assert_eq; + use rstest::rstest; use super::*; - use crate::test_utils; + use crate::{test_utils, AGGREGATE_BLOOM_BLOCK_RANGE_LEN}; static MAX_BLOCKS_TO_SCAN: LazyLock = LazyLock::new(|| NonZeroUsize::new(100).unwrap()); - static MAX_BLOOM_FILTERS_TO_LOAD: LazyLock = - LazyLock::new(|| NonZeroUsize::new(3).unwrap()); + static MAX_EVENT_FILTERS_TO_LOAD: LazyLock = + LazyLock::new(|| NonZeroUsize::new(10).unwrap()); mod event_bloom { use pretty_assertions_sorted::assert_eq; @@ -763,8 +773,8 @@ mod tests { filter.set_keys(&[event_key!("0xdeadbeef")]); filter.set_address(&contract_address!("0x1234")); - aggregate.add_bloom(&filter, BlockNumber::GENESIS); - aggregate.add_bloom(&filter, BlockNumber::GENESIS + 1); + aggregate.insert(&filter, BlockNumber::GENESIS); + aggregate.insert(&filter, BlockNumber::GENESIS + 1); let constraints = EventConstraints { from_block: None, to_block: None, @@ -776,7 +786,7 @@ mod tests { assert_eq!( aggregate.check(&constraints), - BTreeSet::from_iter(vec![BlockNumber::GENESIS, BlockNumber::GENESIS + 1]) + vec![BlockNumber::GENESIS, BlockNumber::GENESIS + 1] ); } @@ -788,8 +798,8 @@ mod tests { filter.set_keys(&[event_key!("0xdeadbeef")]); filter.set_address(&contract_address!("0x1234")); - aggregate.add_bloom(&filter, BlockNumber::GENESIS); - aggregate.add_bloom(&filter, BlockNumber::GENESIS + 1); + aggregate.insert(&filter, BlockNumber::GENESIS); + aggregate.insert(&filter, BlockNumber::GENESIS + 1); let constraints = EventConstraints { from_block: None, to_block: None, @@ -799,7 +809,7 @@ mod tests { offset: 0, }; - assert_eq!(aggregate.check(&constraints), BTreeSet::new()); + assert_eq!(aggregate.check(&constraints), Vec::::new()); } #[test] @@ -810,8 +820,8 @@ mod tests { filter.set_keys(&[event_key!("0xdeadbeef")]); filter.set_address(&contract_address!("0x1234")); - aggregate.add_bloom(&filter, BlockNumber::GENESIS); - aggregate.add_bloom(&filter, BlockNumber::GENESIS + 1); + aggregate.insert(&filter, BlockNumber::GENESIS); + aggregate.insert(&filter, BlockNumber::GENESIS + 1); let constraints = EventConstraints { from_block: None, to_block: None, @@ -821,7 +831,7 @@ mod tests { offset: 0, }; - assert_eq!(aggregate.check(&constraints), BTreeSet::new()); + assert_eq!(aggregate.check(&constraints), Vec::::new()); } #[test] @@ -832,8 +842,8 @@ mod tests { filter.set_address(&contract_address!("0x1234")); filter.set_keys(&[event_key!("0xdeadbeef")]); - aggregate.add_bloom(&filter, BlockNumber::GENESIS); - aggregate.add_bloom(&filter, BlockNumber::GENESIS + 1); + aggregate.insert(&filter, BlockNumber::GENESIS); + aggregate.insert(&filter, BlockNumber::GENESIS + 1); let constraints = EventConstraints { from_block: None, to_block: None, @@ -848,19 +858,25 @@ mod tests { offset: 0, }; - assert_eq!(aggregate.check(&constraints), BTreeSet::new()); + assert_eq!(aggregate.check(&constraints), Vec::::new()); } #[test] fn no_constraints() { + fn all_blocks(bloom: &AggregateBloom) -> Vec { + (bloom.from_block.get()..=bloom.to_block.get()) + .map(BlockNumber::new_or_panic) + .collect() + } + let mut aggregate = AggregateBloom::new(BlockNumber::GENESIS); let mut filter = BloomFilter::new(); filter.set_keys(&[event_key!("0xdeadbeef")]); filter.set_address(&contract_address!("0x1234")); - aggregate.add_bloom(&filter, BlockNumber::GENESIS); - aggregate.add_bloom(&filter, BlockNumber::GENESIS + 1); + aggregate.insert(&filter, BlockNumber::GENESIS); + aggregate.insert(&filter, BlockNumber::GENESIS + 1); let constraints = EventConstraints { from_block: None, to_block: None, @@ -870,7 +886,7 @@ mod tests { offset: 0, }; - assert_eq!(aggregate.check(&constraints), aggregate.all_blocks()); + assert_eq!(aggregate.check(&constraints), all_blocks(&aggregate)); } } @@ -886,7 +902,7 @@ mod tests { from_block: Some(expected_event.block_number), to_block: Some(expected_event.block_number), contract_address: Some(expected_event.from_address), - // we're using a key which is present in _all_ events as the 2nd key + // We're using a key which is present in _all_ events as the 2nd key. keys: vec![vec![], vec![event_key!("0xdeadbeef")]], page_size: test_utils::NUM_EVENTS, offset: 0, @@ -896,7 +912,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1001,7 +1017,7 @@ mod tests { offset: 0, }, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap() .events @@ -1040,7 +1056,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1075,7 +1091,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1108,7 +1124,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); pretty_assertions_sorted::assert_eq!( @@ -1138,7 +1154,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); pretty_assertions_sorted::assert_eq!( @@ -1172,7 +1188,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1206,7 +1222,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1239,7 +1255,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1260,7 +1276,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1292,7 +1308,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1324,7 +1340,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1351,7 +1367,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1378,7 +1394,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1411,7 +1427,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1449,7 +1465,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1477,7 +1493,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1505,7 +1521,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1533,7 +1549,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1558,7 +1574,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1590,7 +1606,7 @@ mod tests { .events( &constraints, 1.try_into().unwrap(), - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1617,7 +1633,7 @@ mod tests { .events( &constraints, 1.try_into().unwrap(), - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1634,30 +1650,12 @@ mod tests { #[test] fn crossing_event_filter_range_stores_and_updates_running() { - let blocks: Vec = [ - // First event filter start. - BlockNumber::GENESIS, - BlockNumber::GENESIS + 1, - BlockNumber::GENESIS + 2, - BlockNumber::GENESIS + 3, - // End. - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN - 1, - // Second event filter start. - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN, - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 1, - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 2, - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 3, - // End. - BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN - 1, - // Third event filter start. - BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN, - BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN + 1, - ] - .iter() - .map(|&n| n.get() as usize) - .collect(); + // Two and a half ranges. + let n_blocks = 2 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN + AGGREGATE_BLOOM_BLOCK_RANGE_LEN / 2; + let n_blocks = usize::try_from(n_blocks).unwrap(); - let (storage, _) = test_utils::setup_custom_test_storage(&blocks, 2); + let (storage, test_data) = test_utils::setup_custom_test_storage(n_blocks, 1); + let emitted_events = test_data.events; let mut connection = storage.connection().unwrap(); let tx = connection.transaction().unwrap(); @@ -1673,37 +1671,46 @@ mod tests { // Running event filter starts from next block range. assert_eq!( running_event_filter.filter.from_block, - 2 * AggregateBloom::BLOCK_RANGE_LEN + 2 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN + ); + // Lock needed in `events()`. + drop(running_event_filter); + + let constraints = EventConstraints { + from_block: None, + to_block: None, + contract_address: None, + // We're using a key which is present in _all_ events as the 2nd key. + keys: vec![vec![], vec![event_key!("0xdeadbeef")]], + page_size: emitted_events.len(), + offset: 0, + }; + + let events = tx + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_EVENT_FILTERS_TO_LOAD, + ) + .unwrap(); + assert_eq!( + events, + PageOfEvents { + events: emitted_events, + continuation_token: None, + } ); } #[test] fn event_filter_filter_load_limit() { - let blocks: Vec = [ - // First event filter start. - BlockNumber::GENESIS, - BlockNumber::GENESIS + 1, - BlockNumber::GENESIS + 2, - BlockNumber::GENESIS + 3, - // End. - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN - 1, - // Second event filter start. - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN, - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 1, - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 2, - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 3, - // End. - BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN - 1, - // Third event filter start. - BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN, - BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN + 1, - ] - .iter() - .map(|&n| n.get() as usize) - .collect(); + let n_blocks = 2 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN + AGGREGATE_BLOOM_BLOCK_RANGE_LEN / 2; + let n_blocks = usize::try_from(n_blocks).unwrap(); - let (storage, test_data) = test_utils::setup_custom_test_storage(&blocks, 2); + let (storage, test_data) = test_utils::setup_custom_test_storage(n_blocks, 1); let emitted_events = test_data.events; + let events_per_block = emitted_events.len() / n_blocks; + let mut connection = storage.connection().unwrap(); let tx = connection.transaction().unwrap(); @@ -1721,20 +1728,22 @@ mod tests { .events(&constraints, *MAX_BLOCKS_TO_SCAN, 1.try_into().unwrap()) .unwrap(); - let first_event_filter_range = BlockNumber::GENESIS.get()..AggregateBloom::BLOCK_RANGE_LEN; - for event in events.events { - // ...but only events from the first bloom filter range are returned. - assert!( - first_event_filter_range.contains(&event.block_number.get()), - "Event block number: {} should have been in the range: {:?}", - event.block_number.get(), - first_event_filter_range - ); - } - let continue_from_block = events.continuation_token.unwrap().block_number; - assert_eq!(continue_from_block, first_event_filter_range.end); + let block_range_len = usize::try_from(AGGREGATE_BLOOM_BLOCK_RANGE_LEN).unwrap(); + assert_eq!( + events, + PageOfEvents { + // ...but only events from the first bloom filter range are returned... + events: emitted_events[..events_per_block * block_range_len].to_vec(), + // ...with a continuation token pointing to the next block range. + continuation_token: Some(ContinuationToken { + block_number: BlockNumber::new_or_panic(AGGREGATE_BLOOM_BLOCK_RANGE_LEN), + offset: 0, + }) + } + ); let constraints_with_offset = EventConstraints { + // Use the provided continuation token. from_block: Some(events.continuation_token.unwrap().block_number), to_block: None, contract_address: None, @@ -1751,19 +1760,96 @@ mod tests { 1.try_into().unwrap(), ) .unwrap(); - assert!(events.continuation_token.is_none()); - - let second_event_filter_range = - AggregateBloom::BLOCK_RANGE_LEN..(2 * AggregateBloom::BLOCK_RANGE_LEN); - let third_event_filter_range = - 2 * AggregateBloom::BLOCK_RANGE_LEN..(3 * AggregateBloom::BLOCK_RANGE_LEN); - for event in events.events { - // ...but only events from the second (loaded) and third (running) event filter - // range are returned. - assert!( - (second_event_filter_range.start..third_event_filter_range.end) - .contains(&event.block_number.get()) - ); + assert_eq!( + events, + PageOfEvents { + // ...but only events from the second (loaded) and third (running) event filter + // range are returned... + events: emitted_events[events_per_block * block_range_len..].to_vec(), + // ...without a continuation token. + continuation_token: None, + } + ); + } + + #[rustfmt::skip] + #[rstest] + #[case(0, 0, 0, 0, 0)] // 0 ..=(N ) + #[case(0, 0, -1, 0, 0)] // 0 ..=(N - 1) + #[case(0, 0, 1, 0, 2)] // 0 ..=(N + 1) + #[case(1, 0, 0, 1, 1)] // (N )..=(2 * N) + #[case(1, -2, 0, 0, 1)] // (N - 1)..=(2 * N) + #[case(1, 1, 0, 1, 1)] // (N + 1)..=(2 * N) + fn event_filter_edge_cases( + #[case] range_idx: usize, + #[case] offset_from: i32, + #[case] offset_to: i32, + #[case] range_start_idx: usize, + #[case] range_end_idx: usize, + ) { + use std::collections::BTreeSet; + + fn contained_blocks(page: &PageOfEvents) -> BTreeSet { + page.events + .iter() + .map(|event| event.block_number) + .collect::>() } + + let n_blocks = 3 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN; + let n_blocks = usize::try_from(n_blocks).unwrap(); + + let (storage, test_data) = test_utils::setup_custom_test_storage(n_blocks, 1); + let emitted_events = test_data.events; + + let mut connection = storage.connection().unwrap(); + let tx = connection.transaction().unwrap(); + + let ranges = [ + ( + BlockNumber::GENESIS, + BlockNumber::GENESIS + AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1, + ), + ( + BlockNumber::GENESIS + AGGREGATE_BLOOM_BLOCK_RANGE_LEN, + BlockNumber::GENESIS + 2 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1, + ), + ( + BlockNumber::GENESIS + 2 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN, + BlockNumber::GENESIS + 3 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1, + ), + ( + BlockNumber::GENESIS + 3 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN, + BlockNumber::GENESIS + 4 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1, + ), + ]; + + let from_block = ranges[range_idx].0.get() as i32 + offset_from; + let to_block = ranges[range_idx].1.get() as i32 + offset_to; + + let constraints = EventConstraints { + from_block: Some(BlockNumber::new_or_panic(u64::try_from(from_block).unwrap())), + to_block: Some(BlockNumber::new_or_panic(u64::try_from(to_block).unwrap())), + contract_address: None, + keys: vec![], + page_size: emitted_events.len(), + offset: 0, + }; + + let page = tx + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_EVENT_FILTERS_TO_LOAD, + ) + .unwrap(); + let blocks = contained_blocks(&page); + + let expected = (ranges[range_start_idx].0.get()..=ranges[range_end_idx].1.get()) + .filter(|&block| (from_block..=to_block).contains(&(block as i32))) + .map(BlockNumber::new_or_panic) + .collect::>(); + + assert_eq!(blocks, expected); } } diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index a845654307..e76f4f520c 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -561,7 +561,17 @@ fn schema_version(connection: &rusqlite::Connection) -> anyhow::Result { #[cfg(test)] mod tests { + use std::num::NonZeroUsize; + use std::sync::LazyLock; + + use rstest::rstest; + use test_utils::*; + use super::*; + static MAX_BLOCKS_TO_SCAN: LazyLock = + LazyLock::new(|| NonZeroUsize::new(100).unwrap()); + static MAX_EVENT_FILTERS_TO_LOAD: LazyLock = + LazyLock::new(|| NonZeroUsize::new(5).unwrap()); #[test] fn schema_version_defaults_to_zero() { @@ -678,22 +688,13 @@ mod tests { #[test] fn running_event_filter_rebuilt_after_shutdown() { - use std::num::NonZeroUsize; - use std::sync::LazyLock; - - use test_utils::*; - - static MAX_BLOCKS_TO_SCAN: LazyLock = - LazyLock::new(|| NonZeroUsize::new(10).unwrap()); - static MAX_EVENT_FILTERS_TO_LOAD: LazyLock = - LazyLock::new(|| NonZeroUsize::new(3).unwrap()); - - let blocks = [0, 1, 2, 3, 4, 5]; + let n_blocks = 6; let transactions_per_block = 2; - let headers = create_blocks(&blocks); + let headers = create_blocks(n_blocks); let transactions_and_receipts = - create_transactions_and_receipts(blocks.len(), transactions_per_block); - let emitted_events = extract_events(&headers, &transactions_and_receipts); + create_transactions_and_receipts(n_blocks, transactions_per_block); + let emitted_events = + extract_events(&headers, &transactions_and_receipts, transactions_per_block); let insert_block_data = |tx: &Transaction<'_>, idx: usize| { let header = &headers[idx]; @@ -798,4 +799,107 @@ mod tests { assert!(events_after.contains(&e)); } } + + #[rstest] + #[case::block_before_full_range(AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1, 0)] + #[case::full_block_range(AGGREGATE_BLOOM_BLOCK_RANGE_LEN, 1)] + #[case::block_after_full_range(AGGREGATE_BLOOM_BLOCK_RANGE_LEN + 1, 1)] + fn rebuild_running_event_filter_edge_cases( + #[case] n_blocks: u64, + #[case] expected_insert_count: u64, + ) { + let n_blocks = usize::try_from(n_blocks).unwrap(); + let transactions_per_block = 1; + let headers = create_blocks(n_blocks); + let transactions_and_receipts = + create_transactions_and_receipts(n_blocks, transactions_per_block); + let emitted_events = + extract_events(&headers, &transactions_and_receipts, transactions_per_block); + let events_per_block = emitted_events.len() / n_blocks; + + let insert_block_data = |tx: &Transaction<'_>, idx: usize| { + let header = &headers[idx]; + + tx.insert_block_header(header).unwrap(); + tx.insert_transaction_data( + header.number, + &transactions_and_receipts + [idx * transactions_per_block..(idx + 1) * transactions_per_block] + .iter() + .cloned() + .map(|(tx, receipt, ..)| (tx, receipt)) + .collect::>(), + Some( + &transactions_and_receipts + [idx * transactions_per_block..(idx + 1) * transactions_per_block] + .iter() + .cloned() + .map(|(_, _, events)| events) + .collect::>(), + ), + ) + .unwrap(); + }; + + let db = crate::StorageBuilder::in_memory().unwrap(); + let db_path = Arc::clone(&db.0.database_path).to_path_buf(); + + // Keep this around so that the in-memory database doesn't get dropped. + let mut rsqlite_conn = rusqlite::Connection::open(&db_path).unwrap(); + + let mut conn = db.connection().unwrap(); + let tx = conn.transaction().unwrap(); + + for i in 0..n_blocks { + insert_block_data(&tx, i); + } + + // Pretend like we shut down by dropping these. + tx.commit().unwrap(); + drop(conn); + drop(db); + + let db = crate::StorageBuilder::file(db_path) + .journal_mode(JournalMode::Rollback) + .migrate() + .unwrap() + .create_pool(NonZeroU32::new(5).unwrap()) + .unwrap(); + + let mut conn = db.connection().unwrap(); + let tx = conn.transaction().unwrap(); + + let to_block = BlockNumber::GENESIS + n_blocks as u64; + + let constraints = EventConstraints { + from_block: None, + to_block: Some(to_block), + contract_address: None, + keys: vec![], + page_size: 1024, + offset: 0, + }; + + let events = tx + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_EVENT_FILTERS_TO_LOAD, + ) + .unwrap() + .events; + + let inserted_event_filter_count = rsqlite_conn + .transaction() + .unwrap() + .prepare("SELECT COUNT(*) FROM event_filters") + .unwrap() + .query_row([], |row| row.get::<_, u64>(0)) + .unwrap(); + + assert_eq!(inserted_event_filter_count, expected_insert_count); + + let expected = &emitted_events[..events_per_block * n_blocks]; + assert_eq!(events, expected); + } } diff --git a/crates/storage/src/schema/revision_0066.rs b/crates/storage/src/schema/revision_0066.rs index 6cfa73ea36..0da4d72282 100644 --- a/crates/storage/src/schema/revision_0066.rs +++ b/crates/storage/src/schema/revision_0066.rs @@ -31,8 +31,9 @@ pub(crate) fn migrate(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { } /// Migrate individual event bloom filters to the new aggregate table. We only -/// need to migrate all of the [AggregateBloom::BLOCK_RANGE_LEN] sized chunks. -/// The remainder will be reconstructed by the [crate::StorageManager] as the +/// need to migrate all of the [crate::bloom::AGGREGATE_BLOOM_BLOCK_RANGE_LEN] +/// sized chunks. The remainder will be reconstructed by the +/// [crate::StorageManager] as the /// [RunningEventFilter](crate::connection::event::RunningEventFilter). fn migrate_event_filters(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { let bloom_filter_count = tx @@ -74,7 +75,7 @@ fn migrate_event_filters(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { while let Some(bloom_filter) = bloom_filters.next().transpose()? { let current_block = BlockNumber::new_or_panic(migrated_count); - aggregate.add_bloom(&bloom_filter, current_block); + aggregate.insert(&bloom_filter, current_block); if current_block == aggregate.to_block { insert_aggregate_stmt diff --git a/crates/storage/src/test_utils.rs b/crates/storage/src/test_utils.rs index ab0a0d0a13..694ff845b9 100644 --- a/crates/storage/src/test_utils.rs +++ b/crates/storage/src/test_utils.rs @@ -25,36 +25,43 @@ pub const NUM_TRANSACTIONS: usize = NUM_BLOCKS * TRANSACTIONS_PER_BLOCK; pub const NUM_EVENTS: usize = NUM_BLOCKS * EVENTS_PER_BLOCK; /// Creates a custom set of [BlockHeader]s with arbitrary values. -pub(crate) fn create_blocks(block_numbers: &[usize]) -> Vec { - block_numbers - .iter() - .enumerate() - .map(|(i, &block_number)| { +pub(crate) fn create_blocks(n_blocks: usize) -> Vec { + (0..n_blocks) + .map(|block_number| { let storage_commitment = - StorageCommitment(Felt::from_hex_str(&"b".repeat(i + 3)).unwrap()); - let class_commitment = ClassCommitment(Felt::from_hex_str(&"c".repeat(i + 3)).unwrap()); - let index_as_felt = Felt::from_be_slice(&[i as u8]).unwrap(); + StorageCommitment(Felt::from_hex_str(&"b".repeat(block_number + 3)).unwrap()); + let class_commitment = + ClassCommitment(Felt::from_hex_str(&"c".repeat(block_number + 3)).unwrap()); + let index_as_felt = Felt::from_be_slice(&[block_number as u8]).unwrap(); BlockHeader::builder() .number(BlockNumber::GENESIS + block_number as u64) - .timestamp(BlockTimestamp::new_or_panic(i as u64 + 500)) + .timestamp(BlockTimestamp::new_or_panic(block_number as u64 + 500)) .calculated_state_commitment(storage_commitment, class_commitment) - .eth_l1_gas_price(GasPrice::from(i as u64)) + .eth_l1_gas_price(GasPrice::from(block_number as u64)) .sequencer_address(SequencerAddress(index_as_felt)) .transaction_commitment(TransactionCommitment(index_as_felt)) .event_commitment(EventCommitment(index_as_felt)) - .finalize_with_hash(BlockHash(Felt::from_hex_str(&"a".repeat(i + 3)).unwrap())) + .finalize_with_hash(BlockHash( + Felt::from_hex_str(&"a".repeat(block_number + 3)).unwrap(), + )) }) .collect::>() } /// Creates a custom test set of transactions and receipts. pub(crate) fn create_transactions_and_receipts( - block_count: usize, + n_blocks: usize, transactions_per_block: usize, ) -> Vec<(Transaction, Receipt, Vec)> { - let n = block_count * transactions_per_block; - let transactions = (0..n).map(|i| match i % transactions_per_block { + let n_transactions = n_blocks * transactions_per_block; + assert!( + n_transactions < 64, + "Too many transactions ({} > {}), `Felt::from_hex_str() will overflow.", + n_transactions, + 64 + ); + let transactions = (0..n_transactions).map(|i| match i % transactions_per_block { x if x < INVOKE_TRANSACTIONS_PER_BLOCK => Transaction { hash: TransactionHash(Felt::from_hex_str(&"4".repeat(i + 3)).unwrap()), variant: TransactionVariant::InvokeV0(InvokeTransactionV0 { @@ -110,60 +117,62 @@ pub(crate) fn create_transactions_and_receipts( }, }); - let tx_receipt = transactions.enumerate().map(|(i, tx)| { - let receipt = Receipt { - actual_fee: Fee::ZERO, - execution_resources: ExecutionResources { - builtins: Default::default(), - n_steps: i as u64 + 987, - n_memory_holes: i as u64 + 1177, - data_availability: L1Gas { - l1_gas: i as u128 + 124, - l1_data_gas: i as u128 + 457, - }, - total_gas_consumed: L1Gas { - l1_gas: i as u128 + 333, - l1_data_gas: i as u128 + 666, + transactions + .enumerate() + .map(|(i, tx)| { + let receipt = Receipt { + actual_fee: Fee::ZERO, + execution_resources: ExecutionResources { + builtins: Default::default(), + n_steps: i as u64 + 987, + n_memory_holes: i as u64 + 1177, + data_availability: L1Gas { + l1_gas: i as u128 + 124, + l1_data_gas: i as u128 + 457, + }, + total_gas_consumed: L1Gas { + l1_gas: i as u128 + 333, + l1_data_gas: i as u128 + 666, + }, + l2_gas: Default::default(), }, - l2_gas: Default::default(), - }, - transaction_hash: tx.hash, - transaction_index: TransactionIndex::new_or_panic(i as u64 + 2311), - ..Default::default() - }; - let events = if i % transactions_per_block < EVENTS_PER_BLOCK { - vec![pathfinder_common::event::Event { - from_address: ContractAddress::new_or_panic( - Felt::from_hex_str(&"2".repeat(i + 3)).unwrap(), - ), - data: vec![EventData(Felt::from_hex_str(&"c".repeat(i + 3)).unwrap())], - keys: vec![ - EventKey(Felt::from_hex_str(&"d".repeat(i + 3)).unwrap()), - event_key!("0xdeadbeef"), - ], - }] - } else { - vec![] - }; - - (tx, receipt, events) - }); + transaction_hash: tx.hash, + transaction_index: TransactionIndex::new_or_panic(i as u64 + 2311), + ..Default::default() + }; + let events = if i % transactions_per_block < EVENTS_PER_BLOCK { + vec![pathfinder_common::event::Event { + from_address: ContractAddress::new_or_panic( + Felt::from_hex_str(&"2".repeat(i + 3)).unwrap(), + ), + data: vec![EventData(Felt::from_hex_str(&"c".repeat(i + 3)).unwrap())], + keys: vec![ + EventKey(Felt::from_hex_str(&"d".repeat(i + 3)).unwrap()), + event_key!("0xdeadbeef"), + ], + }] + } else { + vec![] + }; - tx_receipt.collect::>() + (tx, receipt, events) + }) + .collect::>() } /// Creates a set of emitted events from given blocks and transactions. pub(crate) fn extract_events( blocks: &[BlockHeader], transactions: &[(Transaction, Receipt, Vec)], + transactions_per_block: usize, ) -> Vec { transactions .iter() .enumerate() .filter_map(|(i, (txn, _, events))| { - if i % TRANSACTIONS_PER_BLOCK < EVENTS_PER_BLOCK { + if i % transactions_per_block < EVENTS_PER_BLOCK { let event = &events[0]; - let block = &blocks[i / TRANSACTIONS_PER_BLOCK]; + let block = &blocks[i / transactions_per_block]; Some(EmittedEvent { data: event.data.clone(), @@ -191,24 +200,22 @@ pub struct TestData { /// [BlockHeader]s starting from L2 genesis, with arbitrary other values and a /// set of expected emitted events. pub fn setup_test_storage() -> (crate::Storage, TestData) { - let block_numbers: Vec = (0..NUM_BLOCKS).collect(); - setup_custom_test_storage(&block_numbers, TRANSACTIONS_PER_BLOCK) + setup_custom_test_storage(NUM_BLOCKS, TRANSACTIONS_PER_BLOCK) } -// Creates an in-memory storage instance with custom block numbers (not -// necessarily consecutive) and a custom number of transactions per block, with -// a set of expected emitted events. +// Creates an in-memory storage instance with N blocks and a custom number of +// transactions per block, with a set of expected emitted events. pub fn setup_custom_test_storage( - block_numbers: &[usize], + n_blocks: usize, transactions_per_block: usize, ) -> (crate::Storage, TestData) { let storage = crate::StorageBuilder::in_memory().unwrap(); let mut connection = storage.connection().unwrap(); let tx = connection.transaction().unwrap(); - let headers = create_blocks(block_numbers); + let headers = create_blocks(n_blocks); let transactions_and_receipts = - create_transactions_and_receipts(block_numbers.len(), transactions_per_block); + create_transactions_and_receipts(n_blocks, transactions_per_block); for (i, header) in headers.iter().enumerate() { tx.insert_block_header(header).unwrap(); @@ -234,7 +241,7 @@ pub fn setup_custom_test_storage( tx.commit().unwrap(); - let events = extract_events(&headers, &transactions_and_receipts); + let events = extract_events(&headers, &transactions_and_receipts, transactions_per_block); let (transactions, receipts): (Vec<_>, Vec<_>) = transactions_and_receipts .into_iter() .map(|(transaction, receipts, _)| (transaction, receipts))