From 3e353cf4d6ac2a8f5b25670c7d2125329a0fb036 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 30 Jul 2024 08:32:41 -0400 Subject: [PATCH 01/13] Enable `datafusion.execution.parquet.schema_force_string_view` by default --- datafusion/common/src/config.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 9f8aa1cbdcaa..7c90f1715a3e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -470,9 +470,10 @@ config_namespace! { /// data frame. pub maximum_buffered_record_batches_per_stream: usize, default = 2 - /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, - /// and `Binary/BinaryLarge` with `BinaryView`. - pub schema_force_string_view: bool, default = false + /// (reading) If true, parquet reader will read columns of + /// `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with + /// `BinaryView`. + pub schema_force_string_view: bool, default = true } } From b4037169278ede9f83e07a2ec4f0d9b7b19c8c1c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 30 Jul 2024 08:34:32 -0400 Subject: [PATCH 02/13] update information_schema --- datafusion/sqllogictest/test_files/information_schema.slt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index fef7bfe82174..78db0f832486 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -201,7 +201,7 @@ datafusion.execution.parquet.metadata_size_hint NULL datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false -datafusion.execution.parquet.schema_force_string_view false +datafusion.execution.parquet.schema_force_string_view true datafusion.execution.parquet.skip_metadata true datafusion.execution.parquet.statistics_enabled page datafusion.execution.parquet.write_batch_size 1024 @@ -288,7 +288,7 @@ datafusion.execution.parquet.metadata_size_hint NULL (reading) If specified, the datafusion.execution.parquet.pruning true (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query -datafusion.execution.parquet.schema_force_string_view false (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. +datafusion.execution.parquet.schema_force_string_view true (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in bytes From 0215d21ab318e35e05e84bb5d62cef17bbc76faa Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 30 Jul 2024 08:35:33 -0400 Subject: [PATCH 03/13] Update config docs --- docs/source/user-guide/configs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 78d0d7b0239f..5fc71b085b4f 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -76,7 +76,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.allow_single_file_parallelism | true | (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | | datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | -| datafusion.execution.parquet.schema_force_string_view | false | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | +| datafusion.execution.parquet.schema_force_string_view | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | From 081a3cadd9c9bc3e3e5bfc39e137c7d9fe3fff01 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 30 Jul 2024 08:37:28 -0400 Subject: [PATCH 04/13] Update sqlloigctest runner for utf8view --- .../sqllogictest/src/engines/datafusion_engine/normalize.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs index 66ffeadf8cec..b6b583b9fbdb 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs @@ -267,7 +267,9 @@ pub(crate) fn convert_schema_to_types(columns: &Fields) -> Vec { | DataType::Float64 | DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => DFColumnType::Float, - DataType::Utf8 | DataType::LargeUtf8 => DFColumnType::Text, + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => { + DFColumnType::Text + } DataType::Date32 | DataType::Date64 | DataType::Time32(_) From d7b7cb00f3b29e7ef3b580673c492a6d9b2e4b8d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 30 Jul 2024 08:38:08 -0400 Subject: [PATCH 05/13] Update typeof --- datafusion/sqllogictest/test_files/arrow_typeof.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/arrow_typeof.slt b/datafusion/sqllogictest/test_files/arrow_typeof.slt index 448706744305..930bd488d7eb 100644 --- a/datafusion/sqllogictest/test_files/arrow_typeof.slt +++ b/datafusion/sqllogictest/test_files/arrow_typeof.slt @@ -424,7 +424,7 @@ select arrow_cast([1, 2, 3], 'FixedSizeList(3, Int64)'); [1, 2, 3] # Tests for Utf8View -query ?T +query TT select arrow_cast('MyAwesomeString', 'Utf8View'), arrow_typeof(arrow_cast('MyAwesomeString', 'Utf8View')) ---- MyAwesomeString Utf8View From d98a0803fac21247a909cce7058d6775950a79f1 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 30 Jul 2024 08:38:34 -0400 Subject: [PATCH 06/13] update map --- datafusion/sqllogictest/test_files/map.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/map.slt b/datafusion/sqllogictest/test_files/map.slt index e530e14df66e..421ecf7a3797 100644 --- a/datafusion/sqllogictest/test_files/map.slt +++ b/datafusion/sqllogictest/test_files/map.slt @@ -26,7 +26,7 @@ describe data; ---- ints Map(Field { name: "entries", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false) NO strings Map(Field { name: "entries", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false) NO -timestamp Utf8 NO +timestamp Utf8View NO query ??T SELECT * FROM data ORDER by ints['bytes'] DESC LIMIT 10; From de8f597717c31305482cbdbc8a68a2ebc94e34e8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 30 Jul 2024 08:39:16 -0400 Subject: [PATCH 07/13] Update explain --- datafusion/sqllogictest/test_files/explain.slt | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 5a1733460120..876c2d967693 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -310,8 +310,8 @@ initial_physical_plan 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] initial_physical_plan_with_schema -01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] -02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] +01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] +02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] physical_plan after OutputRequirements 01)OutputRequirementExec, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] @@ -333,7 +333,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after LimitPushdown ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] +physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] statement ok @@ -350,8 +350,8 @@ initial_physical_plan_with_stats 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] initial_physical_plan_with_schema -01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] -02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] +01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] +02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] physical_plan after OutputRequirements 01)OutputRequirementExec 02)--GlobalLimitExec: skip=0, fetch=10 @@ -374,7 +374,7 @@ physical_plan after LimitPushdown ParquetExec: file_groups={1 group: [[WORKSPACE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10 physical_plan_with_stats ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] +physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] statement ok From 4e43a8e29e1ab359ca2ff56251eb7817f2e7546a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 30 Jul 2024 08:46:30 -0400 Subject: [PATCH 08/13] update describe --- datafusion/sqllogictest/test_files/describe.slt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/describe.slt b/datafusion/sqllogictest/test_files/describe.slt index a15c3a109cab..57cb8a29fcc7 100644 --- a/datafusion/sqllogictest/test_files/describe.slt +++ b/datafusion/sqllogictest/test_files/describe.slt @@ -81,8 +81,8 @@ int_col Int32 YES bigint_col Int64 YES float_col Float32 YES double_col Float64 YES -date_string_col Utf8 YES -string_col Utf8 YES +date_string_col Utf8View YES +string_col Utf8View YES timestamp_col Timestamp(Nanosecond, None) YES year Int32 YES month Int32 YES From dca498a2f9b2e2983ae01c812c5e121687d23c1a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 31 Jul 2024 14:53:46 -0400 Subject: [PATCH 09/13] Workaround missing Utf8View statistics support in arrow --- .../physical_plan/parquet/statistics.rs | 47 ++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 11b8f5fc6c79..db95e0fb0351 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -21,6 +21,7 @@ use arrow::array::{ BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder, + StringViewBuilder, }; use arrow::datatypes::i256; use arrow::{array::ArrayRef, datatypes::DataType}; @@ -438,6 +439,25 @@ macro_rules! get_statistics { } Ok(Arc::new(builder.finish())) }, + DataType::Utf8View => { + let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator); + let mut builder = StringViewBuilder::new(); + for x in iterator { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x) else { + log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + builder.append_null(); + continue; + }; + + builder.append_value(x); + } + Ok(Arc::new(builder.finish())) + }, DataType::FixedSizeBinary(size) => { let iterator = [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator); let mut builder = FixedSizeBinaryBuilder::new(*size); @@ -482,8 +502,8 @@ macro_rules! get_statistics { DataType::Duration(_) | DataType::Interval(_) | DataType::Null | + // TODO binary view DataType::BinaryView | - DataType::Utf8View | DataType::List(_) | DataType::ListView(_) | DataType::FixedSizeList(_, _) | @@ -901,6 +921,29 @@ macro_rules! get_data_page_statistics { } Ok(Arc::new(builder.finish())) }, + // TODO file upstream in Arrowrs -- + // support Utf8View and BinaryView in statistics + Some(DataType::Utf8View) => { + let mut builder = StringViewBuilder::new(); + let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator); + for x in iterator { + for x in x.into_iter() { + let Some(x) = x else { + builder.append_null(); // no statistics value + continue; + }; + + let Ok(x) = std::str::from_utf8(x.data()) else { + log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + builder.append_null(); + continue; + }; + + builder.append_value(x); + } + } + Ok(Arc::new(builder.finish())) + }, Some(DataType::Dictionary(_, value_type)) => { [<$stat_type_prefix:lower _ page_statistics>](Some(value_type), $iterator) }, @@ -983,6 +1026,7 @@ macro_rules! get_data_page_statistics { } Ok(Arc::new(builder.finish())) }, + // TODO file upstream in arrow-rs -- return not implemented for unsupported types rather than panic _ => unimplemented!() } } @@ -1104,6 +1148,7 @@ where .iter() .map(|x| x.null_count.map(|x| x as u64)) .collect::>(), + // TODO file upstream in Arrow-rs -- return not implemented _ => unimplemented!(), }); From e7f8cd7dfae2bbf7cd9eab5f57fbcfe46e36829c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 31 Jul 2024 15:09:00 -0400 Subject: [PATCH 10/13] work around arrow cast feature gap --- .../core/src/datasource/schema_adapter.rs | 39 +++++++++++++++++-- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/schema_adapter.rs b/datafusion/core/src/datasource/schema_adapter.rs index 40cb40a83af2..ea77ae9514b9 100644 --- a/datafusion/core/src/datasource/schema_adapter.rs +++ b/datafusion/core/src/datasource/schema_adapter.rs @@ -21,9 +21,10 @@ //! physical format into how they should be used by DataFusion. For instance, a schema //! can be stored external to a parquet file that maps parquet logical types to arrow types. -use arrow::compute::{can_cast_types, cast}; -use arrow_array::{new_null_array, RecordBatch, RecordBatchOptions}; -use arrow_schema::{Schema, SchemaRef}; +use arrow_array::builder::StringBuilder; +use arrow_array::cast::AsArray; +use arrow_array::{new_null_array, Array, ArrayRef, RecordBatch, RecordBatchOptions}; +use arrow_schema::{ArrowError, DataType, Schema, SchemaRef}; use datafusion_common::plan_err; use std::fmt::Debug; use std::sync::Arc; @@ -165,6 +166,38 @@ impl SchemaAdapter for DefaultSchemaAdapter { } } +// Workaround arrow-rs bug in can_cast_types +// External error: query failed: DataFusion error: Arrow error: Cast error: Casting from BinaryView to Utf8 not supported +fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool { + arrow::compute::can_cast_types(from_type, to_type) + || matches!( + (from_type, to_type), + (DataType::BinaryView, DataType::Utf8 | DataType::LargeUtf8) + | (DataType::Utf8 | DataType::LargeUtf8, DataType::BinaryView) + ) +} + +// Work around arrow-rs casting bug +// External error: query failed: DataFusion error: Arrow error: Cast error: Casting from BinaryView to Utf8 not supported +fn cast(array: &dyn Array, to_type: &DataType) -> Result { + match (array.data_type(), to_type) { + (DataType::BinaryView, DataType::Utf8) => { + let array = array.as_binary_view(); + let mut builder = StringBuilder::with_capacity(array.len(), 8 * 1024); + for value in array.iter() { + // check if the value is valid utf8 (should do this once, not each value) + let value = value.map(|value| std::str::from_utf8(value)).transpose()?; + + builder.append_option(value); + } + + Ok(Arc::new(builder.finish())) + } + // fallback to arrow kernel + (_, _) => arrow::compute::cast(array, to_type), + } +} + /// The SchemaMapping struct holds a mapping from the file schema to the table schema /// and any necessary type conversions that need to be applied. #[derive(Debug)] From 2e8aa6f808943e7092126e193a947611535e288c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 1 Aug 2024 07:35:36 -0400 Subject: [PATCH 11/13] Remove dfbench stringview override --- benchmarks/src/clickbench.rs | 9 +-------- benchmarks/src/tpch/run.rs | 5 ----- benchmarks/src/util/options.rs | 5 ----- 3 files changed, 1 insertion(+), 18 deletions(-) diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index a0f051d17623..2594b98fd7a8 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -116,14 +116,7 @@ impl RunOpt { None => queries.min_query_id()..=queries.max_query_id(), }; - let mut config = self.common.config(); - config - .options_mut() - .execution - .parquet - .schema_force_string_view = self.common.string_view; - - let ctx = SessionContext::new_with_config(config); + let ctx = SessionContext::new(); self.register_hits(&ctx).await?; let iterations = self.common.iterations; diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index a72dfaa0f58c..dbb7f69cc230 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -120,11 +120,6 @@ impl RunOpt { .config() .with_collect_statistics(!self.disable_statistics); config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; - config - .options_mut() - .execution - .parquet - .schema_force_string_view = self.common.string_view; let ctx = SessionContext::new_with_config(config); // register tables diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index 02591e293272..b9398e5b522f 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -37,11 +37,6 @@ pub struct CommonOpt { /// Activate debug mode to see more details #[structopt(short, long)] pub debug: bool, - - /// If true, will use StringView/BinaryViewArray instead of String/BinaryArray - /// when reading ParquetFiles - #[structopt(long)] - pub string_view: bool, } impl CommonOpt { From 2a8c6593297f04a029a3782e4df02262b6a5f146 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 1 Aug 2024 09:29:12 -0400 Subject: [PATCH 12/13] Minor: add "clickbench extended" queries to unit tests --- benchmarks/queries/clickbench/README.md | 2 +- .../sqllogictest/test_files/clickbench.slt | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/benchmarks/queries/clickbench/README.md b/benchmarks/queries/clickbench/README.md index 29b1a7588f17..560b54181d5f 100644 --- a/benchmarks/queries/clickbench/README.md +++ b/benchmarks/queries/clickbench/README.md @@ -14,7 +14,7 @@ ClickBench is focused on aggregation and filtering performance (though it has no The "extended" queries are not part of the official ClickBench benchmark. Instead they are used to test other DataFusion features that are not covered by -the standard benchmark Each description below is for the corresponding line in +the standard benchmark. Each description below is for the corresponding line in `extended.sql` (line 1 is `Q0`, line 2 is `Q1`, etc.) ### Q0: Data Exploration diff --git a/datafusion/sqllogictest/test_files/clickbench.slt b/datafusion/sqllogictest/test_files/clickbench.slt index c2dba435263d..733c0a3cd972 100644 --- a/datafusion/sqllogictest/test_files/clickbench.slt +++ b/datafusion/sqllogictest/test_files/clickbench.slt @@ -274,5 +274,23 @@ query PI SELECT DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) AS M, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-14' AND "EventDate"::INT::DATE <= '2013-07-15' AND "IsRefresh" = 0 AND "DontCountHits" = 0 GROUP BY DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) ORDER BY DATE_TRUNC('minute', M) LIMIT 10 OFFSET 1000; ---- +# Clickbench "Extended" queries that test count distinct + +query III +SELECT COUNT(DISTINCT "SearchPhrase"), COUNT(DISTINCT "MobilePhone"), COUNT(DISTINCT "MobilePhoneModel") FROM hits; +---- +1 1 1 + +query III +SELECT COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserCountry"), COUNT(DISTINCT "BrowserLanguage") FROM hits; +---- +1 1 1 + +query TIIII +SELECT "BrowserCountry", COUNT(DISTINCT "SocialNetwork"), COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserLanguage"), COUNT(DISTINCT "SocialAction") FROM hits GROUP BY 1 ORDER BY 2 DESC LIMIT 10; +---- +� 1 1 1 1 + + statement ok drop table hits; From f70328a6a2c2f3712c12115742613ce55e2034ee Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 1 Aug 2024 13:05:27 -0400 Subject: [PATCH 13/13] fix bug --- datafusion/functions-aggregate/src/count.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index e2d59003fca1..64eb7253f5c9 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -237,7 +237,7 @@ impl AggregateUDFImpl for Count { Box::new(BytesDistinctCountAccumulator::::new(OutputType::Utf8)) } DataType::Utf8View => { - Box::new(BytesViewDistinctCountAccumulator::new(OutputType::Utf8)) + Box::new(BytesViewDistinctCountAccumulator::new(OutputType::Utf8View)) } DataType::LargeUtf8 => { Box::new(BytesDistinctCountAccumulator::::new(OutputType::Utf8)) @@ -245,6 +245,9 @@ impl AggregateUDFImpl for Count { DataType::Binary => Box::new(BytesDistinctCountAccumulator::::new( OutputType::Binary, )), + DataType::BinaryView => Box::new(BytesViewDistinctCountAccumulator::new( + OutputType::BinaryView, + )), DataType::LargeBinary => Box::new(BytesDistinctCountAccumulator::::new( OutputType::Binary, )),