From a64c6413a53587c35a7d6949b425d8bdf868dac7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 30 Dec 2024 08:54:58 -0500 Subject: [PATCH] Parallelize pruning utf8 fuzz test --- datafusion/core/tests/fuzz_cases/pruning.rs | 387 +++++++++++++------- 1 file changed, 263 insertions(+), 124 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/pruning.rs b/datafusion/core/tests/fuzz_cases/pruning.rs index ad35850a5f18..3725e6d908e6 100644 --- a/datafusion/core/tests/fuzz_cases/pruning.rs +++ b/datafusion/core/tests/fuzz_cases/pruning.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use arrow_array::{Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema}; @@ -38,151 +38,266 @@ use parquet::{ file::properties::{EnabledStatistics, WriterProperties}, }; use rand::seq::SliceRandom; +use tokio::sync::Mutex; use url::Url; #[tokio::test] -async fn test_fuzz_utf8() { - // Fuzz testing for UTF8 predicate pruning - // The basic idea is that query results should always be the same with or without stats/pruning - // If we get this right we at least guarantee that there are no incorrect results - // There may still be suboptimal pruning or stats but that's something we can try to catch - // with more targeted tests. - - // Since we know where the edge cases might be we don't do random black box fuzzing. - // Instead we fuzz on specific pre-defined axis: - // - // - Which characters are in each value. We want to make sure to include characters that when - // incremented, truncated or otherwise manipulated might cause issues. - // - The values in each row group. This impacts which min/max stats are generated for each rg. - // We'll generate combinations of the characters with lengths ranging from 1 to 4. - // - Truncation of statistics to 1, 2 or 3 characters as well as no truncation. - - let mut rng = rand::thread_rng(); - - let characters = [ - "z", - "0", - "~", - "ß", - "℣", - "%", // this one is useful for like/not like tests since it will result in randomly inserted wildcards - "_", // this one is useful for like/not like tests since it will result in randomly inserted wildcards - "\u{7F}", - "\u{7FF}", - "\u{FF}", - "\u{10FFFF}", - "\u{D7FF}", - "\u{FDCF}", - // null character - "\u{0}", - ]; - - let value_lengths = [1, 2, 3]; - - // generate all combinations of characters with lengths ranging from 1 to 4 - let mut values = vec![]; - for length in &value_lengths { - values.extend( - characters - .iter() - .cloned() - .combinations(*length) - // now get all permutations of each combination - .flat_map(|c| c.into_iter().permutations(*length)) - // and join them into strings - .map(|c| c.join("")), - ); - } +async fn test_utf8_eq() { + Utf8Test::new(|value| col("a").eq(lit(value))).run().await; +} + +#[tokio::test] +async fn test_utf8_not_eq() { + Utf8Test::new(|value| col("a").not_eq(lit(value))) + .run() + .await; +} + +#[tokio::test] +async fn test_utf8_lt() { + Utf8Test::new(|value| col("a").lt(lit(value))).run().await; +} + +#[tokio::test] +async fn test_utf8_lt_eq() { + Utf8Test::new(|value| col("a").lt_eq(lit(value))) + .run() + .await; +} + +#[tokio::test] +async fn test_utf8_gt() { + Utf8Test::new(|value| col("a").gt(lit(value))).run().await; +} + +#[tokio::test] +async fn test_utf8_gt_eq() { + Utf8Test::new(|value| col("a").gt_eq(lit(value))) + .run() + .await; +} + +#[tokio::test] +async fn test_utf8_like() { + Utf8Test::new(|value| col("a").like(lit(value))).run().await; +} - println!("Generated {} values", values.len()); +#[tokio::test] +async fn test_utf8_not_like() { + Utf8Test::new(|value| col("a").not_like(lit(value))) + .run() + .await; +} - // randomly pick 100 values - values.shuffle(&mut rng); - values.truncate(100); +#[tokio::test] +async fn test_utf8_like_prefix() { + Utf8Test::new(|value| col("a").like(lit(format!("%{}", value)))) + .run() + .await; +} + +#[tokio::test] +async fn test_utf8_like_suffix() { + Utf8Test::new(|value| col("a").like(lit(format!("{}%", value)))) + .run() + .await; +} + +#[tokio::test] +async fn test_utf8_not_like_prefix() { + Utf8Test::new(|value| col("a").not_like(lit(format!("%{}", value)))) + .run() + .await; +} - let mut row_groups = vec![]; - // generate all combinations of values for row groups (1 or 2 values per rg, more is unnecessary since we only get min/max stats out) - for rg_length in [1, 2] { - row_groups.extend(values.iter().cloned().combinations(rg_length)); +#[tokio::test] +async fn test_utf8_not_like_suffix() { + Utf8Test::new(|value| col("a").not_like(lit(format!("{}%", value)))) + .run() + .await; +} + +/// Fuzz testing for UTF8 predicate pruning +/// The basic idea is that query results should always be the same with or without stats/pruning +/// If we get this right we at least guarantee that there are no incorrect results +/// There may still be suboptimal pruning or stats but that's something we can try to catch +/// with more targeted tests. +// +/// Since we know where the edge cases might be we don't do random black box fuzzing. +/// Instead we fuzz on specific pre-defined axis: +/// +/// - Which characters are in each value. We want to make sure to include characters that when +/// incremented, truncated or otherwise manipulated might cause issues. +/// - The values in each row group. This impacts which min/max stats are generated for each rg. +/// We'll generate combinations of the characters with lengths ranging from 1 to 4. +/// - Truncation of statistics to 1, 2 or 3 characters as well as no truncation. +struct Utf8Test { + /// Test queries the parquet files with this predicate both with and without + /// pruning enabled + predicate_generator: Box Expr + 'static>, +} + +impl Utf8Test { + /// Create a new test with the given predicate generator + fn new Expr + 'static>(f: F) -> Self { + Self { + predicate_generator: Box::new(f), + } } - println!("Generated {} row groups", row_groups.len()); + /// Run the test by evaluating the predicate on the test files with and + /// without pruning enable + async fn run(&self) { + let ctx = SessionContext::new(); + + let mut predicates = vec![]; + for value in Self::values() { + predicates.push((self.predicate_generator)(value)); + } - // Randomly pick 100 row groups (combinations of said values) - row_groups.shuffle(&mut rng); - row_groups.truncate(100); + let store = Self::memory_store(); + ctx.register_object_store(&Url::parse("memory://").unwrap(), Arc::clone(store)); - let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)])); - let df_schema = DFSchema::try_from(schema.clone()).unwrap(); + let files = Self::test_files().await; + let schema = Self::schema(); + let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap(); - let store = InMemory::new(); - let mut files = vec![]; - for (idx, truncation_length) in [Some(1), Some(2), None].iter().enumerate() { - // parquet files only support 32767 row groups per file, so chunk up into multiple files so we don't error if running on a large number of row groups - for (rg_idx, row_groups) in row_groups.chunks(32766).enumerate() { - let buf = write_parquet_file( - *truncation_length, + println!("Testing {} predicates", predicates.len()); + for predicate in predicates { + // println!("Testing predicate {:?}", predicate); + let phys_expr_predicate = ctx + .create_physical_expr(predicate.clone(), &df_schema) + .unwrap(); + let expected = execute_with_predicate( + &files, + Arc::clone(&phys_expr_predicate), + false, schema.clone(), - row_groups.to_vec(), + &ctx, ) .await; - let filename = format!("test_fuzz_utf8_{idx}_{rg_idx}.parquet"); - files.push((filename.clone(), buf.len())); - let payload = PutPayload::from(buf); - let path = Path::from(filename); - store.put(&path, payload).await.unwrap(); + let with_pruning = execute_with_predicate( + &files, + phys_expr_predicate, + true, + schema.clone(), + &ctx, + ) + .await; + assert_eq!(expected, with_pruning); } } - println!("Generated {} parquet files", files.len()); - - let ctx = SessionContext::new(); - - ctx.register_object_store(&Url::parse("memory://").unwrap(), Arc::new(store)); - - let mut predicates = vec![]; - for value in values { - predicates.push(col("a").eq(lit(value.clone()))); - predicates.push(col("a").not_eq(lit(value.clone()))); - predicates.push(col("a").lt(lit(value.clone()))); - predicates.push(col("a").lt_eq(lit(value.clone()))); - predicates.push(col("a").gt(lit(value.clone()))); - predicates.push(col("a").gt_eq(lit(value.clone()))); - predicates.push(col("a").like(lit(value.clone()))); - predicates.push(col("a").not_like(lit(value.clone()))); - predicates.push(col("a").like(lit(format!("%{}", value.clone())))); - predicates.push(col("a").like(lit(format!("{}%", value.clone())))); - predicates.push(col("a").not_like(lit(format!("%{}", value.clone())))); - predicates.push(col("a").not_like(lit(format!("{}%", value.clone())))); + /// all combinations of interesting charactes with lengths ranging from 1 to 4 + fn values() -> &'static [String] { + VALUES.get_or_init(|| { + let mut rng = rand::thread_rng(); + + let characters = [ + "z", + "0", + "~", + "ß", + "℣", + "%", // this one is useful for like/not like tests since it will result in randomly inserted wildcards + "_", // this one is useful for like/not like tests since it will result in randomly inserted wildcards + "\u{7F}", + "\u{7FF}", + "\u{FF}", + "\u{10FFFF}", + "\u{D7FF}", + "\u{FDCF}", + // null character + "\u{0}", + ]; + let value_lengths = [1, 2, 3]; + let mut values = vec![]; + for length in &value_lengths { + values.extend( + characters + .iter() + .cloned() + .combinations(*length) + // now get all permutations of each combination + .flat_map(|c| c.into_iter().permutations(*length)) + // and join them into strings + .map(|c| c.join("")), + ); + } + println!("Generated {} values", values.len()); + // randomly pick 100 values + values.shuffle(&mut rng); + values.truncate(100); + values + }) } - for predicate in predicates { - println!("Testing predicate {:?}", predicate); - let phys_expr_predicate = ctx - .create_physical_expr(predicate.clone(), &df_schema) - .unwrap(); - let expected = execute_with_predicate( - &files, - phys_expr_predicate.clone(), - false, - schema.clone(), - &ctx, - ) - .await; - let with_pruning = execute_with_predicate( - &files, - phys_expr_predicate, - true, - schema.clone(), - &ctx, - ) - .await; - assert_eq!(expected, with_pruning); + /// return the in memory object store + fn memory_store() -> &'static Arc { + MEMORY_STORE.get_or_init(|| Arc::new(InMemory::new())) + } + + /// return the schema of the created test files + fn schema() -> Arc { + let schema = SCHEMA.get_or_init(|| { + Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)])) + }); + Arc::clone(schema) + } + + /// Return a list of test files with UTF8 data and combinations of + /// [`Self::values`] + async fn test_files() -> Vec { + let files_mutex = TESTFILES.get_or_init(|| Mutex::new(vec![])); + let mut files = files_mutex.lock().await; + if !files.is_empty() { + return (*files).clone(); + } + + let mut rng = rand::thread_rng(); + let values = Self::values(); + + let mut row_groups = vec![]; + // generate all combinations of values for row groups (1 or 2 values per rg, more is unnecessary since we only get min/max stats out) + for rg_length in [1, 2] { + row_groups.extend(values.iter().cloned().combinations(rg_length)); + } + + println!("Generated {} row groups", row_groups.len()); + + // Randomly pick 100 row groups (combinations of said values) + row_groups.shuffle(&mut rng); + row_groups.truncate(100); + + let schema = Self::schema(); + + let store = Self::memory_store(); + for (idx, truncation_length) in [Some(1), Some(2), None].iter().enumerate() { + // parquet files only support 32767 row groups per file, so chunk up into multiple files so we don't error if running on a large number of row groups + for (rg_idx, row_groups) in row_groups.chunks(32766).enumerate() { + let buf = write_parquet_file( + *truncation_length, + Arc::clone(&schema), + row_groups.to_vec(), + ) + .await; + let filename = format!("test_fuzz_utf8_{idx}_{rg_idx}.parquet"); + let size = buf.len(); + let path = Path::from(filename); + let payload = PutPayload::from(buf); + store.put(&path, payload).await.unwrap(); + + files.push(TestFile { path, size }); + } + } + + println!("Generated {} parquet files", files.len()); + files.clone() } } async fn execute_with_predicate( - files: &[(String, usize)], + files: &[TestFile], predicate: Arc, prune_stats: bool, schema: Arc, @@ -193,7 +308,12 @@ async fn execute_with_predicate( .with_file_group( files .iter() - .map(|(path, size)| PartitionedFile::new(path.clone(), *size as u64)) + .map(|test_file| { + PartitionedFile::new( + test_file.path.clone(), + test_file.size as u64, + ) + }) .collect(), ); let mut builder = ParquetExecBuilder::new(scan); @@ -245,3 +365,22 @@ async fn write_parquet_file( } buf.into_inner().freeze() } + +/// The string values for [Utf8Test::values] +static VALUES: OnceLock> = OnceLock::new(); +/// The schema for the [Utf8Test::schema] +static SCHEMA: OnceLock> = OnceLock::new(); + +/// The InMemory object store +static MEMORY_STORE: OnceLock> = OnceLock::new(); + +/// List of in memory parquet files with UTF8 data +// Use a mutex rather than OnceLock to allow for async initialization +static TESTFILES: OnceLock>> = OnceLock::new(); + +/// Holds a temporary parquet file path and its size +#[derive(Debug, Clone)] +struct TestFile { + path: Path, + size: usize, +}