diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index c607809194896..3fe51288e79a8 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -19,9 +19,9 @@ use arrow::array::Decimal128Array; use arrow::{ array::{ - Array, ArrayRef, Date32Array, Date64Array, Float64Array, Int32Array, StringArray, - TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, - TimestampSecondArray, + Array, ArrayRef, BinaryArray, Date32Array, Date64Array, FixedSizeBinaryArray, + Float64Array, Int32Array, StringArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, }, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, @@ -70,6 +70,7 @@ enum Scenario { DecimalBloomFilterInt64, DecimalLargePrecision, DecimalLargePrecisionBloomFilter, + ByteArray, PeriodsInColumnNames, } @@ -506,6 +507,51 @@ fn make_date_batch(offset: Duration) -> RecordBatch { .unwrap() } +/// returns a batch with two columns (note "service.name" is the name +/// of the column. It is *not* a table named service.name +/// +/// name | service.name +fn make_bytearray_batch( + name: &str, + string_values: Vec<&str>, + binary_values: Vec<&[u8]>, + fixedsize_values: Vec<&[u8; 3]>, +) -> RecordBatch { + let num_rows = string_values.len(); + let name: StringArray = std::iter::repeat(Some(name)).take(num_rows).collect(); + let service_string: StringArray = string_values.iter().map(Some).collect(); + let service_binary: BinaryArray = binary_values.iter().map(Some).collect(); + let service_fixedsize: FixedSizeBinaryArray = fixedsize_values + .iter() + .map(|value| Some(value.as_slice())) + .collect::>() + .into(); + + let schema = Schema::new(vec![ + Field::new("name", name.data_type().clone(), true), + // note the column name has a period in it! + Field::new("service_string", service_string.data_type().clone(), true), + Field::new("service_binary", service_binary.data_type().clone(), true), + Field::new( + "service_fixedsize", + service_fixedsize.data_type().clone(), + true, + ), + ]); + let schema = Arc::new(schema); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(name), + Arc::new(service_string), + Arc::new(service_binary), + Arc::new(service_fixedsize), + ], + ) + .unwrap() +} + /// returns a batch with two columns (note "service.name" is the name /// of the column. It is *not* a table named service.name /// @@ -604,6 +650,66 @@ fn create_data_batch(scenario: Scenario) -> Vec { make_decimal_batch(vec![100000, 200000, 300000, 400000, 600000], 38, 5), ] } + Scenario::ByteArray => { + // frontends first, then backends. All in order, except frontends 4 and 7 + // are swapped to cause a statistics false positive on the 'fixed size' column. + vec![ + make_bytearray_batch( + "all frontends", + vec![ + "frontend one", + "frontend two", + "frontend three", + "frontend seven", + "frontend five", + ], + vec![ + b"frontend one", + b"frontend two", + b"frontend three", + b"frontend seven", + b"frontend five", + ], + vec![b"fe1", b"fe2", b"fe3", b"fe7", b"fe5"], + ), + make_bytearray_batch( + "mixed", + vec![ + "frontend six", + "frontend four", + "backend one", + "backend two", + "backend three", + ], + vec![ + b"frontend six", + b"frontend four", + b"backend one", + b"backend two", + b"backend three", + ], + vec![b"fe6", b"fe4", b"be1", b"be2", b"be3"], + ), + make_bytearray_batch( + "all backends", + vec![ + "backend four", + "backend five", + "backend six", + "backend seven", + "backend eight", + ], + vec![ + b"backend four", + b"backend five", + b"backend six", + b"backend seven", + b"backend eight", + ], + vec![b"be4", b"be5", b"be6", b"be7", b"be8"], + ), + ] + } Scenario::PeriodsInColumnNames => { vec![ // all frontend diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index b7038ef1a73f4..406eb721bf945 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -744,6 +744,108 @@ async fn prune_decimal_in_list() { .await; } +#[tokio::test] +async fn prune_string_eq_match() { + RowGroupPruningTest::new() + .with_scenario(Scenario::ByteArray) + .with_query( + "SELECT name, service_string FROM t WHERE service_string = 'backend one'", + ) + .with_expected_errors(Some(0)) + // false positive on 'all backends' batch: 'backend five' < 'backend one' < 'backend three' + .with_matched_by_stats(Some(2)) + .with_pruned_by_stats(Some(1)) + .with_matched_by_bloom_filter(Some(1)) + .with_pruned_by_bloom_filter(Some(1)) + .with_expected_rows(1) + .test_row_group_prune() + .await; +} + +#[tokio::test] +async fn prune_string_eq_no_match() { + RowGroupPruningTest::new() + .with_scenario(Scenario::ByteArray) + .with_query( + "SELECT name, service_string FROM t WHERE service_string = 'backend nine'", + ) + .with_expected_errors(Some(0)) + // false positive on 'all backends' batch: 'backend five' < 'backend one' < 'backend three' + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(2)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(1)) + .with_expected_rows(0) + .test_row_group_prune() + .await; + + RowGroupPruningTest::new() + .with_scenario(Scenario::ByteArray) + .with_query( + "SELECT name, service_string FROM t WHERE service_string = 'frontend nine'", + ) + .with_expected_errors(Some(0)) + // false positive on 'all frontends' batch: 'frontend five' < 'frontend nine' < 'frontend two' + // false positive on 'mixed' batch: 'backend one' < 'frontend nine' < 'frontend six' + .with_matched_by_stats(Some(2)) + .with_pruned_by_stats(Some(1)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(2)) + .with_expected_rows(0) + .test_row_group_prune() + .await; +} + +#[tokio::test] +async fn prune_string_neq() { + RowGroupPruningTest::new() + .with_scenario(Scenario::ByteArray) + .with_query( + "SELECT name, service_string FROM t WHERE service_string != 'backend one'", + ) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(3)) + .with_pruned_by_stats(Some(0)) + .with_matched_by_bloom_filter(Some(3)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(14) + .test_row_group_prune() + .await; +} + +#[tokio::test] +async fn prune_string_lt() { + RowGroupPruningTest::new() + .with_scenario(Scenario::ByteArray) + .with_query( + "SELECT name, service_string FROM t WHERE service_string < 'backend one'", + ) + .with_expected_errors(Some(0)) + // matches 'all backends' only + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(2)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(3) + .test_row_group_prune() + .await; + + RowGroupPruningTest::new() + .with_scenario(Scenario::ByteArray) + .with_query( + "SELECT name, service_string FROM t WHERE service_string < 'backend zero'", + ) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(2)) + .with_pruned_by_stats(Some(1)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + // all backends from 'mixed' and 'all backends' + .with_expected_rows(8) + .test_row_group_prune() + .await; +} + #[tokio::test] async fn prune_periods_in_column_names() { // There are three row groups for "service.name", each with 5 rows = 15 rows total