Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supporting writing schema metadata when writing Parquet in parallel #13866

Merged
merged 16 commits into from
Jan 1, 2025

Conversation

wiedld
Copy link
Contributor

@wiedld wiedld commented Dec 21, 2024

Which issue does this PR close?

Closes #11770

Rationale for this change

The ArrowWriter with it's default ArrowWriterOptions will encode the arrow schema in the parquet kv_metadata, unless explicitly skipped. Skipping is done via ArrowWriterOptions::with_skip_arrow_metadata.

In datafusion's ParquetSink, we can write in either single threaded or parallelized format. When in single-threaded mode, we use the default ArrowWriterOptions and the arrow schema is inserted into file kv_meta. However, when performing parallelized writes we do not use the ArrowWriter and instead rely upon the SerializedFileWriter. As a result, we are missing the arrow schema metadata in the parquet files (see the issue ticket).

ArrowWriterOptions vs WriterProperties

The SerializedFileWriter, along with other associated writers, rely upon the WriterProperties. The WriterProperties differ from the ArrowWriterOptions only in terms of the skip_arrow_metadata and the schema_root:

pub struct ArrowWriterOptions {
    properties: WriterProperties,
    skip_arrow_metadata: bool,
    schema_root: Option<String>,
}

The skip_arrow_metadata config is only used to decide if the schema should be added to the WriterProperties.kv_metadata.

Proposed Solution

Since we have WriterProperties, not ArrowWriterOptions, I focused on solutions which construct the proper WriterProperties.kv_metadata (with or without the arrow schema).

Our established pattern is to take TableParquetOptions configurations, and derive WriterProperties from those. Therefore, I updates those conversion methods to consider arrow schema insertion.

What changes are included in this PR?

  • add a new configuration ParquetOptions.skip_arrow_metadata
  • have ParquetSink single-threaded writes, which use the ArrowWriter, respect this configuration
  • have ParquetSink multiple-threaded writes, which use WriterProperties, respect this configuration
  • TableParquetOptions => into WriterProperties conversion methods are updated

Are these changes tested?

yes.

Are there any user-facing changes?

We have new APIs:

  • ParquetOptions.skip_arrow_metadata configuration
  • replace/deprecate ParquetWriterOptions::try_from(ParquetWriterOptions) and replace with methods which explicitly handle arrow schema

@github-actions github-actions bot added core Core DataFusion crate common Related to common crate proto Related to proto crate labels Dec 21, 2024
@wiedld wiedld force-pushed the 11770/parquet-sink-metadata branch from 93278ee to 1a9da6f Compare December 21, 2024 01:10
Comment on lines 165 to 170
/// Encodes the Arrow schema into the IPC format, and base64 encodes it
///
/// TODO: make arrow schema encoding available in a public API.
/// Refer to currently private `add_encoded_arrow_schema_to_metadata` and `encode_arrow_schema` public.
/// <https://github.com/apache/arrow-rs/blob/2908a80d9ca3e3fb0414e35b67856f1fb761304c/parquet/src/arrow/schema/mod.rs#L172-L221>
fn encode_arrow_schema(schema: &Arc<Schema>) -> String {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are in agreement on need, I'll go make the arrow-rs PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 2347 to +2353
async fn parquet_sink_write() -> Result<()> {
let parquet_sink = create_written_parquet_sink("file:///").await?;

// assert written
let mut written = parquet_sink.written();
let written = written.drain();
assert_eq!(
written.len(),
1,
"expected a single parquet files to be written, instead found {}",
written.len()
);
// assert written to proper path
let (path, file_metadata) = get_written(parquet_sink)?;
let path_parts = path.parts().collect::<Vec<_>>();
assert_eq!(path_parts.len(), 1, "should not have path prefix");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored the tests first, into a series of helpers. May be easier to review in this commit: 09004c5

}

#[tokio::test]
async fn parquet_sink_parallel_write() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New test added.

}

#[tokio::test]
async fn parquet_sink_write_insert_schema_into_metadata() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New test added.

@wiedld wiedld force-pushed the 11770/parquet-sink-metadata branch from 1a9da6f to 0b960d9 Compare December 21, 2024 01:53
@wiedld wiedld changed the title ParquetSink should be aware of arrow schema encoding (configurable) in the file metadata. WIP: ParquetSink should be aware of arrow schema encoding (configurable) in the file metadata. Dec 21, 2024
@wiedld wiedld changed the title WIP: ParquetSink should be aware of arrow schema encoding (configurable) in the file metadata. WIP: ParquetSink should be aware of arrow schema encoding for the file metadata. Dec 21, 2024
@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Dec 26, 2024
@@ -61,7 +61,7 @@ logical_plan
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--FilterExec: column1@0 != 42
03)----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..88], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:88..176], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:176..264], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:264..351]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != column1_row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)]
03)----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != column1_row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)]
Copy link
Contributor Author

@wiedld wiedld Dec 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this file, the partition offsets are shifted based upon the slightly larger kv metadata (from the default addition of encoded arrow schema).

@wiedld wiedld changed the title WIP: ParquetSink should be aware of arrow schema encoding for the file metadata. ParquetSink should be aware of arrow schema encoding for the file metadata. Dec 26, 2024
@wiedld wiedld marked this pull request as ready for review December 26, 2024 18:31
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank ou @wiedld -- I think this is very close.

  1. Can you please try and avoid into_writer_properties_builder_with_arrow_schema -- it would be nicer to avoid having that and into_writer_properties_builder if possible
  2. Could you please file a ticket / PR upstream in Arrow to support the metadata encoding?

Otherwise I think this PR is ready to go

FYI @kylebarron

///
/// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column,
/// as well as the arrow schema encoded into the kv_meta at [`ARROW_SCHEMA_META_KEY`].
pub fn into_writer_properties_builder_with_arrow_schema(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think making setting the arrow schema a different function might be more consistent with the rest of this API https://docs.rs/datafusion/latest/datafusion/config/struct.ParquetOptions.html

So for example I would expect that ParquetOptions::into_writer_properties_builder would always set the arrow metadata to be consistent with how WriterProperties works 🤔

If someone doesn't want the arrow metadata, I would expect an option to disable it, like https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata

So that might mean that TableParquetOptions has a field like skip_arrow_metadata

And then depending on the value of that field, into_writer_properties_builder would set/not set the metadata appropriately

That would also avoid having to change the conversion API

Copy link
Contributor Author

@wiedld wiedld Dec 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for example I would expect that ParquetOptions::into_writer_properties_builder would always set the arrow metadata to be consistent with how WriterProperties works 🤔

The ParquetOptions does not have kv_metadata. That is held within the parent structure TableParquetOptions. That is why the existing TryFrom code only constructs the writer properties from the TableParquetOptions. Not the ParquetOptions.

I like the idea of making the metadata APIs on the TableParquetOptions, altho I'll keep the config on the ParquetOptions. I'll push up the change shortly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the change: f2f9b00

I also made another change to prevent the user errors I was worried about: c5ad794

datafusion/core/src/datasource/file_format/parquet.rs Outdated Show resolved Hide resolved
Ok((path, file_metadata))
}

fn assert_file_metadata(file_metadata: FileMetaData, expected_kv: Vec<KeyValue>) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you passed in a ref here it might make the code above simpler (less clone):

Suggested change
fn assert_file_metadata(file_metadata: FileMetaData, expected_kv: Vec<KeyValue>) {
fn assert_file_metadata(file_metadata: &FileMetaData, expected_kv: &[KeyValue]) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second arg (expected_kv) as a ref does indeed avoid the cloning. Thank you.

The first arg does not have any cloning currently (see tests), and if we add the ref -- then I would need to clone the file_metadata.key_value_metadata in order to sort & compare to the expected_kv.

@alamb alamb changed the title ParquetSink should be aware of arrow schema encoding for the file metadata. Supporting writing schema metadata when writing Parquet in parallel Dec 26, 2024
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Dec 27, 2024
…Options>, and separately add the arrow_schema to the kv_metadata on the TableParquetOptions
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for this contribution @wiedld -- this looks good to me.

self.config.output_schema()
};

// TODO: avoid this clone in follow up PR, where the writer properties & schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a PR you plan to do ? Should I file a ticket to track?

@alamb
Copy link
Contributor

alamb commented Dec 30, 2024

I'll plan to merge this tomorrow unless others would like time to review

Thanks again @wiedld

@alamb alamb merged commit b54e648 into apache:main Jan 1, 2025
29 checks passed
@alamb
Copy link
Contributor

alamb commented Jan 1, 2025

Thanks again @wiedld

@alamb alamb deleted the 11770/parquet-sink-metadata branch January 1, 2025 13:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate documentation Improvements or additions to documentation proto Related to proto crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Arrow schema is missing from the parquet metadata, for files written by ParquetSink.
2 participants