From e5e45276be1c34f027527bf1ff0ae90d2a1b2594 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 29 Apr 2024 11:18:15 +0200 Subject: [PATCH 01/12] refactor: extract inclusive_projection from manifest_evaluator --- crates/iceberg/src/expr/predicate.rs | 10 +- .../src/expr/visitors/manifest_evaluator.rs | 352 ++++++++++-------- crates/iceberg/src/scan.rs | 130 ++++--- 3 files changed, 279 insertions(+), 213 deletions(-) diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index 6cdf4fc6b3..1457d5a174 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -32,7 +32,7 @@ use crate::spec::{Datum, SchemaRef}; use crate::{Error, ErrorKind}; /// Logical expression, such as `AND`, `OR`, `NOT`. -#[derive(PartialEq)] +#[derive(PartialEq, Clone)] pub struct LogicalExpression { inputs: [Box; N], } @@ -79,7 +79,7 @@ where } /// Unary predicate, for example, `a IS NULL`. -#[derive(PartialEq)] +#[derive(PartialEq, Clone)] pub struct UnaryExpression { /// Operator of this predicate, must be single operand operator. op: PredicateOperator, @@ -126,7 +126,7 @@ impl UnaryExpression { } /// Binary predicate, for example, `a > 10`. -#[derive(PartialEq)] +#[derive(PartialEq, Clone)] pub struct BinaryExpression { /// Operator of this predicate, must be binary operator, such as `=`, `>`, `<`, etc. op: PredicateOperator, @@ -184,7 +184,7 @@ impl Bind for BinaryExpression { } /// Set predicates, for example, `a in (1, 2, 3)`. -#[derive(PartialEq)] +#[derive(PartialEq, Clone)] pub struct SetExpression { /// Operator of this predicate, must be set operator, such as `IN`, `NOT IN`, etc. op: PredicateOperator, @@ -613,7 +613,7 @@ impl Not for Predicate { } /// Bound predicate expression after binding to a schema. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum BoundPredicate { /// An expression always evaluates to true. AlwaysTrue, diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs index 16d648178c..7029908829 100644 --- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs @@ -16,74 +16,48 @@ // under the License. use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; -use crate::expr::visitors::inclusive_projection::InclusiveProjection; -use crate::expr::{Bind, BoundPredicate, BoundReference}; -use crate::spec::{Datum, FieldSummary, ManifestFile, PartitionSpecRef, Schema, SchemaRef}; -use crate::{Error, ErrorKind}; +use crate::expr::{BoundPredicate, BoundReference}; +use crate::spec::{Datum, FieldSummary, ManifestFile}; +use crate::{Error, ErrorKind, Result}; use fnv::FnvHashSet; -use std::sync::Arc; -/// Evaluates [`ManifestFile`]s to see if their partition summary matches a provided -/// [`BoundPredicate`]. Used by [`TableScan`] to filter down the list of [`ManifestFile`]s +/// Evaluates a [`ManifestFile`] to see if the partition summaries +/// match a provided [`BoundPredicate`]. +/// +/// Used by [`TableScan`] to prune the list of [`ManifestFile`]s /// in which data might be found that matches the TableScan's filter. pub(crate) struct ManifestEvaluator { - partition_schema: SchemaRef, + partition_schema_id: i32, partition_filter: BoundPredicate, case_sensitive: bool, } impl ManifestEvaluator { pub(crate) fn new( - partition_spec: PartitionSpecRef, - table_schema: SchemaRef, - filter: BoundPredicate, + partition_schema_id: i32, + partition_filter: BoundPredicate, case_sensitive: bool, - ) -> crate::Result { - let partition_type = partition_spec.partition_type(&table_schema)?; - - // this is needed as SchemaBuilder.with_fields expects an iterator over - // Arc rather than &Arc - let cloned_partition_fields: Vec<_> = - partition_type.fields().iter().map(Arc::clone).collect(); - - // The partition_schema's schema_id is set to the partition - // spec's spec_id here, and used to perform a sanity check - // during eval to confirm that it matches the spec_id - // of the ManifestFile we're evaluating - let partition_schema = Schema::builder() - .with_schema_id(partition_spec.spec_id) - .with_fields(cloned_partition_fields) - .build()?; - - let partition_schema_ref = Arc::new(partition_schema); - - let mut inclusive_projection = InclusiveProjection::new(partition_spec.clone()); - let unbound_partition_filter = inclusive_projection.project(&filter)?; - - let partition_filter = unbound_partition_filter - .rewrite_not() - .bind(partition_schema_ref.clone(), case_sensitive)?; - - Ok(Self { - partition_schema: partition_schema_ref, + ) -> Self { + Self { + partition_schema_id, partition_filter, case_sensitive, - }) + } } /// Evaluate this `ManifestEvaluator`'s filter predicate against the /// provided [`ManifestFile`]'s partitions. Used by [`TableScan`] to /// see if this `ManifestFile` could possibly contain data that matches /// the scan's filter. - pub(crate) fn eval(&self, manifest_file: &ManifestFile) -> crate::Result { + pub(crate) fn eval(&self, manifest_file: &ManifestFile) -> Result { if manifest_file.partitions.is_empty() { return Ok(true); } - // The schema_id of self.partition_schema is set to the - // spec_id of the partition spec that this ManifestEvaluator - // was created from in ManifestEvaluator::new - if self.partition_schema.schema_id() != manifest_file.partition_spec_id { + // A [`ManifestEvaluator`] is created for a specific schema id + // based on the partition spec. This id should match the partition + // spec id of the [`ManifestFile`]. + if self.partition_schema_id != manifest_file.partition_spec_id { return Err(Error::new( ErrorKind::Unexpected, format!( @@ -273,194 +247,244 @@ impl ManifestFilterVisitor<'_> { #[cfg(test)] mod test { + use crate::expr::visitors::inclusive_projection::InclusiveProjection; use crate::expr::visitors::manifest_evaluator::ManifestEvaluator; - use crate::expr::{Bind, Predicate, PredicateOperator, Reference, UnaryExpression}; + use crate::expr::{ + Bind, BoundPredicate, Predicate, PredicateOperator, Reference, UnaryExpression, + }; use crate::spec::{ FieldSummary, ManifestContentType, ManifestFile, NestedField, PartitionField, - PartitionSpec, PrimitiveType, Schema, Transform, Type, + PartitionSpec, PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Transform, Type, }; + use crate::Result; use std::sync::Arc; - #[test] - fn test_manifest_file_no_partitions() { - let (table_schema_ref, partition_spec_ref) = create_test_schema_and_partition_spec(); + fn create_schema_and_partition_spec() -> Result<(SchemaRef, PartitionSpecRef)> { + let schema = Schema::builder() + .with_fields(vec![Arc::new(NestedField::optional( + 1, + "a", + Type::Primitive(PrimitiveType::Float), + ))]) + .build()?; - let partition_filter = Predicate::AlwaysTrue - .bind(table_schema_ref.clone(), false) + let spec = PartitionSpec::builder() + .with_spec_id(1) + .with_fields(vec![PartitionField::builder() + .source_id(1) + .name("a".to_string()) + .field_id(1) + .transform(Transform::Identity) + .build()]) + .build() .unwrap(); - let case_sensitive = false; + Ok((Arc::new(schema), Arc::new(spec))) + } + + fn create_schema_and_partition_spec_with_id_mismatch() -> Result<(SchemaRef, PartitionSpecRef)> + { + let schema = Schema::builder() + .with_fields(vec![Arc::new(NestedField::optional( + 1, + "a", + Type::Primitive(PrimitiveType::Float), + ))]) + .build()?; + + let spec = PartitionSpec::builder() + .with_spec_id(999) + .with_fields(vec![PartitionField::builder() + .source_id(1) + .name("a".to_string()) + .field_id(1) + .transform(Transform::Identity) + .build()]) + .build() + .unwrap(); + + Ok((Arc::new(schema), Arc::new(spec))) + } + + fn create_manifest_file(partitions: Vec) -> ManifestFile { + ManifestFile { + manifest_path: "/test/path".to_string(), + manifest_length: 0, + partition_spec_id: 1, + content: ManifestContentType::Data, + sequence_number: 0, + min_sequence_number: 0, + added_snapshot_id: 0, + added_data_files_count: None, + existing_data_files_count: None, + deleted_data_files_count: None, + added_rows_count: None, + existing_rows_count: None, + deleted_rows_count: None, + partitions, + key_metadata: vec![], + } + } + + fn create_partition_schema( + partition_spec: &PartitionSpecRef, + schema: &SchemaRef, + ) -> Result { + let partition_type = partition_spec.partition_type(schema)?; + + let partition_fields: Vec<_> = partition_type.fields().iter().map(Arc::clone).collect(); + + let partition_schema = Arc::new( + Schema::builder() + .with_schema_id(partition_spec.spec_id) + .with_fields(partition_fields) + .build()?, + ); + + Ok(partition_schema) + } + + fn create_partition_filter( + partition_spec: PartitionSpecRef, + partition_schema: SchemaRef, + filter: &BoundPredicate, + case_sensitive: bool, + ) -> Result { + let mut inclusive_projection = InclusiveProjection::new(partition_spec); + + let partition_filter = inclusive_projection + .project(filter)? + .rewrite_not() + .bind(partition_schema, case_sensitive)?; + + Ok(partition_filter) + } - let manifest_file_partitions = vec![]; - let manifest_file = create_test_manifest_file(manifest_file_partitions); + fn create_manifest_evaluator( + schema: SchemaRef, + partition_spec: PartitionSpecRef, + filter: &BoundPredicate, + case_sensitive: bool, + ) -> Result { + let partition_schema = create_partition_schema(&partition_spec, &schema)?; + let partition_filter = create_partition_filter( + partition_spec, + partition_schema.clone(), + filter, + case_sensitive, + )?; - let manifest_evaluator = ManifestEvaluator::new( - partition_spec_ref, - table_schema_ref, + Ok(ManifestEvaluator::new( + partition_schema.schema_id(), partition_filter, case_sensitive, - ) - .unwrap(); + )) + } - let result = manifest_evaluator.eval(&manifest_file).unwrap(); + #[test] + fn test_manifest_file_empty_partitions() -> Result<()> { + let case_sensitive = false; + + let (schema, partition_spec) = create_schema_and_partition_spec()?; + + let filter = Predicate::AlwaysTrue.bind(schema.clone(), case_sensitive)?; + + let manifest_file = create_manifest_file(vec![]); + + let manifest_evaluator = + create_manifest_evaluator(schema, partition_spec, &filter, case_sensitive)?; + + let result = manifest_evaluator.eval(&manifest_file)?; assert!(result); + + Ok(()) } #[test] - fn test_manifest_file_trivial_partition_passing_filter() { - let (table_schema_ref, partition_spec_ref) = create_test_schema_and_partition_spec(); + fn test_manifest_file_trivial_partition_passing_filter() -> Result<()> { + let case_sensitive = true; - let partition_filter = Predicate::Unary(UnaryExpression::new( + let (schema, partition_spec) = create_schema_and_partition_spec()?; + + let filter = Predicate::Unary(UnaryExpression::new( PredicateOperator::IsNull, Reference::new("a"), )) - .bind(table_schema_ref.clone(), true) - .unwrap(); + .bind(schema.clone(), case_sensitive)?; - let manifest_file_partitions = vec![FieldSummary { + let manifest_file = create_manifest_file(vec![FieldSummary { contains_null: true, contains_nan: None, lower_bound: None, upper_bound: None, - }]; - let manifest_file = create_test_manifest_file(manifest_file_partitions); + }]); let manifest_evaluator = - ManifestEvaluator::new(partition_spec_ref, table_schema_ref, partition_filter, true) - .unwrap(); + create_manifest_evaluator(schema, partition_spec, &filter, case_sensitive)?; - let result = manifest_evaluator.eval(&manifest_file).unwrap(); + let result = manifest_evaluator.eval(&manifest_file)?; assert!(result); + + Ok(()) } #[test] - fn test_manifest_file_partition_id_mismatch_returns_error() { - let (table_schema_ref, partition_spec_ref) = - create_test_schema_and_partition_spec_with_id_mismatch(); + fn test_manifest_file_partition_id_mismatch_returns_error() -> Result<()> { + let case_sensitive = true; - let partition_filter = Predicate::Unary(UnaryExpression::new( + let (schema, partition_spec) = create_schema_and_partition_spec_with_id_mismatch()?; + + let filter = Predicate::Unary(UnaryExpression::new( PredicateOperator::IsNull, Reference::new("a"), )) - .bind(table_schema_ref.clone(), true) - .unwrap(); + .bind(schema.clone(), case_sensitive)?; - let manifest_file_partitions = vec![FieldSummary { + let manifest_file = create_manifest_file(vec![FieldSummary { contains_null: true, contains_nan: None, lower_bound: None, upper_bound: None, - }]; - let manifest_file = create_test_manifest_file(manifest_file_partitions); + }]); let manifest_evaluator = - ManifestEvaluator::new(partition_spec_ref, table_schema_ref, partition_filter, true) - .unwrap(); + create_manifest_evaluator(schema, partition_spec, &filter, case_sensitive)?; let result = manifest_evaluator.eval(&manifest_file); assert!(result.is_err()); + + Ok(()) } #[test] - fn test_manifest_file_trivial_partition_rejected_filter() { - let (table_schema_ref, partition_spec_ref) = create_test_schema_and_partition_spec(); + fn test_manifest_file_trivial_partition_rejected_filter() -> Result<()> { + let case_sensitive = true; - let partition_filter = Predicate::Unary(UnaryExpression::new( + let (schema, partition_spec) = create_schema_and_partition_spec()?; + + let filter = Predicate::Unary(UnaryExpression::new( PredicateOperator::IsNan, Reference::new("a"), )) - .bind(table_schema_ref.clone(), true) - .unwrap(); + .bind(schema.clone(), case_sensitive)?; - let manifest_file_partitions = vec![FieldSummary { + let manifest_file = create_manifest_file(vec![FieldSummary { contains_null: false, contains_nan: None, lower_bound: None, upper_bound: None, - }]; - let manifest_file = create_test_manifest_file(manifest_file_partitions); + }]); let manifest_evaluator = - ManifestEvaluator::new(partition_spec_ref, table_schema_ref, partition_filter, true) - .unwrap(); + create_manifest_evaluator(schema, partition_spec, &filter, case_sensitive)?; let result = manifest_evaluator.eval(&manifest_file).unwrap(); assert!(!result); - } - - fn create_test_schema_and_partition_spec() -> (Arc, Arc) { - let table_schema = Schema::builder() - .with_fields(vec![Arc::new(NestedField::optional( - 1, - "a", - Type::Primitive(PrimitiveType::Float), - ))]) - .build() - .unwrap(); - let table_schema_ref = Arc::new(table_schema); - let partition_spec = PartitionSpec::builder() - .with_spec_id(1) - .with_fields(vec![PartitionField::builder() - .source_id(1) - .name("a".to_string()) - .field_id(1) - .transform(Transform::Identity) - .build()]) - .build() - .unwrap(); - let partition_spec_ref = Arc::new(partition_spec); - (table_schema_ref, partition_spec_ref) - } - - fn create_test_schema_and_partition_spec_with_id_mismatch() -> (Arc, Arc) - { - let table_schema = Schema::builder() - .with_fields(vec![Arc::new(NestedField::optional( - 1, - "a", - Type::Primitive(PrimitiveType::Float), - ))]) - .build() - .unwrap(); - let table_schema_ref = Arc::new(table_schema); - - let partition_spec = PartitionSpec::builder() - // Spec ID here deliberately doesn't match the one from create_test_manifest_file - .with_spec_id(999) - .with_fields(vec![PartitionField::builder() - .source_id(1) - .name("a".to_string()) - .field_id(1) - .transform(Transform::Identity) - .build()]) - .build() - .unwrap(); - let partition_spec_ref = Arc::new(partition_spec); - (table_schema_ref, partition_spec_ref) - } - - fn create_test_manifest_file(manifest_file_partitions: Vec) -> ManifestFile { - ManifestFile { - manifest_path: "/test/path".to_string(), - manifest_length: 0, - partition_spec_id: 1, - content: ManifestContentType::Data, - sequence_number: 0, - min_sequence_number: 0, - added_snapshot_id: 0, - added_data_files_count: None, - existing_data_files_count: None, - deleted_data_files_count: None, - added_rows_count: None, - existing_rows_count: None, - deleted_rows_count: None, - partitions: manifest_file_partitions, - key_metadata: vec![], - } + Ok(()) } } diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 36f71c1233..5f2688d707 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -18,17 +18,19 @@ //! Table scan api. use crate::arrow::ArrowReaderBuilder; +use crate::expr::visitors::inclusive_projection::InclusiveProjection; use crate::expr::visitors::manifest_evaluator::ManifestEvaluator; -use crate::expr::{Bind, Predicate}; +use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; use crate::spec::{ - DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadata, TableMetadataRef, + DataContentType, ManifestEntryRef, PartitionSpecRef, Schema, SchemaRef, SnapshotRef, + TableMetadataRef, }; use crate::table::Table; use crate::{Error, ErrorKind}; use arrow_array::RecordBatch; use async_stream::try_stream; -use futures::stream::{iter, BoxStream}; +use futures::stream::BoxStream; use futures::StreamExt; use std::collections::HashMap; use std::sync::Arc; @@ -176,12 +178,9 @@ impl TableScan { /// Returns a stream of file scan tasks. pub async fn plan_files(&self) -> crate::Result { - // Cache `ManifestEvaluatorFactory`s created as part of this scan let mut manifest_evaluator_cache: HashMap = HashMap::new(); + let mut partition_filter_cache: HashMap = HashMap::new(); - // these variables needed to ensure that we don't need to pass a - // reference to self into `try_stream`, as it expects references - // passed in to outlive 'static let schema = self.schema.clone(); let snapshot = self.snapshot.clone(); let table_metadata = self.table_metadata.clone(); @@ -190,34 +189,51 @@ impl TableScan { let filter = self.filter.clone(); Ok(try_stream! { + let manifest_list = snapshot - .clone() - .load_manifest_list(&file_io, &table_metadata) - .await?; + .load_manifest_list(&file_io, &table_metadata) + .await?; - // Generate data file stream for entry in manifest_list.entries() { - // If this scan has a filter, check the partition evaluator cache for an existing - // PartitionEvaluator that matches this manifest's partition spec ID. - // Use one from the cache if there is one. If not, create one, put it in - // the cache, and take a reference to it. - #[allow(clippy::map_entry)] if let Some(filter) = filter.as_ref() { - if !manifest_evaluator_cache.contains_key(&entry.partition_spec_id) { - manifest_evaluator_cache.insert(entry.partition_spec_id, Self::create_manifest_evaluator(entry.partition_spec_id, schema.clone(), table_metadata.clone(), case_sensitive, filter)?); - } - let manifest_evaluator = &manifest_evaluator_cache[&entry.partition_spec_id]; + let bound_filter = filter.bind(schema.clone(), case_sensitive)?; + + let partition_spec_id = entry.partition_spec_id; + let partition_spec = + Self::create_partition_spec(&table_metadata, partition_spec_id)?; + + let partition_schema = Self::create_partition_schema(partition_spec, &schema)?; + + let partition_filter = partition_filter_cache.entry(partition_spec_id).or_insert( + Self::create_partition_filter( + partition_spec.clone(), + partition_schema.clone(), + &bound_filter, + case_sensitive, + )?, + ); + + let manifest_evaluator = manifest_evaluator_cache + .entry(partition_spec_id) + .or_insert(ManifestEvaluator::new( + partition_schema.schema_id(), + partition_filter.clone(), + case_sensitive, + )); - // reject any manifest files whose partition values don't match the filter. if !manifest_evaluator.eval(entry)? { continue; } + + // TODO: Create ExpressionEvaluator + // TODO: Create InclusiveMetricsEvaluator } let manifest = entry.load_manifest(&file_io).await?; + let mut manifest_entries_stream = + futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive())); - let mut manifest_entries = iter(manifest.entries().iter().filter(|e| e.is_alive())); - while let Some(manifest_entry) = manifest_entries.next().await { + while let Some(manifest_entry) = manifest_entries_stream.next().await { match manifest_entry.content_type() { DataContentType::EqualityDeletes | DataContentType::PositionDeletes => { yield Err(Error::new( @@ -236,30 +252,56 @@ impl TableScan { } } } - } - .boxed()) + + }.boxed()) + } + + fn create_partition_spec( + table_metadata: &TableMetadataRef, + partition_spec_id: i32, + ) -> crate::Result<&PartitionSpecRef> { + let partition_spec = table_metadata + .partition_spec_by_id(partition_spec_id) + .ok_or(Error::new( + ErrorKind::Unexpected, + format!("Could not find partition spec for id {}", partition_spec_id), + ))?; + + Ok(partition_spec) } - fn create_manifest_evaluator( - id: i32, - schema: SchemaRef, - table_metadata: Arc, + fn create_partition_schema( + partition_spec: &PartitionSpecRef, + schema: &SchemaRef, + ) -> crate::Result { + let partition_type = partition_spec.partition_type(schema)?; + + let partition_fields: Vec<_> = partition_type.fields().iter().map(Arc::clone).collect(); + + let partition_schema = Arc::new( + Schema::builder() + .with_schema_id(partition_spec.spec_id) + .with_fields(partition_fields) + .build()?, + ); + + Ok(partition_schema) + } + + fn create_partition_filter( + partition_spec: PartitionSpecRef, + partition_schema: SchemaRef, + filter: &BoundPredicate, case_sensitive: bool, - filter: &Predicate, - ) -> crate::Result { - let bound_predicate = filter.bind(schema.clone(), case_sensitive)?; - - let partition_spec = table_metadata.partition_spec_by_id(id).ok_or(Error::new( - ErrorKind::Unexpected, - format!("Could not find partition spec for id {id}"), - ))?; - - ManifestEvaluator::new( - partition_spec.clone(), - schema.clone(), - bound_predicate, - case_sensitive, - ) + ) -> crate::Result { + let mut inclusive_projection = InclusiveProjection::new(partition_spec); + + let partition_filter = inclusive_projection + .project(filter)? + .rewrite_not() + .bind(partition_schema, case_sensitive)?; + + Ok(partition_filter) } pub async fn to_arrow(&self) -> crate::Result { From 8c22f980b939ab5c1524d21560d83bb80b2d5b06 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 29 Apr 2024 11:39:41 +0200 Subject: [PATCH 02/12] refactor: add FileScanStreamContext --- .../src/expr/visitors/manifest_evaluator.rs | 1 + crates/iceberg/src/scan.rs | 182 ++++++++++-------- 2 files changed, 108 insertions(+), 75 deletions(-) diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs index 7029908829..0784aab1a6 100644 --- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs @@ -21,6 +21,7 @@ use crate::spec::{Datum, FieldSummary, ManifestFile}; use crate::{Error, ErrorKind, Result}; use fnv::FnvHashSet; +#[derive(Debug)] /// Evaluates a [`ManifestFile`] to see if the partition summaries /// match a provided [`BoundPredicate`]. /// diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 5f2688d707..97ac54b3ce 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -27,7 +27,7 @@ use crate::spec::{ TableMetadataRef, }; use crate::table::Table; -use crate::{Error, ErrorKind}; +use crate::{Error, ErrorKind, Result}; use arrow_array::RecordBatch; use async_stream::try_stream; use futures::stream::BoxStream; @@ -35,6 +35,11 @@ use futures::StreamExt; use std::collections::HashMap; use std::sync::Arc; +/// A stream of [`FileScanTask`]. +pub type FileScanTaskStream = BoxStream<'static, Result>; +/// A stream of arrow [`RecordBatch`]es. +pub type ArrowRecordBatchStream = BoxStream<'static, Result>; + /// Builder to create table scan. pub struct TableScanBuilder<'a> { table: &'a Table, @@ -171,54 +176,49 @@ pub struct TableScan { filter: Option>, } -/// A stream of [`FileScanTask`]. -pub type FileScanTaskStream = BoxStream<'static, crate::Result>; - impl TableScan { /// Returns a stream of file scan tasks. - pub async fn plan_files(&self) -> crate::Result { - let mut manifest_evaluator_cache: HashMap = HashMap::new(); - let mut partition_filter_cache: HashMap = HashMap::new(); - - let schema = self.schema.clone(); - let snapshot = self.snapshot.clone(); - let table_metadata = self.table_metadata.clone(); - let file_io = self.file_io.clone(); - let case_sensitive = self.case_sensitive; - let filter = self.filter.clone(); + let mut context = FileScanStreamContext::new( + self.schema.clone(), + self.snapshot.clone(), + self.table_metadata.clone(), + self.file_io.clone(), + self.filter.clone(), + self.case_sensitive, + ); Ok(try_stream! { - let manifest_list = snapshot - .load_manifest_list(&file_io, &table_metadata) + let manifest_list = context.snapshot + .load_manifest_list(&context.file_io, &context.table_metadata) .await?; for entry in manifest_list.entries() { - if let Some(filter) = filter.as_ref() { - let bound_filter = filter.bind(schema.clone(), case_sensitive)?; + if let Some(filter) = context.filter.as_ref() { + let bound_filter = filter.bind(context.schema.clone(), context.case_sensitive)?; let partition_spec_id = entry.partition_spec_id; let partition_spec = - Self::create_partition_spec(&table_metadata, partition_spec_id)?; + Self::create_partition_spec(&context.table_metadata, partition_spec_id)?; - let partition_schema = Self::create_partition_schema(partition_spec, &schema)?; + let partition_schema = Self::create_partition_schema(partition_spec, &context.schema)?; - let partition_filter = partition_filter_cache.entry(partition_spec_id).or_insert( + let partition_filter = context.partition_filter_cache.entry(partition_spec_id).or_insert( Self::create_partition_filter( partition_spec.clone(), partition_schema.clone(), &bound_filter, - case_sensitive, + context.case_sensitive, )?, ); - let manifest_evaluator = manifest_evaluator_cache + let manifest_evaluator = context.manifest_evaluator_cache .entry(partition_spec_id) .or_insert(ManifestEvaluator::new( partition_schema.schema_id(), partition_filter.clone(), - case_sensitive, + context.case_sensitive, )); if !manifest_evaluator.eval(entry)? { @@ -229,7 +229,7 @@ impl TableScan { // TODO: Create InclusiveMetricsEvaluator } - let manifest = entry.load_manifest(&file_io).await?; + let manifest = entry.load_manifest(&context.file_io).await?; let mut manifest_entries_stream = futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive())); @@ -256,6 +256,58 @@ impl TableScan { }.boxed()) } + /// Returns an [`ArrowRecordBatchStream`]. + pub async fn to_arrow(&self) -> crate::Result { + let mut arrow_reader_builder = + ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone()); + + let mut field_ids = vec![]; + for column_name in &self.column_names { + let field_id = self.schema.field_id_by_name(column_name).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Column {} not found in table. Schema: {}", + column_name, self.schema + ), + ) + })?; + + let field = self.schema + .as_struct() + .field_by_id(field_id) + .ok_or_else(|| { + Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Column {} is not a direct child of schema but a nested field, which is not supported now. Schema: {}", + column_name, self.schema + ), + ) + })?; + + if !field.field_type.is_primitive() { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Column {} is not a primitive type. Schema: {}", + column_name, self.schema + ), + )); + } + + field_ids.push(field_id as usize); + } + + arrow_reader_builder = arrow_reader_builder.with_field_ids(field_ids); + + if let Some(batch_size) = self.batch_size { + arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); + } + + arrow_reader_builder.build().read(self.plan_files().await?) + } + fn create_partition_spec( table_metadata: &TableMetadataRef, partition_spec_id: i32, @@ -303,56 +355,39 @@ impl TableScan { Ok(partition_filter) } +} - pub async fn to_arrow(&self) -> crate::Result { - let mut arrow_reader_builder = - ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone()); - - let mut field_ids = vec![]; - for column_name in &self.column_names { - let field_id = self.schema.field_id_by_name(column_name).ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!( - "Column {} not found in table. Schema: {}", - column_name, self.schema - ), - ) - })?; - - let field = self.schema - .as_struct() - .field_by_id(field_id) - .ok_or_else(|| { - Error::new( - ErrorKind::FeatureUnsupported, - format!( - "Column {} is not a direct child of schema but a nested field, which is not supported now. Schema: {}", - column_name, self.schema - ), - ) - })?; - - if !field.field_type.is_primitive() { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - format!( - "Column {} is not a primitive type. Schema: {}", - column_name, self.schema - ), - )); - } - - field_ids.push(field_id as usize); - } - - arrow_reader_builder = arrow_reader_builder.with_field_ids(field_ids); +#[derive(Debug)] +struct FileScanStreamContext { + schema: SchemaRef, + snapshot: SnapshotRef, + table_metadata: TableMetadataRef, + file_io: FileIO, + filter: Option>, + case_sensitive: bool, + partition_filter_cache: HashMap, + manifest_evaluator_cache: HashMap, +} - if let Some(batch_size) = self.batch_size { - arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); +impl FileScanStreamContext { + fn new( + schema: SchemaRef, + snapshot: SnapshotRef, + table_metadata: TableMetadataRef, + file_io: FileIO, + filter: Option>, + case_sensitive: bool, + ) -> Self { + Self { + schema, + snapshot, + table_metadata, + file_io, + filter, + case_sensitive, + partition_filter_cache: HashMap::new(), + manifest_evaluator_cache: HashMap::new(), } - - arrow_reader_builder.build().read(self.plan_files().await?) } } @@ -366,9 +401,6 @@ pub struct FileScanTask { length: u64, } -/// A stream of arrow record batches. -pub type ArrowRecordBatchStream = BoxStream<'static, crate::Result>; - impl FileScanTask { pub fn data(&self) -> ManifestEntryRef { self.data_manifest_entry.clone() From e89d35f8a23b643cf64e5e5f4775a547dde37db2 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 29 Apr 2024 14:00:40 +0200 Subject: [PATCH 03/12] refactor: create partition_spec and partition_schema --- crates/iceberg/src/scan.rs | 75 +++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 42 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 97ac54b3ce..a5c6ed2bcb 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -179,7 +179,7 @@ pub struct TableScan { impl TableScan { /// Returns a stream of file scan tasks. pub async fn plan_files(&self) -> crate::Result { - let mut context = FileScanStreamContext::new( + let context = FileScanStreamContext::new( self.schema.clone(), self.snapshot.clone(), self.table_metadata.clone(), @@ -188,6 +188,10 @@ impl TableScan { self.case_sensitive, ); + let mut partition_filter_cache: HashMap = HashMap::new(); + + let mut manifest_evaluator_cache: HashMap = HashMap::new(); + Ok(try_stream! { let manifest_list = context.snapshot @@ -199,12 +203,11 @@ impl TableScan { let bound_filter = filter.bind(context.schema.clone(), context.case_sensitive)?; let partition_spec_id = entry.partition_spec_id; - let partition_spec = - Self::create_partition_spec(&context.table_metadata, partition_spec_id)?; - let partition_schema = Self::create_partition_schema(partition_spec, &context.schema)?; + let (partition_spec, partition_schema) = context.create_partition_spec_and_schema(partition_spec_id)?; - let partition_filter = context.partition_filter_cache.entry(partition_spec_id).or_insert( + + let partition_filter = partition_filter_cache.entry(partition_spec_id).or_insert( Self::create_partition_filter( partition_spec.clone(), partition_schema.clone(), @@ -213,7 +216,7 @@ impl TableScan { )?, ); - let manifest_evaluator = context.manifest_evaluator_cache + let manifest_evaluator = manifest_evaluator_cache .entry(partition_spec_id) .or_insert(ManifestEvaluator::new( partition_schema.schema_id(), @@ -308,38 +311,6 @@ impl TableScan { arrow_reader_builder.build().read(self.plan_files().await?) } - fn create_partition_spec( - table_metadata: &TableMetadataRef, - partition_spec_id: i32, - ) -> crate::Result<&PartitionSpecRef> { - let partition_spec = table_metadata - .partition_spec_by_id(partition_spec_id) - .ok_or(Error::new( - ErrorKind::Unexpected, - format!("Could not find partition spec for id {}", partition_spec_id), - ))?; - - Ok(partition_spec) - } - - fn create_partition_schema( - partition_spec: &PartitionSpecRef, - schema: &SchemaRef, - ) -> crate::Result { - let partition_type = partition_spec.partition_type(schema)?; - - let partition_fields: Vec<_> = partition_type.fields().iter().map(Arc::clone).collect(); - - let partition_schema = Arc::new( - Schema::builder() - .with_schema_id(partition_spec.spec_id) - .with_fields(partition_fields) - .build()?, - ); - - Ok(partition_schema) - } - fn create_partition_filter( partition_spec: PartitionSpecRef, partition_schema: SchemaRef, @@ -365,8 +336,6 @@ struct FileScanStreamContext { file_io: FileIO, filter: Option>, case_sensitive: bool, - partition_filter_cache: HashMap, - manifest_evaluator_cache: HashMap, } impl FileScanStreamContext { @@ -385,10 +354,32 @@ impl FileScanStreamContext { file_io, filter, case_sensitive, - partition_filter_cache: HashMap::new(), - manifest_evaluator_cache: HashMap::new(), } } + + fn create_partition_spec_and_schema( + &self, + spec_id: i32, + ) -> Result<(&PartitionSpecRef, SchemaRef)> { + let partition_spec = + self.table_metadata + .partition_spec_by_id(spec_id) + .ok_or(Error::new( + ErrorKind::Unexpected, + format!("Could not find partition spec for id {}", spec_id), + ))?; + + let partition_type = partition_spec.partition_type(&self.schema)?; + let partition_fields = partition_type.fields().to_owned(); + let partition_schema = Arc::new( + Schema::builder() + .with_schema_id(partition_spec.spec_id) + .with_fields(partition_fields) + .build()?, + ); + + Ok((partition_spec, partition_schema)) + } } /// A task to scan part of file. From de12066252343f10db62ad4fd369bf48f4bad8ab Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 29 Apr 2024 14:50:20 +0200 Subject: [PATCH 04/12] refactor: add cache structs --- crates/iceberg/src/scan.rs | 101 ++++++++++++++++++++++++------------- 1 file changed, 66 insertions(+), 35 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index a5c6ed2bcb..df253ff890 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -188,9 +188,8 @@ impl TableScan { self.case_sensitive, ); - let mut partition_filter_cache: HashMap = HashMap::new(); - - let mut manifest_evaluator_cache: HashMap = HashMap::new(); + let mut partition_filter_cache = PartitionFilterCache::new(); + let mut manifest_evaluator_cache = ManifestEvaluatorCache::new(); Ok(try_stream! { @@ -200,29 +199,17 @@ impl TableScan { for entry in manifest_list.entries() { if let Some(filter) = context.filter.as_ref() { - let bound_filter = filter.bind(context.schema.clone(), context.case_sensitive)?; + let filter = filter.bind(context.schema.clone(), context.case_sensitive)?; let partition_spec_id = entry.partition_spec_id; let (partition_spec, partition_schema) = context.create_partition_spec_and_schema(partition_spec_id)?; + let partition_schema_id = partition_schema.schema_id(); - let partition_filter = partition_filter_cache.entry(partition_spec_id).or_insert( - Self::create_partition_filter( - partition_spec.clone(), - partition_schema.clone(), - &bound_filter, - context.case_sensitive, - )?, - ); + let partition_filter = partition_filter_cache.get(partition_spec_id, partition_spec.clone(), partition_schema, &filter, context.case_sensitive)?; - let manifest_evaluator = manifest_evaluator_cache - .entry(partition_spec_id) - .or_insert(ManifestEvaluator::new( - partition_schema.schema_id(), - partition_filter.clone(), - context.case_sensitive, - )); + let manifest_evaluator = manifest_evaluator_cache.get(partition_spec_id, partition_schema_id, partition_filter.clone(), context.case_sensitive); if !manifest_evaluator.eval(entry)? { continue; @@ -310,22 +297,6 @@ impl TableScan { arrow_reader_builder.build().read(self.plan_files().await?) } - - fn create_partition_filter( - partition_spec: PartitionSpecRef, - partition_schema: SchemaRef, - filter: &BoundPredicate, - case_sensitive: bool, - ) -> crate::Result { - let mut inclusive_projection = InclusiveProjection::new(partition_spec); - - let partition_filter = inclusive_projection - .project(filter)? - .rewrite_not() - .bind(partition_schema, case_sensitive)?; - - Ok(partition_filter) - } } #[derive(Debug)] @@ -382,6 +353,66 @@ impl FileScanStreamContext { } } +struct PartitionFilterCache(HashMap); + +impl PartitionFilterCache { + fn new() -> Self { + Self(HashMap::new()) + } + + fn get( + &mut self, + spec_id: i32, + partition_spec: PartitionSpecRef, + partition_schema: SchemaRef, + filter: &BoundPredicate, + case_sensitive: bool, + ) -> Result<&BoundPredicate> { + if !self.0.contains_key(&spec_id) { + let mut inclusive_projection = InclusiveProjection::new(partition_spec); + + let partition_filter = inclusive_projection + .project(filter)? + .rewrite_not() + .bind(partition_schema, case_sensitive)?; + + self.0.insert(spec_id, partition_filter); + } + + self.0.get(&spec_id).ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + format!( + "Expected a partition filter for spec id {} and predicate {}", + spec_id, filter + ), + ) + }) + } +} + +struct ManifestEvaluatorCache(HashMap); + +impl ManifestEvaluatorCache { + fn new() -> Self { + Self(HashMap::new()) + } + + fn get( + &mut self, + spec_id: i32, + partition_schema_id: i32, + partition_filter: BoundPredicate, + case_sensitive: bool, + ) -> &mut ManifestEvaluator { + self.0.entry(spec_id).or_insert(ManifestEvaluator::new( + partition_schema_id, + partition_filter, + case_sensitive, + )) + } +} + /// A task to scan part of file. #[derive(Debug)] pub struct FileScanTask { From d46e25bd57575dd492e1836b8da297760574e2ba Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 29 Apr 2024 15:14:22 +0200 Subject: [PATCH 05/12] refactor: use entry in partition_file_cache --- crates/iceberg/src/scan.rs | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index df253ff890..2245f117b5 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -32,6 +32,7 @@ use arrow_array::RecordBatch; use async_stream::try_stream; use futures::stream::BoxStream; use futures::StreamExt; +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; @@ -368,26 +369,19 @@ impl PartitionFilterCache { filter: &BoundPredicate, case_sensitive: bool, ) -> Result<&BoundPredicate> { - if !self.0.contains_key(&spec_id) { - let mut inclusive_projection = InclusiveProjection::new(partition_spec); + match self.0.entry(spec_id) { + Entry::Occupied(e) => Ok(e.into_mut()), + Entry::Vacant(e) => { + let mut inclusive_projection = InclusiveProjection::new(partition_spec); - let partition_filter = inclusive_projection - .project(filter)? - .rewrite_not() - .bind(partition_schema, case_sensitive)?; + let partition_filter = inclusive_projection + .project(filter)? + .rewrite_not() + .bind(partition_schema, case_sensitive)?; - self.0.insert(spec_id, partition_filter); + Ok(e.insert(partition_filter)) + } } - - self.0.get(&spec_id).ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - format!( - "Expected a partition filter for spec id {} and predicate {}", - spec_id, filter - ), - ) - }) } } From 22b30431dea2b8a7aaf1f2a33f87a3911df61049 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 29 Apr 2024 15:15:29 +0200 Subject: [PATCH 06/12] refactor: use result --- crates/iceberg/src/scan.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 2245f117b5..2ffb26f462 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -107,7 +107,7 @@ impl<'a> TableScanBuilder<'a> { } /// Build the table scan. - pub fn build(self) -> crate::Result { + pub fn build(self) -> Result { let snapshot = match self.snapshot_id { Some(snapshot_id) => self .table @@ -179,7 +179,7 @@ pub struct TableScan { impl TableScan { /// Returns a stream of file scan tasks. - pub async fn plan_files(&self) -> crate::Result { + pub async fn plan_files(&self) -> Result { let context = FileScanStreamContext::new( self.schema.clone(), self.snapshot.clone(), @@ -233,7 +233,7 @@ impl TableScan { ))?; } DataContentType::Data => { - let scan_task: crate::Result = Ok(FileScanTask { + let scan_task: Result = Ok(FileScanTask { data_manifest_entry: manifest_entry.clone(), start: 0, length: manifest_entry.file_size_in_bytes(), @@ -248,7 +248,7 @@ impl TableScan { } /// Returns an [`ArrowRecordBatchStream`]. - pub async fn to_arrow(&self) -> crate::Result { + pub async fn to_arrow(&self) -> Result { let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone()); From 11db2002e19230893c7884de7c288c9175965e40 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 29 Apr 2024 15:30:22 +0200 Subject: [PATCH 07/12] chore: update docs + fmt --- crates/iceberg/src/scan.rs | 49 ++++++++++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 10 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 2ffb26f462..fb7cd5a964 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -178,7 +178,7 @@ pub struct TableScan { } impl TableScan { - /// Returns a stream of file scan tasks. + /// Returns a stream of [`FileScanTask`]s. pub async fn plan_files(&self) -> Result { let context = FileScanStreamContext::new( self.schema.clone(), @@ -193,8 +193,8 @@ impl TableScan { let mut manifest_evaluator_cache = ManifestEvaluatorCache::new(); Ok(try_stream! { - - let manifest_list = context.snapshot + let manifest_list = context + .snapshot .load_manifest_list(&context.file_io, &context.table_metadata) .await?; @@ -204,13 +204,23 @@ impl TableScan { let partition_spec_id = entry.partition_spec_id; - let (partition_spec, partition_schema) = context.create_partition_spec_and_schema(partition_spec_id)?; - - let partition_schema_id = partition_schema.schema_id(); + let (partition_spec, partition_schema) = + context.create_partition_spec_and_schema(partition_spec_id)?; - let partition_filter = partition_filter_cache.get(partition_spec_id, partition_spec.clone(), partition_schema, &filter, context.case_sensitive)?; + let partition_filter = partition_filter_cache.get( + partition_spec_id, + partition_spec.clone(), + partition_schema.clone(), + &filter, + context.case_sensitive, + )?; - let manifest_evaluator = manifest_evaluator_cache.get(partition_spec_id, partition_schema_id, partition_filter.clone(), context.case_sensitive); + let manifest_evaluator = manifest_evaluator_cache.get( + partition_spec_id, + partition_schema.schema_id(), + partition_filter.clone(), + context.case_sensitive, + ); if !manifest_evaluator.eval(entry)? { continue; @@ -243,8 +253,8 @@ impl TableScan { } } } - - }.boxed()) + } + .boxed()) } /// Returns an [`ArrowRecordBatchStream`]. @@ -301,6 +311,8 @@ impl TableScan { } #[derive(Debug)] +/// Holds the context necessary for file scanning operations +/// in a streaming environment. struct FileScanStreamContext { schema: SchemaRef, snapshot: SnapshotRef, @@ -311,6 +323,7 @@ struct FileScanStreamContext { } impl FileScanStreamContext { + /// Creates a new [`FileScanStreamContext`]. fn new( schema: SchemaRef, snapshot: SnapshotRef, @@ -329,6 +342,8 @@ impl FileScanStreamContext { } } + /// Creates a reference-counted [`PartitionSpec`] and a + /// corresponding schema based on the specified partition spec id. fn create_partition_spec_and_schema( &self, spec_id: i32, @@ -354,13 +369,20 @@ impl FileScanStreamContext { } } +#[derive(Debug)] +/// Manages the caching of [`BoundPredicate`] objects +/// for [`PartitionSpec`]s based on partition spec id. struct PartitionFilterCache(HashMap); impl PartitionFilterCache { + /// Creates a new [`PartitionFilterCache`] + /// with an empty internal HashMap. fn new() -> Self { Self(HashMap::new()) } + /// Retrieves a [`BoundPredicate`] from the cache + /// or computes it if not present. fn get( &mut self, spec_id: i32, @@ -385,13 +407,20 @@ impl PartitionFilterCache { } } +#[derive(Debug)] +/// Manages the caching of [`ManifestEvaluator`] objects +/// for [`PartitionSpec`]s based on partition spec id. struct ManifestEvaluatorCache(HashMap); impl ManifestEvaluatorCache { + /// Creates a new [`ManifestEvaluatorCache`] + /// with an empty internal HashMap. fn new() -> Self { Self(HashMap::new()) } + /// Retrieves a [`ManifestEvaluator`] from the cache + /// or computes it if not present. fn get( &mut self, spec_id: i32, From 033c78827aa14f89c0ccbc235d9e096a70a7982e Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 29 Apr 2024 20:13:26 +0200 Subject: [PATCH 08/12] refactor: add bound_filter to FileScanStreamContext --- crates/iceberg/src/scan.rs | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index fb7cd5a964..dc9634f24a 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -189,6 +189,8 @@ impl TableScan { self.case_sensitive, ); + let bound_filter = context.bound_filter()?; + let mut partition_filter_cache = PartitionFilterCache::new(); let mut manifest_evaluator_cache = ManifestEvaluatorCache::new(); @@ -199,9 +201,7 @@ impl TableScan { .await?; for entry in manifest_list.entries() { - if let Some(filter) = context.filter.as_ref() { - let filter = filter.bind(context.schema.clone(), context.case_sensitive)?; - + if let Some(filter) = &bound_filter { let partition_spec_id = entry.partition_spec_id; let (partition_spec, partition_schema) = @@ -211,7 +211,7 @@ impl TableScan { partition_spec_id, partition_spec.clone(), partition_schema.clone(), - &filter, + filter, context.case_sensitive, )?; @@ -227,7 +227,6 @@ impl TableScan { } // TODO: Create ExpressionEvaluator - // TODO: Create InclusiveMetricsEvaluator } let manifest = entry.load_manifest(&context.file_io).await?; @@ -235,6 +234,9 @@ impl TableScan { futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive())); while let Some(manifest_entry) = manifest_entries_stream.next().await { + // TODO: Apply ExpressionEvaluator + // TODO: Apply InclusiveMetricsEvaluator::eval() + match manifest_entry.content_type() { DataContentType::EqualityDeletes | DataContentType::PositionDeletes => { yield Err(Error::new( @@ -342,6 +344,14 @@ impl FileScanStreamContext { } } + /// Creates a [`BoundPredicate`] from row filter [`Predicate`]. + fn bound_filter(&self) -> Result> { + match self.filter { + Some(ref filter) => Ok(Some(filter.bind(self.schema.clone(), self.case_sensitive)?)), + None => Ok(None), + } + } + /// Creates a reference-counted [`PartitionSpec`] and a /// corresponding schema based on the specified partition spec id. fn create_partition_spec_and_schema( From 56499a5fca40f4af4e28902dfecf96d580a00459 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Tue, 30 Apr 2024 05:59:26 +0200 Subject: [PATCH 09/12] refactor: return ref BoundPredicate --- crates/iceberg/src/scan.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index dc9634f24a..61a2fc7771 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -187,9 +187,7 @@ impl TableScan { self.file_io.clone(), self.filter.clone(), self.case_sensitive, - ); - - let bound_filter = context.bound_filter()?; + )?; let mut partition_filter_cache = PartitionFilterCache::new(); let mut manifest_evaluator_cache = ManifestEvaluatorCache::new(); @@ -201,7 +199,7 @@ impl TableScan { .await?; for entry in manifest_list.entries() { - if let Some(filter) = &bound_filter { + if let Some(filter) = context.bound_filter() { let partition_spec_id = entry.partition_spec_id; let (partition_spec, partition_schema) = @@ -320,7 +318,7 @@ struct FileScanStreamContext { snapshot: SnapshotRef, table_metadata: TableMetadataRef, file_io: FileIO, - filter: Option>, + bound_filter: Option, case_sensitive: bool, } @@ -333,23 +331,25 @@ impl FileScanStreamContext { file_io: FileIO, filter: Option>, case_sensitive: bool, - ) -> Self { - Self { + ) -> Result { + let bound_filter = match filter { + Some(ref filter) => Some(filter.bind(schema.clone(), case_sensitive)?), + None => None, + }; + + Ok(Self { schema, snapshot, table_metadata, file_io, - filter, + bound_filter, case_sensitive, - } + }) } - /// Creates a [`BoundPredicate`] from row filter [`Predicate`]. - fn bound_filter(&self) -> Result> { - match self.filter { - Some(ref filter) => Ok(Some(filter.bind(self.schema.clone(), self.case_sensitive)?)), - None => Ok(None), - } + /// Returns a reference to the [`BoundPredicate`] filter. + fn bound_filter(&self) -> Option<&BoundPredicate> { + self.bound_filter.as_ref() } /// Creates a reference-counted [`PartitionSpec`] and a From e9836de8a1757241c9733ec1e5c33dd777f3f3dd Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Tue, 30 Apr 2024 06:09:30 +0200 Subject: [PATCH 10/12] fix: return type PartitionSpecRef --- crates/iceberg/src/scan.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 61a2fc7771..cecf524570 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -207,7 +207,7 @@ impl TableScan { let partition_filter = partition_filter_cache.get( partition_spec_id, - partition_spec.clone(), + partition_spec, partition_schema.clone(), filter, context.case_sensitive, @@ -353,11 +353,11 @@ impl FileScanStreamContext { } /// Creates a reference-counted [`PartitionSpec`] and a - /// corresponding schema based on the specified partition spec id. + /// corresponding [`Schema`] based on the specified partition spec id. fn create_partition_spec_and_schema( &self, spec_id: i32, - ) -> Result<(&PartitionSpecRef, SchemaRef)> { + ) -> Result<(PartitionSpecRef, SchemaRef)> { let partition_spec = self.table_metadata .partition_spec_by_id(spec_id) @@ -375,7 +375,7 @@ impl FileScanStreamContext { .build()?, ); - Ok((partition_spec, partition_schema)) + Ok((partition_spec.clone(), partition_schema)) } } From 89e510e0971e19fbd947d80be5ec7a081515ec53 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Tue, 30 Apr 2024 06:19:07 +0200 Subject: [PATCH 11/12] refactor: remove spec_id runtime check --- .../src/expr/visitors/manifest_evaluator.rs | 58 +------------------ crates/iceberg/src/scan.rs | 10 +--- 2 files changed, 6 insertions(+), 62 deletions(-) diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs index 0784aab1a6..bcb5967186 100644 --- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs @@ -18,7 +18,7 @@ use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; use crate::expr::{BoundPredicate, BoundReference}; use crate::spec::{Datum, FieldSummary, ManifestFile}; -use crate::{Error, ErrorKind, Result}; +use crate::Result; use fnv::FnvHashSet; #[derive(Debug)] @@ -28,19 +28,13 @@ use fnv::FnvHashSet; /// Used by [`TableScan`] to prune the list of [`ManifestFile`]s /// in which data might be found that matches the TableScan's filter. pub(crate) struct ManifestEvaluator { - partition_schema_id: i32, partition_filter: BoundPredicate, case_sensitive: bool, } impl ManifestEvaluator { - pub(crate) fn new( - partition_schema_id: i32, - partition_filter: BoundPredicate, - case_sensitive: bool, - ) -> Self { + pub(crate) fn new(partition_filter: BoundPredicate, case_sensitive: bool) -> Self { Self { - partition_schema_id, partition_filter, case_sensitive, } @@ -55,19 +49,6 @@ impl ManifestEvaluator { return Ok(true); } - // A [`ManifestEvaluator`] is created for a specific schema id - // based on the partition spec. This id should match the partition - // spec id of the [`ManifestFile`]. - if self.partition_schema_id != manifest_file.partition_spec_id { - return Err(Error::new( - ErrorKind::Unexpected, - format!( - "Partition ID for manifest file '{}' does not match partition ID for the Scan", - &manifest_file.manifest_path - ), - )); - } - let mut evaluator = ManifestFilterVisitor::new(self, &manifest_file.partitions); visit(&mut evaluator, &self.partition_filter) @@ -375,11 +356,7 @@ mod test { case_sensitive, )?; - Ok(ManifestEvaluator::new( - partition_schema.schema_id(), - partition_filter, - case_sensitive, - )) + Ok(ManifestEvaluator::new(partition_filter, case_sensitive)) } #[test] @@ -431,35 +408,6 @@ mod test { Ok(()) } - #[test] - fn test_manifest_file_partition_id_mismatch_returns_error() -> Result<()> { - let case_sensitive = true; - - let (schema, partition_spec) = create_schema_and_partition_spec_with_id_mismatch()?; - - let filter = Predicate::Unary(UnaryExpression::new( - PredicateOperator::IsNull, - Reference::new("a"), - )) - .bind(schema.clone(), case_sensitive)?; - - let manifest_file = create_manifest_file(vec![FieldSummary { - contains_null: true, - contains_nan: None, - lower_bound: None, - upper_bound: None, - }]); - - let manifest_evaluator = - create_manifest_evaluator(schema, partition_spec, &filter, case_sensitive)?; - - let result = manifest_evaluator.eval(&manifest_file); - - assert!(result.is_err()); - - Ok(()) - } - #[test] fn test_manifest_file_trivial_partition_rejected_filter() -> Result<()> { let case_sensitive = true; diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index cecf524570..866bb2d1e1 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -214,7 +214,6 @@ impl TableScan { )?; let manifest_evaluator = manifest_evaluator_cache.get( - partition_spec_id, partition_schema.schema_id(), partition_filter.clone(), context.case_sensitive, @@ -434,15 +433,12 @@ impl ManifestEvaluatorCache { fn get( &mut self, spec_id: i32, - partition_schema_id: i32, partition_filter: BoundPredicate, case_sensitive: bool, ) -> &mut ManifestEvaluator { - self.0.entry(spec_id).or_insert(ManifestEvaluator::new( - partition_schema_id, - partition_filter, - case_sensitive, - )) + self.0 + .entry(spec_id) + .or_insert(ManifestEvaluator::new(partition_filter, case_sensitive)) } } From b710e7333fe3149735bec4458b6ca0694e1bf186 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Tue, 30 Apr 2024 06:36:47 +0200 Subject: [PATCH 12/12] feat: add check for content_type data --- crates/iceberg/src/scan.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 866bb2d1e1..b842522e2f 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -23,8 +23,8 @@ use crate::expr::visitors::manifest_evaluator::ManifestEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; use crate::spec::{ - DataContentType, ManifestEntryRef, PartitionSpecRef, Schema, SchemaRef, SnapshotRef, - TableMetadataRef, + DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, PartitionSpecRef, Schema, + SchemaRef, SnapshotRef, TableMetadataRef, }; use crate::table::Table; use crate::{Error, ErrorKind, Result}; @@ -199,6 +199,10 @@ impl TableScan { .await?; for entry in manifest_list.entries() { + if !Self::content_type_is_data(entry) { + continue; + } + if let Some(filter) = context.bound_filter() { let partition_spec_id = entry.partition_spec_id; @@ -307,6 +311,14 @@ impl TableScan { arrow_reader_builder.build().read(self.plan_files().await?) } + + /// Checks whether the [`ManifestContentType`] is `Data` or not. + fn content_type_is_data(entry: &ManifestFile) -> bool { + if let ManifestContentType::Data = entry.content { + return true; + } + false + } } #[derive(Debug)]