-
Notifications
You must be signed in to change notification settings - Fork 939
Encapsulate encryption code more in readers #7337
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,7 +33,7 @@ use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField}; | |
use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; | ||
use crate::column::page::{PageIterator, PageReader}; | ||
#[cfg(feature = "encryption")] | ||
use crate::encryption::decrypt::{CryptoContext, FileDecryptionProperties}; | ||
use crate::encryption::decrypt::FileDecryptionProperties; | ||
use crate::errors::{ParquetError, Result}; | ||
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; | ||
use crate::file::reader::{ChunkReader, SerializedPageReader}; | ||
|
@@ -682,13 +682,11 @@ struct ReaderPageIterator<T: ChunkReader> { | |
metadata: Arc<ParquetMetaData>, | ||
} | ||
|
||
impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> { | ||
type Item = Result<Box<dyn PageReader>>; | ||
|
||
fn next(&mut self) -> Option<Self::Item> { | ||
let rg_idx = self.row_groups.next()?; | ||
impl<T: ChunkReader + 'static> ReaderPageIterator<T> { | ||
/// Return the next SerializedPageReader | ||
fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> { | ||
let rg = self.metadata.row_group(rg_idx); | ||
let meta = rg.column(self.column_idx); | ||
let column_chunk_metadata = rg.column(self.column_idx); | ||
let offset_index = self.metadata.offset_index(); | ||
// `offset_index` may not exist and `i[rg_idx]` will be empty. | ||
// To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`. | ||
|
@@ -698,32 +696,25 @@ impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> { | |
let total_rows = rg.num_rows() as usize; | ||
let reader = self.reader.clone(); | ||
|
||
#[cfg(feature = "encryption")] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this code I found especially hard to work with, so I was able to move it into a method called |
||
let crypto_context = if let Some(file_decryptor) = self.metadata.file_decryptor() { | ||
match meta.crypto_metadata() { | ||
Some(crypto_metadata) => { | ||
match CryptoContext::for_column( | ||
file_decryptor, | ||
crypto_metadata, | ||
rg_idx, | ||
self.column_idx, | ||
) { | ||
Ok(context) => Some(Arc::new(context)), | ||
Err(err) => return Some(Err(err)), | ||
} | ||
} | ||
None => None, | ||
} | ||
} else { | ||
None | ||
}; | ||
|
||
let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations); | ||
SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)? | ||
.add_crypto_context( | ||
rg_idx, | ||
self.column_idx, | ||
self.metadata.as_ref(), | ||
column_chunk_metadata, | ||
) | ||
} | ||
} | ||
|
||
#[cfg(feature = "encryption")] | ||
let ret = ret.map(|reader| reader.with_crypto_context(crypto_context)); | ||
impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> { | ||
type Item = Result<Box<dyn PageReader>>; | ||
|
||
Some(ret.map(|x| Box::new(x) as _)) | ||
fn next(&mut self) -> Option<Self::Item> { | ||
let rg_idx = self.row_groups.next()?; | ||
let page_reader = self | ||
.next_page_reader(rg_idx) | ||
.map(|page_reader| Box::new(page_reader) as _); | ||
Some(page_reader) | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,9 +59,6 @@ use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHas | |
mod metadata; | ||
pub use metadata::*; | ||
|
||
#[cfg(feature = "encryption")] | ||
use crate::encryption::decrypt::CryptoContext; | ||
|
||
#[cfg(feature = "object_store")] | ||
mod store; | ||
|
||
|
@@ -1027,6 +1024,7 @@ impl RowGroups for InMemoryRowGroup<'_> { | |
self.row_count | ||
} | ||
|
||
/// Return chunks for column i | ||
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> { | ||
match &self.column_chunks[i] { | ||
None => Err(ParquetError::General(format!( | ||
|
@@ -1038,31 +1036,19 @@ impl RowGroups for InMemoryRowGroup<'_> { | |
// filter out empty offset indexes (old versions specified Some(vec![]) when no present) | ||
.filter(|index| !index.is_empty()) | ||
.map(|index| index[i].page_locations.clone()); | ||
let column_metadata = self.metadata.row_group(self.row_group_idx).column(i); | ||
let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i); | ||
let page_reader = SerializedPageReader::new( | ||
data.clone(), | ||
column_metadata, | ||
column_chunk_metadata, | ||
self.row_count, | ||
page_locations, | ||
)?; | ||
|
||
#[cfg(feature = "encryption")] | ||
let crypto_context = if let Some(file_decryptor) = self.metadata.file_decryptor() { | ||
match column_metadata.crypto_metadata() { | ||
Some(crypto_metadata) => Some(Arc::new(CryptoContext::for_column( | ||
file_decryptor, | ||
crypto_metadata, | ||
self.row_group_idx, | ||
i, | ||
)?)), | ||
None => None, | ||
} | ||
} else { | ||
None | ||
}; | ||
|
||
#[cfg(feature = "encryption")] | ||
let page_reader = page_reader.with_crypto_context(crypto_context); | ||
let page_reader = page_reader.add_crypto_context( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same thing here, I removed the duplicated setup of the |
||
self.row_group_idx, | ||
i, | ||
self.metadata, | ||
column_chunk_metadata, | ||
)?; | ||
|
||
let page_reader: Box<dyn PageReader> = Box::new(page_reader); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -571,22 +571,51 @@ impl<R: ChunkReader> SerializedPageReader<R> { | |
/// Creates a new serialized page reader from a chunk reader and metadata | ||
pub fn new( | ||
reader: Arc<R>, | ||
meta: &ColumnChunkMetaData, | ||
column_chunk_metadata: &ColumnChunkMetaData, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I renamed this variable for clarity (there are too many types of metadata floating around -- at least ParquetMetaData and ColumnChunkMetadata) |
||
total_rows: usize, | ||
page_locations: Option<Vec<PageLocation>>, | ||
) -> Result<Self> { | ||
let props = Arc::new(ReaderProperties::builder().build()); | ||
SerializedPageReader::new_with_properties(reader, meta, total_rows, page_locations, props) | ||
SerializedPageReader::new_with_properties( | ||
reader, | ||
column_chunk_metadata, | ||
total_rows, | ||
page_locations, | ||
props, | ||
) | ||
} | ||
|
||
/// Adds cryptographical information to the reader. | ||
/// Stub No-op implementation when encryption is disabled. | ||
#[cfg(all(feature = "arrow", not(feature = "encryption")))] | ||
pub(crate) fn add_crypto_context( | ||
self, | ||
_rg_idx: usize, | ||
_column_idx: usize, | ||
_parquet_meta_data: &ParquetMetaData, | ||
_column_chunk_metadata: &ColumnChunkMetaData, | ||
) -> Result<SerializedPageReader<R>> { | ||
Ok(self) | ||
} | ||
|
||
/// Adds any necessary crypto context to this page reader, if encryption is enabled. | ||
#[cfg(feature = "encryption")] | ||
pub(crate) fn with_crypto_context( | ||
pub(crate) fn add_crypto_context( | ||
mut self, | ||
crypto_context: Option<Arc<CryptoContext>>, | ||
) -> Self { | ||
self.crypto_context = crypto_context; | ||
self | ||
rg_idx: usize, | ||
column_idx: usize, | ||
parquet_meta_data: &ParquetMetaData, | ||
column_chunk_metadata: &ColumnChunkMetaData, | ||
) -> Result<SerializedPageReader<R>> { | ||
let Some(file_decryptor) = parquet_meta_data.file_decryptor() else { | ||
return Ok(self); | ||
}; | ||
let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else { | ||
return Ok(self); | ||
}; | ||
let crypto_context = | ||
CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?; | ||
self.crypto_context = Some(Arc::new(crypto_context)); | ||
Ok(self) | ||
} | ||
|
||
/// Creates a new serialized page with custom options. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the code that gets the next reader into its own function that returns
Result<>
so that I could use?
to check for errors. In theIterator
implementation, returningOption<Result<..>>
meant it was akward to return the errorsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 this is so much nicer