Skip to content

Commit

Permalink
ensure statistics are type compliant with schema (parseablehq#571)
Browse files Browse the repository at this point in the history
This PR adds ensure all the types in table statistics are 
compatible with table schema types.
  • Loading branch information
trueleo committed Dec 15, 2023
1 parent 7cd311a commit 31f6472
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 51 deletions.
36 changes: 36 additions & 0 deletions server/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

use std::cmp::{max, min};

use arrow_schema::DataType;
use datafusion::scalar::ScalarValue;
use parquet::file::statistics::Statistics;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -85,6 +87,40 @@ impl TypedStatistics {
_ => panic!("Cannot update wrong types"),
}
}

pub fn min_max_as_scalar(self, datatype: &DataType) -> Option<(ScalarValue, ScalarValue)> {
let (min, max) = match (self, datatype) {
(TypedStatistics::Bool(stats), DataType::Boolean) => (
ScalarValue::Boolean(Some(stats.min)),
ScalarValue::Boolean(Some(stats.max)),
),
(TypedStatistics::Int(stats), DataType::Int32) => (
ScalarValue::Int32(Some(stats.min as i32)),
ScalarValue::Int32(Some(stats.max as i32)),
),
(TypedStatistics::Int(stats), DataType::Int64) => (
ScalarValue::Int64(Some(stats.min)),
ScalarValue::Int64(Some(stats.max)),
),
(TypedStatistics::Float(stats), DataType::Float32) => (
ScalarValue::Float32(Some(stats.min as f32)),
ScalarValue::Float32(Some(stats.max as f32)),
),
(TypedStatistics::Float(stats), DataType::Float64) => (
ScalarValue::Float64(Some(stats.min)),
ScalarValue::Float64(Some(stats.max)),
),
(TypedStatistics::String(stats), DataType::Utf8) => (
ScalarValue::Utf8(Some(stats.min)),
ScalarValue::Utf8(Some(stats.max)),
),
_ => {
return None;
}
};

Some((min, max))
}
}

/// Column statistics are used to track statistics for a column in a given file.
Expand Down
68 changes: 17 additions & 51 deletions server/src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc};

use arrow_schema::{DataType, Schema, SchemaRef, SortOptions};
use arrow_schema::{Schema, SchemaRef, SortOptions};
use bytes::Bytes;
use chrono::{NaiveDateTime, Timelike, Utc};
use datafusion::{
Expand Down Expand Up @@ -236,57 +236,23 @@ fn partitioned_files(
count += num_rows;
}

let mut statistics = vec![];

for field in table_schema.fields() {
let Some(stats) = column_statistics
.get(field.name())
.and_then(|stats| stats.as_ref())
else {
statistics.push(datafusion::common::ColumnStatistics::default());
break;
};

let datatype = field.data_type();

let (min, max) = match (stats, datatype) {
(TypedStatistics::Bool(stats), DataType::Boolean) => (
ScalarValue::Boolean(Some(stats.min)),
ScalarValue::Boolean(Some(stats.max)),
),
(TypedStatistics::Int(stats), DataType::Int32) => (
ScalarValue::Int32(Some(stats.min as i32)),
ScalarValue::Int32(Some(stats.max as i32)),
),
(TypedStatistics::Int(stats), DataType::Int64) => (
ScalarValue::Int64(Some(stats.min)),
ScalarValue::Int64(Some(stats.max)),
),
(TypedStatistics::Float(stats), DataType::Float32) => (
ScalarValue::Float32(Some(stats.min as f32)),
ScalarValue::Float32(Some(stats.max as f32)),
),
(TypedStatistics::Float(stats), DataType::Float64) => (
ScalarValue::Float64(Some(stats.min)),
ScalarValue::Float64(Some(stats.max)),
),
(TypedStatistics::String(stats), DataType::Utf8) => (
ScalarValue::Utf8(Some(stats.min.clone())),
ScalarValue::Utf8(Some(stats.max.clone())),
),
_ => {
statistics.push(datafusion::common::ColumnStatistics::default());
break;
}
};

statistics.push(datafusion::common::ColumnStatistics {
null_count: None,
max_value: Some(max),
min_value: Some(min),
distinct_count: None,
let statistics = table_schema
.fields()
.iter()
.map(|field| {
column_statistics
.get(field.name())
.and_then(|stats| stats.as_ref())
.and_then(|stats| stats.clone().min_max_as_scalar(field.data_type()))
.map(|(min, max)| datafusion::common::ColumnStatistics {
null_count: None,
max_value: Some(max),
min_value: Some(min),
distinct_count: None,
})
.unwrap_or_default()
})
}
.collect();

let statistics = datafusion::common::Statistics {
num_rows: Some(count as usize),
Expand Down

0 comments on commit 31f6472

Please sign in to comment.