From 377a4c553b04fbcf7609384a501af9a30fe02dbe Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 14 Oct 2024 22:24:48 -0400 Subject: [PATCH] Improve AggregationFuzzer error reporting (#12832) * Improve AggregationFuzzer error reporting * simplify * Update datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs * fmt --- .../core/tests/fuzz_cases/aggregate_fuzz.rs | 2 +- .../fuzz_cases/aggregation_fuzzer/fuzzer.rs | 90 ++++++++++++------- 2 files changed, 59 insertions(+), 33 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 64a7514ebd5e..34061a64d783 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -83,7 +83,7 @@ async fn test_basic_prim_aggr_no_group() { .table_name("fuzz_table") .build(); - fuzzer.run().await; + fuzzer.run().await } /// Fuzz test for `basic prim aggr(sum/sum distinct/max/min/count/avg)` + `group by single int64` diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs index abb34048284d..6daebc894272 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use arrow::util::pretty::pretty_format_batches; use arrow_array::RecordBatch; +use datafusion_common::{DataFusionError, Result}; use rand::{thread_rng, Rng}; use tokio::task::JoinSet; @@ -132,7 +133,20 @@ struct QueryGroup { } impl AggregationFuzzer { + /// Run the fuzzer, printing an error and panicking if any of the tasks fail pub async fn run(&self) { + let res = self.run_inner().await; + + if let Err(e) = res { + // Print the error via `Display` so that it displays nicely (the default `unwrap()` + // prints using `Debug` which escapes newlines, and makes multi-line messages + // hard to read + println!("{e}"); + panic!("Error!"); + } + } + + async fn run_inner(&self) -> Result<()> { let mut join_set = JoinSet::new(); let mut rng = thread_rng(); @@ -157,16 +171,20 @@ impl AggregationFuzzer { let tasks = self.generate_fuzz_tasks(query_groups).await; for task in tasks { - join_set.spawn(async move { - task.run().await; - }); + join_set.spawn(async move { task.run().await }); } } while let Some(join_handle) = join_set.join_next().await { // propagate errors - join_handle.unwrap(); + join_handle.map_err(|e| { + DataFusionError::Internal(format!( + "AggregationFuzzer task error: {:?}", + e + )) + })??; } + Ok(()) } async fn generate_fuzz_tasks( @@ -237,45 +255,53 @@ struct AggregationFuzzTestTask { } impl AggregationFuzzTestTask { - async fn run(&self) { + async fn run(&self) -> Result<()> { let task_result = run_sql(&self.sql, &self.ctx_with_params.ctx) .await - .expect("should success to run sql"); - self.check_result(&task_result, &self.expected_result); + .map_err(|e| e.context(self.context_error_report()))?; + self.check_result(&task_result, &self.expected_result) } - // TODO: maybe we should persist the `expected_result` and `task_result`, - // because the readability is not so good if we just print it. - fn check_result(&self, task_result: &[RecordBatch], expected_result: &[RecordBatch]) { - let result = check_equality_of_batches(task_result, expected_result); - if let Err(e) = result { + fn check_result( + &self, + task_result: &[RecordBatch], + expected_result: &[RecordBatch], + ) -> Result<()> { + check_equality_of_batches(task_result, expected_result).map_err(|e| { // If we found inconsistent result, we print the test details for reproducing at first - println!( - "##### AggregationFuzzer error report ##### - ### Sql:\n{}\n\ - ### Schema:\n{}\n\ - ### Session context params:\n{:?}\n\ - ### Inconsistent row:\n\ - - row_idx:{}\n\ - - task_row:{}\n\ - - expected_row:{}\n\ - ### Task total result:\n{}\n\ - ### Expected total result:\n{}\n\ - ### Input:\n{}\n\ - ", - self.sql, - self.dataset_ref.batches[0].schema_ref(), - self.ctx_with_params.params, + let message = format!( + "{}\n\ + ### Inconsistent row:\n\ + - row_idx:{}\n\ + - task_row:{}\n\ + - expected_row:{}\n\ + ### Task total result:\n{}\n\ + ### Expected total result:\n{}\n\ + ", + self.context_error_report(), e.row_idx, e.lhs_row, e.rhs_row, pretty_format_batches(task_result).unwrap(), pretty_format_batches(expected_result).unwrap(), - pretty_format_batches(&self.dataset_ref.batches).unwrap(), ); + DataFusionError::Internal(message) + }) + } - // Then we just panic - panic!(); - } + /// Returns a formatted error message + fn context_error_report(&self) -> String { + format!( + "##### AggregationFuzzer error report #####\n\ + ### Sql:\n{}\n\ + ### Schema:\n{}\n\ + ### Session context params:\n{:?}\n\ + ### Input:\n{}\n\ + ", + self.sql, + self.dataset_ref.batches[0].schema_ref(), + self.ctx_with_params.params, + pretty_format_batches(&self.dataset_ref.batches).unwrap(), + ) } }