Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into streaming-writes
Browse files Browse the repository at this point in the history
  • Loading branch information
H-Plus-Time committed Sep 5, 2023
2 parents 07ef837 + c14a39a commit 7a0f1df
Show file tree
Hide file tree
Showing 11 changed files with 2,231 additions and 2,693 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "parquet-wasm"
version = "0.4.0"
version = "0.5.0-alpha.1"
authors = ["Kyle Barron <[email protected]>"]
edition = "2021"
description = "WebAssembly Parquet reader and writer."
Expand All @@ -16,12 +16,12 @@ crate-type = ["cdylib", "rlib"]

[features]
default = ["arrow1", "all_compressions", "reader", "writer"]
arrow1 = ["dep:arrow", "dep:parquet", "dep:bytes"]
arrow1 = ["dep:arrow", "dep:parquet", "dep:bytes", "arrow-wasm/arrow1"]
arrow2 = [
"dep:arrow2",
"dep:parquet2",
"dep:serde-wasm-bindgen",
"dep:arrow-wasm",
"arrow-wasm/arrow2",
]
reader = []
writer = []
Expand Down Expand Up @@ -77,8 +77,8 @@ js-sys = "0.3.60"
getrandom = { version = "0.2.6", features = ["js"] }
thiserror = "1.0"

arrow-wasm = { git = "https://github.com/kylebarron/arrow-wasm", rev = "1c48ba24e98c756fd593418180c3833fc6e3a795", optional = true }
# arrow-wasm = { path = "/Users/kyle/github/rust/arrow-wasm", optional = true }
arrow-wasm = { git = "https://github.com/kylebarron/arrow-wasm", rev = "ab0455c7dd8df31f3d4b97988f08bc34e62f2dc9" }
# arrow-wasm = { path = "/Users/kyle/github/rust/arrow-wasm" }

arrow2 = { version = "0.17", optional = true, features = [
"io_ipc",
Expand Down
23 changes: 6 additions & 17 deletions src/arrow1/reader.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,26 @@
use crate::arrow1::error::Result;
use crate::arrow1::ffi::FFIArrowTable;
use arrow::ipc::writer::StreamWriter;
use arrow_wasm::arrow1::Table;
use bytes::Bytes;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;

/// Internal function to read a buffer with Parquet data into a buffer with Arrow IPC Stream data
/// using the arrow and parquet crates
pub fn read_parquet(parquet_file: Vec<u8>) -> Result<Vec<u8>> {
pub fn read_parquet(parquet_file: Vec<u8>) -> Result<Table> {
// Create Parquet reader
let cursor: Bytes = parquet_file.into();
let builder = ParquetRecordBatchReaderBuilder::try_new(cursor).unwrap();
let arrow_schema = builder.schema().clone();

// Create Arrow reader
let reader = builder.build().unwrap();

// Create IPC Writer
let mut output_file = Vec::new();
let mut batches = vec![];

{
let mut writer = StreamWriter::try_new(&mut output_file, &arrow_schema)?;

// Iterate over record batches, writing them to IPC stream
for maybe_record_batch in reader {
let record_batch = maybe_record_batch?;
writer.write(&record_batch)?;
}
writer.finish()?;
for maybe_chunk in reader {
batches.push(maybe_chunk?)
}

// Note that this returns output_file directly instead of using writer.into_inner().to_vec() as
// the latter seems likely to incur an extra copy of the vec
Ok(output_file)
Ok(Table::new(batches))
}

pub fn read_parquet_ffi(parquet_file: Vec<u8>) -> Result<FFIArrowTable> {
Expand Down
31 changes: 11 additions & 20 deletions src/arrow1/wasm.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::arrow1::error::WasmResult;
use crate::arrow1::ffi::FFIArrowTable;
use crate::utils::assert_parquet_file_not_empty;
use arrow_wasm::arrow1::{RecordBatch, Table};
use wasm_bindgen::prelude::*;

/// Read a Parquet file into Arrow data using the [`arrow`](https://crates.io/crates/arrow) and
Expand All @@ -23,18 +23,11 @@ use wasm_bindgen::prelude::*;
/// @returns Uint8Array containing Arrow data in [IPC Stream format](https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format). To parse this into an Arrow table, pass to `tableFromIPC` in the Arrow JS bindings.
#[wasm_bindgen(js_name = readParquet)]
#[cfg(feature = "reader")]
pub fn read_parquet(parquet_file: Vec<u8>) -> WasmResult<Vec<u8>> {
pub fn read_parquet(parquet_file: Vec<u8>) -> WasmResult<Table> {
assert_parquet_file_not_empty(parquet_file.as_slice())?;
Ok(crate::arrow1::reader::read_parquet(parquet_file)?)
}

#[wasm_bindgen(js_name = readParquetFFI)]
#[cfg(feature = "reader")]
pub fn read_parquet_ffi(parquet_file: Vec<u8>) -> WasmResult<FFIArrowTable> {
assert_parquet_file_not_empty(&parquet_file)?;
Ok(crate::arrow1::reader::read_parquet_ffi(parquet_file)?)
}

/// Write Arrow data to a Parquet file using the [`arrow`](https://crates.io/crates/arrow) and
/// [`parquet`](https://crates.io/crates/parquet) Rust crates.
///
Expand Down Expand Up @@ -62,32 +55,30 @@ pub fn read_parquet_ffi(parquet_file: Vec<u8>) -> WasmResult<FFIArrowTable> {
#[wasm_bindgen(js_name = writeParquet)]
#[cfg(feature = "writer")]
pub fn write_parquet(
arrow_file: &[u8],
table: Table,
writer_properties: Option<crate::arrow1::writer_properties::WriterProperties>,
) -> WasmResult<Vec<u8>> {
let writer_props = writer_properties.unwrap_or_else(|| {
crate::arrow1::writer_properties::WriterPropertiesBuilder::default().build()
});

let schema = table.schema().into_inner();
let batches = table.into_inner();
Ok(crate::arrow1::writer::write_parquet(
arrow_file,
writer_props,
batches.into_iter(),
schema,
writer_properties.unwrap_or_default(),
)?)
}

#[wasm_bindgen(js_name = readFFIStream)]
#[wasm_bindgen(js_name = readParquetStream)]
#[cfg(all(feature = "reader", feature = "async"))]
pub async fn read_ffi_stream(
pub async fn read_parquet_stream(
url: String,
content_length: Option<usize>,
) -> WasmResult<wasm_streams::readable::sys::ReadableStream> {
use crate::arrow1::ffi::FFIArrowRecordBatch;
use futures::StreamExt;
let parquet_stream =
crate::arrow1::reader_async::read_record_batch_stream(url, content_length).await?;
let stream = parquet_stream.map(|maybe_record_batch| {
let record_batch = maybe_record_batch.unwrap();
Ok(FFIArrowRecordBatch::from(record_batch).into())
Ok(RecordBatch::new(record_batch).into())
});
Ok(wasm_streams::ReadableStream::from_stream(stream).into_raw())
}
22 changes: 7 additions & 15 deletions src/arrow1/writer.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,22 @@
use crate::arrow1::error::Result;
use arrow::ipc::reader::StreamReader;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use parquet::arrow::arrow_writer::ArrowWriter;
use std::io::Cursor;

/// Internal function to write a buffer of data in Arrow IPC Stream format to a Parquet file using
/// the arrow and parquet crates
pub fn write_parquet(
arrow_file: &[u8],
batches: impl Iterator<Item = RecordBatch>,
schema: SchemaRef,
writer_properties: crate::arrow1::writer_properties::WriterProperties,
) -> Result<Vec<u8>> {
// Create IPC reader
let input_file = Cursor::new(arrow_file);
let arrow_ipc_reader = StreamReader::try_new(input_file, None)?;
let arrow_schema = arrow_ipc_reader.schema();

// Create Parquet writer
let mut output_file: Vec<u8> = vec![];
let mut writer = ArrowWriter::try_new(
&mut output_file,
arrow_schema,
Some(writer_properties.into()),
)?;
let mut writer =
ArrowWriter::try_new(&mut output_file, schema, Some(writer_properties.into()))?;

// Iterate over IPC chunks, writing each batch to Parquet
for maybe_record_batch in arrow_ipc_reader {
let record_batch = maybe_record_batch?;
for record_batch in batches {
writer.write(&record_batch)?;
}

Expand Down
6 changes: 6 additions & 0 deletions src/arrow1/writer_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ impl From<WriterProperties> for parquet::file::properties::WriterProperties {
}
}

impl Default for WriterProperties {
fn default() -> Self {
WriterPropertiesBuilder::default().build()
}
}

/// Builder to create a writing configuration for `writeParquet`
///
/// Call {@linkcode build} on the finished builder to create an immputable {@linkcode WriterProperties} to pass to `writeParquet`
Expand Down
30 changes: 15 additions & 15 deletions src/bin/read_parquet.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use clap::Parser;
use parquet_wasm::arrow1::reader::read_parquet;
use std::fs;
// use parquet_wasm::arrow1::reader::read_parquet;
// use std::fs;
use std::path::PathBuf;
use std::process;
// use std::process;

#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
Expand All @@ -17,19 +17,19 @@ struct Args {
}

fn main() {
let args = Args::parse();
// let args = Args::parse();

// Read file to buffer
let data = fs::read(&args.input_file).expect("Unable to read file");
// // Read file to buffer
// let data = fs::read(&args.input_file).expect("Unable to read file");

// Call read_parquet
let arrow_ipc = read_parquet(data)
.map_err(|err| {
eprintln!("Could not read parquet file: {}", err);
process::exit(1);
})
.unwrap();
// // Call read_parquet
// let arrow_ipc = read_parquet(data)
// .map_err(|err| {
// eprintln!("Could not read parquet file: {}", err);
// process::exit(1);
// })
// .unwrap();

// Write result to file
fs::write(&args.output_file, arrow_ipc).expect("Unable to write file");
// // Write result to file
// fs::write(&args.output_file, arrow_ipc).expect("Unable to write file");
}
42 changes: 21 additions & 21 deletions src/bin/write_parquet.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use clap::Parser;
use parquet_wasm::arrow1::writer::write_parquet;
use parquet_wasm::arrow1::writer_properties::WriterPropertiesBuilder;
use parquet_wasm::common::writer_properties::Compression;
use std::fs;
// use parquet_wasm::arrow1::writer::write_parquet;
// use parquet_wasm::arrow1::writer_properties::WriterPropertiesBuilder;
// use parquet_wasm::common::writer_properties::Compression;
// use std::fs;
use std::path::PathBuf;
use std::process;
// use std::process;

/// Simple program to greet a person
#[derive(Parser, Debug)]
Expand All @@ -20,24 +20,24 @@ struct Args {
}

fn main() {
let args = Args::parse();
// let args = Args::parse();

// Read file to buffer
let data = fs::read(&args.input_file).expect("Unable to read file");
let slice = data.as_slice();
// // Read file to buffer
// let data = fs::read(&args.input_file).expect("Unable to read file");
// let slice = data.as_slice();

// Call read_parquet
let writer_properties = WriterPropertiesBuilder::new()
.set_compression(Compression::SNAPPY)
.build();
// // Call read_parquet
// let writer_properties = WriterPropertiesBuilder::new()
// .set_compression(Compression::SNAPPY)
// .build();

let arrow_ipc = write_parquet(slice, writer_properties)
.map_err(|err| {
eprintln!("Could not write parquet file: {}", err);
process::exit(1);
})
.unwrap();
// let arrow_ipc = write_parquet(slice, writer_properties)
// .map_err(|err| {
// eprintln!("Could not write parquet file: {}", err);
// process::exit(1);
// })
// .unwrap();

// Write result to file
fs::write(&args.output_file, arrow_ipc).expect("Unable to write file");
// // Write result to file
// fs::write(&args.output_file, arrow_ipc).expect("Unable to write file");
}
Loading

0 comments on commit 7a0f1df

Please sign in to comment.