diff --git a/benchmarks/queries/clickbench/README.md b/benchmarks/queries/clickbench/README.md index 29b1a7588f17..560b54181d5f 100644 --- a/benchmarks/queries/clickbench/README.md +++ b/benchmarks/queries/clickbench/README.md @@ -14,7 +14,7 @@ ClickBench is focused on aggregation and filtering performance (though it has no The "extended" queries are not part of the official ClickBench benchmark. Instead they are used to test other DataFusion features that are not covered by -the standard benchmark Each description below is for the corresponding line in +the standard benchmark. Each description below is for the corresponding line in `extended.sql` (line 1 is `Q0`, line 2 is `Q1`, etc.) ### Q0: Data Exploration diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index a0f051d17623..2594b98fd7a8 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -116,14 +116,7 @@ impl RunOpt { None => queries.min_query_id()..=queries.max_query_id(), }; - let mut config = self.common.config(); - config - .options_mut() - .execution - .parquet - .schema_force_string_view = self.common.string_view; - - let ctx = SessionContext::new_with_config(config); + let ctx = SessionContext::new(); self.register_hits(&ctx).await?; let iterations = self.common.iterations; diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index a72dfaa0f58c..dbb7f69cc230 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -120,11 +120,6 @@ impl RunOpt { .config() .with_collect_statistics(!self.disable_statistics); config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; - config - .options_mut() - .execution - .parquet - .schema_force_string_view = self.common.string_view; let ctx = SessionContext::new_with_config(config); // register tables diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index 02591e293272..b9398e5b522f 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -37,11 +37,6 @@ pub struct CommonOpt { /// Activate debug mode to see more details #[structopt(short, long)] pub debug: bool, - - /// If true, will use StringView/BinaryViewArray instead of String/BinaryArray - /// when reading ParquetFiles - #[structopt(long)] - pub string_view: bool, } impl CommonOpt { diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 9f8aa1cbdcaa..7c90f1715a3e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -470,9 +470,10 @@ config_namespace! { /// data frame. pub maximum_buffered_record_batches_per_stream: usize, default = 2 - /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, - /// and `Binary/BinaryLarge` with `BinaryView`. - pub schema_force_string_view: bool, default = false + /// (reading) If true, parquet reader will read columns of + /// `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with + /// `BinaryView`. + pub schema_force_string_view: bool, default = true } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 11b8f5fc6c79..db95e0fb0351 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -21,6 +21,7 @@ use arrow::array::{ BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder, + StringViewBuilder, }; use arrow::datatypes::i256; use arrow::{array::ArrayRef, datatypes::DataType}; @@ -438,6 +439,25 @@ macro_rules! get_statistics { } Ok(Arc::new(builder.finish())) }, + DataType::Utf8View => { + let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator); + let mut builder = StringViewBuilder::new(); + for x in iterator { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x) else { + log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + builder.append_null(); + continue; + }; + + builder.append_value(x); + } + Ok(Arc::new(builder.finish())) + }, DataType::FixedSizeBinary(size) => { let iterator = [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator); let mut builder = FixedSizeBinaryBuilder::new(*size); @@ -482,8 +502,8 @@ macro_rules! get_statistics { DataType::Duration(_) | DataType::Interval(_) | DataType::Null | + // TODO binary view DataType::BinaryView | - DataType::Utf8View | DataType::List(_) | DataType::ListView(_) | DataType::FixedSizeList(_, _) | @@ -901,6 +921,29 @@ macro_rules! get_data_page_statistics { } Ok(Arc::new(builder.finish())) }, + // TODO file upstream in Arrowrs -- + // support Utf8View and BinaryView in statistics + Some(DataType::Utf8View) => { + let mut builder = StringViewBuilder::new(); + let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator); + for x in iterator { + for x in x.into_iter() { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x.data()) else { + log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + builder.append_null(); + continue; + }; + + builder.append_value(x); + } + } + Ok(Arc::new(builder.finish())) + }, Some(DataType::Dictionary(_, value_type)) => { [<$stat_type_prefix:lower _ page_statistics>](Some(value_type), $iterator) }, @@ -983,6 +1026,7 @@ macro_rules! get_data_page_statistics { } Ok(Arc::new(builder.finish())) }, + // TODO file upstream in arrow-rs -- return not implemented for unsupported types rather than panic _ => unimplemented!() } } @@ -1104,6 +1148,7 @@ where .iter() .map(|x| x.null_count.map(|x| x as u64)) .collect::>(), + // TODO file upstream in Arrow-rs -- return not implemented _ => unimplemented!(), }); diff --git a/datafusion/core/src/datasource/schema_adapter.rs b/datafusion/core/src/datasource/schema_adapter.rs index 40cb40a83af2..ea77ae9514b9 100644 --- a/datafusion/core/src/datasource/schema_adapter.rs +++ b/datafusion/core/src/datasource/schema_adapter.rs @@ -21,9 +21,10 @@ //! physical format into how they should be used by DataFusion. For instance, a schema //! can be stored external to a parquet file that maps parquet logical types to arrow types. -use arrow::compute::{can_cast_types, cast}; -use arrow_array::{new_null_array, RecordBatch, RecordBatchOptions}; -use arrow_schema::{Schema, SchemaRef}; +use arrow_array::builder::StringBuilder; +use arrow_array::cast::AsArray; +use arrow_array::{new_null_array, Array, ArrayRef, RecordBatch, RecordBatchOptions}; +use arrow_schema::{ArrowError, DataType, Schema, SchemaRef}; use datafusion_common::plan_err; use std::fmt::Debug; use std::sync::Arc; @@ -165,6 +166,38 @@ impl SchemaAdapter for DefaultSchemaAdapter { } } +// Workaround arrow-rs bug in can_cast_types +// External error: query failed: DataFusion error: Arrow error: Cast error: Casting from BinaryView to Utf8 not supported +fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool { + arrow::compute::can_cast_types(from_type, to_type) + || matches!( + (from_type, to_type), + (DataType::BinaryView, DataType::Utf8 | DataType::LargeUtf8) + | (DataType::Utf8 | DataType::LargeUtf8, DataType::BinaryView) + ) +} + +// Work around arrow-rs casting bug +// External error: query failed: DataFusion error: Arrow error: Cast error: Casting from BinaryView to Utf8 not supported +fn cast(array: &dyn Array, to_type: &DataType) -> Result { + match (array.data_type(), to_type) { + (DataType::BinaryView, DataType::Utf8) => { + let array = array.as_binary_view(); + let mut builder = StringBuilder::with_capacity(array.len(), 8 * 1024); + for value in array.iter() { + // check if the value is valid utf8 (should do this once, not each value) + let value = value.map(|value| std::str::from_utf8(value)).transpose()?; + + builder.append_option(value); + } + + Ok(Arc::new(builder.finish())) + } + // fallback to arrow kernel + (_, _) => arrow::compute::cast(array, to_type), + } +} + /// The SchemaMapping struct holds a mapping from the file schema to the table schema /// and any necessary type conversions that need to be applied. #[derive(Debug)] diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index e2d59003fca1..64eb7253f5c9 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -237,7 +237,7 @@ impl AggregateUDFImpl for Count { Box::new(BytesDistinctCountAccumulator::::new(OutputType::Utf8)) } DataType::Utf8View => { - Box::new(BytesViewDistinctCountAccumulator::new(OutputType::Utf8)) + Box::new(BytesViewDistinctCountAccumulator::new(OutputType::Utf8View)) } DataType::LargeUtf8 => { Box::new(BytesDistinctCountAccumulator::::new(OutputType::Utf8)) @@ -245,6 +245,9 @@ impl AggregateUDFImpl for Count { DataType::Binary => Box::new(BytesDistinctCountAccumulator::::new( OutputType::Binary, )), + DataType::BinaryView => Box::new(BytesViewDistinctCountAccumulator::new( + OutputType::BinaryView, + )), DataType::LargeBinary => Box::new(BytesDistinctCountAccumulator::::new( OutputType::Binary, )), diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs index 66ffeadf8cec..b6b583b9fbdb 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs @@ -267,7 +267,9 @@ pub(crate) fn convert_schema_to_types(columns: &Fields) -> Vec { | DataType::Float64 | DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => DFColumnType::Float, - DataType::Utf8 | DataType::LargeUtf8 => DFColumnType::Text, + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => { + DFColumnType::Text + } DataType::Date32 | DataType::Date64 | DataType::Time32(_) diff --git a/datafusion/sqllogictest/test_files/arrow_typeof.slt b/datafusion/sqllogictest/test_files/arrow_typeof.slt index 448706744305..930bd488d7eb 100644 --- a/datafusion/sqllogictest/test_files/arrow_typeof.slt +++ b/datafusion/sqllogictest/test_files/arrow_typeof.slt @@ -424,7 +424,7 @@ select arrow_cast([1, 2, 3], 'FixedSizeList(3, Int64)'); [1, 2, 3] # Tests for Utf8View -query ?T +query TT select arrow_cast('MyAwesomeString', 'Utf8View'), arrow_typeof(arrow_cast('MyAwesomeString', 'Utf8View')) ---- MyAwesomeString Utf8View diff --git a/datafusion/sqllogictest/test_files/clickbench.slt b/datafusion/sqllogictest/test_files/clickbench.slt index c2dba435263d..733c0a3cd972 100644 --- a/datafusion/sqllogictest/test_files/clickbench.slt +++ b/datafusion/sqllogictest/test_files/clickbench.slt @@ -274,5 +274,23 @@ query PI SELECT DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) AS M, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-14' AND "EventDate"::INT::DATE <= '2013-07-15' AND "IsRefresh" = 0 AND "DontCountHits" = 0 GROUP BY DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) ORDER BY DATE_TRUNC('minute', M) LIMIT 10 OFFSET 1000; ---- +# Clickbench "Extended" queries that test count distinct + +query III +SELECT COUNT(DISTINCT "SearchPhrase"), COUNT(DISTINCT "MobilePhone"), COUNT(DISTINCT "MobilePhoneModel") FROM hits; +---- +1 1 1 + +query III +SELECT COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserCountry"), COUNT(DISTINCT "BrowserLanguage") FROM hits; +---- +1 1 1 + +query TIIII +SELECT "BrowserCountry", COUNT(DISTINCT "SocialNetwork"), COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserLanguage"), COUNT(DISTINCT "SocialAction") FROM hits GROUP BY 1 ORDER BY 2 DESC LIMIT 10; +---- +� 1 1 1 1 + + statement ok drop table hits; diff --git a/datafusion/sqllogictest/test_files/describe.slt b/datafusion/sqllogictest/test_files/describe.slt index a15c3a109cab..57cb8a29fcc7 100644 --- a/datafusion/sqllogictest/test_files/describe.slt +++ b/datafusion/sqllogictest/test_files/describe.slt @@ -81,8 +81,8 @@ int_col Int32 YES bigint_col Int64 YES float_col Float32 YES double_col Float64 YES -date_string_col Utf8 YES -string_col Utf8 YES +date_string_col Utf8View YES +string_col Utf8View YES timestamp_col Timestamp(Nanosecond, None) YES year Int32 YES month Int32 YES diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 5a1733460120..876c2d967693 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -310,8 +310,8 @@ initial_physical_plan 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] initial_physical_plan_with_schema -01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] -02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] +01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] +02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] physical_plan after OutputRequirements 01)OutputRequirementExec, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] @@ -333,7 +333,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after LimitPushdown ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] +physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] statement ok @@ -350,8 +350,8 @@ initial_physical_plan_with_stats 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] initial_physical_plan_with_schema -01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] -02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] +01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] +02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] physical_plan after OutputRequirements 01)OutputRequirementExec 02)--GlobalLimitExec: skip=0, fetch=10 @@ -374,7 +374,7 @@ physical_plan after LimitPushdown ParquetExec: file_groups={1 group: [[WORKSPACE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10 physical_plan_with_stats ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] +physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] statement ok diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index fef7bfe82174..78db0f832486 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -201,7 +201,7 @@ datafusion.execution.parquet.metadata_size_hint NULL datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false -datafusion.execution.parquet.schema_force_string_view false +datafusion.execution.parquet.schema_force_string_view true datafusion.execution.parquet.skip_metadata true datafusion.execution.parquet.statistics_enabled page datafusion.execution.parquet.write_batch_size 1024 @@ -288,7 +288,7 @@ datafusion.execution.parquet.metadata_size_hint NULL (reading) If specified, the datafusion.execution.parquet.pruning true (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query -datafusion.execution.parquet.schema_force_string_view false (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. +datafusion.execution.parquet.schema_force_string_view true (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in bytes diff --git a/datafusion/sqllogictest/test_files/map.slt b/datafusion/sqllogictest/test_files/map.slt index e530e14df66e..421ecf7a3797 100644 --- a/datafusion/sqllogictest/test_files/map.slt +++ b/datafusion/sqllogictest/test_files/map.slt @@ -26,7 +26,7 @@ describe data; ---- ints Map(Field { name: "entries", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false) NO strings Map(Field { name: "entries", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false) NO -timestamp Utf8 NO +timestamp Utf8View NO query ??T SELECT * FROM data ORDER by ints['bytes'] DESC LIMIT 10; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 78d0d7b0239f..5fc71b085b4f 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -76,7 +76,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.allow_single_file_parallelism | true | (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | | datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | -| datafusion.execution.parquet.schema_force_string_view | false | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | +| datafusion.execution.parquet.schema_force_string_view | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). |