From 7b82982f3684a641fe60b186e156d92022c0e263 Mon Sep 17 00:00:00 2001 From: Benjamin Levin Date: Wed, 25 Oct 2023 17:47:37 -0500 Subject: [PATCH] Add io_parquet_async feature flag --- Cargo.toml | 10 +++++----- src/io/parquet/read/mod.rs | 13 +++++++++---- src/io/parquet/read/row_group.rs | 6 ++++++ src/io/parquet/write/mod.rs | 3 +++ 4 files changed, 23 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5deab656f5e..a8e5933d2fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,7 +100,7 @@ odbc-api = { version = "0.36", optional = true } # Faster hashing ahash = "0.8" -# For `LIKE` matching "contains" fast-path +# For `LIKE` matching "contains" fast-path memchr = { version = "2.6", optional = true } # Support conversion to/from arrow-rs @@ -117,7 +117,6 @@ getrandom = { version = "0.2", features = ["js"] } version = "0.17" optional = true default_features = false -features = ["async"] [dev-dependencies] criterion = "0.4" @@ -160,7 +159,7 @@ full = [ "io_ipc_compression", "io_json_integration", "io_print", - "io_parquet", + "io_parquet_async", "io_parquet_compression", "io_avro", "io_orc", @@ -189,7 +188,8 @@ io_ipc_compression = ["lz4", "zstd"] io_flight = ["io_ipc", "arrow-format/flight-data"] # base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format. -io_parquet = ["parquet2", "io_ipc", "base64", "futures", "streaming-iterator", "fallible-streaming-iterator"] +io_parquet = ["parquet2", "io_ipc", "base64", "streaming-iterator", "fallible-streaming-iterator"] +io_parquet_async = ["futures", "io_parquet", "parquet2/async"] io_parquet_compression = [ "io_parquet_zstd", @@ -200,7 +200,7 @@ io_parquet_compression = [ ] # sample testing of generated arrow data -io_parquet_sample_test = ["io_parquet"] +io_parquet_sample_test = ["io_parquet_async"] # compression backends io_parquet_zstd = ["parquet2/zstd"] diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index ea2b2f46d43..e856f101af3 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -10,19 +10,22 @@ pub mod statistics; use std::io::{Read, Seek}; +#[cfg(feature = "io_parquet_async")] use futures::{AsyncRead, AsyncSeek}; // re-exports of parquet2's relevant APIs +#[cfg(feature = "io_parquet_async")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_parquet_async")))] +pub use parquet2::read::{get_page_stream, read_metadata_async as _read_metadata_async}; pub use parquet2::{ error::Error as ParquetError, fallible_streaming_iterator, metadata::{ColumnChunkMetaData, ColumnDescriptor, RowGroupMetaData}, page::{CompressedDataPage, DataPageHeader, Page}, read::{ - decompress, get_column_iterator, get_page_stream, - read_columns_indexes as _read_columns_indexes, read_metadata as _read_metadata, - read_metadata_async as _read_metadata_async, read_pages_locations, BasicDecompressor, - Decompressor, MutStreamingIterator, PageFilter, PageReader, ReadColumnIterator, State, + decompress, get_column_iterator, read_columns_indexes as _read_columns_indexes, + read_metadata as _read_metadata, read_pages_locations, BasicDecompressor, Decompressor, + MutStreamingIterator, PageFilter, PageReader, ReadColumnIterator, State, }, schema::types::{ GroupLogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, PrimitiveLogicalType, @@ -60,6 +63,8 @@ pub fn read_metadata(reader: &mut R) -> Result { } /// Reads parquets' metadata asynchronously. +#[cfg(feature = "io_parquet_async")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_parquet_async")))] pub async fn read_metadata_async( reader: &mut R, ) -> Result { diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index 176c6e83182..7062df31e45 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -1,5 +1,6 @@ use std::io::{Read, Seek}; +#[cfg(feature = "io_parquet_async")] use futures::{ future::{try_join_all, BoxFuture}, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, @@ -138,6 +139,7 @@ where Ok((meta, chunk)) } +#[cfg(feature = "io_parquet_async")] async fn _read_single_column_async<'b, R, F>( reader_factory: F, meta: &ColumnChunkMetaData, @@ -163,6 +165,8 @@ where /// /// It does so asynchronously via a single `join_all` over all the necessary columns for /// `field_name`. +#[cfg(feature = "io_parquet_async")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_parquet_async")))] pub async fn read_columns_async< 'a, 'b, @@ -303,6 +307,8 @@ pub fn read_columns_many<'a, R: Read + Seek>( /// This operation is IO-bounded `O(C)` where C is the number of columns in the row group - /// it reads all the columns to memory from the row group associated to the requested fields. /// It does so asynchronously via `join_all` +#[cfg(feature = "io_parquet_async")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_parquet_async")))] pub async fn read_columns_many_async< 'a, 'b, diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index d4134f27df5..6ef1864c6f3 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -22,6 +22,7 @@ mod pages; mod primitive; mod row_group; mod schema; +#[cfg(feature = "io_parquet_async")] mod sink; mod utf8; mod utils; @@ -68,6 +69,8 @@ use crate::compute::aggregate::estimated_bytes_size; pub use file::FileWriter; pub use row_group::{row_group_iter, RowGroupIterator}; pub use schema::to_parquet_type; +#[cfg(feature = "io_parquet_async")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_parquet_async")))] pub use sink::FileSink; pub use pages::array_to_columns;