Skip to content

Commit

Permalink
Slight reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
H-Plus-Time committed Sep 1, 2023
1 parent df5c5dd commit c262267
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 13 deletions.
22 changes: 12 additions & 10 deletions src/arrow2/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,8 @@ pub fn write_parquet_stream(
) -> WasmResult<wasm_streams::readable::sys::ReadableStream> {
use futures::StreamExt;
let (schema, chunks) = table.into_inner();
let batches = futures::stream::iter(chunks.into_iter()).map(move |chunk| {
arrow_wasm::arrow2::RecordBatch::new(schema.clone(), chunk)
});
let batches = futures::stream::iter(chunks.into_iter())
.map(move |chunk| arrow_wasm::arrow2::RecordBatch::new(schema.clone(), chunk));
let output_stream = super::writer_async::transform_parquet_stream(
batches,
writer_properties.unwrap_or_default(),
Expand All @@ -252,16 +251,19 @@ pub fn write_parquet_stream(
#[cfg(all(feature = "writer", feature = "async"))]
pub fn transform_parquet_stream(
stream: wasm_streams::readable::sys::ReadableStream,
writer_properties: Option<crate::arrow2::writer_properties::WriterProperties>
writer_properties: Option<crate::arrow2::writer_properties::WriterProperties>,
) -> WasmResult<wasm_streams::readable::sys::ReadableStream> {
use futures::StreamExt;
let batches = wasm_streams::ReadableStream::from_raw(stream).into_stream().map(|maybe_chunk| {
let chunk = maybe_chunk.unwrap();
let transformed: arrow_wasm::arrow2::RecordBatch = chunk.try_into().unwrap();
transformed
});
let batches = wasm_streams::ReadableStream::from_raw(stream)
.into_stream()
.map(|maybe_chunk| {
let chunk = maybe_chunk.unwrap();
let transformed: arrow_wasm::arrow2::RecordBatch = chunk.try_into().unwrap();
transformed
});
let output_stream = super::writer_async::transform_parquet_stream(
batches, writer_properties.unwrap_or_default()
batches,
writer_properties.unwrap_or_default(),
);
Ok(output_stream.unwrap())
}
6 changes: 3 additions & 3 deletions src/arrow2/writer_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub fn transform_parquet_stream(
) -> Result<wasm_streams::readable::sys::ReadableStream> {
let options = writer_properties.get_write_options();
let encoding = writer_properties.get_encoding();

let (writable_stream, output_stream) = {
let raw_stream = wasm_streams::transform::sys::TransformStream::new();
let raw_writable = raw_stream.writable();
Expand All @@ -49,7 +49,7 @@ pub fn transform_parquet_stream(
};
(writable_stream, raw_stream.readable())
};

spawn_local::<_>(async move {
let mut adapted_stream = batches.peekable();
let mut pinned_stream = std::pin::pin!(adapted_stream);
Expand All @@ -69,4 +69,4 @@ pub fn transform_parquet_stream(
let _ = writer.close().await;
});
Ok(output_stream)
}
}

0 comments on commit c262267

Please sign in to comment.