From 53dbc2fb1b175cfaed4fe94f2331f8f2fcd71d1c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 5 Feb 2025 08:24:27 -0500 Subject: [PATCH] Put unit tests with code --- datafusion/catalog-listing/Cargo.toml | 1 + datafusion/catalog-listing/src/helpers.rs | 283 +++++++++++++++++- .../core/tests/catalog_listing/helpers.rs | 224 -------------- datafusion/core/tests/catalog_listing/mod.rs | 18 -- datafusion/core/tests/core_integration.rs | 2 - 5 files changed, 281 insertions(+), 247 deletions(-) delete mode 100644 datafusion/core/tests/catalog_listing/helpers.rs delete mode 100644 datafusion/core/tests/catalog_listing/mod.rs diff --git a/datafusion/catalog-listing/Cargo.toml b/datafusion/catalog-listing/Cargo.toml index 2752e530264c..03132e7b7bb5 100644 --- a/datafusion/catalog-listing/Cargo.toml +++ b/datafusion/catalog-listing/Cargo.toml @@ -53,6 +53,7 @@ object_store = { workspace = true } url = { workspace = true } [dev-dependencies] +async-trait = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true } diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index 94542446d952..6cb3f661e652 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -532,13 +532,21 @@ pub fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) { #[cfg(test)] mod tests { + use async_trait::async_trait; + use datafusion_execution::config::SessionConfig; + use datafusion_execution::runtime_env::RuntimeEnv; + use futures::FutureExt; + use object_store::memory::InMemory; + use std::any::Any; use std::ops::Not; - // use futures::StreamExt; - use datafusion_expr::{case, col, lit, Expr}; - use super::*; + use datafusion_expr::{ + case, col, lit, AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF, + }; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + use datafusion_physical_plan::ExecutionPlan; #[test] fn test_split_files() { @@ -580,6 +588,205 @@ mod tests { assert_eq!(0, chunks.len()); } + #[tokio::test] + async fn test_pruned_partition_list_empty() { + let (store, state) = make_test_store_and_state(&[ + ("tablepath/mypartition=val1/notparquetfile", 100), + ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), + ("tablepath/file.parquet", 100), + ]); + let filter = Expr::eq(col("mypartition"), lit("val1")); + let pruned = pruned_partition_list( + state.as_ref(), + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + &[filter], + ".parquet", + &[(String::from("mypartition"), DataType::Utf8)], + ) + .await + .expect("partition pruning failed") + .collect::>() + .await; + + assert_eq!(pruned.len(), 0); + } + + #[tokio::test] + async fn test_pruned_partition_list() { + let (store, state) = make_test_store_and_state(&[ + ("tablepath/mypartition=val1/file.parquet", 100), + ("tablepath/mypartition=val2/file.parquet", 100), + ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), + ("tablepath/mypartition=val1/other=val3/file.parquet", 100), + ]); + let filter = Expr::eq(col("mypartition"), lit("val1")); + let pruned = pruned_partition_list( + state.as_ref(), + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + &[filter], + ".parquet", + &[(String::from("mypartition"), DataType::Utf8)], + ) + .await + .expect("partition pruning failed") + .try_collect::>() + .await + .unwrap(); + + assert_eq!(pruned.len(), 2); + let f1 = &pruned[0]; + assert_eq!( + f1.object_meta.location.as_ref(), + "tablepath/mypartition=val1/file.parquet" + ); + assert_eq!(&f1.partition_values, &[ScalarValue::from("val1")]); + let f2 = &pruned[1]; + assert_eq!( + f2.object_meta.location.as_ref(), + "tablepath/mypartition=val1/other=val3/file.parquet" + ); + assert_eq!(f2.partition_values, &[ScalarValue::from("val1"),]); + } + + #[tokio::test] + async fn test_pruned_partition_list_multi() { + let (store, state) = make_test_store_and_state(&[ + ("tablepath/part1=p1v1/file.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100), + ("tablepath/part1=p1v3/part2=p2v1/file2.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v2/file2.parquet", 100), + ]); + let filter1 = Expr::eq(col("part1"), lit("p1v2")); + let filter2 = Expr::eq(col("part2"), lit("p2v1")); + let pruned = pruned_partition_list( + state.as_ref(), + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + &[filter1, filter2], + ".parquet", + &[ + (String::from("part1"), DataType::Utf8), + (String::from("part2"), DataType::Utf8), + ], + ) + .await + .expect("partition pruning failed") + .try_collect::>() + .await + .unwrap(); + + assert_eq!(pruned.len(), 2); + let f1 = &pruned[0]; + assert_eq!( + f1.object_meta.location.as_ref(), + "tablepath/part1=p1v2/part2=p2v1/file1.parquet" + ); + assert_eq!( + &f1.partition_values, + &[ScalarValue::from("p1v2"), ScalarValue::from("p2v1"),] + ); + let f2 = &pruned[1]; + assert_eq!( + f2.object_meta.location.as_ref(), + "tablepath/part1=p1v2/part2=p2v1/file2.parquet" + ); + assert_eq!( + &f2.partition_values, + &[ScalarValue::from("p1v2"), ScalarValue::from("p2v1")] + ); + } + + #[tokio::test] + async fn test_list_partition() { + let (store, _) = make_test_store_and_state(&[ + ("tablepath/part1=p1v1/file.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100), + ("tablepath/part1=p1v3/part2=p2v1/file3.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v2/file4.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v2/empty.parquet", 0), + ]); + + let partitions = list_partitions( + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + 0, + None, + ) + .await + .expect("listing partitions failed"); + + assert_eq!( + &partitions + .iter() + .map(describe_partition) + .collect::>(), + &vec![ + ("tablepath", 0, vec![]), + ("tablepath/part1=p1v1", 1, vec![]), + ("tablepath/part1=p1v2", 1, vec![]), + ("tablepath/part1=p1v3", 1, vec![]), + ] + ); + + let partitions = list_partitions( + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + 1, + None, + ) + .await + .expect("listing partitions failed"); + + assert_eq!( + &partitions + .iter() + .map(describe_partition) + .collect::>(), + &vec![ + ("tablepath", 0, vec![]), + ("tablepath/part1=p1v1", 1, vec!["file.parquet"]), + ("tablepath/part1=p1v2", 1, vec![]), + ("tablepath/part1=p1v2/part2=p2v1", 2, vec![]), + ("tablepath/part1=p1v2/part2=p2v2", 2, vec![]), + ("tablepath/part1=p1v3", 1, vec![]), + ("tablepath/part1=p1v3/part2=p2v1", 2, vec![]), + ] + ); + + let partitions = list_partitions( + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + 2, + None, + ) + .await + .expect("listing partitions failed"); + + assert_eq!( + &partitions + .iter() + .map(describe_partition) + .collect::>(), + &vec![ + ("tablepath", 0, vec![]), + ("tablepath/part1=p1v1", 1, vec!["file.parquet"]), + ("tablepath/part1=p1v2", 1, vec![]), + ("tablepath/part1=p1v3", 1, vec![]), + ( + "tablepath/part1=p1v2/part2=p2v1", + 2, + vec!["file1.parquet", "file2.parquet"] + ), + ("tablepath/part1=p1v2/part2=p2v2", 2, vec!["file4.parquet"]), + ("tablepath/part1=p1v3/part2=p2v1", 2, vec!["file3.parquet"]), + ] + ); + } + #[test] fn test_parse_partitions_for_path() { assert_eq!( @@ -793,4 +1000,74 @@ mod tests { Some(Path::from("a=1970-01-05")), ); } + + pub fn make_test_store_and_state( + files: &[(&str, u64)], + ) -> (Arc, Arc) { + let memory = InMemory::new(); + + for (name, size) in files { + memory + .put(&Path::from(*name), vec![0; *size as usize].into()) + .now_or_never() + .unwrap() + .unwrap(); + } + + (Arc::new(memory), Arc::new(MockSession {})) + } + + struct MockSession {} + + #[async_trait] + impl Session for MockSession { + fn session_id(&self) -> &str { + unimplemented!() + } + + fn config(&self) -> &SessionConfig { + unimplemented!() + } + + async fn create_physical_plan( + &self, + _logical_plan: &LogicalPlan, + ) -> Result> { + unimplemented!() + } + + fn create_physical_expr( + &self, + _expr: Expr, + _df_schema: &DFSchema, + ) -> Result> { + unimplemented!() + } + + fn scalar_functions(&self) -> &std::collections::HashMap> { + unimplemented!() + } + + fn aggregate_functions( + &self, + ) -> &std::collections::HashMap> { + unimplemented!() + } + + fn window_functions(&self) -> &std::collections::HashMap> { + unimplemented!() + } + + fn runtime_env(&self) -> &Arc { + unimplemented!() + } + + fn execution_props(&self) -> &ExecutionProps { + unimplemented!() + } + + fn as_any(&self) -> &dyn Any { + unimplemented!() + } + } } diff --git a/datafusion/core/tests/catalog_listing/helpers.rs b/datafusion/core/tests/catalog_listing/helpers.rs deleted file mode 100644 index 6bd0feae70ed..000000000000 --- a/datafusion/core/tests/catalog_listing/helpers.rs +++ /dev/null @@ -1,224 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::datatypes::DataType; -use datafusion::test::object_store::make_test_store_and_state; -use datafusion_catalog_listing::helpers::*; -use datafusion_catalog_listing::*; -use datafusion_common::ScalarValue; -use datafusion_expr::{col, lit, Expr}; -use futures::StreamExt; -use futures::TryStreamExt; - -#[tokio::test] -async fn test_pruned_partition_list_empty() { - let (store, state) = make_test_store_and_state(&[ - ("tablepath/mypartition=val1/notparquetfile", 100), - ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), - ("tablepath/file.parquet", 100), - ]); - let filter = Expr::eq(col("mypartition"), lit("val1")); - let pruned = pruned_partition_list( - &state, - store.as_ref(), - &ListingTableUrl::parse("file:///tablepath/").unwrap(), - &[filter], - ".parquet", - &[(String::from("mypartition"), DataType::Utf8)], - ) - .await - .expect("partition pruning failed") - .collect::>() - .await; - - assert_eq!(pruned.len(), 0); -} - -#[tokio::test] -async fn test_pruned_partition_list() { - let (store, state) = make_test_store_and_state(&[ - ("tablepath/mypartition=val1/file.parquet", 100), - ("tablepath/mypartition=val2/file.parquet", 100), - ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), - ("tablepath/mypartition=val1/other=val3/file.parquet", 100), - ]); - let filter = Expr::eq(col("mypartition"), lit("val1")); - let pruned = pruned_partition_list( - &state, - store.as_ref(), - &ListingTableUrl::parse("file:///tablepath/").unwrap(), - &[filter], - ".parquet", - &[(String::from("mypartition"), DataType::Utf8)], - ) - .await - .expect("partition pruning failed") - .try_collect::>() - .await - .unwrap(); - - assert_eq!(pruned.len(), 2); - let f1 = &pruned[0]; - assert_eq!( - f1.object_meta.location.as_ref(), - "tablepath/mypartition=val1/file.parquet" - ); - assert_eq!(&f1.partition_values, &[ScalarValue::from("val1")]); - let f2 = &pruned[1]; - assert_eq!( - f2.object_meta.location.as_ref(), - "tablepath/mypartition=val1/other=val3/file.parquet" - ); - assert_eq!(f2.partition_values, &[ScalarValue::from("val1"),]); -} - -#[tokio::test] -async fn test_pruned_partition_list_multi() { - let (store, state) = make_test_store_and_state(&[ - ("tablepath/part1=p1v1/file.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100), - ("tablepath/part1=p1v3/part2=p2v1/file2.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v2/file2.parquet", 100), - ]); - let filter1 = Expr::eq(col("part1"), lit("p1v2")); - let filter2 = Expr::eq(col("part2"), lit("p2v1")); - let pruned = pruned_partition_list( - &state, - store.as_ref(), - &ListingTableUrl::parse("file:///tablepath/").unwrap(), - &[filter1, filter2], - ".parquet", - &[ - (String::from("part1"), DataType::Utf8), - (String::from("part2"), DataType::Utf8), - ], - ) - .await - .expect("partition pruning failed") - .try_collect::>() - .await - .unwrap(); - - assert_eq!(pruned.len(), 2); - let f1 = &pruned[0]; - assert_eq!( - f1.object_meta.location.as_ref(), - "tablepath/part1=p1v2/part2=p2v1/file1.parquet" - ); - assert_eq!( - &f1.partition_values, - &[ScalarValue::from("p1v2"), ScalarValue::from("p2v1"),] - ); - let f2 = &pruned[1]; - assert_eq!( - f2.object_meta.location.as_ref(), - "tablepath/part1=p1v2/part2=p2v1/file2.parquet" - ); - assert_eq!( - &f2.partition_values, - &[ScalarValue::from("p1v2"), ScalarValue::from("p2v1")] - ); -} - -#[tokio::test] -async fn test_list_partition() { - let (store, _) = make_test_store_and_state(&[ - ("tablepath/part1=p1v1/file.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100), - ("tablepath/part1=p1v3/part2=p2v1/file3.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v2/file4.parquet", 100), - ("tablepath/part1=p1v2/part2=p2v2/empty.parquet", 0), - ]); - - let partitions = list_partitions( - store.as_ref(), - &ListingTableUrl::parse("file:///tablepath/").unwrap(), - 0, - None, - ) - .await - .expect("listing partitions failed"); - - assert_eq!( - &partitions - .iter() - .map(describe_partition) - .collect::>(), - &vec![ - ("tablepath", 0, vec![]), - ("tablepath/part1=p1v1", 1, vec![]), - ("tablepath/part1=p1v2", 1, vec![]), - ("tablepath/part1=p1v3", 1, vec![]), - ] - ); - - let partitions = list_partitions( - store.as_ref(), - &ListingTableUrl::parse("file:///tablepath/").unwrap(), - 1, - None, - ) - .await - .expect("listing partitions failed"); - - assert_eq!( - &partitions - .iter() - .map(describe_partition) - .collect::>(), - &vec![ - ("tablepath", 0, vec![]), - ("tablepath/part1=p1v1", 1, vec!["file.parquet"]), - ("tablepath/part1=p1v2", 1, vec![]), - ("tablepath/part1=p1v2/part2=p2v1", 2, vec![]), - ("tablepath/part1=p1v2/part2=p2v2", 2, vec![]), - ("tablepath/part1=p1v3", 1, vec![]), - ("tablepath/part1=p1v3/part2=p2v1", 2, vec![]), - ] - ); - - let partitions = list_partitions( - store.as_ref(), - &ListingTableUrl::parse("file:///tablepath/").unwrap(), - 2, - None, - ) - .await - .expect("listing partitions failed"); - - assert_eq!( - &partitions - .iter() - .map(describe_partition) - .collect::>(), - &vec![ - ("tablepath", 0, vec![]), - ("tablepath/part1=p1v1", 1, vec!["file.parquet"]), - ("tablepath/part1=p1v2", 1, vec![]), - ("tablepath/part1=p1v3", 1, vec![]), - ( - "tablepath/part1=p1v2/part2=p2v1", - 2, - vec!["file1.parquet", "file2.parquet"] - ), - ("tablepath/part1=p1v2/part2=p2v2", 2, vec!["file4.parquet"]), - ("tablepath/part1=p1v3/part2=p2v1", 2, vec!["file3.parquet"]), - ] - ); -} diff --git a/datafusion/core/tests/catalog_listing/mod.rs b/datafusion/core/tests/catalog_listing/mod.rs deleted file mode 100644 index 9df4cc1fa3ac..000000000000 --- a/datafusion/core/tests/catalog_listing/mod.rs +++ /dev/null @@ -1,18 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -mod helpers; diff --git a/datafusion/core/tests/core_integration.rs b/datafusion/core/tests/core_integration.rs index fb182baf1f87..66b4103160e7 100644 --- a/datafusion/core/tests/core_integration.rs +++ b/datafusion/core/tests/core_integration.rs @@ -46,8 +46,6 @@ mod physical_optimizer; mod catalog; -mod catalog_listing; - #[cfg(test)] #[ctor::ctor] fn init() {