Skip to content

Commit

Permalink
fixes and cmt
Browse files Browse the repository at this point in the history
  • Loading branch information
devinjdangelo committed Oct 23, 2023
1 parent 66007da commit a214937
Showing 1 changed file with 2 additions and 44 deletions.
46 changes: 2 additions & 44 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ use parquet::arrow::arrow_writer::{
ArrowLeafColumn,
};
use parquet::file::writer::SerializedFileWriter;
use rand::distributions::DistString;
use std::any::Any;
use std::fmt;
use std::fmt::Debug;
use std::fs::File;
use std::io::Write;
use std::sync::Arc;
use tokio::io::{AsyncWrite, AsyncWriteExt};
Expand All @@ -55,7 +53,6 @@ use parquet::file::footer::{decode_footer, decode_metadata};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use parquet::file::statistics::Statistics as ParquetStatistics;
use rand::distributions::Alphanumeric;

use super::write::demux::start_demuxer_task;
use super::write::{create_writer, AbortableWrite, FileWriterMode};
Expand Down Expand Up @@ -678,45 +675,6 @@ impl ParquetSink {
}
}
}

/// Creates an object store writer for each output partition
/// This is used when parallelizing individual parquet file writes.
async fn create_object_store_writers(
&self,
num_partitions: usize,
object_store: Arc<dyn ObjectStore>,
single_file_output: bool,
) -> Result<Vec<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>>> {
let mut writers = Vec::new();

let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
for part_idx in 0..num_partitions {
let file_path = if !single_file_output {
self.config.table_paths[0]
.prefix()
.child(format!("{}_{}.parquet", write_id, part_idx))
} else {
self.config.table_paths[0].prefix().clone()
};
let object_meta = ObjectMeta {
location: file_path,
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
writers.push(
create_writer(
FileWriterMode::PutMultipart,
FileCompressionType::UNCOMPRESSED,
object_meta.into(),
object_store.clone(),
)
.await?,
);
}

Ok(writers)
}
}

#[async_trait]
Expand Down Expand Up @@ -814,14 +772,14 @@ impl DataSink for ParquetSink {
let props = parquet_props.clone();
let parallel_options_clone = parallel_options.clone();
file_write_tasks.spawn(async move {
Ok(output_single_parquet_file_parallelized(
output_single_parquet_file_parallelized(
writer,
rx,
schema,
&props,
parallel_options_clone,
)
.await?)
.await
});
}
}
Expand Down

0 comments on commit a214937

Please sign in to comment.