From c2622679de4c220801cc888c6e17ce636f8330c6 Mon Sep 17 00:00:00 2001 From: Nicholas Roberts Date: Sat, 2 Sep 2023 00:05:05 +1000 Subject: [PATCH] Slight reformat --- src/arrow2/wasm.rs | 22 ++++++++++++---------- src/arrow2/writer_async.rs | 6 +++--- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/arrow2/wasm.rs b/src/arrow2/wasm.rs index 69943e44..7acbc917 100644 --- a/src/arrow2/wasm.rs +++ b/src/arrow2/wasm.rs @@ -238,9 +238,8 @@ pub fn write_parquet_stream( ) -> WasmResult { use futures::StreamExt; let (schema, chunks) = table.into_inner(); - let batches = futures::stream::iter(chunks.into_iter()).map(move |chunk| { - arrow_wasm::arrow2::RecordBatch::new(schema.clone(), chunk) - }); + let batches = futures::stream::iter(chunks.into_iter()) + .map(move |chunk| arrow_wasm::arrow2::RecordBatch::new(schema.clone(), chunk)); let output_stream = super::writer_async::transform_parquet_stream( batches, writer_properties.unwrap_or_default(), @@ -252,16 +251,19 @@ pub fn write_parquet_stream( #[cfg(all(feature = "writer", feature = "async"))] pub fn transform_parquet_stream( stream: wasm_streams::readable::sys::ReadableStream, - writer_properties: Option + writer_properties: Option, ) -> WasmResult { use futures::StreamExt; - let batches = wasm_streams::ReadableStream::from_raw(stream).into_stream().map(|maybe_chunk| { - let chunk = maybe_chunk.unwrap(); - let transformed: arrow_wasm::arrow2::RecordBatch = chunk.try_into().unwrap(); - transformed - }); + let batches = wasm_streams::ReadableStream::from_raw(stream) + .into_stream() + .map(|maybe_chunk| { + let chunk = maybe_chunk.unwrap(); + let transformed: arrow_wasm::arrow2::RecordBatch = chunk.try_into().unwrap(); + transformed + }); let output_stream = super::writer_async::transform_parquet_stream( - batches, writer_properties.unwrap_or_default() + batches, + writer_properties.unwrap_or_default(), ); Ok(output_stream.unwrap()) } diff --git a/src/arrow2/writer_async.rs b/src/arrow2/writer_async.rs index d00cebd3..acab151d 100644 --- a/src/arrow2/writer_async.rs +++ b/src/arrow2/writer_async.rs @@ -39,7 +39,7 @@ pub fn transform_parquet_stream( ) -> Result { let options = writer_properties.get_write_options(); let encoding = writer_properties.get_encoding(); - + let (writable_stream, output_stream) = { let raw_stream = wasm_streams::transform::sys::TransformStream::new(); let raw_writable = raw_stream.writable(); @@ -49,7 +49,7 @@ pub fn transform_parquet_stream( }; (writable_stream, raw_stream.readable()) }; - + spawn_local::<_>(async move { let mut adapted_stream = batches.peekable(); let mut pinned_stream = std::pin::pin!(adapted_stream); @@ -69,4 +69,4 @@ pub fn transform_parquet_stream( let _ = writer.close().await; }); Ok(output_stream) -} \ No newline at end of file +}