From 91442a3fb27f032ca13d536f01dcc5e7c9d6289e Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 20 Dec 2024 16:18:49 -0800 Subject: [PATCH] refactor(11770): replace with new method, since the kv_metadata is inherent to TableParquetOptions and therefore we should explicitly make the API apparant that you have to include the arrow schema or not --- datafusion-cli/Cargo.lock | 2 + datafusion/common/Cargo.toml | 2 + datafusion/common/src/file_options/mod.rs | 13 +- .../common/src/file_options/parquet_writer.rs | 118 ++++++++++++------ 4 files changed, 90 insertions(+), 45 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index a435869dbece9..9f75a34a7607a 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1326,7 +1326,9 @@ dependencies = [ "arrow", "arrow-array", "arrow-buffer", + "arrow-ipc", "arrow-schema", + "base64 0.22.1", "half", "hashbrown 0.14.5", "indexmap", diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index a81ec724dd66f..330faff1cbb25 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -52,7 +52,9 @@ apache-avro = { version = "0.17", default-features = false, features = [ arrow = { workspace = true } arrow-array = { workspace = true } arrow-buffer = { workspace = true } +arrow-ipc = { workspace = true } arrow-schema = { workspace = true } +base64 = "0.22.1" half = { workspace = true } hashbrown = { workspace = true } indexmap = { workspace = true } diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index 77781457d0d2d..91885fc5b42b4 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -30,7 +30,6 @@ pub mod parquet_writer; mod tests { use std::collections::HashMap; - use super::parquet_writer::ParquetWriterOptions; use crate::{ config::{ConfigFileType, TableOptions}, file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions}, @@ -79,8 +78,10 @@ mod tests { table_config.set_config_format(ConfigFileType::PARQUET); table_config.alter_with_string_hash_map(&option_map)?; - let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?; - let properties = parquet_options.writer_options(); + let properties = table_config + .parquet + .into_writer_properties_builder()? + .build(); // Verify the expected options propagated down to parquet crate WriterProperties struct assert_eq!(properties.max_row_group_size(), 123); @@ -184,8 +185,10 @@ mod tests { table_config.set_config_format(ConfigFileType::PARQUET); table_config.alter_with_string_hash_map(&option_map)?; - let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?; - let properties = parquet_options.writer_options(); + let properties = table_config + .parquet + .into_writer_properties_builder()? + .build(); let col1 = ColumnPath::from(vec!["col1".to_owned()]); let col2_nested = ColumnPath::from(vec!["col2".to_owned(), "nested".to_owned()]); diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index a5b71b9215728..a99694d3e07fa 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -17,18 +17,25 @@ //! Options related to how parquet files should be written +use base64::Engine; +use std::sync::Arc; + use crate::{ config::{ParquetOptions, TableParquetOptions}, DataFusionError, Result, }; +use arrow_schema::Schema; use parquet::{ + arrow::ARROW_SCHEMA_META_KEY, basic::{BrotliLevel, GzipLevel, ZstdLevel}, - file::properties::{ - EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion, - DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED, + file::{ + metadata::KeyValue, + properties::{ + EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion, + DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED, + }, }, - format::KeyValue, schema::types::ColumnPath, }; @@ -51,38 +58,42 @@ impl ParquetWriterOptions { } } -impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { - type Error = DataFusionError; - - fn try_from(parquet_table_options: &TableParquetOptions) -> Result { - // ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns) - Ok(ParquetWriterOptions { - writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)? - .build(), - }) +impl TableParquetOptions { + /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`]. + /// + /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column. + pub fn into_writer_properties_builder(&self) -> Result { + self.into_writer_properties_builder_with_arrow_schema(None) } -} - -impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { - type Error = DataFusionError; /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`]. /// - /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column. - fn try_from(table_parquet_options: &TableParquetOptions) -> Result { + /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column, + /// as well as the arrow schema encoded into the kv_meta at [`ARROW_SCHEMA_META_KEY`]. + pub fn into_writer_properties_builder_with_arrow_schema( + &self, + to_encode: Option<&Arc>, + ) -> Result { // Table options include kv_metadata and col-specific options let TableParquetOptions { global, column_specific_options, key_value_metadata, - } = table_parquet_options; + } = self; let mut builder = global.into_writer_properties_builder()?; - if !key_value_metadata.is_empty() { + // add kv_meta, if any + let mut kv_meta = key_value_metadata.to_owned(); + if let Some(schema) = to_encode { + kv_meta.insert( + ARROW_SCHEMA_META_KEY.into(), + Some(encode_arrow_schema(schema)), + ); + } + if !kv_meta.is_empty() { builder = builder.set_key_value_metadata(Some( - key_value_metadata - .to_owned() + kv_meta .drain() .map(|(key, value)| KeyValue { key, value }) .collect(), @@ -140,6 +151,32 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { } } +/// Encodes the Arrow schema into the IPC format, and base64 encodes it +/// +/// TODO: make arrow schema encoding available in a public API. +/// Refer to currently private `add_encoded_arrow_schema_to_metadata` and `encode_arrow_schema` public. +/// +fn encode_arrow_schema(schema: &Arc) -> String { + let options = arrow_ipc::writer::IpcWriteOptions::default(); + let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(true); + let data_gen = arrow_ipc::writer::IpcDataGenerator::default(); + let mut serialized_schema = data_gen.schema_to_bytes_with_dictionary_tracker( + schema, + &mut dictionary_tracker, + &options, + ); + + // manually prepending the length to the schema as arrow uses the legacy IPC format + // TODO: change after addressing ARROW-9777 + let schema_len = serialized_schema.ipc_message.len(); + let mut len_prefix_schema = Vec::with_capacity(schema_len + 8); + len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]); + len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut()); + len_prefix_schema.append(&mut serialized_schema.ipc_message); + + base64::prelude::BASE64_STANDARD.encode(&len_prefix_schema) +} + impl ParquetOptions { /// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`]. /// @@ -573,7 +610,8 @@ mod tests { key_value_metadata: [(key, value)].into(), }; - let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts) + let writer_props = table_parquet_opts + .into_writer_properties_builder() .unwrap() .build(); assert_eq!( @@ -600,10 +638,10 @@ mod tests { let default_writer_props = WriterProperties::new(); // WriterProperties::try_from(TableParquetOptions::default), a.k.a. using datafusion's defaults - let from_datafusion_defaults = - WriterPropertiesBuilder::try_from(&default_table_writer_opts) - .unwrap() - .build(); + let from_datafusion_defaults = default_table_writer_opts + .into_writer_properties_builder() + .unwrap() + .build(); // Expected: how the defaults should not match assert_ne!( @@ -656,10 +694,10 @@ mod tests { // the TableParquetOptions::default, with only the bloom filter turned on let mut default_table_writer_opts = TableParquetOptions::default(); default_table_writer_opts.global.bloom_filter_on_write = true; - let from_datafusion_defaults = - WriterPropertiesBuilder::try_from(&default_table_writer_opts) - .unwrap() - .build(); + let from_datafusion_defaults = default_table_writer_opts + .into_writer_properties_builder() + .unwrap() + .build(); // the WriterProperties::default, with only the bloom filter turned on let default_writer_props = WriterProperties::builder() @@ -684,10 +722,10 @@ mod tests { let mut default_table_writer_opts = TableParquetOptions::default(); default_table_writer_opts.global.bloom_filter_on_write = true; default_table_writer_opts.global.bloom_filter_fpp = Some(0.42); - let from_datafusion_defaults = - WriterPropertiesBuilder::try_from(&default_table_writer_opts) - .unwrap() - .build(); + let from_datafusion_defaults = default_table_writer_opts + .into_writer_properties_builder() + .unwrap() + .build(); // the WriterProperties::default, with only fpp set let default_writer_props = WriterProperties::builder() @@ -716,10 +754,10 @@ mod tests { let mut default_table_writer_opts = TableParquetOptions::default(); default_table_writer_opts.global.bloom_filter_on_write = true; default_table_writer_opts.global.bloom_filter_ndv = Some(42); - let from_datafusion_defaults = - WriterPropertiesBuilder::try_from(&default_table_writer_opts) - .unwrap() - .build(); + let from_datafusion_defaults = default_table_writer_opts + .into_writer_properties_builder() + .unwrap() + .build(); // the WriterProperties::default, with only ndv set let default_writer_props = WriterProperties::builder()