diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 7654ceb0fce7..91a9dae4dc74 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -17,9 +17,6 @@ //! Contains reader which reads parquet data into arrow [`RecordBatch`] -use std::collections::VecDeque; -use std::sync::Arc; -use num::ToPrimitive; use arrow_array::cast::AsArray; use arrow_array::Array; use arrow_array::{RecordBatch, RecordBatchReader}; @@ -27,6 +24,8 @@ use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; use arrow_select::filter::prep_null_mask_filter; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; pub use selection::{RowSelection, RowSelector}; +use std::collections::VecDeque; +use std::sync::Arc; pub use crate::arrow::array_reader::RowGroups; use crate::arrow::array_reader::{build_array_reader, ArrayReader}; @@ -695,14 +694,22 @@ impl Iterator for ReaderPageIterator { let total_rows = rg.num_rows() as usize; let reader = self.reader.clone(); - let file_decryptor = Arc::new(self.metadata.file_decryptor().clone().unwrap()); + let crypto_context = if self.metadata.file_decryptor().is_some() { + let file_decryptor = Arc::new(self.metadata.file_decryptor().clone().unwrap()); - let crypto_context = CryptoContext::new( - rg_idx, self.column_idx, file_decryptor.clone(), file_decryptor); - let crypto_context = Arc::new(crypto_context); + let crypto_context = CryptoContext::new( + rg_idx, + self.column_idx, + file_decryptor.clone(), + file_decryptor, + ); + Some(Arc::new(crypto_context)) + } else { + None + }; - let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations, Some(crypto_context)); - // let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations); + let ret = + SerializedPageReader::new(reader, meta, total_rows, page_locations, crypto_context); Some(ret.map(|x| Box::new(x) as _)) } } @@ -1721,12 +1728,17 @@ mod tests { .build(), ); - let metadata = ArrowReaderMetadata::load(&file, Default::default(), decryption_properties.as_ref()).unwrap(); + let metadata = + ArrowReaderMetadata::load(&file, Default::default(), decryption_properties.as_ref()) + .unwrap(); let file_metadata = metadata.metadata.file_metadata(); assert_eq!(file_metadata.num_rows(), 50); assert_eq!(file_metadata.schema_descr().num_columns(), 8); - assert_eq!(file_metadata.created_by().unwrap(), "parquet-cpp-arrow version 14.0.0-SNAPSHOT"); + assert_eq!( + file_metadata.created_by().unwrap(), + "parquet-cpp-arrow version 14.0.0-SNAPSHOT" + ); metadata.metadata.row_groups().iter().for_each(|rg| { assert_eq!(rg.num_columns(), 8); @@ -1740,9 +1752,12 @@ mod tests { .with_footer_key(key_code.to_vec()) .build(), ); - let record_reader = - ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties.as_ref()) - .unwrap(); + let record_reader = ParquetRecordBatchReader::try_new_with_decryption( + file, + 128, + decryption_properties.as_ref(), + ) + .unwrap(); // todo check contents let mut row_count = 0; for batch in record_reader { diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 95b18be9e7c3..0b0e8f5134d6 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -104,6 +104,7 @@ use crate::format::{ }; use crate::basic::{ColumnOrder, Compression, Encoding, Type}; +use crate::encryption::ciphers::FileDecryptor; use crate::errors::{ParquetError, Result}; pub(crate) use crate::file::metadata::memory::HeapSize; use crate::file::page_encoding_stats::{self, PageEncodingStats}; @@ -117,7 +118,6 @@ use crate::schema::types::{ pub use reader::ParquetMetaDataReader; pub use writer::ParquetMetaDataWriter; pub(crate) use writer::ThriftMetadataWriter; -use crate::encryption::ciphers::FileDecryptor; /// Page level statistics for each column chunk of each row group. /// @@ -182,7 +182,11 @@ pub struct ParquetMetaData { impl ParquetMetaData { /// Creates Parquet metadata from file metadata and a list of row /// group metadata - pub fn new(file_metadata: FileMetaData, row_groups: Vec, file_decryptor: Option) -> Self { + pub fn new( + file_metadata: FileMetaData, + row_groups: Vec, + file_decryptor: Option, + ) -> Self { ParquetMetaData { file_metadata, row_groups, @@ -223,8 +227,6 @@ impl ParquetMetaData { &self.file_decryptor } - - /// Returns number of row groups in this file. pub fn num_row_groups(&self) -> usize { self.row_groups.len() diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index f61b97c77769..36dc7077d2eb 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -664,14 +664,17 @@ impl ParquetMetaDataReader { // todo decr: set both aad_prefix and aad_file_unique in file_decryptor let aad_file_unique = aes_gcm_algo.aad_file_unique.unwrap(); let aad_footer = create_footer_aad(aad_file_unique.as_ref())?; - let aad_prefix : Vec = aes_gcm_algo.aad_prefix.unwrap_or_default(); + let aad_prefix: Vec = aes_gcm_algo.aad_prefix.unwrap_or_default(); - file_decryptor = Some(FileDecryptor::new(file_decryption_properties, aad_file_unique.clone(), aad_prefix.clone())); + file_decryptor = Some(FileDecryptor::new( + file_decryption_properties, + aad_file_unique.clone(), + aad_prefix.clone(), + )); let decryptor = file_decryptor.clone().unwrap().get_footer_decryptor(); // file_decryptor = Some(FileDecryptor::new(file_decryption_properties, aad, aad_prefix)); - decrypted_fmd_buf = - decryptor.decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())?; + decrypted_fmd_buf = decryptor.decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())?; prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref()); } @@ -700,7 +703,11 @@ impl ParquetMetaDataReader { schema_descr, column_orders, ); - Ok(ParquetMetaData::new(file_metadata, row_groups, Some(file_decryptor.unwrap()))) + Ok(ParquetMetaData::new( + file_metadata, + row_groups, + Some(file_decryptor.unwrap()), + )) } /// Parses column orders from Thrift definition. diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 9891e2c45ffb..9aef1d06637a 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -18,13 +18,11 @@ //! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader //! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM) -use std::collections::VecDeque; -use std::iter; -use std::{fs::File, io::Read, path::Path, sync::Arc}; use crate::basic::{Encoding, Type}; use crate::bloom_filter::Sbbf; use crate::column::page::{Page, PageMetadata, PageReader}; use crate::compression::{create_codec, Codec}; +use crate::encryption::ciphers::{create_page_aad, BlockDecryptor, CryptoContext, ModuleType}; use crate::errors::{ParquetError, Result}; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::{ @@ -39,8 +37,10 @@ use crate::record::Row; use crate::schema::types::Type as SchemaType; use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; use bytes::Bytes; +use std::collections::VecDeque; +use std::iter; +use std::{fs::File, io::Read, path::Path, sync::Arc}; use thrift::protocol::TCompactInputProtocol; -use crate::encryption::ciphers::{create_page_aad, BlockDecryptor, CryptoContext, ModuleType}; impl TryFrom for SerializedFileReader { type Error = ParquetError; @@ -339,7 +339,10 @@ impl RowGroupReader for SerializedRowGroupReader<'_, R } /// Reads a [`PageHeader`] from the provided [`Read`] -pub(crate) fn read_page_header(input: &mut T, crypto_context: Option>) -> Result { +pub(crate) fn read_page_header( + input: &mut T, + crypto_context: Option>, +) -> Result { if let Some(crypto_context) = crypto_context { let decryptor = &crypto_context.data_decryptor(); let file_decryptor = decryptor.footer_decryptor(); @@ -367,7 +370,7 @@ pub(crate) fn read_page_header(input: &mut T, crypto_context: Option(input: &mut T, crypto_context: Option(input: &mut T, crypto_context: Option>) -> Result<(usize, PageHeader)> { +fn read_page_header_len( + input: &mut T, + crypto_context: Option>, +) -> Result<(usize, PageHeader)> { /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read struct TrackedRead { inner: R, @@ -432,7 +438,7 @@ pub(crate) fn decode_page( can_decompress = header_v2.is_compressed.unwrap_or(true); } - let buffer : Bytes = if crypto_context.is_some() { + let buffer: Bytes = if crypto_context.is_some() { let crypto_context = crypto_context.as_ref().unwrap(); let decryptor = crypto_context.data_decryptor(); let file_decryptor = decryptor.footer_decryptor(); @@ -589,7 +595,14 @@ impl SerializedPageReader { crypto_context: Option>, ) -> Result { let props = Arc::new(ReaderProperties::builder().build()); - SerializedPageReader::new_with_properties(reader, meta, total_rows, page_locations, props, crypto_context) + SerializedPageReader::new_with_properties( + reader, + meta, + total_rows, + page_locations, + props, + crypto_context, + ) } /// Creates a new serialized page with custom options. @@ -636,7 +649,7 @@ impl SerializedPageReader { state, physical_type: meta.column_type(), crypto_context, - }) + }); } Ok(Self { reader, @@ -675,7 +688,11 @@ impl PageReader for SerializedPageReader { let header = if let Some(header) = next_page_header.take() { *header } else { - let crypto_context = page_crypto_context(&self.crypto_context, *page_ordinal, *require_dictionary)?; + let crypto_context = page_crypto_context( + &self.crypto_context, + *page_ordinal, + *require_dictionary, + )?; let (header_len, header) = read_page_header_len(&mut read, crypto_context)?; *offset += header_len; *remaining -= header_len; @@ -700,7 +717,11 @@ impl PageReader for SerializedPageReader { )); } - let crypto_context = page_crypto_context(&self.crypto_context, *page_ordinal, *require_dictionary)?; + let crypto_context = page_crypto_context( + &self.crypto_context, + *page_ordinal, + *require_dictionary, + )?; let page = decode_page( header, Bytes::from(buffer), @@ -853,13 +874,18 @@ impl PageReader for SerializedPageReader { } } -fn page_crypto_context(crypto_context: &Option>, page_ordinal: usize, dictionary_page: bool) -> Result>> { - Ok(crypto_context.as_ref().map( - |c| Arc::new(if dictionary_page { +fn page_crypto_context( + crypto_context: &Option>, + page_ordinal: usize, + dictionary_page: bool, +) -> Result>> { + Ok(crypto_context.as_ref().map(|c| { + Arc::new(if dictionary_page { c.for_dictionary_page() } else { c.with_page_ordinal(page_ordinal) - }))) + }) + })) } #[cfg(test)]