Skip to content

Commit

Permalink
refactor(11770): replace with new method, since the kv_metadata is in…
Browse files Browse the repository at this point in the history
…herent to TableParquetOptions and therefore we should explicitly make the API apparant that you have to include the arrow schema or not
  • Loading branch information
wiedld committed Dec 21, 2024
1 parent f153153 commit 91442a3
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 45 deletions.
2 changes: 2 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ apache-avro = { version = "0.17", default-features = false, features = [
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-ipc = { workspace = true }
arrow-schema = { workspace = true }
base64 = "0.22.1"
half = { workspace = true }
hashbrown = { workspace = true }
indexmap = { workspace = true }
Expand Down
13 changes: 8 additions & 5 deletions datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ pub mod parquet_writer;
mod tests {
use std::collections::HashMap;

use super::parquet_writer::ParquetWriterOptions;
use crate::{
config::{ConfigFileType, TableOptions},
file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
Expand Down Expand Up @@ -79,8 +78,10 @@ mod tests {
table_config.set_config_format(ConfigFileType::PARQUET);
table_config.alter_with_string_hash_map(&option_map)?;

let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?;
let properties = parquet_options.writer_options();
let properties = table_config
.parquet
.into_writer_properties_builder()?
.build();

// Verify the expected options propagated down to parquet crate WriterProperties struct
assert_eq!(properties.max_row_group_size(), 123);
Expand Down Expand Up @@ -184,8 +185,10 @@ mod tests {
table_config.set_config_format(ConfigFileType::PARQUET);
table_config.alter_with_string_hash_map(&option_map)?;

let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?;
let properties = parquet_options.writer_options();
let properties = table_config
.parquet
.into_writer_properties_builder()?
.build();

let col1 = ColumnPath::from(vec!["col1".to_owned()]);
let col2_nested = ColumnPath::from(vec!["col2".to_owned(), "nested".to_owned()]);
Expand Down
118 changes: 78 additions & 40 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@

//! Options related to how parquet files should be written
use base64::Engine;
use std::sync::Arc;

use crate::{
config::{ParquetOptions, TableParquetOptions},
DataFusionError, Result,
};

use arrow_schema::Schema;
use parquet::{
arrow::ARROW_SCHEMA_META_KEY,
basic::{BrotliLevel, GzipLevel, ZstdLevel},
file::properties::{
EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED,
file::{
metadata::KeyValue,
properties::{
EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED,
},
},
format::KeyValue,
schema::types::ColumnPath,
};

Expand All @@ -51,38 +58,42 @@ impl ParquetWriterOptions {
}
}

impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
type Error = DataFusionError;

fn try_from(parquet_table_options: &TableParquetOptions) -> Result<Self> {
// ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns)
Ok(ParquetWriterOptions {
writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)?
.build(),
})
impl TableParquetOptions {
/// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`].
///
/// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column.
pub fn into_writer_properties_builder(&self) -> Result<WriterPropertiesBuilder> {
self.into_writer_properties_builder_with_arrow_schema(None)
}
}

impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
type Error = DataFusionError;

/// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`].
///
/// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column.
fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> {
/// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column,
/// as well as the arrow schema encoded into the kv_meta at [`ARROW_SCHEMA_META_KEY`].
pub fn into_writer_properties_builder_with_arrow_schema(
&self,
to_encode: Option<&Arc<Schema>>,
) -> Result<WriterPropertiesBuilder> {
// Table options include kv_metadata and col-specific options
let TableParquetOptions {
global,
column_specific_options,
key_value_metadata,
} = table_parquet_options;
} = self;

let mut builder = global.into_writer_properties_builder()?;

if !key_value_metadata.is_empty() {
// add kv_meta, if any
let mut kv_meta = key_value_metadata.to_owned();
if let Some(schema) = to_encode {
kv_meta.insert(
ARROW_SCHEMA_META_KEY.into(),
Some(encode_arrow_schema(schema)),
);
}
if !kv_meta.is_empty() {
builder = builder.set_key_value_metadata(Some(
key_value_metadata
.to_owned()
kv_meta
.drain()
.map(|(key, value)| KeyValue { key, value })
.collect(),
Expand Down Expand Up @@ -140,6 +151,32 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
}
}

/// Encodes the Arrow schema into the IPC format, and base64 encodes it
///
/// TODO: make arrow schema encoding available in a public API.
/// Refer to currently private `add_encoded_arrow_schema_to_metadata` and `encode_arrow_schema` public.
/// <https://github.com/apache/arrow-rs/blob/2908a80d9ca3e3fb0414e35b67856f1fb761304c/parquet/src/arrow/schema/mod.rs#L172-L221>
fn encode_arrow_schema(schema: &Arc<Schema>) -> String {
let options = arrow_ipc::writer::IpcWriteOptions::default();
let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(true);
let data_gen = arrow_ipc::writer::IpcDataGenerator::default();
let mut serialized_schema = data_gen.schema_to_bytes_with_dictionary_tracker(
schema,
&mut dictionary_tracker,
&options,
);

// manually prepending the length to the schema as arrow uses the legacy IPC format
// TODO: change after addressing ARROW-9777
let schema_len = serialized_schema.ipc_message.len();
let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
len_prefix_schema.append(&mut serialized_schema.ipc_message);

base64::prelude::BASE64_STANDARD.encode(&len_prefix_schema)
}

impl ParquetOptions {
/// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`].
///
Expand Down Expand Up @@ -573,7 +610,8 @@ mod tests {
key_value_metadata: [(key, value)].into(),
};

let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts)
let writer_props = table_parquet_opts
.into_writer_properties_builder()
.unwrap()
.build();
assert_eq!(
Expand All @@ -600,10 +638,10 @@ mod tests {
let default_writer_props = WriterProperties::new();

// WriterProperties::try_from(TableParquetOptions::default), a.k.a. using datafusion's defaults
let from_datafusion_defaults =
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
.unwrap()
.build();
let from_datafusion_defaults = default_table_writer_opts
.into_writer_properties_builder()
.unwrap()
.build();

// Expected: how the defaults should not match
assert_ne!(
Expand Down Expand Up @@ -656,10 +694,10 @@ mod tests {
// the TableParquetOptions::default, with only the bloom filter turned on
let mut default_table_writer_opts = TableParquetOptions::default();
default_table_writer_opts.global.bloom_filter_on_write = true;
let from_datafusion_defaults =
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
.unwrap()
.build();
let from_datafusion_defaults = default_table_writer_opts
.into_writer_properties_builder()
.unwrap()
.build();

// the WriterProperties::default, with only the bloom filter turned on
let default_writer_props = WriterProperties::builder()
Expand All @@ -684,10 +722,10 @@ mod tests {
let mut default_table_writer_opts = TableParquetOptions::default();
default_table_writer_opts.global.bloom_filter_on_write = true;
default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
let from_datafusion_defaults =
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
.unwrap()
.build();
let from_datafusion_defaults = default_table_writer_opts
.into_writer_properties_builder()
.unwrap()
.build();

// the WriterProperties::default, with only fpp set
let default_writer_props = WriterProperties::builder()
Expand Down Expand Up @@ -716,10 +754,10 @@ mod tests {
let mut default_table_writer_opts = TableParquetOptions::default();
default_table_writer_opts.global.bloom_filter_on_write = true;
default_table_writer_opts.global.bloom_filter_ndv = Some(42);
let from_datafusion_defaults =
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
.unwrap()
.build();
let from_datafusion_defaults = default_table_writer_opts
.into_writer_properties_builder()
.unwrap()
.build();

// the WriterProperties::default, with only ndv set
let default_writer_props = WriterProperties::builder()
Expand Down

0 comments on commit 91442a3

Please sign in to comment.