Skip to content

Commit

Permalink
feat: upgrade dataframe write_parquet and write_json
Browse files Browse the repository at this point in the history
The options to write_parquet changed.

write_json has a new argument that I defaulted to None. We can expose that config later.

Ref: apache/datafusion#9382
  • Loading branch information
Michael-J-Ward committed May 8, 2024
1 parent 5ec79ac commit 7786a71
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use std::sync::Arc;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow};
use datafusion::arrow::util::pretty;
use datafusion::config::TableParquetOptions;
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
use datafusion::execution::SendableRecordBatchStream;
use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::prelude::*;
use datafusion_common::UnnestOptions;
use pyo3::exceptions::{PyTypeError, PyValueError};
Expand Down Expand Up @@ -350,7 +350,7 @@ impl PyDataFrame {
cl.ok_or(PyValueError::new_err("compression_level is not defined"))
}

let compression_type = match compression.to_lowercase().as_str() {
let _validated = match compression.to_lowercase().as_str() {
"snappy" => Compression::SNAPPY,
"gzip" => Compression::GZIP(
GzipLevel::try_new(compression_level.unwrap_or(6))
Expand All @@ -375,16 +375,20 @@ impl PyDataFrame {
}
};

let writer_properties = WriterProperties::builder()
.set_compression(compression_type)
.build();
let mut compression_string = compression.to_string();
if let Some(level) = compression_level {
compression_string.push_str(&format!("({level})"));
}

let mut options = TableParquetOptions::default();
options.global.compression = Some(compression_string);

wait_for_future(
py,
self.df.as_ref().clone().write_parquet(
path,
DataFrameWriteOptions::new(),
Option::from(writer_properties),
Option::from(options),
),
)?;
Ok(())
Expand All @@ -397,7 +401,7 @@ impl PyDataFrame {
self.df
.as_ref()
.clone()
.write_json(path, DataFrameWriteOptions::new()),
.write_json(path, DataFrameWriteOptions::new(), None),
)?;
Ok(())
}
Expand Down

0 comments on commit 7786a71

Please sign in to comment.