Skip to content

Commit

Permalink
Allow for plaintext footer
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Dec 22, 2024
1 parent 7db06cc commit c1e33ba
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 25 deletions.
2 changes: 1 addition & 1 deletion parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ zstd-sys = { version = ">=2.0.0, <2.0.14", default-features = false }
all-features = true

[features]
default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64", "encryption"]
# Enable lz4
lz4 = ["lz4_flex"]
# Enable arrow reader/writer APIs
Expand Down
23 changes: 13 additions & 10 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,10 @@ impl ArrowReaderMetadata {
) -> Result<Self> {
let metadata = ParquetMetaDataReader::new().with_page_indexes(options.page_index);
#[cfg(feature = "encryption")]
let metadata = metadata.with_encryption_properties(file_decryption_properties);
let metadata = metadata
.with_encryption_properties(file_decryption_properties)
.parse_and_finish(reader)?;
#[cfg(not(feature = "encryption"))]
let metadata = metadata.parse_and_finish(reader)?;
Self::try_new(Arc::new(metadata), options)
}
Expand Down Expand Up @@ -1789,7 +1792,7 @@ mod tests {
#[cfg(feature = "encryption")]
fn test_non_uniform_encryption() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted");
let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
let file = File::open(path).unwrap();

let footer_key = "0123456789012345".as_bytes(); // 128bit/16
Expand All @@ -1807,14 +1810,14 @@ mod tests {
let metadata =
ArrowReaderMetadata::load(&file, Default::default(), decryption_properties.as_ref())
.unwrap();
// let file_metadata = metadata.metadata.file_metadata();
//
// assert_eq!(file_metadata.num_rows(), 50);
// assert_eq!(file_metadata.schema_descr().num_columns(), 8);
// assert_eq!(
// file_metadata.created_by().unwrap(),
// "parquet-cpp-arrow version 19.0.0-SNAPSHOT"
// );
let file_metadata = metadata.metadata.file_metadata();

assert_eq!(file_metadata.num_rows(), 50);
assert_eq!(file_metadata.schema_descr().num_columns(), 8);
assert_eq!(
file_metadata.created_by().unwrap(),
"parquet-cpp-arrow version 19.0.0-SNAPSHOT"
);
}

#[test]
Expand Down
16 changes: 11 additions & 5 deletions parquet/src/encryption/ciphers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::sync::Arc;
use ring::aead::{Aad, LessSafeKey, NonceSequence, UnboundKey, AES_128_GCM};
use ring::rand::{SecureRandom, SystemRandom};
use crate::errors::{ParquetError, Result};
use crate::format::EncryptionAlgorithm;

pub trait BlockEncryptor {
fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Result<Vec<u8>>;
Expand Down Expand Up @@ -268,7 +269,6 @@ impl DecryptionPropertiesBuilder {
pub fn with_column_key(mut self, key: Vec<u8>, value: Vec<u8>) -> Self {
let mut column_keys= self.column_keys.unwrap_or_else(HashMap::new);
column_keys.insert(key, value);
// let _ = column_keys.insert(key, value);
self.column_keys = Some(column_keys);
self
}
Expand All @@ -278,7 +278,7 @@ impl DecryptionPropertiesBuilder {
pub struct FileDecryptor {
decryption_properties: FileDecryptionProperties,
// todo decr: change to BlockDecryptor
footer_decryptor: RingGcmBlockDecryptor,
footer_decryptor: Option<RingGcmBlockDecryptor>,
aad_file_unique: Vec<u8>,
aad_prefix: Vec<u8>,
}
Expand All @@ -291,9 +291,15 @@ impl PartialEq for FileDecryptor {

impl FileDecryptor {
pub(crate) fn new(decryption_properties: &FileDecryptionProperties, aad_file_unique: Vec<u8>, aad_prefix: Vec<u8>) -> Self {
let footer_decryptor = if let Some(footer_key) = decryption_properties.footer_key.clone() {
Some(RingGcmBlockDecryptor::new(footer_key.as_ref()))
} else {
None
};

Self {
// todo decr: if no key available yet (not set in properties, will be retrieved from metadata)
footer_decryptor: RingGcmBlockDecryptor::new(decryption_properties.footer_key.clone().unwrap().as_ref()),
footer_decryptor,
decryption_properties: decryption_properties.clone(),
aad_file_unique,
aad_prefix,
Expand All @@ -302,7 +308,7 @@ impl FileDecryptor {

// todo decr: change to BlockDecryptor
pub(crate) fn get_footer_decryptor(self) -> RingGcmBlockDecryptor {
self.footer_decryptor
self.footer_decryptor.unwrap()
}

pub(crate) fn get_column_decryptor(&self, column_key: &[u8]) -> RingGcmBlockDecryptor {
Expand All @@ -314,7 +320,7 @@ impl FileDecryptor {
&self.decryption_properties
}

pub(crate) fn footer_decryptor(&self) -> RingGcmBlockDecryptor {
pub(crate) fn footer_decryptor(&self) -> Option<RingGcmBlockDecryptor> {
self.footer_decryptor.clone()
}

Expand Down
30 changes: 23 additions & 7 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ impl ParquetMetaDataReader {
}

#[cfg(feature = "encryption")]
let mut file_decryptor = None;
let mut decryptor = None;
#[cfg(feature = "encryption")]
let decrypted_fmd_buf;

Expand All @@ -714,7 +714,7 @@ impl ParquetMetaDataReader {
if file_decryption_properties.is_none() {
return Err(general_err!("Parquet file has an encrypted footer but no decryption properties were provided"));
};
let file_decryption_properties = file_decryption_properties.unwrap();
let file_decryption_properties = file_decryption_properties;

let t_file_crypto_metadata: TFileCryptoMetaData =
TFileCryptoMetaData::read_from_in_protocol(&mut prot)
Expand All @@ -736,14 +736,15 @@ impl ParquetMetaDataReader {
let aad_footer = create_footer_aad(aad_file_unique.as_ref())?;
let aad_prefix: Vec<u8> = aes_gcm_algo.aad_prefix.unwrap_or_default();

file_decryptor = Some(FileDecryptor::new(
file_decryption_properties,
decryptor = Some(FileDecryptor::new(
file_decryption_properties.unwrap(),
aad_file_unique.clone(),
aad_prefix.clone(),
));
let decryptor = file_decryptor.clone().unwrap().get_footer_decryptor();
let footer_decryptor = decryptor.clone().unwrap().get_footer_decryptor();

decrypted_fmd_buf = decryptor.decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())?;
decrypted_fmd_buf =
footer_decryptor.decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())?;
prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
}

Expand All @@ -760,7 +761,22 @@ impl ParquetMetaDataReader {
let column_orders = Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr);

// todo add file decryptor
#[cfg(feature = "encryption")]
if t_file_metadata.encryption_algorithm.is_some() {
let algo = t_file_metadata.encryption_algorithm;
let aes_gcm_algo = if let Some(EncryptionAlgorithm::AESGCMV1(a)) = algo {
a
} else {
unreachable!()
}; // todo decr: add support for GCMCTRV1
let aad_file_unique = aes_gcm_algo.aad_file_unique.unwrap();
let aad_prefix: Vec<u8> = aes_gcm_algo.aad_prefix.unwrap_or_default();
let fdp = file_decryption_properties.unwrap();
decryptor = Some(FileDecryptor::new(
fdp,
aad_file_unique.clone(),
aad_prefix.clone(),
));
// todo get key_metadata etc. Set file decryptor in return value
// todo check signature
}
Expand All @@ -777,7 +793,7 @@ impl ParquetMetaDataReader {
file_metadata,
row_groups,
#[cfg(feature = "encryption")]
file_decryptor,
decryptor,
))
}

Expand Down
7 changes: 5 additions & 2 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ pub(crate) fn read_page_header<T: Read>(
let decryptor = &crypto_context.data_decryptor();
// todo: get column decryptor
// let file_decryptor = decryptor.get_column_decryptor(crypto_context.column_ordinal);
// if !decryptor.decryption_properties().has_footer_key() {
// return Err(general_err!("Missing footer decryptor"));
// }
let file_decryptor = decryptor.footer_decryptor();
let aad_file_unique = decryptor.aad_file_unique();

Expand All @@ -371,7 +374,7 @@ pub(crate) fn read_page_header<T: Read>(
let ciphertext_len = u32::from_le_bytes(len_bytes) as usize;
let mut ciphertext = vec![0; 4 + ciphertext_len];
input.read_exact(&mut ciphertext[4..])?;
let buf = file_decryptor.decrypt(&ciphertext, aad.as_ref())?;
let buf = file_decryptor.unwrap().decrypt(&ciphertext, aad.as_ref())?;

let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
Expand Down Expand Up @@ -465,7 +468,7 @@ pub(crate) fn decode_page(
crypto_context.column_ordinal,
crypto_context.page_ordinal,
)?;
let decrypted = file_decryptor.decrypt(&buffer.as_ref(), &aad)?;
let decrypted = file_decryptor.unwrap().decrypt(&buffer.as_ref(), &aad)?;
Bytes::from(decrypted)
} else {
buffer
Expand Down

0 comments on commit c1e33ba

Please sign in to comment.