From 2334eb3470009182bd02bfe15a393dba27a35179 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 19 Dec 2024 23:41:39 +0100 Subject: [PATCH] start non-uniform decryption --- parquet/src/arrow/arrow_reader/mod.rs | 81 +++++++++++++++++++++++++++ parquet/src/encryption/ciphers.rs | 27 +++++++-- parquet/src/file/metadata/reader.rs | 9 ++- parquet/src/file/serialized_reader.rs | 2 + 4 files changed, 112 insertions(+), 7 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 965230cdbaaf..a72620388011 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1715,6 +1715,87 @@ mod tests { assert!(col.value(2).is_nan()); } + #[test] + fn test_non_uniform_encryption_plaintext_footer() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted"); + let file = File::open(path).unwrap(); + + let column_1_key = "1234567890123450".as_bytes(); + let column_2_key = "1234567890123451".as_bytes(); + + let decryption_properties = Some( + ciphers::FileDecryptionProperties::builder() + .with_column_key("kc1".as_bytes().to_vec(), column_1_key.to_vec()) + .with_column_key("kc2".as_bytes().to_vec(), column_2_key.to_vec()) + .build(), + ); + + let metadata = + ArrowReaderMetadata::load(&file, Default::default(), decryption_properties.as_ref()) + .unwrap(); + let file_metadata = metadata.metadata.file_metadata(); + + assert_eq!(file_metadata.num_rows(), 50); + assert_eq!(file_metadata.schema_descr().num_columns(), 8); + assert_eq!( + file_metadata.created_by().unwrap(), + "parquet-cpp-arrow version 14.0.0-SNAPSHOT" + ); + + metadata.metadata.row_groups().iter().for_each(|rg| { + assert_eq!(rg.num_columns(), 8); + assert_eq!(rg.num_rows(), 50); + assert_eq!(rg.total_byte_size(), 3816); + }); + + let record_reader = ParquetRecordBatchReader::try_new_with_decryption( + file, + 128, + decryption_properties.as_ref(), + ) + .unwrap(); + + let mut row_count = 0; + for batch in record_reader { + let batch = batch.unwrap(); + row_count += batch.num_rows(); + } + + assert_eq!(row_count, file_metadata.num_rows() as usize); + } + + #[test] + fn test_non_uniform_encryption() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted"); + let file = File::open(path).unwrap(); + + let footer_key = "0123456789012345".as_bytes(); // 128bit/16 + let column_1_key = "1234567890123450".as_bytes(); + let column_2_key = "1234567890123451".as_bytes(); + + let decryption_properties = Some( + ciphers::FileDecryptionProperties::builder() + .with_footer_key(footer_key.to_vec()) + .with_column_key("kc1".as_bytes().to_vec(), column_1_key.to_vec()) + .with_column_key("kc2".as_bytes().to_vec(), column_2_key.to_vec()) + .build(), + ); + + let metadata = + ArrowReaderMetadata::load(&file, Default::default(), decryption_properties.as_ref()) + .unwrap(); + // let file_metadata = metadata.metadata.file_metadata(); + // + // assert_eq!(file_metadata.num_rows(), 50); + // assert_eq!(file_metadata.schema_descr().num_columns(), 8); + // assert_eq!( + // file_metadata.created_by().unwrap(), + // "parquet-cpp-arrow version 19.0.0-SNAPSHOT" + // ); + } + #[test] fn test_uniform_encryption() { let testdata = arrow::util::test_util::parquet_test_data(); diff --git a/parquet/src/encryption/ciphers.rs b/parquet/src/encryption/ciphers.rs index 6ecbb003dd5b..0b8d9246bca2 100644 --- a/parquet/src/encryption/ciphers.rs +++ b/parquet/src/encryption/ciphers.rs @@ -18,6 +18,7 @@ //! Encryption implementation specific to Parquet, as described //! in the [spec](https://github.com/apache/parquet-format/blob/master/Encryption.md). +use std::collections::HashMap; use std::sync::Arc; use ring::aead::{Aad, LessSafeKey, NonceSequence, UnboundKey, AES_128_GCM}; use ring::rand::{SecureRandom, SystemRandom}; @@ -227,29 +228,34 @@ fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal #[derive(Debug, Clone, PartialEq)] pub struct FileDecryptionProperties { - footer_key: Option> + footer_key: Option>, + column_keys: Option, Vec>>, } impl FileDecryptionProperties { pub fn builder() -> DecryptionPropertiesBuilder { DecryptionPropertiesBuilder::with_defaults() } + pub fn has_footer_key(&self) -> bool { self.footer_key.is_some() } } pub struct DecryptionPropertiesBuilder { - footer_key: Option> + footer_key: Option>, + column_keys: Option, Vec>>, } impl DecryptionPropertiesBuilder { pub fn with_defaults() -> Self { Self { - footer_key: None + footer_key: None, + column_keys: None, } } pub fn build(self) -> FileDecryptionProperties { FileDecryptionProperties { - footer_key: self.footer_key + footer_key: self.footer_key, + column_keys: self.column_keys, } } @@ -258,6 +264,14 @@ impl DecryptionPropertiesBuilder { self.footer_key = Some(value); self } + + pub fn with_column_key(mut self, key: Vec, value: Vec) -> Self { + let mut column_keys= self.column_keys.unwrap_or_else(HashMap::new); + column_keys.insert(key, value); + // let _ = column_keys.insert(key, value); + self.column_keys = Some(column_keys); + self + } } #[derive(Debug, Clone)] @@ -291,6 +305,11 @@ impl FileDecryptor { self.footer_decryptor } + pub(crate) fn get_column_decryptor(&self, column_key: &[u8]) -> RingGcmBlockDecryptor { + let column_key = self.decryption_properties.column_keys.as_ref().unwrap().get(column_key).unwrap(); + RingGcmBlockDecryptor::new(column_key) + } + pub(crate) fn decryption_properties(&self) -> &FileDecryptionProperties { &self.decryption_properties } diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index d00651f72cd4..ae8661f73278 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -642,10 +642,12 @@ impl ParquetMetaDataReader { file_decryption_properties: Option<&FileDecryptionProperties>, ) -> Result { let mut prot = TCompactSliceInputProtocol::new(buf); - let mut file_decryptor = None; let decrypted_fmd_buf; - if let Some(file_decryption_properties) = file_decryption_properties { + + if file_decryption_properties.is_some() + && file_decryption_properties.unwrap().has_footer_key() + { let t_file_crypto_metadata: TFileCryptoMetaData = TFileCryptoMetaData::read_from_in_protocol(&mut prot) .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?; @@ -667,7 +669,7 @@ impl ParquetMetaDataReader { let aad_prefix: Vec = aes_gcm_algo.aad_prefix.unwrap_or_default(); file_decryptor = Some(FileDecryptor::new( - file_decryption_properties, + file_decryption_properties.unwrap(), aad_file_unique.clone(), aad_prefix.clone(), )); @@ -684,6 +686,7 @@ impl ParquetMetaDataReader { let mut row_groups = Vec::new(); // TODO: row group filtering for rg in t_file_metadata.row_groups { + // rg. row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?); } let column_orders = Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr); diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 9aef1d06637a..8c8c3615c83e 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -345,6 +345,8 @@ pub(crate) fn read_page_header( ) -> Result { if let Some(crypto_context) = crypto_context { let decryptor = &crypto_context.data_decryptor(); + // todo: get column decryptor + // let file_decryptor = decryptor.get_column_decryptor(crypto_context.column_ordinal); let file_decryptor = decryptor.footer_decryptor(); let aad_file_unique = decryptor.aad_file_unique();