Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use StringView by default when reading from parquet #11723

Closed
wants to merge 13 commits into from
7 changes: 4 additions & 3 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,9 +470,10 @@ config_namespace! {
/// data frame.
pub maximum_buffered_record_batches_per_stream: usize, default = 2

/// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
/// and `Binary/BinaryLarge` with `BinaryView`.
pub schema_force_string_view: bool, default = false
/// (reading) If true, parquet reader will read columns of
/// `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with
/// `BinaryView`.
pub schema_force_string_view: bool, default = true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

use arrow::array::{
BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder,
StringViewBuilder,
};
use arrow::datatypes::i256;
use arrow::{array::ArrayRef, datatypes::DataType};
Expand Down Expand Up @@ -438,6 +439,25 @@ macro_rules! get_statistics {
}
Ok(Arc::new(builder.finish()))
},
DataType::Utf8View => {
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
let mut builder = StringViewBuilder::new();
for x in iterator {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

let Ok(x) = std::str::from_utf8(x) else {
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
builder.append_null();
continue;
};

builder.append_value(x);
}
Ok(Arc::new(builder.finish()))
},
DataType::FixedSizeBinary(size) => {
let iterator = [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator);
let mut builder = FixedSizeBinaryBuilder::new(*size);
Expand Down Expand Up @@ -482,8 +502,8 @@ macro_rules! get_statistics {
DataType::Duration(_) |
DataType::Interval(_) |
DataType::Null |
// TODO binary view
DataType::BinaryView |
DataType::Utf8View |
DataType::List(_) |
DataType::ListView(_) |
DataType::FixedSizeList(_, _) |
Expand Down Expand Up @@ -901,6 +921,29 @@ macro_rules! get_data_page_statistics {
}
Ok(Arc::new(builder.finish()))
},
// TODO file upstream in Arrowrs --
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed upstream as apache/arrow-rs#6164

Suggested change
// TODO file upstream in Arrowrs --
// https://github.com/apache/arrow-rs/issues/6164

// support Utf8View and BinaryView in statistics
Some(DataType::Utf8View) => {
let mut builder = StringViewBuilder::new();
let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
for x in iterator {
for x in x.into_iter() {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

let Ok(x) = std::str::from_utf8(x.data()) else {
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
builder.append_null();
continue;
};

builder.append_value(x);
}
}
Ok(Arc::new(builder.finish()))
},
Some(DataType::Dictionary(_, value_type)) => {
[<$stat_type_prefix:lower _ page_statistics>](Some(value_type), $iterator)
},
Expand Down Expand Up @@ -983,6 +1026,7 @@ macro_rules! get_data_page_statistics {
}
Ok(Arc::new(builder.finish()))
},
// TODO file upstream in arrow-rs -- return not implemented for unsupported types rather than panic
_ => unimplemented!()
}
}
Expand Down Expand Up @@ -1104,6 +1148,7 @@ where
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
// TODO file upstream in Arrow-rs -- return not implemented
_ => unimplemented!(),
});

Expand Down
39 changes: 36 additions & 3 deletions datafusion/core/src/datasource/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
//! physical format into how they should be used by DataFusion. For instance, a schema
//! can be stored external to a parquet file that maps parquet logical types to arrow types.
use arrow::compute::{can_cast_types, cast};
use arrow_array::{new_null_array, RecordBatch, RecordBatchOptions};
use arrow_schema::{Schema, SchemaRef};
use arrow_array::builder::StringBuilder;
use arrow_array::cast::AsArray;
use arrow_array::{new_null_array, Array, ArrayRef, RecordBatch, RecordBatchOptions};
use arrow_schema::{ArrowError, DataType, Schema, SchemaRef};
use datafusion_common::plan_err;
use std::fmt::Debug;
use std::sync::Arc;
Expand Down Expand Up @@ -165,6 +166,38 @@ impl SchemaAdapter for DefaultSchemaAdapter {
}
}

// Workaround arrow-rs bug in can_cast_types
// External error: query failed: DataFusion error: Arrow error: Cast error: Casting from BinaryView to Utf8 not supported
fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool {
arrow::compute::can_cast_types(from_type, to_type)
|| matches!(
(from_type, to_type),
(DataType::BinaryView, DataType::Utf8 | DataType::LargeUtf8)
| (DataType::Utf8 | DataType::LargeUtf8, DataType::BinaryView)
)
}

// Work around arrow-rs casting bug
// External error: query failed: DataFusion error: Arrow error: Cast error: Casting from BinaryView to Utf8 not supported
fn cast(array: &dyn Array, to_type: &DataType) -> Result<ArrayRef, ArrowError> {
match (array.data_type(), to_type) {
(DataType::BinaryView, DataType::Utf8) => {
let array = array.as_binary_view();
let mut builder = StringBuilder::with_capacity(array.len(), 8 * 1024);
for value in array.iter() {
// check if the value is valid utf8 (should do this once, not each value)
let value = value.map(|value| std::str::from_utf8(value)).transpose()?;

builder.append_option(value);
}

Ok(Arc::new(builder.finish()))
}
// fallback to arrow kernel
(_, _) => arrow::compute::cast(array, to_type),
}
}

/// The SchemaMapping struct holds a mapping from the file schema to the table schema
/// and any necessary type conversions that need to be applied.
#[derive(Debug)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,9 @@ pub(crate) fn convert_schema_to_types(columns: &Fields) -> Vec<DFColumnType> {
| DataType::Float64
| DataType::Decimal128(_, _)
| DataType::Decimal256(_, _) => DFColumnType::Float,
DataType::Utf8 | DataType::LargeUtf8 => DFColumnType::Text,
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
DFColumnType::Text
}
DataType::Date32
| DataType::Date64
| DataType::Time32(_)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/arrow_typeof.slt
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ select arrow_cast([1, 2, 3], 'FixedSizeList(3, Int64)');
[1, 2, 3]

# Tests for Utf8View
query ?T
query TT
select arrow_cast('MyAwesomeString', 'Utf8View'), arrow_typeof(arrow_cast('MyAwesomeString', 'Utf8View'))
----
MyAwesomeString Utf8View
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/describe.slt
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ int_col Int32 YES
bigint_col Int64 YES
float_col Float32 YES
double_col Float64 YES
date_string_col Utf8 YES
string_col Utf8 YES
date_string_col Utf8View YES
string_col Utf8View YES
timestamp_col Timestamp(Nanosecond, None) YES
year Int32 YES
month Int32 YES
12 changes: 6 additions & 6 deletions datafusion/sqllogictest/test_files/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ initial_physical_plan
01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
initial_physical_plan_with_schema
01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N]
02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N]
01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N]
02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N]
physical_plan after OutputRequirements
01)OutputRequirementExec, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
02)--GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
Expand All @@ -333,7 +333,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after LimitPushdown ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N]
physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N]


statement ok
Expand All @@ -350,8 +350,8 @@ initial_physical_plan_with_stats
01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
initial_physical_plan_with_schema
01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N]
02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N]
01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N]
02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N]
physical_plan after OutputRequirements
01)OutputRequirementExec
02)--GlobalLimitExec: skip=0, fetch=10
Expand All @@ -374,7 +374,7 @@ physical_plan after LimitPushdown ParquetExec: file_groups={1 group: [[WORKSPACE
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10
physical_plan_with_stats ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N]
physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N]


statement ok
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ datafusion.execution.parquet.metadata_size_hint NULL
datafusion.execution.parquet.pruning true
datafusion.execution.parquet.pushdown_filters false
datafusion.execution.parquet.reorder_filters false
datafusion.execution.parquet.schema_force_string_view false
datafusion.execution.parquet.schema_force_string_view true
datafusion.execution.parquet.skip_metadata true
datafusion.execution.parquet.statistics_enabled page
datafusion.execution.parquet.write_batch_size 1024
Expand Down Expand Up @@ -288,7 +288,7 @@ datafusion.execution.parquet.metadata_size_hint NULL (reading) If specified, the
datafusion.execution.parquet.pruning true (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file
datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization".
datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query
datafusion.execution.parquet.schema_force_string_view false (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`.
datafusion.execution.parquet.schema_force_string_view true (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`.
datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata
datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting
datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in bytes
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/map.slt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe data;
----
ints Map(Field { name: "entries", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false) NO
strings Map(Field { name: "entries", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false) NO
timestamp Utf8 NO
timestamp Utf8View NO

query ??T
SELECT * FROM data ORDER by ints['bytes'] DESC LIMIT 10;
Expand Down
Loading
Loading