Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ pub struct ArrowReaderOptions {
pub(crate) page_index_policy: PageIndexPolicy,
/// If encryption is enabled, the file decryption properties can be provided
#[cfg(feature = "encryption")]
pub(crate) file_decryption_properties: Option<FileDecryptionProperties>,
pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
}

impl ArrowReaderOptions {
Expand Down Expand Up @@ -508,7 +508,7 @@ impl ArrowReaderOptions {
#[cfg(feature = "encryption")]
pub fn with_file_decryption_properties(
self,
file_decryption_properties: FileDecryptionProperties,
file_decryption_properties: Arc<FileDecryptionProperties>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this and the change to file_decryption_properties is an API change

) -> Self {
Self {
file_decryption_properties: Some(file_decryption_properties),
Expand All @@ -528,7 +528,7 @@ impl ArrowReaderOptions {
/// This can be set via
/// [`file_decryption_properties`][Self::with_file_decryption_properties].
#[cfg(feature = "encryption")]
pub fn file_decryption_properties(&self) -> Option<&FileDecryptionProperties> {
pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
self.file_decryption_properties.as_ref()
}
}
Expand Down Expand Up @@ -572,8 +572,9 @@ impl ArrowReaderMetadata {
let metadata =
ParquetMetaDataReader::new().with_page_index_policy(options.page_index_policy);
#[cfg(feature = "encryption")]
let metadata =
metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
let metadata = metadata.with_decryption_properties(
options.file_decryption_properties.as_ref().map(Arc::clone),
);
let metadata = metadata.parse_and_finish(reader)?;
Self::try_new(Arc::new(metadata), options)
}
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {

#[cfg(feature = "encryption")]
let metadata_reader = metadata_reader.with_decryption_properties(
options.and_then(|o| o.file_decryption_properties.as_ref()),
options.and_then(|o| o.file_decryption_properties.as_ref().map(Arc::clone)),
);

let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
Expand Down
5 changes: 3 additions & 2 deletions parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,9 @@ impl AsyncFileReader for ParquetObjectReader {

#[cfg(feature = "encryption")]
if let Some(options) = options {
metadata = metadata
.with_decryption_properties(options.file_decryption_properties.as_ref());
metadata = metadata.with_decryption_properties(
options.file_decryption_properties.as_ref().map(Arc::clone),
);
}

let metadata = if let Some(file_size) = self.file_size {
Expand Down
18 changes: 9 additions & 9 deletions parquet/src/encryption/decrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,16 +438,16 @@ impl DecryptionPropertiesBuilder {
}

/// Finalize the builder and return created [`FileDecryptionProperties`]
pub fn build(self) -> Result<FileDecryptionProperties> {
pub fn build(self) -> Result<Arc<FileDecryptionProperties>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also an API change. It is somewhat strange to return an Arc from a builder but doing so eases the impact of this change on downstream users -- the output of the builder can be used by the other APIs directly

let keys = DecryptionKeys::Explicit(ExplicitDecryptionKeys {
footer_key: self.footer_key,
column_keys: self.column_keys,
});
Ok(FileDecryptionProperties {
Ok(Arc::new(FileDecryptionProperties {
keys,
aad_prefix: self.aad_prefix,
footer_signature_verification: self.footer_signature_verification,
})
}))
}

/// Specify the expected AAD prefix to be used for decryption.
Expand Down Expand Up @@ -509,13 +509,13 @@ impl DecryptionPropertiesBuilderWithRetriever {
}

/// Finalize the builder and return created [`FileDecryptionProperties`]
pub fn build(self) -> Result<FileDecryptionProperties> {
pub fn build(self) -> Result<Arc<FileDecryptionProperties>> {
let keys = DecryptionKeys::ViaRetriever(self.key_retriever);
Ok(FileDecryptionProperties {
Ok(Arc::new(FileDecryptionProperties {
keys,
aad_prefix: self.aad_prefix,
footer_signature_verification: self.footer_signature_verification,
})
}))
}

/// Specify the expected AAD prefix to be used for decryption.
Expand All @@ -536,7 +536,7 @@ impl DecryptionPropertiesBuilderWithRetriever {

#[derive(Clone, Debug)]
pub(crate) struct FileDecryptor {
decryption_properties: FileDecryptionProperties,
decryption_properties: Arc<FileDecryptionProperties>,
footer_decryptor: Arc<dyn BlockDecryptor>,
file_aad: Vec<u8>,
}
Expand All @@ -549,7 +549,7 @@ impl PartialEq for FileDecryptor {

impl FileDecryptor {
pub(crate) fn new(
decryption_properties: &FileDecryptionProperties,
decryption_properties: &Arc<FileDecryptionProperties>,
footer_key_metadata: Option<&[u8]>,
aad_file_unique: Vec<u8>,
aad_prefix: Vec<u8>,
Expand All @@ -565,7 +565,7 @@ impl FileDecryptor {

Ok(Self {
footer_decryptor: Arc::new(footer_decryptor),
decryption_properties: decryption_properties.clone(),
decryption_properties: Arc::clone(decryption_properties),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one example of a clone (deep copy) that is removed due to this change

file_aad,
})
}
Expand Down
5 changes: 3 additions & 2 deletions parquet/src/encryption/encrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
use ring::rand::{SecureRandom, SystemRandom};
use std::collections::{HashMap, HashSet};
use std::io::Write;
use std::sync::Arc;

#[derive(Debug, Clone, PartialEq)]
struct EncryptionKey {
Expand Down Expand Up @@ -288,13 +289,13 @@ impl EncryptionPropertiesBuilder {
#[derive(Debug)]
/// The encryption configuration for a single Parquet file
pub(crate) struct FileEncryptor {
properties: FileEncryptionProperties,
properties: Arc<FileEncryptionProperties>,
aad_file_unique: Vec<u8>,
file_aad: Vec<u8>,
}

impl FileEncryptor {
pub(crate) fn new(properties: FileEncryptionProperties) -> Result<Self> {
pub(crate) fn new(properties: Arc<FileEncryptionProperties>) -> Result<Self> {
// Generate unique AAD for file
let rng = SystemRandom::new();
let mut aad_file_unique = vec![0u8; 8];
Expand Down
6 changes: 4 additions & 2 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1818,7 +1818,8 @@ mod tests {
#[cfg(not(feature = "encryption"))]
let base_expected_size = 2312;
#[cfg(feature = "encryption")]
let base_expected_size = 2744;
// Not as accurate as it should be: https://github.com/apache/arrow-rs/issues/8472
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR does actually reduce heap size needed when the encryption feature is enabled, but there are no FileDecryptionProperties -- instead of having space inline for the entire FileDecryptionProperties now there is only an Arc (a pointer + refcount).

However, while reviewing this, I found the heap size accounting is somewhat inaccurate for encryption and filed a second ticket:

let base_expected_size = 2648;

assert_eq!(parquet_meta.memory_size(), base_expected_size);

Expand Down Expand Up @@ -1849,7 +1850,8 @@ mod tests {
#[cfg(not(feature = "encryption"))]
let bigger_expected_size = 2738;
#[cfg(feature = "encryption")]
let bigger_expected_size = 3170;
// Not as accurate as it should be: https://github.com/apache/arrow-rs/issues/8472
let bigger_expected_size = 3074;

// more set fields means more memory usage
assert!(bigger_expected_size > base_expected_size);
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/metadata/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ mod inner {
encrypted_footer: bool,
) -> Result<ParquetMetaData> {
crate::file::metadata::thrift_gen::parquet_metadata_with_encryption(
self.file_decryption_properties.as_deref(),
self.file_decryption_properties.as_ref(),
encrypted_footer,
buf,
)
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ impl ParquetMetaDataReader {
#[cfg(feature = "encryption")]
pub fn with_decryption_properties(
mut self,
properties: Option<&FileDecryptionProperties>,
properties: Option<std::sync::Arc<FileDecryptionProperties>>,
) -> Self {
self.file_decryption_properties = properties.cloned().map(std::sync::Arc::new);
self.file_decryption_properties = properties;
self
}

Expand Down Expand Up @@ -1245,7 +1245,7 @@ mod async_tests {

// just make sure the metadata is properly decrypted and read
let expected = ParquetMetaDataReader::new()
.with_decryption_properties(Some(&decryption_properties))
.with_decryption_properties(Some(decryption_properties))
.load_via_suffix_and_finish(input)
.await
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/file/metadata/thrift_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ fn row_group_from_encrypted_thrift(
/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
/// [Parquet Encryption Spec]: https://parquet.apache.org/docs/file-format/data-pages/encryption/
pub(crate) fn parquet_metadata_with_encryption(
file_decryption_properties: Option<&FileDecryptionProperties>,
file_decryption_properties: Option<&Arc<FileDecryptionProperties>>,
encrypted_footer: bool,
buf: &[u8],
) -> Result<ParquetMetaData> {
Expand Down Expand Up @@ -810,7 +810,7 @@ pub(crate) fn parquet_metadata_with_encryption(
fn get_file_decryptor(
encryption_algorithm: EncryptionAlgorithm,
footer_key_metadata: Option<&[u8]>,
file_decryption_properties: &FileDecryptionProperties,
file_decryption_properties: &Arc<FileDecryptionProperties>,
) -> Result<FileDecryptor> {
match encryption_algorithm {
EncryptionAlgorithm::AES_GCM_V1(algo) => {
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ pub struct WriterProperties {
statistics_truncate_length: Option<usize>,
coerce_types: bool,
#[cfg(feature = "encryption")]
pub(crate) file_encryption_properties: Option<FileEncryptionProperties>,
pub(crate) file_encryption_properties: Option<Arc<FileEncryptionProperties>>,
}

impl Default for WriterProperties {
Expand Down Expand Up @@ -432,7 +432,7 @@ impl WriterProperties {
///
/// For more details see [`WriterPropertiesBuilder::with_file_encryption_properties`]
#[cfg(feature = "encryption")]
pub fn file_encryption_properties(&self) -> Option<&FileEncryptionProperties> {
pub fn file_encryption_properties(&self) -> Option<&Arc<FileEncryptionProperties>> {
self.file_encryption_properties.as_ref()
}
}
Expand Down Expand Up @@ -506,7 +506,7 @@ impl WriterPropertiesBuilder {
statistics_truncate_length: self.statistics_truncate_length,
coerce_types: self.coerce_types,
#[cfg(feature = "encryption")]
file_encryption_properties: self.file_encryption_properties,
file_encryption_properties: self.file_encryption_properties.map(Arc::new),
}
}

Expand Down Expand Up @@ -965,7 +965,7 @@ impl From<WriterProperties> for WriterPropertiesBuilder {
statistics_truncate_length: props.statistics_truncate_length,
coerce_types: props.coerce_types,
#[cfg(feature = "encryption")]
file_encryption_properties: props.file_encryption_properties,
file_encryption_properties: props.file_encryption_properties.map(Arc::unwrap_or_clone),
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,12 @@ impl<W: Write + Send> SerializedFileWriter<W> {
properties: &WriterPropertiesPtr,
schema_descriptor: &SchemaDescriptor,
) -> Result<Option<Arc<FileEncryptor>>> {
if let Some(file_encryption_properties) = &properties.file_encryption_properties {
if let Some(file_encryption_properties) = properties.file_encryption_properties() {
file_encryption_properties.validate_encrypted_column_names(schema_descriptor)?;

Ok(Some(Arc::new(FileEncryptor::new(
file_encryption_properties.clone(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is another clone that is removed by this PR

)?)))
Ok(Some(Arc::new(FileEncryptor::new(Arc::clone(
file_encryption_properties,
))?)))
} else {
Ok(None)
}
Expand Down Expand Up @@ -318,7 +318,7 @@ impl<W: Write + Send> SerializedFileWriter<W> {
/// Writes magic bytes at the beginning of the file.
#[cfg(feature = "encryption")]
fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
let magic = get_file_magic(properties.file_encryption_properties.as_ref());
let magic = get_file_magic(properties.file_encryption_properties.as_deref());

buf.write_all(magic)?;
Ok(())
Expand Down
28 changes: 14 additions & 14 deletions parquet/tests/encryption/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ fn test_plaintext_footer_signature_verification() {
.build()
.unwrap();

let options = ArrowReaderOptions::default()
.with_file_decryption_properties(decryption_properties.clone());
let options =
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
let result = ArrowReaderMetadata::load(&file, options.clone());
assert!(result.is_err());
assert!(
Expand Down Expand Up @@ -148,8 +148,8 @@ fn test_non_uniform_encryption_disabled_aad_storage() {
.unwrap();

let file = File::open(path).unwrap();
let options = ArrowReaderOptions::default()
.with_file_decryption_properties(decryption_properties.clone());
let options =
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
let result = ArrowReaderMetadata::load(&file, options.clone());
assert!(result.is_err());
assert_eq!(
Expand Down Expand Up @@ -279,8 +279,8 @@ fn test_uniform_encryption_plaintext_footer_with_key_retriever() {
.build()
.unwrap();

let options = ArrowReaderOptions::default()
.with_file_decryption_properties(decryption_properties.clone());
let options =
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap();

// Write data into temporary file with plaintext footer and footer key metadata
Expand Down Expand Up @@ -320,8 +320,8 @@ fn test_uniform_encryption_plaintext_footer_with_key_retriever() {
.build()
.unwrap();

let options = ArrowReaderOptions::default()
.with_file_decryption_properties(decryption_properties.clone());
let options =
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
let _ = ArrowReaderMetadata::load(&temp_file, options.clone()).unwrap();

// Read temporary file with plaintext metadata using key retriever with invalid key
Expand All @@ -334,8 +334,8 @@ fn test_uniform_encryption_plaintext_footer_with_key_retriever() {
let decryption_properties = FileDecryptionProperties::with_key_retriever(key_retriever)
.build()
.unwrap();
let options = ArrowReaderOptions::default()
.with_file_decryption_properties(decryption_properties.clone());
let options =
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
let result = ArrowReaderMetadata::load(&temp_file, options.clone());
assert!(result.is_err());
assert!(
Expand Down Expand Up @@ -672,7 +672,7 @@ fn test_write_uniform_encryption_plaintext_footer() {
// Try writing plaintext footer and then reading it with the correct footer key
read_and_roundtrip_to_encrypted_file(
&file,
decryption_properties.clone(),
Arc::clone(&decryption_properties),
file_encryption_properties.clone(),
);

Expand Down Expand Up @@ -928,8 +928,8 @@ fn test_write_encrypted_struct_field() {
.with_column_key("struct_col.float64_col", column_key_2)
.build()
.unwrap();
let options = ArrowReaderOptions::default()
.with_file_decryption_properties(decryption_properties.clone());
let options =
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);

let builder =
ParquetRecordBatchReaderBuilder::try_new_with_options(temp_file, options).unwrap();
Expand Down Expand Up @@ -1036,7 +1036,7 @@ fn test_decrypt_page_index_non_uniform() {

fn test_decrypt_page_index(
path: &str,
decryption_properties: FileDecryptionProperties,
decryption_properties: Arc<FileDecryptionProperties>,
) -> Result<(), ParquetError> {
let file = File::open(path)?;
let options = ArrowReaderOptions::default()
Expand Down
Loading