Skip to content

Draft POC Unified filter decoder #7503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 195 additions & 96 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
//! Contains reader which reads parquet data into arrow [`RecordBatch`]

use arrow_array::cast::AsArray;
use arrow_array::Array;
use arrow_array::{Array, ArrayRef, BooleanArray};
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
use arrow_select::filter::prep_null_mask_filter;
use arrow_select::filter::{filter, filter_record_batch, prep_null_mask_filter};
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};
use std::collections::VecDeque;
use std::sync::Arc;

use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder};
use arrow_select::concat::concat;
pub use crate::arrow::array_reader::RowGroups;
use crate::arrow::array_reader::{build_array_reader, ArrayReader};
use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
Expand Down Expand Up @@ -680,7 +681,37 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {

let mut filter = self.filter;
let mut selection = self.selection;
let mut projection = self.projection;


let predicate_projection = filter
.as_mut()
.map(|filter| {
filter
.predicates
.iter_mut()
.map(|p| p.projection().clone())
.reduce(|mut acc, p| {
acc.union(&p);
acc
})
})
.flatten();

let projection_to_cache = predicate_projection.as_ref().map(|p| {
let mut p = p.clone();
p.intersect(&projection);
p
});

let project_exclude_filter = projection_to_cache.as_ref().map(|p| {
let mut rest = projection.clone();
rest.subtract(p);
rest
}).or_else(|| Some(projection.clone()));


let mut filter_readers = vec![];
if let Some(filter) = filter.as_mut() {
for predicate in filter.predicates.iter_mut() {
if !selects_any(selection.as_ref()) {
Expand All @@ -690,26 +721,28 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
let array_reader =
build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?;

selection = Some(evaluate_predicate(
batch_size,
array_reader,
selection,
predicate.as_mut(),
)?);
filter_readers.push(array_reader);
}
}

let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?;

// If selection is empty, truncate
if !selects_any(selection.as_ref()) {
selection = Some(RowSelection::from(vec![]));
}

Ok(ParquetRecordBatchReader::new(

let filter_reader = build_array_reader(self.fields.as_deref(), predicate_projection.as_ref().unwrap(), &reader)?;


selection = apply_range(selection, reader.num_rows(), self.offset, self.limit);

Ok(ParquetRecordBatchReader::new(
batch_size,
array_reader,
apply_range(selection, reader.num_rows(), self.offset, self.limit),
build_array_reader(self.fields.as_deref(), project_exclude_filter.as_ref().unwrap(), &reader)?,
filter_readers,
filter,
projection_to_cache,
selection,
))
}
}
Expand Down Expand Up @@ -791,66 +824,143 @@ impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
pub struct ParquetRecordBatchReader {
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
filter_readers: Vec<Box<dyn ArrayReader>>,
row_filter: Option<RowFilter>,
schema: SchemaRef,
cached_mask: Option<ProjectionMask>,
selection: Option<VecDeque<RowSelector>>,
}


/// Take the next selection from the selection queue, and return the selection
/// whose selected row count is to_select or less (if input selection is exhausted).
fn take_next_selection(
selection: &mut VecDeque<RowSelector>,
to_select: usize,
) -> Option<RowSelection> {
let mut current_selected = 0;
let mut rt = Vec::new();
while let Some(front) = selection.pop_front() {
if front.skip {
rt.push(front);
continue;
}

if current_selected + front.row_count <= to_select {
rt.push(front);
current_selected += front.row_count;
} else {
let select = to_select - current_selected;
let remaining = front.row_count - select;
rt.push(RowSelector::select(select));
selection.push_front(RowSelector::select(remaining));

return Some(rt.into());
}
}
if !rt.is_empty() {
return Some(rt.into());
}
None
}



impl Iterator for ParquetRecordBatchReader {
type Item = Result<RecordBatch, ArrowError>;

fn next(&mut self) -> Option<Self::Item> {
let mut read_records = 0;
match self.selection.as_mut() {
Some(selection) => {
while read_records < self.batch_size && !selection.is_empty() {
let front = selection.pop_front().unwrap();
if front.skip {
let skipped = match self.array_reader.skip_records(front.row_count) {
Ok(skipped) => skipped,
Err(e) => return Some(Err(e.into())),

let mut current_selected = 0;

let mut current_selections: Vec<RowSelection> = vec![];
while current_selected < self.batch_size {
let selection: &mut VecDeque<RowSelector> = match self.selection.as_mut() {
Some(s) => s,
None => {
self.selection = Some(
std::iter::once(RowSelector::select(self.batch_size))
.collect::<VecDeque<_>>(),
);
self.selection.as_mut().unwrap()
}
};

let Some(mut raw_sel) = take_next_selection(selection, self.batch_size) else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a neat pattern -- to start with the relevant RowSelection and then potentially narrow it down using the filters

break;
};

let selection: Result<RowSelection, ArrowError> = match &mut self.row_filter {
None => Ok(raw_sel),
Some(filter) => {
debug_assert_eq!(
self.filter_readers.len(),
filter.predicates.len(),
"predicate readers and predicates should have the same length"
);

let mut final_select = raw_sel.clone();
for (predicate, reader) in filter
.predicates
.iter_mut()
.zip(self.filter_readers.iter_mut())
{
let array = read_selection(reader.as_mut(), &raw_sel);
let batch = RecordBatch::from(array.unwrap().as_struct_opt().ok_or_else(|| {
general_err!("Struct array reader should return struct array")
}).unwrap());
let input_rows = batch.num_rows();
let predicate_filter = predicate.evaluate(batch).unwrap();
if predicate_filter.len() != input_rows {
return Some(Err(ArrowError::ParquetError(format!(
"ArrowPredicate predicate returned {} rows, expected {input_rows}",
predicate_filter.len()
))));
}
let predicate_filter = match predicate_filter.null_count() {
0 => predicate_filter,
_ => prep_null_mask_filter(&predicate_filter),
};
let raw = RowSelection::from_filters(&[predicate_filter]);
final_select = final_select.and_then(&raw);

if skipped != front.row_count {
return Some(Err(general_err!(
"failed to skip rows, expected {}, got {}",
front.row_count,
skipped
)
.into()));
if !final_select.selects_any() {
break
}
continue;
}
Ok(final_select)
}
};

//Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
//Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
if front.row_count == 0 {
continue;
}
current_selected += selection.as_ref().unwrap().row_count();
current_selections.push(selection.unwrap());
}

// try to read record
let need_read = self.batch_size - read_records;
let to_read = match front.row_count.checked_sub(need_read) {
Some(remaining) if remaining != 0 => {
// if page row count less than batch_size we must set batch size to page row count.
// add check avoid dead loop
selection.push_front(RowSelector::select(remaining));
need_read
}
_ => front.row_count,
for selection in &mut current_selections {
for selector in selection.iter() {
if selector.skip {
let skipped = match self.array_reader.skip_records(selector.row_count) {
Ok(skipped) => skipped,
Err(e) => return Some(Err(e.into())),
};
match self.array_reader.read_records(to_read) {
Ok(0) => break,
Ok(rec) => read_records += rec,
Err(error) => return Some(Err(error.into())),

if skipped != selector.row_count {
return Some(Err(general_err!(
"failed to skip rows, expected {}, got {}",
selector.row_count,
skipped
)
.into()));
}
continue;
}

match self.array_reader.read_records(selector.row_count) {
Ok(read) => read,
Err(e) => return Some(Err(e.into())),
};
}
None => {
if let Err(error) = self.array_reader.read_records(self.batch_size) {
return Some(Err(error.into()));
}
}
};
}

match self.array_reader.consume_batch() {
Err(error) => Some(Err(error.into())),
Expand All @@ -863,7 +973,10 @@ impl Iterator for ParquetRecordBatchReader {

match struct_array {
Err(err) => Some(Err(err)),
Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))),
Ok(e) => {
// println!("e.len() = {}", e.len());
(e.len() > 0).then(|| Ok(RecordBatch::from(e)))
},
}
}
}
Expand Down Expand Up @@ -906,6 +1019,9 @@ impl ParquetRecordBatchReader {
Ok(Self {
batch_size,
array_reader,
filter_readers: vec![],
row_filter: None,
cached_mask: None,
schema: Arc::new(Schema::new(levels.fields.clone())),
selection: selection.map(|s| s.trim().into()),
})
Expand All @@ -916,7 +1032,14 @@ impl ParquetRecordBatchReader {
/// all rows will be returned
pub(crate) fn new(
batch_size: usize,
// finial project columns exclude the filter columns
array_reader: Box<dyn ArrayReader>,
// filter columns reader
filter_readers: Vec<Box<dyn ArrayReader>>,
// row filters
row_filter: Option<RowFilter>,
// Cached project mask
cached_mask: Option<ProjectionMask>,
selection: Option<RowSelection>,
) -> Self {
let schema = match array_reader.get_data_type() {
Expand All @@ -927,12 +1050,14 @@ impl ParquetRecordBatchReader {
Self {
batch_size,
array_reader,
filter_readers,
row_filter,
schema: Arc::new(schema),
cached_mask,
selection: selection.map(|s| s.trim().into()),
}
}
}

/// Returns `true` if `selection` is `None` or selects some rows
pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
selection.map(|x| x.selects_any()).unwrap_or(true)
Expand Down Expand Up @@ -973,46 +1098,20 @@ pub(crate) fn apply_range(
selection
}

/// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating
/// which rows to return.
///
/// `input_selection`: Optional pre-existing selection. If `Some`, then the
/// final [`RowSelection`] will be the conjunction of it and the rows selected
/// by `predicate`.
///
/// Note: A pre-existing selection may come from evaluating a previous predicate
/// or if the [`ParquetRecordBatchReader`] specified an explicit
/// [`RowSelection`] in addition to one or more predicates.
pub(crate) fn evaluate_predicate(
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
input_selection: Option<RowSelection>,
predicate: &mut dyn ArrowPredicate,
) -> Result<RowSelection> {
let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone());
let mut filters = vec![];
for maybe_batch in reader {
let maybe_batch = maybe_batch?;
let input_rows = maybe_batch.num_rows();
let filter = predicate.evaluate(maybe_batch)?;
// Since user supplied predicate, check error here to catch bugs quickly
if filter.len() != input_rows {
return Err(arrow_err!(
"ArrowPredicate predicate returned {} rows, expected {input_rows}",
filter.len()
));
fn read_selection(
reader: &mut dyn ArrayReader,
selection: &RowSelection,
) -> Result<ArrayRef, ParquetError> {
for selector in selection.iter() {
if selector.skip {
let skipped = reader.skip_records(selector.row_count)?;
debug_assert_eq!(skipped, selector.row_count, "failed to skip rows");
} else {
let read_records = reader.read_records(selector.row_count)?;
debug_assert_eq!(read_records, selector.row_count, "failed to read rows");
}
match filter.null_count() {
0 => filters.push(filter),
_ => filters.push(prep_null_mask_filter(&filter)),
};
}

let raw = RowSelection::from_filters(&filters);
Ok(match input_selection {
Some(selection) => selection.and_then(&raw),
None => raw,
})
reader.consume_batch()
}

#[cfg(test)]
Expand Down
Loading
Loading