From c9484964b57f09135e56aae139645b9cfd7069a1 Mon Sep 17 00:00:00 2001 From: sistemd Date: Wed, 25 Dec 2024 12:39:20 +0100 Subject: [PATCH 1/2] use bit array instead of `BTreeSet` --- crates/storage/src/bloom.rs | 272 +++++++++++++++------ crates/storage/src/connection/event.rs | 93 +++---- crates/storage/src/schema/revision_0066.rs | 2 +- 3 files changed, 244 insertions(+), 123 deletions(-) diff --git a/crates/storage/src/bloom.rs b/crates/storage/src/bloom.rs index d82145d95a..6d25844940 100644 --- a/crates/storage/src/bloom.rs +++ b/crates/storage/src/bloom.rs @@ -60,7 +60,6 @@ //! specific set of keys without having to load and check each individual bloom //! filter. -use std::collections::BTreeSet; use std::sync::{Arc, Mutex}; use bloomfilter::Bloom; @@ -88,14 +87,17 @@ pub struct AggregateBloom { impl AggregateBloom { /// Maximum number of blocks to aggregate in a single `AggregateBloom`. - pub const BLOCK_RANGE_LEN: u64 = 8192; - const BLOCK_RANGE_BYTES: u64 = Self::BLOCK_RANGE_LEN / 8; + #[cfg(not(test))] + pub const BLOCK_RANGE_LEN: u64 = AGGREGATE_BLOOM_BLOCK_RANGE_LEN; + #[cfg(test)] + pub const BLOCK_RANGE_LEN: u64 = AGGREGATE_BLOOM_BLOCK_RANGE_LEN_TEST; + const BLOCK_RANGE_BYTES: usize = Self::BLOCK_RANGE_LEN as usize / 8; /// Create a new `AggregateBloom` for the (`from_block`, `from_block` + /// [`block_range_length`](Self::BLOCK_RANGE_LEN) - 1) range. pub fn new(from_block: BlockNumber) -> Self { let to_block = from_block + Self::BLOCK_RANGE_LEN - 1; - let bitmap = vec![0; Self::BLOCK_RANGE_BYTES as usize * BloomFilter::BITVEC_LEN as usize]; + let bitmap = vec![0; Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN as usize]; Self::from_parts(from_block, to_block, bitmap) } @@ -107,7 +109,7 @@ impl AggregateBloom { ) -> Self { let bitmap = zstd::bulk::decompress( &compressed_bitmap, - AggregateBloom::BLOCK_RANGE_BYTES as usize * BloomFilter::BITVEC_LEN as usize, + AggregateBloom::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN as usize, ) .expect("Decompressing aggregate Bloom filter"); @@ -118,7 +120,7 @@ impl AggregateBloom { assert_eq!(from_block + Self::BLOCK_RANGE_LEN - 1, to_block); assert_eq!( bitmap.len() as u64, - Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN + Self::BLOCK_RANGE_BYTES as u64 * BloomFilter::BITVEC_LEN ); Self { @@ -141,7 +143,7 @@ impl AggregateBloom { /// /// Panics if the block number is not in the range of blocks that this /// aggregate covers. - pub fn add_bloom(&mut self, bloom: &BloomFilter, block_number: BlockNumber) { + pub fn insert(&mut self, bloom: &BloomFilter, block_number: BlockNumber) { assert!( (self.from_block..=self.to_block).contains(&block_number), "Block number {} is not in the range {}..={}", @@ -151,91 +153,190 @@ impl AggregateBloom { ); assert_eq!(bloom.0.number_of_hash_functions(), BloomFilter::K_NUM); - let bloom = bloom.0.bit_vec().to_bytes(); - assert_eq!(bloom.len() as u64, BloomFilter::BITVEC_BYTES); + let bloom_bytes = bloom.0.bit_vec().to_bytes(); + assert_eq!(bloom_bytes.len(), BloomFilter::BITVEC_BYTES as usize); - let relative_block_number = block_number.get() - self.from_block.get(); - let byte_idx = (relative_block_number / 8) as usize; - let bit_idx = (relative_block_number % 8) as usize; - for (i, bloom_byte) in bloom.iter().enumerate() { - if *bloom_byte == 0 { - continue; - } + let relative_block_number = usize::try_from(block_number.get() - self.from_block.get()) + .expect("usize can fit a u64"); - let base = 8 * i; - for j in 0..8 { - let row_idx = base + j; - *self.bitmap_at_mut(row_idx, byte_idx) |= ((bloom_byte >> (7 - j)) & 1) << bit_idx; - } - } + // Column in the bitmap. + let byte_idx = relative_block_number / 8; + // Block number offset within a bitmap byte. + let bit_idx = relative_block_number % 8; + + bloom_bytes + .into_iter() + .enumerate() + .filter(|(_, b)| *b != 0) + .for_each(|(i, bloom_byte)| { + let row_idx_base = 8 * i; + + // Each bit (possible key index) in the Bloom filter has its own row. + for offset in 0..8 { + let row_idx = (row_idx_base + offset) * AggregateBloom::BLOCK_RANGE_BYTES; + let bitmap_idx = row_idx + byte_idx; + // Reverse the offsets so that the most significant bit is considered as the + // first. + let bit = (bloom_byte >> (7 - offset)) & 1; + self.bitmap[bitmap_idx] |= bit << (7 - bit_idx); + } + }); } - /// Returns a set of [block numbers](BlockNumber) for which the given keys - /// are present in the aggregate. - pub fn blocks_for_keys(&self, keys: &[Felt]) -> BTreeSet { + /// Returns a [bit array](BlockRange) where each bit position represents an + /// offset from the [starting block][Self::from_block] of the aggregate + /// filter. If the bit is set, this block contains one of the gives keys. + /// False positives are possible. + /// + /// See [BlockRange::iter_ones]. + pub fn blocks_for_keys(&self, keys: &[Felt]) -> BlockRange { if keys.is_empty() { - return self.all_blocks(); + return BlockRange::FULL; } - let mut block_matches = BTreeSet::new(); + let mut block_matches = BlockRange::EMPTY; for k in keys { - let mut row_to_check = vec![u8::MAX; Self::BLOCK_RANGE_BYTES as usize]; + let mut matches_for_key = BlockRange::FULL; let indices = BloomFilter::indices_for_key(k); for row_idx in indices { - for (col_idx, row_byte) in row_to_check.iter_mut().enumerate() { - *row_byte &= self.bitmap_at(row_idx, col_idx); - } - } + let row_start = row_idx * Self::BLOCK_RANGE_BYTES; + let row_end = row_start + Self::BLOCK_RANGE_BYTES; - for (col_idx, byte) in row_to_check.iter().enumerate() { - if *byte == 0 { - continue; - } + let block_range = BlockRange::copy_from_slice(&self.bitmap[row_start..row_end]); - for i in 0..8 { - if byte & (1 << i) != 0 { - let match_number = self.from_block + col_idx as u64 * 8 + i as u64; - block_matches.insert(match_number); - } - } + matches_for_key &= block_range; } + + block_matches |= matches_for_key; } block_matches } +} - pub(super) fn all_blocks(&self) -> BTreeSet { - (self.from_block.get()..=self.to_block.get()) - .map(BlockNumber::new_or_panic) - .collect() +impl std::fmt::Debug for AggregateBloom { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AggregateBloom") + .field("from_block", &self.from_block) + .field("to_block", &self.to_block) + .field("bitmap_hash", &"...") + .finish() + } +} + +/// A [`AGGREGATE_BLOOM_BLOCK_RANGE_LEN`] sized bit array. Each bit represents +/// an offset from the starting block of an [`AggregateBloom`]. +/// +/// Intended use is for return values of functions that check presence of keys +/// inside an [`AggregateBloom`] filter. If a bit at position N is set, then the +/// `aggregate_blom.from_block + N` [block number](BlockNumber) contains the +/// given key. False positives are possible. +#[derive(Clone, Debug, PartialEq)] +pub(crate) struct BlockRange([u8; AggregateBloom::BLOCK_RANGE_BYTES]); + +#[allow(dead_code)] +impl BlockRange { + /// An empty `BlockRange`. + pub(crate) const EMPTY: BlockRange = BlockRange([u8::MIN; AggregateBloom::BLOCK_RANGE_BYTES]); + + /// A full `BlockRange`. + pub(crate) const FULL: BlockRange = BlockRange([u8::MAX; AggregateBloom::BLOCK_RANGE_BYTES]); + + /// Create a `BlockRange` from a byte slice. + /// + /// # Panics + /// + /// Panics if the slice is not of length + /// [`AggregateBloom::BLOCK_RANGE_BYTES`]. + fn copy_from_slice(s: &[u8]) -> Self { + assert_eq!(s.len(), AggregateBloom::BLOCK_RANGE_BYTES); + let mut bytes = [0; AggregateBloom::BLOCK_RANGE_BYTES]; + bytes.copy_from_slice(s); + Self(bytes) + } + + /// Set the value of a bit at the given index. + /// + /// # Panics + /// + /// Panics if the index is out of bounds of the block range. + fn set(&mut self, idx: usize, value: bool) { + assert!(idx < AGGREGATE_BLOOM_BLOCK_RANGE_LEN as usize); + + let byte_idx = idx / 8; + let bit_idx = idx % 8; + if value { + self.0[byte_idx] |= 1 << (7 - bit_idx); + } else { + self.0[byte_idx] &= !(1 << (7 - bit_idx)); + } + } + + /// Create an iterator over the indices of bits that are set. + pub(crate) fn iter_ones(&self) -> impl Iterator + '_ { + self.iter_val(true) } - fn bitmap_at(&self, row: usize, col: usize) -> u8 { - let idx = row * Self::BLOCK_RANGE_BYTES as usize + col; - self.bitmap[idx] + /// Create an iterator over the indices of bits that are not set. + pub(crate) fn iter_zeros(&self) -> impl Iterator + '_ { + self.iter_val(false) } - fn bitmap_at_mut(&mut self, row: usize, col: usize) -> &mut u8 { - let idx = row * Self::BLOCK_RANGE_BYTES as usize + col; - &mut self.bitmap[idx] + fn iter_val(&self, val: bool) -> impl Iterator + '_ { + self.0 + .iter() + .enumerate() + .flat_map(move |(byte_idx, &byte)| { + (0..8).filter_map(move |bit_idx| { + if (byte >> (7 - bit_idx)) & 1 == val as u8 { + Some(byte_idx * 8 + bit_idx) + } else { + None + } + }) + }) } } -impl std::fmt::Debug for AggregateBloom { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - use std::hash::{DefaultHasher, Hash, Hasher}; +impl Default for BlockRange { + fn default() -> Self { + Self::EMPTY + } +} - let mut hasher = DefaultHasher::new(); - self.bitmap.hash(&mut hasher); - let bitmap_hash = hasher.finish(); +impl std::ops::BitAndAssign for BlockRange { + fn bitand_assign(&mut self, rhs: Self) { + for (a, b) in self.0.iter_mut().zip(rhs.0.iter()) { + *a &= b; + } + } +} - f.debug_struct("AggregateBloom") - .field("from_block", &self.from_block) - .field("to_block", &self.to_block) - .field("bitmap_hash", &format!("{:#x}", bitmap_hash)) - .finish() +impl std::ops::BitAnd for BlockRange { + type Output = Self; + + fn bitand(mut self, rhs: Self) -> Self::Output { + self &= rhs; + self + } +} + +impl std::ops::BitOrAssign for BlockRange { + fn bitor_assign(&mut self, rhs: Self) { + for (a, b) in self.0.iter_mut().zip(rhs.0.iter()) { + *a |= b; + } + } +} + +impl std::ops::BitOr for BlockRange { + type Output = Self; + + fn bitor(mut self, rhs: Self) -> Self::Output { + self |= rhs; + self } } @@ -403,6 +504,17 @@ mod tests { const KEY_NOT_IN_FILTER: Felt = felt!("0x0218b538681900fad5a0b2ffe1d6781c0c3f14df5d32071ace0bdc9d46cb69ec"); + macro_rules! blockrange { + ($($block:expr),* $(,)?) => {{ + let mut bits = BlockRange::EMPTY; + $( + let idx = $block.get() - BlockNumber::GENESIS.get(); + bits.set(idx as usize, true); + )* + bits + }}; + } + mod filters { use super::*; @@ -415,10 +527,10 @@ mod tests { bloom.set(&KEY); bloom.set(&KEY1); - aggregate_bloom_filter.add_bloom(&bloom, from_block); + aggregate_bloom_filter.insert(&bloom, from_block); let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]); - let expected = BTreeSet::from_iter(vec![from_block]); + let expected = blockrange![from_block]; assert_eq!(block_matches, expected); } @@ -430,11 +542,11 @@ mod tests { let mut bloom = BloomFilter::new(); bloom.set(&KEY); - aggregate_bloom_filter.add_bloom(&bloom, from_block); - aggregate_bloom_filter.add_bloom(&bloom, from_block + 1); + aggregate_bloom_filter.insert(&bloom, from_block); + aggregate_bloom_filter.insert(&bloom, from_block + 1); let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]); - let expected = BTreeSet::from_iter(vec![from_block, from_block + 1]); + let expected = blockrange![from_block, from_block + 1]; assert_eq!(block_matches, expected); } @@ -447,11 +559,11 @@ mod tests { bloom.set(&KEY); bloom.set(&KEY1); - aggregate_bloom_filter.add_bloom(&bloom, from_block); - aggregate_bloom_filter.add_bloom(&bloom, from_block + 1); + aggregate_bloom_filter.insert(&bloom, from_block); + aggregate_bloom_filter.insert(&bloom, from_block + 1); let block_matches_empty = aggregate_bloom_filter.blocks_for_keys(&[KEY_NOT_IN_FILTER]); - assert_eq!(block_matches_empty, BTreeSet::new()); + assert_eq!(block_matches_empty, BlockRange::EMPTY); } #[test] @@ -462,8 +574,8 @@ mod tests { let mut bloom = BloomFilter::new(); bloom.set(&KEY); - aggregate_bloom_filter.add_bloom(&bloom, from_block); - aggregate_bloom_filter.add_bloom(&bloom, from_block + 1); + aggregate_bloom_filter.insert(&bloom, from_block); + aggregate_bloom_filter.insert(&bloom, from_block + 1); let compressed_bitmap = aggregate_bloom_filter.compress_bitmap(); let mut decompressed = AggregateBloom::from_existing_compressed( @@ -471,14 +583,14 @@ mod tests { aggregate_bloom_filter.to_block, compressed_bitmap, ); - decompressed.add_bloom(&bloom, from_block + 2); + decompressed.insert(&bloom, from_block + 2); let block_matches = decompressed.blocks_for_keys(&[KEY]); - let expected = BTreeSet::from_iter(vec![from_block, from_block + 1, from_block + 2]); - assert_eq!(block_matches, expected,); + let expected = blockrange![from_block, from_block + 1, from_block + 2]; + assert_eq!(block_matches, expected); let block_matches_empty = decompressed.blocks_for_keys(&[KEY_NOT_IN_FILTER]); - assert_eq!(block_matches_empty, BTreeSet::new()); + assert_eq!(block_matches_empty, BlockRange::EMPTY); } #[test] @@ -490,10 +602,10 @@ mod tests { let mut bloom = BloomFilter::new(); bloom.set(&KEY); - aggregate_bloom_filter.add_bloom(&bloom, from_block); + aggregate_bloom_filter.insert(&bloom, from_block); - let invalid_insert_pos = from_block + AggregateBloom::BLOCK_RANGE_LEN; - aggregate_bloom_filter.add_bloom(&bloom, invalid_insert_pos); + let invalid_insert_pos = from_block + AGGREGATE_BLOOM_BLOCK_RANGE_LEN; + aggregate_bloom_filter.insert(&bloom, invalid_insert_pos); } } diff --git a/crates/storage/src/connection/event.rs b/crates/storage/src/connection/event.rs index e0ec7a08d6..1d9a0a7909 100644 --- a/crates/storage/src/connection/event.rs +++ b/crates/storage/src/connection/event.rs @@ -1,4 +1,3 @@ -use std::collections::BTreeSet; use std::num::NonZeroUsize; use std::rc::Rc; use std::sync::Arc; @@ -16,7 +15,7 @@ use pathfinder_common::{ }; use rusqlite::types::Value; -use crate::bloom::{AggregateBloom, BloomFilter}; +use crate::bloom::{AggregateBloom, BlockRange, BloomFilter}; use crate::prelude::*; // We're using the upper 4 bits of the 32 byte representation of a felt @@ -109,7 +108,7 @@ impl Transaction<'_> { bloom.set_address(&event.from_address); } - running_event_filter.filter.add_bloom(&bloom, block_number); + running_event_filter.filter.insert(&bloom, block_number); running_event_filter.next_block = block_number + 1; // This check is the reason that blocks cannot be skipped, if they were we would @@ -490,43 +489,53 @@ impl Transaction<'_> { impl AggregateBloom { /// Returns the block numbers that match the given constraints. - pub fn check(&self, constraints: &EventConstraints) -> BTreeSet { + pub fn check(&self, constraints: &EventConstraints) -> Vec { let addr_blocks = self.check_address(constraints.contract_address); let keys_blocks = self.check_keys(&constraints.keys); - addr_blocks.intersection(&keys_blocks).cloned().collect() + let block_matches = addr_blocks & keys_blocks; + + block_matches + .iter_ones() + .map(|offset| self.from_block + offset as u64) + .collect() } - fn check_address(&self, address: Option) -> BTreeSet { + fn check_address(&self, address: Option) -> BlockRange { match address { Some(addr) => self.blocks_for_keys(&[addr.0]), - None => self.all_blocks(), + None => BlockRange::FULL, } } - fn check_keys(&self, keys: &[Vec]) -> BTreeSet { - if keys.is_empty() { - return self.all_blocks(); + fn check_keys(&self, keys: &[Vec]) -> BlockRange { + if keys.is_empty() || keys.iter().any(Vec::is_empty) { + return BlockRange::FULL; } - keys.iter() - .enumerate() - .map(|(idx, key_group)| { - let indexed_keys: Vec<_> = key_group - .iter() - .map(|key| { - let mut key_with_idx = key.0; - key_with_idx.as_mut_be_bytes()[0] |= (idx as u8) << 4; - key_with_idx - }) - .collect(); + let mut result = BlockRange::FULL; - self.blocks_for_keys(&indexed_keys) - }) - .reduce(|blocks, blocks_for_key| { - blocks.intersection(&blocks_for_key).cloned().collect() - }) - .unwrap_or_default() + for (idx, key_group) in keys.iter().enumerate() { + let indexed_keys: Vec<_> = key_group + .iter() + .map(|key| { + let mut key_with_idx = key.0; + key_with_idx.as_mut_be_bytes()[0] |= (idx as u8) << 4; + key_with_idx + }) + .collect(); + + let blocks_for_key = self.blocks_for_keys(&indexed_keys); + + // No point to continue AND operations with an empty range. + if blocks_for_key == BlockRange::EMPTY { + return BlockRange::EMPTY; + } + + result &= blocks_for_key; + } + + result } } @@ -688,7 +697,7 @@ pub(crate) fn rebuild_running_event_filter( }; let block_number = first_running_event_filter_block + block as u64; - filter.add_bloom(bloom, block_number); + filter.insert(bloom, block_number); } Ok(RunningEventFilter { @@ -763,8 +772,8 @@ mod tests { filter.set_keys(&[event_key!("0xdeadbeef")]); filter.set_address(&contract_address!("0x1234")); - aggregate.add_bloom(&filter, BlockNumber::GENESIS); - aggregate.add_bloom(&filter, BlockNumber::GENESIS + 1); + aggregate.insert(&filter, BlockNumber::GENESIS); + aggregate.insert(&filter, BlockNumber::GENESIS + 1); let constraints = EventConstraints { from_block: None, to_block: None, @@ -776,7 +785,7 @@ mod tests { assert_eq!( aggregate.check(&constraints), - BTreeSet::from_iter(vec![BlockNumber::GENESIS, BlockNumber::GENESIS + 1]) + vec![BlockNumber::GENESIS, BlockNumber::GENESIS + 1] ); } @@ -788,8 +797,8 @@ mod tests { filter.set_keys(&[event_key!("0xdeadbeef")]); filter.set_address(&contract_address!("0x1234")); - aggregate.add_bloom(&filter, BlockNumber::GENESIS); - aggregate.add_bloom(&filter, BlockNumber::GENESIS + 1); + aggregate.insert(&filter, BlockNumber::GENESIS); + aggregate.insert(&filter, BlockNumber::GENESIS + 1); let constraints = EventConstraints { from_block: None, to_block: None, @@ -799,7 +808,7 @@ mod tests { offset: 0, }; - assert_eq!(aggregate.check(&constraints), BTreeSet::new()); + assert_eq!(aggregate.check(&constraints), Vec::::new()); } #[test] @@ -810,8 +819,8 @@ mod tests { filter.set_keys(&[event_key!("0xdeadbeef")]); filter.set_address(&contract_address!("0x1234")); - aggregate.add_bloom(&filter, BlockNumber::GENESIS); - aggregate.add_bloom(&filter, BlockNumber::GENESIS + 1); + aggregate.insert(&filter, BlockNumber::GENESIS); + aggregate.insert(&filter, BlockNumber::GENESIS + 1); let constraints = EventConstraints { from_block: None, to_block: None, @@ -821,7 +830,7 @@ mod tests { offset: 0, }; - assert_eq!(aggregate.check(&constraints), BTreeSet::new()); + assert_eq!(aggregate.check(&constraints), Vec::::new()); } #[test] @@ -832,8 +841,8 @@ mod tests { filter.set_address(&contract_address!("0x1234")); filter.set_keys(&[event_key!("0xdeadbeef")]); - aggregate.add_bloom(&filter, BlockNumber::GENESIS); - aggregate.add_bloom(&filter, BlockNumber::GENESIS + 1); + aggregate.insert(&filter, BlockNumber::GENESIS); + aggregate.insert(&filter, BlockNumber::GENESIS + 1); let constraints = EventConstraints { from_block: None, to_block: None, @@ -848,7 +857,7 @@ mod tests { offset: 0, }; - assert_eq!(aggregate.check(&constraints), BTreeSet::new()); + assert_eq!(aggregate.check(&constraints), Vec::::new()); } #[test] @@ -859,8 +868,8 @@ mod tests { filter.set_keys(&[event_key!("0xdeadbeef")]); filter.set_address(&contract_address!("0x1234")); - aggregate.add_bloom(&filter, BlockNumber::GENESIS); - aggregate.add_bloom(&filter, BlockNumber::GENESIS + 1); + aggregate.insert(&filter, BlockNumber::GENESIS); + aggregate.insert(&filter, BlockNumber::GENESIS + 1); let constraints = EventConstraints { from_block: None, to_block: None, diff --git a/crates/storage/src/schema/revision_0066.rs b/crates/storage/src/schema/revision_0066.rs index 6cfa73ea36..9eaddbb2db 100644 --- a/crates/storage/src/schema/revision_0066.rs +++ b/crates/storage/src/schema/revision_0066.rs @@ -74,7 +74,7 @@ fn migrate_event_filters(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { while let Some(bloom_filter) = bloom_filters.next().transpose()? { let current_block = BlockNumber::new_or_panic(migrated_count); - aggregate.add_bloom(&bloom_filter, current_block); + aggregate.insert(&bloom_filter, current_block); if current_block == aggregate.to_block { insert_aggregate_stmt From 8cc3025d198ec677d9cc1d48806119fc023db095 Mon Sep 17 00:00:00 2001 From: sistemd Date: Wed, 25 Dec 2024 12:49:32 +0100 Subject: [PATCH 2/2] better event filter unit tests --- .../rpc/src/pathfinder/methods/get_proof.rs | 2 - crates/storage/src/bloom.rs | 158 ++++++---- crates/storage/src/connection/event.rs | 277 +++++++++++------- crates/storage/src/lib.rs | 132 ++++++++- crates/storage/src/schema/revision_0066.rs | 5 +- crates/storage/src/test_utils.rs | 133 +++++---- 6 files changed, 469 insertions(+), 238 deletions(-) diff --git a/crates/rpc/src/pathfinder/methods/get_proof.rs b/crates/rpc/src/pathfinder/methods/get_proof.rs index b336ca11b4..8fbb8a7152 100644 --- a/crates/rpc/src/pathfinder/methods/get_proof.rs +++ b/crates/rpc/src/pathfinder/methods/get_proof.rs @@ -198,7 +198,6 @@ impl crate::dto::serialize::SerializeForVersion for GetProofOutput { ) -> Result { let mut serializer = serializer.serialize_struct()?; serializer.serialize_optional("state_commitment", self.state_commitment)?; - serializer.serialize_optional("class_commitment", self.class_commitment)?; serializer.serialize_field("contract_proof", &self.contract_proof)?; serializer.serialize_optional("contract_data", self.contract_data.clone())?; serializer.end() @@ -217,7 +216,6 @@ impl crate::dto::serialize::SerializeForVersion for GetClassProofOutput { serializer: crate::dto::serialize::Serializer, ) -> Result { let mut serializer = serializer.serialize_struct()?; - serializer.serialize_optional("class_commitment", self.class_commitment)?; serializer.serialize_field("class_proof", &self.class_proof)?; serializer.end() } diff --git a/crates/storage/src/bloom.rs b/crates/storage/src/bloom.rs index 6d25844940..8bb141bd38 100644 --- a/crates/storage/src/bloom.rs +++ b/crates/storage/src/bloom.rs @@ -67,37 +67,42 @@ use cached::{Cached, SizedCache}; use pathfinder_common::BlockNumber; use pathfinder_crypto::Felt; -pub const AGGREGATE_BLOOM_BLOCK_RANGE_LEN: u64 = AggregateBloom::BLOCK_RANGE_LEN; +/// Maximum number of blocks to aggregate in a single `AggregateBloom`. +#[cfg(not(test))] +pub const AGGREGATE_BLOOM_BLOCK_RANGE_LEN: u64 = 8192; + +/// Make testing faster and easier by using a smaller range. +#[cfg(test)] +pub const AGGREGATE_BLOOM_BLOCK_RANGE_LEN: u64 = 16; /// An aggregate of all Bloom filters for a given range of blocks. /// Before being added to `AggregateBloom`, each [`BloomFilter`] is /// rotated by 90 degrees (transposed). #[derive(Clone)] pub struct AggregateBloom { - /// A [Self::BLOCK_RANGE_LEN] by [BloomFilter::BITVEC_LEN] matrix stored in - /// a single array. + /// A [AGGREGATE_BLOOM_BLOCK_RANGE_LEN] by [BloomFilter::BITVEC_LEN] matrix + /// stored in a single array. bitmap: Vec, + /// Starting (inclusive) block number for the range of blocks that this /// aggregate covers. pub from_block: BlockNumber, + /// Ending (inclusive) block number for the range of blocks that this /// aggregate covers. pub to_block: BlockNumber, } impl AggregateBloom { - /// Maximum number of blocks to aggregate in a single `AggregateBloom`. - #[cfg(not(test))] - pub const BLOCK_RANGE_LEN: u64 = AGGREGATE_BLOOM_BLOCK_RANGE_LEN; - #[cfg(test)] - pub const BLOCK_RANGE_LEN: u64 = AGGREGATE_BLOOM_BLOCK_RANGE_LEN_TEST; - const BLOCK_RANGE_BYTES: usize = Self::BLOCK_RANGE_LEN as usize / 8; - - /// Create a new `AggregateBloom` for the (`from_block`, `from_block` + - /// [`block_range_length`](Self::BLOCK_RANGE_LEN) - 1) range. + /// Number of bytes that an `AggregateBloom` block range is represented by. + const BLOCK_RANGE_BYTES: usize = AGGREGATE_BLOOM_BLOCK_RANGE_LEN as usize / 8; + + /// Create a new `AggregateBloom` for the following range: + /// + /// \[`from_block`, `from_block + (AGGREGATE_BLOOM_BLOCK_RANGE_LEN) - 1`\] pub fn new(from_block: BlockNumber) -> Self { - let to_block = from_block + Self::BLOCK_RANGE_LEN - 1; - let bitmap = vec![0; Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN as usize]; + let to_block = from_block + AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1; + let bitmap = vec![0; Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN]; Self::from_parts(from_block, to_block, bitmap) } @@ -109,7 +114,7 @@ impl AggregateBloom { ) -> Self { let bitmap = zstd::bulk::decompress( &compressed_bitmap, - AggregateBloom::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN as usize, + Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN, ) .expect("Decompressing aggregate Bloom filter"); @@ -117,10 +122,10 @@ impl AggregateBloom { } fn from_parts(from_block: BlockNumber, to_block: BlockNumber, bitmap: Vec) -> Self { - assert_eq!(from_block + Self::BLOCK_RANGE_LEN - 1, to_block); + assert_eq!(from_block + AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1, to_block); assert_eq!( - bitmap.len() as u64, - Self::BLOCK_RANGE_BYTES as u64 * BloomFilter::BITVEC_LEN + bitmap.len(), + Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN ); Self { @@ -154,7 +159,7 @@ impl AggregateBloom { assert_eq!(bloom.0.number_of_hash_functions(), BloomFilter::K_NUM); let bloom_bytes = bloom.0.bit_vec().to_bytes(); - assert_eq!(bloom_bytes.len(), BloomFilter::BITVEC_BYTES as usize); + assert_eq!(bloom_bytes.len(), BloomFilter::BITVEC_BYTES); let relative_block_number = usize::try_from(block_number.get() - self.from_block.get()) .expect("usize can fit a u64"); @@ -173,7 +178,7 @@ impl AggregateBloom { // Each bit (possible key index) in the Bloom filter has its own row. for offset in 0..8 { - let row_idx = (row_idx_base + offset) * AggregateBloom::BLOCK_RANGE_BYTES; + let row_idx = (row_idx_base + offset) * Self::BLOCK_RANGE_BYTES; let bitmap_idx = row_idx + byte_idx; // Reverse the offsets so that the most significant bit is considered as the // first. @@ -351,14 +356,18 @@ struct CacheKey { pub(crate) struct AggregateBloomCache(Mutex>>); impl AggregateBloomCache { + /// Create a new cache with the given size. pub fn with_size(size: usize) -> Self { Self(Mutex::new(SizedCache::with_size(size))) } + /// Reset the cache. Removes all entries and frees the memory. pub fn reset(&self) { self.0.lock().unwrap().cache_reset(); } + /// Retrieve all [AggregateBloom] filters whose range of blocks overlaps + /// with the given range. pub fn get_many( &self, from_block: BlockNumber, @@ -370,17 +379,17 @@ impl AggregateBloomCache { let to_block = to_block.get(); // Align to the nearest lower multiple of BLOCK_RANGE_LEN. - let from_block_aligned = from_block - from_block % AggregateBloom::BLOCK_RANGE_LEN; + let from_block_aligned = from_block - from_block % AGGREGATE_BLOOM_BLOCK_RANGE_LEN; // Align to the nearest higher multiple of BLOCK_RANGE_LEN, then subtract 1 // (zero based indexing). - let to_block_aligned = to_block + AggregateBloom::BLOCK_RANGE_LEN - - (to_block % AggregateBloom::BLOCK_RANGE_LEN) + let to_block_aligned = to_block + AGGREGATE_BLOOM_BLOCK_RANGE_LEN + - (to_block % AGGREGATE_BLOOM_BLOCK_RANGE_LEN) - 1; (from_block_aligned..=to_block_aligned) - .step_by(AggregateBloom::BLOCK_RANGE_LEN as usize) + .step_by(AGGREGATE_BLOOM_BLOCK_RANGE_LEN as usize) .map(|from| { - let to = from + AggregateBloom::BLOCK_RANGE_LEN - 1; + let to = from + AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1; ( BlockNumber::new_or_panic(from), BlockNumber::new_or_panic(to), @@ -396,6 +405,7 @@ impl AggregateBloomCache { .collect() } + /// Store the given filters in the cache. pub fn set_many(&self, filters: &[Arc]) { let mut cache = self.0.lock().unwrap(); filters.iter().for_each(|filter| { @@ -413,15 +423,15 @@ pub(crate) struct BloomFilter(Bloom); impl BloomFilter { // The size of the bitmap used by the Bloom filter. - const BITVEC_LEN: u64 = 16_384; + const BITVEC_LEN: usize = 16_384; // The size of the bitmap used by the Bloom filter (in bytes). - const BITVEC_BYTES: u64 = Self::BITVEC_LEN / 8; + const BITVEC_BYTES: usize = Self::BITVEC_LEN / 8; // The number of hash functions used by the Bloom filter. // We need this value to be able to re-create the filter with the deserialized // bitmap. const K_NUM: u32 = 12; // The maximal number of items anticipated to be inserted into the Bloom filter. - const ITEMS_COUNT: u32 = 1024; + const ITEMS_COUNT: usize = 1024; // The seed used by the hash functions of the filter. // This is a randomly generated vector of 32 bytes. const SEED: [u8; 32] = [ @@ -431,18 +441,14 @@ impl BloomFilter { ]; pub fn new() -> Self { - let bloom = Bloom::new_with_seed( - Self::BITVEC_BYTES as usize, - Self::ITEMS_COUNT as usize, - &Self::SEED, - ); + let bloom = Bloom::new_with_seed(Self::BITVEC_BYTES, Self::ITEMS_COUNT, &Self::SEED); assert_eq!(bloom.number_of_hash_functions(), Self::K_NUM); Self(bloom) } pub fn from_compressed_bytes(bytes: &[u8]) -> Self { - let bytes = zstd::bulk::decompress(bytes, Self::BITVEC_BYTES as usize * 2) + let bytes = zstd::bulk::decompress(bytes, Self::BITVEC_BYTES * 2) .expect("Decompressing Bloom filter"); Self::from_bytes(&bytes) } @@ -454,7 +460,7 @@ impl BloomFilter { let k4 = u64::from_le_bytes(Self::SEED[24..32].try_into().unwrap()); let bloom = Bloom::from_existing( bytes, - Self::BITVEC_BYTES * 8, + Self::BITVEC_BYTES as u64 * 8, Self::K_NUM, [(k1, k2), (k3, k4)], ); @@ -621,44 +627,49 @@ mod tests { #[test] fn set_then_get_many_aligned() { - let cache = AggregateBloomCache::with_size(2); + let cache = AggregateBloomCache::with_size(3); - let first_range_start = BlockNumber::GENESIS; - let second_range_start = BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN; - let second_range_end = second_range_start + AggregateBloom::BLOCK_RANGE_LEN - 1; + let range_start1 = BlockNumber::GENESIS; + let range_start2 = BlockNumber::GENESIS + AGGREGATE_BLOOM_BLOCK_RANGE_LEN; + let range_start3 = BlockNumber::GENESIS + 2 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN; + + let range_end2 = range_start2 + AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1; let filters = vec![ - Arc::new(AggregateBloom::new(first_range_start)), - Arc::new(AggregateBloom::new(second_range_start)), + Arc::new(AggregateBloom::new(range_start1)), + Arc::new(AggregateBloom::new(range_start2)), + Arc::new(AggregateBloom::new(range_start3)), ]; cache.set_many(&filters); - let retrieved = cache.get_many(first_range_start, second_range_end); + let retrieved = cache.get_many(range_start1, range_end2); - assert_eq!(retrieved, filters); + assert_eq!(retrieved, filters[0..2]); } #[test] fn set_then_get_many_unaligned() { - let cache = AggregateBloomCache::with_size(2); + let cache = AggregateBloomCache::with_size(3); - let first_range_start = BlockNumber::GENESIS; - let second_range_start = BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN; + let range_start1 = BlockNumber::GENESIS; + let range_start2 = BlockNumber::GENESIS + AGGREGATE_BLOOM_BLOCK_RANGE_LEN; + let range_start3 = BlockNumber::GENESIS + 2 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN; let filters = vec![ - Arc::new(AggregateBloom::new(first_range_start)), - Arc::new(AggregateBloom::new(second_range_start)), + Arc::new(AggregateBloom::new(range_start1)), + Arc::new(AggregateBloom::new(range_start2)), + Arc::new(AggregateBloom::new(range_start3)), ]; - let start = first_range_start + 15; - let end = second_range_start + 15; + let start = range_start2 + 15; + let end = range_start3 + 15; cache.set_many(&filters); let retrieved = cache.get_many(start, end); - assert_eq!(retrieved, filters); + assert_eq!(retrieved, &filters[1..3]); } #[test] @@ -668,24 +679,57 @@ mod tests { let filters = vec![ Arc::new(AggregateBloom::new(BlockNumber::GENESIS)), Arc::new(AggregateBloom::new( - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN, + BlockNumber::GENESIS + AGGREGATE_BLOOM_BLOCK_RANGE_LEN, )), Arc::new(AggregateBloom::new( - BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN, + BlockNumber::GENESIS + 2 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN, )), Arc::new(AggregateBloom::new( - BlockNumber::GENESIS + 3 * AggregateBloom::BLOCK_RANGE_LEN, + BlockNumber::GENESIS + 3 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN, )), ]; cache.set_many(&filters); - let first_range_start = BlockNumber::GENESIS; - let second_range_end = BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN - 1; + let range_start1 = BlockNumber::GENESIS; + let range_end2 = BlockNumber::GENESIS + 2 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1; - let retrieved = cache.get_many(first_range_start, second_range_end); + let retrieved = cache.get_many(range_start1, range_end2); assert_eq!(retrieved, filters[0..2].to_vec()); } + + #[test] + fn cache_edge_cases() { + let cache = AggregateBloomCache::with_size(2); + + let first_range_start = BlockNumber::GENESIS; + let first_range_end = first_range_start + AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1; + let second_range_start = first_range_end + 1; + let second_range_end = second_range_start + AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1; + + let filters = vec![ + Arc::new(AggregateBloom::new(first_range_start)), + Arc::new(AggregateBloom::new(second_range_start)), + ]; + + cache.set_many(&filters); + + // Edge cases around the lower bound. + let retrieved = cache.get_many(first_range_end - 1, second_range_end); + assert_eq!(retrieved, filters[..]); + let retrieved = cache.get_many(first_range_end, second_range_end); + assert_eq!(retrieved, filters[..]); + let retrieved = cache.get_many(first_range_end + 1, second_range_end); + assert_eq!(retrieved, filters[1..]); + + // Edge cases around the upper bound. + let retrieved = cache.get_many(first_range_start, first_range_end - 1); + assert_eq!(retrieved, filters[0..1]); + let retrieved = cache.get_many(first_range_start, first_range_end); + assert_eq!(retrieved, filters[0..1]); + let retrieved = cache.get_many(first_range_start, first_range_end + 1); + assert_eq!(retrieved, filters[0..=1]); + } } } diff --git a/crates/storage/src/connection/event.rs b/crates/storage/src/connection/event.rs index 1d9a0a7909..1993db105e 100644 --- a/crates/storage/src/connection/event.rs +++ b/crates/storage/src/connection/event.rs @@ -561,7 +561,7 @@ pub(crate) struct RunningEventFilter { /// Rebuild the [event filter](RunningEventFilter) for the range of blocks /// between the last stored `to_block` in the event filter table and the last /// overall block in the database. This is needed because the aggregate event -/// filter for each [block range](crate::bloom::AggregateBloom::BLOCK_RANGE_LEN) +/// filter for each [block range](crate::bloom::AGGREGATE_BLOOM_BLOCK_RANGE_LEN) /// is stored once the range is complete, before that it is kept in memory and /// can be lost upon shutdown. pub(crate) fn rebuild_running_event_filter( @@ -750,14 +750,15 @@ mod tests { use pathfinder_common::{transaction as common, BlockHeader, BlockTimestamp, EntryPoint, Fee}; use pathfinder_crypto::Felt; use pretty_assertions_sorted::assert_eq; + use rstest::rstest; use super::*; - use crate::test_utils; + use crate::{test_utils, AGGREGATE_BLOOM_BLOCK_RANGE_LEN}; static MAX_BLOCKS_TO_SCAN: LazyLock = LazyLock::new(|| NonZeroUsize::new(100).unwrap()); - static MAX_BLOOM_FILTERS_TO_LOAD: LazyLock = - LazyLock::new(|| NonZeroUsize::new(3).unwrap()); + static MAX_EVENT_FILTERS_TO_LOAD: LazyLock = + LazyLock::new(|| NonZeroUsize::new(10).unwrap()); mod event_bloom { use pretty_assertions_sorted::assert_eq; @@ -862,6 +863,12 @@ mod tests { #[test] fn no_constraints() { + fn all_blocks(bloom: &AggregateBloom) -> Vec { + (bloom.from_block.get()..=bloom.to_block.get()) + .map(BlockNumber::new_or_panic) + .collect() + } + let mut aggregate = AggregateBloom::new(BlockNumber::GENESIS); let mut filter = BloomFilter::new(); @@ -879,7 +886,7 @@ mod tests { offset: 0, }; - assert_eq!(aggregate.check(&constraints), aggregate.all_blocks()); + assert_eq!(aggregate.check(&constraints), all_blocks(&aggregate)); } } @@ -895,7 +902,7 @@ mod tests { from_block: Some(expected_event.block_number), to_block: Some(expected_event.block_number), contract_address: Some(expected_event.from_address), - // we're using a key which is present in _all_ events as the 2nd key + // We're using a key which is present in _all_ events as the 2nd key. keys: vec![vec![], vec![event_key!("0xdeadbeef")]], page_size: test_utils::NUM_EVENTS, offset: 0, @@ -905,7 +912,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1010,7 +1017,7 @@ mod tests { offset: 0, }, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap() .events @@ -1049,7 +1056,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1084,7 +1091,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1117,7 +1124,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); pretty_assertions_sorted::assert_eq!( @@ -1147,7 +1154,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); pretty_assertions_sorted::assert_eq!( @@ -1181,7 +1188,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1215,7 +1222,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1248,7 +1255,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1269,7 +1276,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1301,7 +1308,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1333,7 +1340,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1360,7 +1367,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1387,7 +1394,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1420,7 +1427,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1458,7 +1465,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1486,7 +1493,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1514,7 +1521,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1542,7 +1549,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1567,7 +1574,7 @@ mod tests { .events( &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1599,7 +1606,7 @@ mod tests { .events( &constraints, 1.try_into().unwrap(), - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1626,7 +1633,7 @@ mod tests { .events( &constraints, 1.try_into().unwrap(), - *MAX_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap(); assert_eq!( @@ -1643,30 +1650,12 @@ mod tests { #[test] fn crossing_event_filter_range_stores_and_updates_running() { - let blocks: Vec = [ - // First event filter start. - BlockNumber::GENESIS, - BlockNumber::GENESIS + 1, - BlockNumber::GENESIS + 2, - BlockNumber::GENESIS + 3, - // End. - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN - 1, - // Second event filter start. - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN, - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 1, - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 2, - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 3, - // End. - BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN - 1, - // Third event filter start. - BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN, - BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN + 1, - ] - .iter() - .map(|&n| n.get() as usize) - .collect(); + // Two and a half ranges. + let n_blocks = 2 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN + AGGREGATE_BLOOM_BLOCK_RANGE_LEN / 2; + let n_blocks = usize::try_from(n_blocks).unwrap(); - let (storage, _) = test_utils::setup_custom_test_storage(&blocks, 2); + let (storage, test_data) = test_utils::setup_custom_test_storage(n_blocks, 1); + let emitted_events = test_data.events; let mut connection = storage.connection().unwrap(); let tx = connection.transaction().unwrap(); @@ -1682,37 +1671,46 @@ mod tests { // Running event filter starts from next block range. assert_eq!( running_event_filter.filter.from_block, - 2 * AggregateBloom::BLOCK_RANGE_LEN + 2 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN + ); + // Lock needed in `events()`. + drop(running_event_filter); + + let constraints = EventConstraints { + from_block: None, + to_block: None, + contract_address: None, + // We're using a key which is present in _all_ events as the 2nd key. + keys: vec![vec![], vec![event_key!("0xdeadbeef")]], + page_size: emitted_events.len(), + offset: 0, + }; + + let events = tx + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_EVENT_FILTERS_TO_LOAD, + ) + .unwrap(); + assert_eq!( + events, + PageOfEvents { + events: emitted_events, + continuation_token: None, + } ); } #[test] fn event_filter_filter_load_limit() { - let blocks: Vec = [ - // First event filter start. - BlockNumber::GENESIS, - BlockNumber::GENESIS + 1, - BlockNumber::GENESIS + 2, - BlockNumber::GENESIS + 3, - // End. - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN - 1, - // Second event filter start. - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN, - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 1, - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 2, - BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 3, - // End. - BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN - 1, - // Third event filter start. - BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN, - BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN + 1, - ] - .iter() - .map(|&n| n.get() as usize) - .collect(); + let n_blocks = 2 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN + AGGREGATE_BLOOM_BLOCK_RANGE_LEN / 2; + let n_blocks = usize::try_from(n_blocks).unwrap(); - let (storage, test_data) = test_utils::setup_custom_test_storage(&blocks, 2); + let (storage, test_data) = test_utils::setup_custom_test_storage(n_blocks, 1); let emitted_events = test_data.events; + let events_per_block = emitted_events.len() / n_blocks; + let mut connection = storage.connection().unwrap(); let tx = connection.transaction().unwrap(); @@ -1730,20 +1728,22 @@ mod tests { .events(&constraints, *MAX_BLOCKS_TO_SCAN, 1.try_into().unwrap()) .unwrap(); - let first_event_filter_range = BlockNumber::GENESIS.get()..AggregateBloom::BLOCK_RANGE_LEN; - for event in events.events { - // ...but only events from the first bloom filter range are returned. - assert!( - first_event_filter_range.contains(&event.block_number.get()), - "Event block number: {} should have been in the range: {:?}", - event.block_number.get(), - first_event_filter_range - ); - } - let continue_from_block = events.continuation_token.unwrap().block_number; - assert_eq!(continue_from_block, first_event_filter_range.end); + let block_range_len = usize::try_from(AGGREGATE_BLOOM_BLOCK_RANGE_LEN).unwrap(); + assert_eq!( + events, + PageOfEvents { + // ...but only events from the first bloom filter range are returned... + events: emitted_events[..events_per_block * block_range_len].to_vec(), + // ...with a continuation token pointing to the next block range. + continuation_token: Some(ContinuationToken { + block_number: BlockNumber::new_or_panic(AGGREGATE_BLOOM_BLOCK_RANGE_LEN), + offset: 0, + }) + } + ); let constraints_with_offset = EventConstraints { + // Use the provided continuation token. from_block: Some(events.continuation_token.unwrap().block_number), to_block: None, contract_address: None, @@ -1760,19 +1760,96 @@ mod tests { 1.try_into().unwrap(), ) .unwrap(); - assert!(events.continuation_token.is_none()); - - let second_event_filter_range = - AggregateBloom::BLOCK_RANGE_LEN..(2 * AggregateBloom::BLOCK_RANGE_LEN); - let third_event_filter_range = - 2 * AggregateBloom::BLOCK_RANGE_LEN..(3 * AggregateBloom::BLOCK_RANGE_LEN); - for event in events.events { - // ...but only events from the second (loaded) and third (running) event filter - // range are returned. - assert!( - (second_event_filter_range.start..third_event_filter_range.end) - .contains(&event.block_number.get()) - ); + assert_eq!( + events, + PageOfEvents { + // ...but only events from the second (loaded) and third (running) event filter + // range are returned... + events: emitted_events[events_per_block * block_range_len..].to_vec(), + // ...without a continuation token. + continuation_token: None, + } + ); + } + + #[rustfmt::skip] + #[rstest] + #[case(0, 0, 0, 0, 0)] // 0 ..=(N ) + #[case(0, 0, -1, 0, 0)] // 0 ..=(N - 1) + #[case(0, 0, 1, 0, 2)] // 0 ..=(N + 1) + #[case(1, 0, 0, 1, 1)] // (N )..=(2 * N) + #[case(1, -2, 0, 0, 1)] // (N - 1)..=(2 * N) + #[case(1, 1, 0, 1, 1)] // (N + 1)..=(2 * N) + fn event_filter_edge_cases( + #[case] range_idx: usize, + #[case] offset_from: i32, + #[case] offset_to: i32, + #[case] range_start_idx: usize, + #[case] range_end_idx: usize, + ) { + use std::collections::BTreeSet; + + fn contained_blocks(page: &PageOfEvents) -> BTreeSet { + page.events + .iter() + .map(|event| event.block_number) + .collect::>() } + + let n_blocks = 3 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN; + let n_blocks = usize::try_from(n_blocks).unwrap(); + + let (storage, test_data) = test_utils::setup_custom_test_storage(n_blocks, 1); + let emitted_events = test_data.events; + + let mut connection = storage.connection().unwrap(); + let tx = connection.transaction().unwrap(); + + let ranges = [ + ( + BlockNumber::GENESIS, + BlockNumber::GENESIS + AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1, + ), + ( + BlockNumber::GENESIS + AGGREGATE_BLOOM_BLOCK_RANGE_LEN, + BlockNumber::GENESIS + 2 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1, + ), + ( + BlockNumber::GENESIS + 2 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN, + BlockNumber::GENESIS + 3 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1, + ), + ( + BlockNumber::GENESIS + 3 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN, + BlockNumber::GENESIS + 4 * AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1, + ), + ]; + + let from_block = ranges[range_idx].0.get() as i32 + offset_from; + let to_block = ranges[range_idx].1.get() as i32 + offset_to; + + let constraints = EventConstraints { + from_block: Some(BlockNumber::new_or_panic(u64::try_from(from_block).unwrap())), + to_block: Some(BlockNumber::new_or_panic(u64::try_from(to_block).unwrap())), + contract_address: None, + keys: vec![], + page_size: emitted_events.len(), + offset: 0, + }; + + let page = tx + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_EVENT_FILTERS_TO_LOAD, + ) + .unwrap(); + let blocks = contained_blocks(&page); + + let expected = (ranges[range_start_idx].0.get()..=ranges[range_end_idx].1.get()) + .filter(|&block| (from_block..=to_block).contains(&(block as i32))) + .map(BlockNumber::new_or_panic) + .collect::>(); + + assert_eq!(blocks, expected); } } diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index a845654307..e76f4f520c 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -561,7 +561,17 @@ fn schema_version(connection: &rusqlite::Connection) -> anyhow::Result { #[cfg(test)] mod tests { + use std::num::NonZeroUsize; + use std::sync::LazyLock; + + use rstest::rstest; + use test_utils::*; + use super::*; + static MAX_BLOCKS_TO_SCAN: LazyLock = + LazyLock::new(|| NonZeroUsize::new(100).unwrap()); + static MAX_EVENT_FILTERS_TO_LOAD: LazyLock = + LazyLock::new(|| NonZeroUsize::new(5).unwrap()); #[test] fn schema_version_defaults_to_zero() { @@ -678,22 +688,13 @@ mod tests { #[test] fn running_event_filter_rebuilt_after_shutdown() { - use std::num::NonZeroUsize; - use std::sync::LazyLock; - - use test_utils::*; - - static MAX_BLOCKS_TO_SCAN: LazyLock = - LazyLock::new(|| NonZeroUsize::new(10).unwrap()); - static MAX_EVENT_FILTERS_TO_LOAD: LazyLock = - LazyLock::new(|| NonZeroUsize::new(3).unwrap()); - - let blocks = [0, 1, 2, 3, 4, 5]; + let n_blocks = 6; let transactions_per_block = 2; - let headers = create_blocks(&blocks); + let headers = create_blocks(n_blocks); let transactions_and_receipts = - create_transactions_and_receipts(blocks.len(), transactions_per_block); - let emitted_events = extract_events(&headers, &transactions_and_receipts); + create_transactions_and_receipts(n_blocks, transactions_per_block); + let emitted_events = + extract_events(&headers, &transactions_and_receipts, transactions_per_block); let insert_block_data = |tx: &Transaction<'_>, idx: usize| { let header = &headers[idx]; @@ -798,4 +799,107 @@ mod tests { assert!(events_after.contains(&e)); } } + + #[rstest] + #[case::block_before_full_range(AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1, 0)] + #[case::full_block_range(AGGREGATE_BLOOM_BLOCK_RANGE_LEN, 1)] + #[case::block_after_full_range(AGGREGATE_BLOOM_BLOCK_RANGE_LEN + 1, 1)] + fn rebuild_running_event_filter_edge_cases( + #[case] n_blocks: u64, + #[case] expected_insert_count: u64, + ) { + let n_blocks = usize::try_from(n_blocks).unwrap(); + let transactions_per_block = 1; + let headers = create_blocks(n_blocks); + let transactions_and_receipts = + create_transactions_and_receipts(n_blocks, transactions_per_block); + let emitted_events = + extract_events(&headers, &transactions_and_receipts, transactions_per_block); + let events_per_block = emitted_events.len() / n_blocks; + + let insert_block_data = |tx: &Transaction<'_>, idx: usize| { + let header = &headers[idx]; + + tx.insert_block_header(header).unwrap(); + tx.insert_transaction_data( + header.number, + &transactions_and_receipts + [idx * transactions_per_block..(idx + 1) * transactions_per_block] + .iter() + .cloned() + .map(|(tx, receipt, ..)| (tx, receipt)) + .collect::>(), + Some( + &transactions_and_receipts + [idx * transactions_per_block..(idx + 1) * transactions_per_block] + .iter() + .cloned() + .map(|(_, _, events)| events) + .collect::>(), + ), + ) + .unwrap(); + }; + + let db = crate::StorageBuilder::in_memory().unwrap(); + let db_path = Arc::clone(&db.0.database_path).to_path_buf(); + + // Keep this around so that the in-memory database doesn't get dropped. + let mut rsqlite_conn = rusqlite::Connection::open(&db_path).unwrap(); + + let mut conn = db.connection().unwrap(); + let tx = conn.transaction().unwrap(); + + for i in 0..n_blocks { + insert_block_data(&tx, i); + } + + // Pretend like we shut down by dropping these. + tx.commit().unwrap(); + drop(conn); + drop(db); + + let db = crate::StorageBuilder::file(db_path) + .journal_mode(JournalMode::Rollback) + .migrate() + .unwrap() + .create_pool(NonZeroU32::new(5).unwrap()) + .unwrap(); + + let mut conn = db.connection().unwrap(); + let tx = conn.transaction().unwrap(); + + let to_block = BlockNumber::GENESIS + n_blocks as u64; + + let constraints = EventConstraints { + from_block: None, + to_block: Some(to_block), + contract_address: None, + keys: vec![], + page_size: 1024, + offset: 0, + }; + + let events = tx + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_EVENT_FILTERS_TO_LOAD, + ) + .unwrap() + .events; + + let inserted_event_filter_count = rsqlite_conn + .transaction() + .unwrap() + .prepare("SELECT COUNT(*) FROM event_filters") + .unwrap() + .query_row([], |row| row.get::<_, u64>(0)) + .unwrap(); + + assert_eq!(inserted_event_filter_count, expected_insert_count); + + let expected = &emitted_events[..events_per_block * n_blocks]; + assert_eq!(events, expected); + } } diff --git a/crates/storage/src/schema/revision_0066.rs b/crates/storage/src/schema/revision_0066.rs index 9eaddbb2db..0da4d72282 100644 --- a/crates/storage/src/schema/revision_0066.rs +++ b/crates/storage/src/schema/revision_0066.rs @@ -31,8 +31,9 @@ pub(crate) fn migrate(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { } /// Migrate individual event bloom filters to the new aggregate table. We only -/// need to migrate all of the [AggregateBloom::BLOCK_RANGE_LEN] sized chunks. -/// The remainder will be reconstructed by the [crate::StorageManager] as the +/// need to migrate all of the [crate::bloom::AGGREGATE_BLOOM_BLOCK_RANGE_LEN] +/// sized chunks. The remainder will be reconstructed by the +/// [crate::StorageManager] as the /// [RunningEventFilter](crate::connection::event::RunningEventFilter). fn migrate_event_filters(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { let bloom_filter_count = tx diff --git a/crates/storage/src/test_utils.rs b/crates/storage/src/test_utils.rs index ab0a0d0a13..694ff845b9 100644 --- a/crates/storage/src/test_utils.rs +++ b/crates/storage/src/test_utils.rs @@ -25,36 +25,43 @@ pub const NUM_TRANSACTIONS: usize = NUM_BLOCKS * TRANSACTIONS_PER_BLOCK; pub const NUM_EVENTS: usize = NUM_BLOCKS * EVENTS_PER_BLOCK; /// Creates a custom set of [BlockHeader]s with arbitrary values. -pub(crate) fn create_blocks(block_numbers: &[usize]) -> Vec { - block_numbers - .iter() - .enumerate() - .map(|(i, &block_number)| { +pub(crate) fn create_blocks(n_blocks: usize) -> Vec { + (0..n_blocks) + .map(|block_number| { let storage_commitment = - StorageCommitment(Felt::from_hex_str(&"b".repeat(i + 3)).unwrap()); - let class_commitment = ClassCommitment(Felt::from_hex_str(&"c".repeat(i + 3)).unwrap()); - let index_as_felt = Felt::from_be_slice(&[i as u8]).unwrap(); + StorageCommitment(Felt::from_hex_str(&"b".repeat(block_number + 3)).unwrap()); + let class_commitment = + ClassCommitment(Felt::from_hex_str(&"c".repeat(block_number + 3)).unwrap()); + let index_as_felt = Felt::from_be_slice(&[block_number as u8]).unwrap(); BlockHeader::builder() .number(BlockNumber::GENESIS + block_number as u64) - .timestamp(BlockTimestamp::new_or_panic(i as u64 + 500)) + .timestamp(BlockTimestamp::new_or_panic(block_number as u64 + 500)) .calculated_state_commitment(storage_commitment, class_commitment) - .eth_l1_gas_price(GasPrice::from(i as u64)) + .eth_l1_gas_price(GasPrice::from(block_number as u64)) .sequencer_address(SequencerAddress(index_as_felt)) .transaction_commitment(TransactionCommitment(index_as_felt)) .event_commitment(EventCommitment(index_as_felt)) - .finalize_with_hash(BlockHash(Felt::from_hex_str(&"a".repeat(i + 3)).unwrap())) + .finalize_with_hash(BlockHash( + Felt::from_hex_str(&"a".repeat(block_number + 3)).unwrap(), + )) }) .collect::>() } /// Creates a custom test set of transactions and receipts. pub(crate) fn create_transactions_and_receipts( - block_count: usize, + n_blocks: usize, transactions_per_block: usize, ) -> Vec<(Transaction, Receipt, Vec)> { - let n = block_count * transactions_per_block; - let transactions = (0..n).map(|i| match i % transactions_per_block { + let n_transactions = n_blocks * transactions_per_block; + assert!( + n_transactions < 64, + "Too many transactions ({} > {}), `Felt::from_hex_str() will overflow.", + n_transactions, + 64 + ); + let transactions = (0..n_transactions).map(|i| match i % transactions_per_block { x if x < INVOKE_TRANSACTIONS_PER_BLOCK => Transaction { hash: TransactionHash(Felt::from_hex_str(&"4".repeat(i + 3)).unwrap()), variant: TransactionVariant::InvokeV0(InvokeTransactionV0 { @@ -110,60 +117,62 @@ pub(crate) fn create_transactions_and_receipts( }, }); - let tx_receipt = transactions.enumerate().map(|(i, tx)| { - let receipt = Receipt { - actual_fee: Fee::ZERO, - execution_resources: ExecutionResources { - builtins: Default::default(), - n_steps: i as u64 + 987, - n_memory_holes: i as u64 + 1177, - data_availability: L1Gas { - l1_gas: i as u128 + 124, - l1_data_gas: i as u128 + 457, - }, - total_gas_consumed: L1Gas { - l1_gas: i as u128 + 333, - l1_data_gas: i as u128 + 666, + transactions + .enumerate() + .map(|(i, tx)| { + let receipt = Receipt { + actual_fee: Fee::ZERO, + execution_resources: ExecutionResources { + builtins: Default::default(), + n_steps: i as u64 + 987, + n_memory_holes: i as u64 + 1177, + data_availability: L1Gas { + l1_gas: i as u128 + 124, + l1_data_gas: i as u128 + 457, + }, + total_gas_consumed: L1Gas { + l1_gas: i as u128 + 333, + l1_data_gas: i as u128 + 666, + }, + l2_gas: Default::default(), }, - l2_gas: Default::default(), - }, - transaction_hash: tx.hash, - transaction_index: TransactionIndex::new_or_panic(i as u64 + 2311), - ..Default::default() - }; - let events = if i % transactions_per_block < EVENTS_PER_BLOCK { - vec![pathfinder_common::event::Event { - from_address: ContractAddress::new_or_panic( - Felt::from_hex_str(&"2".repeat(i + 3)).unwrap(), - ), - data: vec![EventData(Felt::from_hex_str(&"c".repeat(i + 3)).unwrap())], - keys: vec![ - EventKey(Felt::from_hex_str(&"d".repeat(i + 3)).unwrap()), - event_key!("0xdeadbeef"), - ], - }] - } else { - vec![] - }; - - (tx, receipt, events) - }); + transaction_hash: tx.hash, + transaction_index: TransactionIndex::new_or_panic(i as u64 + 2311), + ..Default::default() + }; + let events = if i % transactions_per_block < EVENTS_PER_BLOCK { + vec![pathfinder_common::event::Event { + from_address: ContractAddress::new_or_panic( + Felt::from_hex_str(&"2".repeat(i + 3)).unwrap(), + ), + data: vec![EventData(Felt::from_hex_str(&"c".repeat(i + 3)).unwrap())], + keys: vec![ + EventKey(Felt::from_hex_str(&"d".repeat(i + 3)).unwrap()), + event_key!("0xdeadbeef"), + ], + }] + } else { + vec![] + }; - tx_receipt.collect::>() + (tx, receipt, events) + }) + .collect::>() } /// Creates a set of emitted events from given blocks and transactions. pub(crate) fn extract_events( blocks: &[BlockHeader], transactions: &[(Transaction, Receipt, Vec)], + transactions_per_block: usize, ) -> Vec { transactions .iter() .enumerate() .filter_map(|(i, (txn, _, events))| { - if i % TRANSACTIONS_PER_BLOCK < EVENTS_PER_BLOCK { + if i % transactions_per_block < EVENTS_PER_BLOCK { let event = &events[0]; - let block = &blocks[i / TRANSACTIONS_PER_BLOCK]; + let block = &blocks[i / transactions_per_block]; Some(EmittedEvent { data: event.data.clone(), @@ -191,24 +200,22 @@ pub struct TestData { /// [BlockHeader]s starting from L2 genesis, with arbitrary other values and a /// set of expected emitted events. pub fn setup_test_storage() -> (crate::Storage, TestData) { - let block_numbers: Vec = (0..NUM_BLOCKS).collect(); - setup_custom_test_storage(&block_numbers, TRANSACTIONS_PER_BLOCK) + setup_custom_test_storage(NUM_BLOCKS, TRANSACTIONS_PER_BLOCK) } -// Creates an in-memory storage instance with custom block numbers (not -// necessarily consecutive) and a custom number of transactions per block, with -// a set of expected emitted events. +// Creates an in-memory storage instance with N blocks and a custom number of +// transactions per block, with a set of expected emitted events. pub fn setup_custom_test_storage( - block_numbers: &[usize], + n_blocks: usize, transactions_per_block: usize, ) -> (crate::Storage, TestData) { let storage = crate::StorageBuilder::in_memory().unwrap(); let mut connection = storage.connection().unwrap(); let tx = connection.transaction().unwrap(); - let headers = create_blocks(block_numbers); + let headers = create_blocks(n_blocks); let transactions_and_receipts = - create_transactions_and_receipts(block_numbers.len(), transactions_per_block); + create_transactions_and_receipts(n_blocks, transactions_per_block); for (i, header) in headers.iter().enumerate() { tx.insert_block_header(header).unwrap(); @@ -234,7 +241,7 @@ pub fn setup_custom_test_storage( tx.commit().unwrap(); - let events = extract_events(&headers, &transactions_and_receipts); + let events = extract_events(&headers, &transactions_and_receipts, transactions_per_block); let (transactions, receipts): (Vec<_>, Vec<_>) = transactions_and_receipts .into_iter() .map(|(transaction, receipts, _)| (transaction, receipts))