Skip to content

Commit

Permalink
Put unit tests with code
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Feb 5, 2025
1 parent 2080d17 commit 53dbc2f
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 247 deletions.
1 change: 1 addition & 0 deletions datafusion/catalog-listing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ object_store = { workspace = true }
url = { workspace = true }

[dev-dependencies]
async-trait = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }

Expand Down
283 changes: 280 additions & 3 deletions datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,13 +532,21 @@ pub fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {

#[cfg(test)]
mod tests {
use async_trait::async_trait;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnv;
use futures::FutureExt;
use object_store::memory::InMemory;
use std::any::Any;
use std::ops::Not;

// use futures::StreamExt;

use datafusion_expr::{case, col, lit, Expr};

use super::*;
use datafusion_expr::{
case, col, lit, AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF,
};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_plan::ExecutionPlan;

#[test]
fn test_split_files() {
Expand Down Expand Up @@ -580,6 +588,205 @@ mod tests {
assert_eq!(0, chunks.len());
}

#[tokio::test]
async fn test_pruned_partition_list_empty() {
let (store, state) = make_test_store_and_state(&[
("tablepath/mypartition=val1/notparquetfile", 100),
("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
("tablepath/file.parquet", 100),
]);
let filter = Expr::eq(col("mypartition"), lit("val1"));
let pruned = pruned_partition_list(
state.as_ref(),
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter],
".parquet",
&[(String::from("mypartition"), DataType::Utf8)],
)
.await
.expect("partition pruning failed")
.collect::<Vec<_>>()
.await;

assert_eq!(pruned.len(), 0);
}

#[tokio::test]
async fn test_pruned_partition_list() {
let (store, state) = make_test_store_and_state(&[
("tablepath/mypartition=val1/file.parquet", 100),
("tablepath/mypartition=val2/file.parquet", 100),
("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
("tablepath/mypartition=val1/other=val3/file.parquet", 100),
]);
let filter = Expr::eq(col("mypartition"), lit("val1"));
let pruned = pruned_partition_list(
state.as_ref(),
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter],
".parquet",
&[(String::from("mypartition"), DataType::Utf8)],
)
.await
.expect("partition pruning failed")
.try_collect::<Vec<_>>()
.await
.unwrap();

assert_eq!(pruned.len(), 2);
let f1 = &pruned[0];
assert_eq!(
f1.object_meta.location.as_ref(),
"tablepath/mypartition=val1/file.parquet"
);
assert_eq!(&f1.partition_values, &[ScalarValue::from("val1")]);
let f2 = &pruned[1];
assert_eq!(
f2.object_meta.location.as_ref(),
"tablepath/mypartition=val1/other=val3/file.parquet"
);
assert_eq!(f2.partition_values, &[ScalarValue::from("val1"),]);
}

#[tokio::test]
async fn test_pruned_partition_list_multi() {
let (store, state) = make_test_store_and_state(&[
("tablepath/part1=p1v1/file.parquet", 100),
("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100),
("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100),
("tablepath/part1=p1v3/part2=p2v1/file2.parquet", 100),
("tablepath/part1=p1v2/part2=p2v2/file2.parquet", 100),
]);
let filter1 = Expr::eq(col("part1"), lit("p1v2"));
let filter2 = Expr::eq(col("part2"), lit("p2v1"));
let pruned = pruned_partition_list(
state.as_ref(),
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter1, filter2],
".parquet",
&[
(String::from("part1"), DataType::Utf8),
(String::from("part2"), DataType::Utf8),
],
)
.await
.expect("partition pruning failed")
.try_collect::<Vec<_>>()
.await
.unwrap();

assert_eq!(pruned.len(), 2);
let f1 = &pruned[0];
assert_eq!(
f1.object_meta.location.as_ref(),
"tablepath/part1=p1v2/part2=p2v1/file1.parquet"
);
assert_eq!(
&f1.partition_values,
&[ScalarValue::from("p1v2"), ScalarValue::from("p2v1"),]
);
let f2 = &pruned[1];
assert_eq!(
f2.object_meta.location.as_ref(),
"tablepath/part1=p1v2/part2=p2v1/file2.parquet"
);
assert_eq!(
&f2.partition_values,
&[ScalarValue::from("p1v2"), ScalarValue::from("p2v1")]
);
}

#[tokio::test]
async fn test_list_partition() {
let (store, _) = make_test_store_and_state(&[
("tablepath/part1=p1v1/file.parquet", 100),
("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100),
("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100),
("tablepath/part1=p1v3/part2=p2v1/file3.parquet", 100),
("tablepath/part1=p1v2/part2=p2v2/file4.parquet", 100),
("tablepath/part1=p1v2/part2=p2v2/empty.parquet", 0),
]);

let partitions = list_partitions(
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
0,
None,
)
.await
.expect("listing partitions failed");

assert_eq!(
&partitions
.iter()
.map(describe_partition)
.collect::<Vec<_>>(),
&vec![
("tablepath", 0, vec![]),
("tablepath/part1=p1v1", 1, vec![]),
("tablepath/part1=p1v2", 1, vec![]),
("tablepath/part1=p1v3", 1, vec![]),
]
);

let partitions = list_partitions(
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
1,
None,
)
.await
.expect("listing partitions failed");

assert_eq!(
&partitions
.iter()
.map(describe_partition)
.collect::<Vec<_>>(),
&vec![
("tablepath", 0, vec![]),
("tablepath/part1=p1v1", 1, vec!["file.parquet"]),
("tablepath/part1=p1v2", 1, vec![]),
("tablepath/part1=p1v2/part2=p2v1", 2, vec![]),
("tablepath/part1=p1v2/part2=p2v2", 2, vec![]),
("tablepath/part1=p1v3", 1, vec![]),
("tablepath/part1=p1v3/part2=p2v1", 2, vec![]),
]
);

let partitions = list_partitions(
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
2,
None,
)
.await
.expect("listing partitions failed");

assert_eq!(
&partitions
.iter()
.map(describe_partition)
.collect::<Vec<_>>(),
&vec![
("tablepath", 0, vec![]),
("tablepath/part1=p1v1", 1, vec!["file.parquet"]),
("tablepath/part1=p1v2", 1, vec![]),
("tablepath/part1=p1v3", 1, vec![]),
(
"tablepath/part1=p1v2/part2=p2v1",
2,
vec!["file1.parquet", "file2.parquet"]
),
("tablepath/part1=p1v2/part2=p2v2", 2, vec!["file4.parquet"]),
("tablepath/part1=p1v3/part2=p2v1", 2, vec!["file3.parquet"]),
]
);
}

#[test]
fn test_parse_partitions_for_path() {
assert_eq!(
Expand Down Expand Up @@ -793,4 +1000,74 @@ mod tests {
Some(Path::from("a=1970-01-05")),
);
}

pub fn make_test_store_and_state(
files: &[(&str, u64)],
) -> (Arc<InMemory>, Arc<dyn Session>) {
let memory = InMemory::new();

for (name, size) in files {
memory
.put(&Path::from(*name), vec![0; *size as usize].into())
.now_or_never()
.unwrap()
.unwrap();
}

(Arc::new(memory), Arc::new(MockSession {}))
}

struct MockSession {}

#[async_trait]
impl Session for MockSession {
fn session_id(&self) -> &str {
unimplemented!()
}

fn config(&self) -> &SessionConfig {
unimplemented!()
}

async fn create_physical_plan(
&self,
_logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}

fn create_physical_expr(
&self,
_expr: Expr,
_df_schema: &DFSchema,
) -> Result<Arc<dyn PhysicalExpr>> {
unimplemented!()
}

fn scalar_functions(&self) -> &std::collections::HashMap<String, Arc<ScalarUDF>> {
unimplemented!()
}

fn aggregate_functions(
&self,
) -> &std::collections::HashMap<String, Arc<AggregateUDF>> {
unimplemented!()
}

fn window_functions(&self) -> &std::collections::HashMap<String, Arc<WindowUDF>> {
unimplemented!()
}

fn runtime_env(&self) -> &Arc<RuntimeEnv> {
unimplemented!()
}

fn execution_props(&self) -> &ExecutionProps {
unimplemented!()
}

fn as_any(&self) -> &dyn Any {
unimplemented!()
}
}
}
Loading

0 comments on commit 53dbc2f

Please sign in to comment.