Skip to content

Commit

Permalink
Lint
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Dec 18, 2024
1 parent 56419be commit aa74618
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 39 deletions.
43 changes: 29 additions & 14 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@

//! Contains reader which reads parquet data into arrow [`RecordBatch`]
use std::collections::VecDeque;
use std::sync::Arc;
use num::ToPrimitive;
use arrow_array::cast::AsArray;
use arrow_array::Array;
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
use arrow_select::filter::prep_null_mask_filter;
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};
use std::collections::VecDeque;
use std::sync::Arc;

pub use crate::arrow::array_reader::RowGroups;
use crate::arrow::array_reader::{build_array_reader, ArrayReader};
Expand Down Expand Up @@ -695,14 +694,22 @@ impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
let total_rows = rg.num_rows() as usize;
let reader = self.reader.clone();

let file_decryptor = Arc::new(self.metadata.file_decryptor().clone().unwrap());
let crypto_context = if self.metadata.file_decryptor().is_some() {
let file_decryptor = Arc::new(self.metadata.file_decryptor().clone().unwrap());

let crypto_context = CryptoContext::new(
rg_idx, self.column_idx, file_decryptor.clone(), file_decryptor);
let crypto_context = Arc::new(crypto_context);
let crypto_context = CryptoContext::new(
rg_idx,
self.column_idx,
file_decryptor.clone(),
file_decryptor,
);
Some(Arc::new(crypto_context))
} else {
None
};

let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations, Some(crypto_context));
// let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations);
let ret =
SerializedPageReader::new(reader, meta, total_rows, page_locations, crypto_context);
Some(ret.map(|x| Box::new(x) as _))
}
}
Expand Down Expand Up @@ -1721,12 +1728,17 @@ mod tests {
.build(),
);

let metadata = ArrowReaderMetadata::load(&file, Default::default(), decryption_properties.as_ref()).unwrap();
let metadata =
ArrowReaderMetadata::load(&file, Default::default(), decryption_properties.as_ref())
.unwrap();
let file_metadata = metadata.metadata.file_metadata();

assert_eq!(file_metadata.num_rows(), 50);
assert_eq!(file_metadata.schema_descr().num_columns(), 8);
assert_eq!(file_metadata.created_by().unwrap(), "parquet-cpp-arrow version 14.0.0-SNAPSHOT");
assert_eq!(
file_metadata.created_by().unwrap(),
"parquet-cpp-arrow version 14.0.0-SNAPSHOT"
);

metadata.metadata.row_groups().iter().for_each(|rg| {
assert_eq!(rg.num_columns(), 8);
Expand All @@ -1740,9 +1752,12 @@ mod tests {
.with_footer_key(key_code.to_vec())
.build(),
);
let record_reader =
ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties.as_ref())
.unwrap();
let record_reader = ParquetRecordBatchReader::try_new_with_decryption(
file,
128,
decryption_properties.as_ref(),
)
.unwrap();
// todo check contents
let mut row_count = 0;
for batch in record_reader {
Expand Down
10 changes: 6 additions & 4 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ use crate::format::{
};

use crate::basic::{ColumnOrder, Compression, Encoding, Type};
use crate::encryption::ciphers::FileDecryptor;
use crate::errors::{ParquetError, Result};
pub(crate) use crate::file::metadata::memory::HeapSize;
use crate::file::page_encoding_stats::{self, PageEncodingStats};
Expand All @@ -117,7 +118,6 @@ use crate::schema::types::{
pub use reader::ParquetMetaDataReader;
pub use writer::ParquetMetaDataWriter;
pub(crate) use writer::ThriftMetadataWriter;
use crate::encryption::ciphers::FileDecryptor;

/// Page level statistics for each column chunk of each row group.
///
Expand Down Expand Up @@ -182,7 +182,11 @@ pub struct ParquetMetaData {
impl ParquetMetaData {
/// Creates Parquet metadata from file metadata and a list of row
/// group metadata
pub fn new(file_metadata: FileMetaData, row_groups: Vec<RowGroupMetaData>, file_decryptor: Option<FileDecryptor>) -> Self {
pub fn new(
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
file_decryptor: Option<FileDecryptor>,
) -> Self {
ParquetMetaData {
file_metadata,
row_groups,
Expand Down Expand Up @@ -223,8 +227,6 @@ impl ParquetMetaData {
&self.file_decryptor
}



/// Returns number of row groups in this file.
pub fn num_row_groups(&self) -> usize {
self.row_groups.len()
Expand Down
17 changes: 12 additions & 5 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,14 +664,17 @@ impl ParquetMetaDataReader {
// todo decr: set both aad_prefix and aad_file_unique in file_decryptor
let aad_file_unique = aes_gcm_algo.aad_file_unique.unwrap();
let aad_footer = create_footer_aad(aad_file_unique.as_ref())?;
let aad_prefix : Vec<u8> = aes_gcm_algo.aad_prefix.unwrap_or_default();
let aad_prefix: Vec<u8> = aes_gcm_algo.aad_prefix.unwrap_or_default();

file_decryptor = Some(FileDecryptor::new(file_decryption_properties, aad_file_unique.clone(), aad_prefix.clone()));
file_decryptor = Some(FileDecryptor::new(
file_decryption_properties,
aad_file_unique.clone(),
aad_prefix.clone(),
));
let decryptor = file_decryptor.clone().unwrap().get_footer_decryptor();
// file_decryptor = Some(FileDecryptor::new(file_decryption_properties, aad, aad_prefix));

decrypted_fmd_buf =
decryptor.decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())?;
decrypted_fmd_buf = decryptor.decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())?;
prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
}

Expand Down Expand Up @@ -700,7 +703,11 @@ impl ParquetMetaDataReader {
schema_descr,
column_orders,
);
Ok(ParquetMetaData::new(file_metadata, row_groups, Some(file_decryptor.unwrap())))
Ok(ParquetMetaData::new(
file_metadata,
row_groups,
Some(file_decryptor.unwrap()),
))
}

/// Parses column orders from Thrift definition.
Expand Down
58 changes: 42 additions & 16 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
//! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader
//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)
use std::collections::VecDeque;
use std::iter;
use std::{fs::File, io::Read, path::Path, sync::Arc};
use crate::basic::{Encoding, Type};
use crate::bloom_filter::Sbbf;
use crate::column::page::{Page, PageMetadata, PageReader};
use crate::compression::{create_codec, Codec};
use crate::encryption::ciphers::{create_page_aad, BlockDecryptor, CryptoContext, ModuleType};
use crate::errors::{ParquetError, Result};
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::{
Expand All @@ -39,8 +37,10 @@ use crate::record::Row;
use crate::schema::types::Type as SchemaType;
use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
use bytes::Bytes;
use std::collections::VecDeque;
use std::iter;
use std::{fs::File, io::Read, path::Path, sync::Arc};
use thrift::protocol::TCompactInputProtocol;
use crate::encryption::ciphers::{create_page_aad, BlockDecryptor, CryptoContext, ModuleType};

impl TryFrom<File> for SerializedFileReader<File> {
type Error = ParquetError;
Expand Down Expand Up @@ -339,7 +339,10 @@ impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R
}

/// Reads a [`PageHeader`] from the provided [`Read`]
pub(crate) fn read_page_header<T: Read>(input: &mut T, crypto_context: Option<Arc<CryptoContext>>) -> Result<PageHeader> {
pub(crate) fn read_page_header<T: Read>(
input: &mut T,
crypto_context: Option<Arc<CryptoContext>>,
) -> Result<PageHeader> {
if let Some(crypto_context) = crypto_context {
let decryptor = &crypto_context.data_decryptor();
let file_decryptor = decryptor.footer_decryptor();
Expand Down Expand Up @@ -367,7 +370,7 @@ pub(crate) fn read_page_header<T: Read>(input: &mut T, crypto_context: Option<Ar

let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
return Ok(page_header)
return Ok(page_header);
}

let mut prot = TCompactInputProtocol::new(input);
Expand All @@ -376,7 +379,10 @@ pub(crate) fn read_page_header<T: Read>(input: &mut T, crypto_context: Option<Ar
}

/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of bytes read
fn read_page_header_len<T: Read>(input: &mut T, crypto_context: Option<Arc<CryptoContext>>) -> Result<(usize, PageHeader)> {
fn read_page_header_len<T: Read>(
input: &mut T,
crypto_context: Option<Arc<CryptoContext>>,
) -> Result<(usize, PageHeader)> {
/// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
struct TrackedRead<R> {
inner: R,
Expand Down Expand Up @@ -432,7 +438,7 @@ pub(crate) fn decode_page(
can_decompress = header_v2.is_compressed.unwrap_or(true);
}

let buffer : Bytes = if crypto_context.is_some() {
let buffer: Bytes = if crypto_context.is_some() {
let crypto_context = crypto_context.as_ref().unwrap();
let decryptor = crypto_context.data_decryptor();
let file_decryptor = decryptor.footer_decryptor();
Expand Down Expand Up @@ -589,7 +595,14 @@ impl<R: ChunkReader> SerializedPageReader<R> {
crypto_context: Option<Arc<CryptoContext>>,
) -> Result<Self> {
let props = Arc::new(ReaderProperties::builder().build());
SerializedPageReader::new_with_properties(reader, meta, total_rows, page_locations, props, crypto_context)
SerializedPageReader::new_with_properties(
reader,
meta,
total_rows,
page_locations,
props,
crypto_context,
)
}

/// Creates a new serialized page with custom options.
Expand Down Expand Up @@ -636,7 +649,7 @@ impl<R: ChunkReader> SerializedPageReader<R> {
state,
physical_type: meta.column_type(),
crypto_context,
})
});
}
Ok(Self {
reader,
Expand Down Expand Up @@ -675,7 +688,11 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
let header = if let Some(header) = next_page_header.take() {
*header
} else {
let crypto_context = page_crypto_context(&self.crypto_context, *page_ordinal, *require_dictionary)?;
let crypto_context = page_crypto_context(
&self.crypto_context,
*page_ordinal,
*require_dictionary,
)?;
let (header_len, header) = read_page_header_len(&mut read, crypto_context)?;
*offset += header_len;
*remaining -= header_len;
Expand All @@ -700,7 +717,11 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
));
}

let crypto_context = page_crypto_context(&self.crypto_context, *page_ordinal, *require_dictionary)?;
let crypto_context = page_crypto_context(
&self.crypto_context,
*page_ordinal,
*require_dictionary,
)?;
let page = decode_page(
header,
Bytes::from(buffer),
Expand Down Expand Up @@ -853,13 +874,18 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
}
}

fn page_crypto_context(crypto_context: &Option<Arc<CryptoContext>>, page_ordinal: usize, dictionary_page: bool) -> Result<Option<Arc<CryptoContext>>> {
Ok(crypto_context.as_ref().map(
|c| Arc::new(if dictionary_page {
fn page_crypto_context(
crypto_context: &Option<Arc<CryptoContext>>,
page_ordinal: usize,
dictionary_page: bool,
) -> Result<Option<Arc<CryptoContext>>> {
Ok(crypto_context.as_ref().map(|c| {
Arc::new(if dictionary_page {
c.for_dictionary_page()
} else {
c.with_page_ordinal(page_ordinal)
})))
})
}))
}

#[cfg(test)]
Expand Down

0 comments on commit aa74618

Please sign in to comment.