Skip to content

Commit 51cbbef

Browse files
committed
feat: add ManifestEvaluator
1 parent ca9de89 commit 51cbbef

File tree

4 files changed

+334
-1
lines changed

4 files changed

+334
-1
lines changed

crates/iceberg/src/expr/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::fmt::{Display, Formatter};
2424
pub use term::*;
2525
pub(crate) mod accessor;
2626
mod predicate;
27+
pub(crate) mod visitors;
2728

2829
use crate::spec::SchemaRef;
2930
pub use predicate::*;
Lines changed: 314 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,314 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor, OpLiteral};
19+
use crate::expr::visitors::inclusive_projection::InclusiveProjection;
20+
use crate::expr::{Bind, BoundPredicate, BoundReference, PredicateOperator};
21+
use crate::spec::{FieldSummary, ManifestFile, PartitionSpecRef, Schema, SchemaRef};
22+
use std::sync::Arc;
23+
24+
pub(crate) struct ManifestEvaluator {
25+
partition_schema: SchemaRef,
26+
partition_filter: BoundPredicate,
27+
case_sensitive: bool,
28+
}
29+
30+
impl ManifestEvaluator {
31+
pub(crate) fn new(
32+
partition_spec: PartitionSpecRef,
33+
table_schema: SchemaRef,
34+
partition_filter: BoundPredicate,
35+
case_sensitive: bool,
36+
) -> crate::Result<Self> {
37+
let partition_type = partition_spec.partition_type(&table_schema)?;
38+
39+
// this is needed as SchemaBuilder.with_fields expects an iterator over
40+
// Arc<NestedField> rather than &Arc<NestedField>
41+
let cloned_partition_fields: Vec<_> =
42+
partition_type.fields().iter().map(Arc::clone).collect();
43+
44+
let partition_schema = Schema::builder()
45+
.with_fields(cloned_partition_fields)
46+
.build()?;
47+
48+
let partition_schema_ref = Arc::new(partition_schema);
49+
50+
let mut inclusive_projection = InclusiveProjection::new(partition_spec.clone());
51+
let unbound_partition_filter = inclusive_projection.project(&partition_filter)?;
52+
53+
let partition_filter =
54+
unbound_partition_filter.bind(partition_schema_ref.clone(), case_sensitive)?;
55+
56+
Ok(Self {
57+
partition_schema: partition_schema_ref,
58+
partition_filter,
59+
case_sensitive,
60+
})
61+
}
62+
63+
pub(crate) fn eval(&self, manifest_file: &ManifestFile) -> crate::Result<bool> {
64+
if manifest_file.partitions.is_empty() {
65+
return Ok(true);
66+
}
67+
68+
struct ManifestFilterVisitor<'a> {
69+
manifest_evaluator: &'a ManifestEvaluator,
70+
partitions: &'a Vec<FieldSummary>,
71+
}
72+
73+
impl<'a> ManifestFilterVisitor<'a> {
74+
fn new(
75+
manifest_evaluator: &'a ManifestEvaluator,
76+
partitions: &'a Vec<FieldSummary>,
77+
) -> Self {
78+
ManifestFilterVisitor {
79+
manifest_evaluator,
80+
partitions,
81+
}
82+
}
83+
}
84+
85+
// Remove this annotation once all todos have been removed
86+
#[allow(unused_variables)]
87+
impl BoundPredicateVisitor for ManifestFilterVisitor<'_> {
88+
type T = bool;
89+
90+
fn always_true(&mut self) -> crate::Result<Self::T> {
91+
Ok(true)
92+
}
93+
94+
fn always_false(&mut self) -> crate::Result<Self::T> {
95+
Ok(false)
96+
}
97+
98+
fn and(&mut self, lhs: Self::T, rhs: Self::T) -> crate::Result<Self::T> {
99+
Ok(lhs && rhs)
100+
}
101+
102+
fn or(&mut self, lhs: Self::T, rhs: Self::T) -> crate::Result<Self::T> {
103+
Ok(lhs || rhs)
104+
}
105+
106+
fn not(&mut self, inner: Self::T) -> crate::Result<Self::T> {
107+
Ok(!inner)
108+
}
109+
110+
fn op(
111+
&mut self,
112+
op: PredicateOperator,
113+
reference: &BoundReference,
114+
literal: Option<OpLiteral>,
115+
predicate: &BoundPredicate,
116+
) -> crate::Result<Self::T> {
117+
Ok(match op {
118+
PredicateOperator::IsNull => {
119+
self.field_summary_for_reference(reference).contains_null
120+
}
121+
PredicateOperator::NotNull => {
122+
todo!()
123+
}
124+
PredicateOperator::IsNan => self
125+
.field_summary_for_reference(reference)
126+
.contains_nan
127+
.is_some(),
128+
PredicateOperator::NotNan => {
129+
todo!()
130+
}
131+
PredicateOperator::LessThan => {
132+
todo!()
133+
}
134+
PredicateOperator::LessThanOrEq => {
135+
todo!()
136+
}
137+
PredicateOperator::GreaterThan => {
138+
todo!()
139+
}
140+
PredicateOperator::GreaterThanOrEq => {
141+
todo!()
142+
}
143+
PredicateOperator::Eq => {
144+
todo!()
145+
}
146+
PredicateOperator::NotEq => {
147+
todo!()
148+
}
149+
PredicateOperator::StartsWith => {
150+
todo!()
151+
}
152+
PredicateOperator::NotStartsWith => {
153+
todo!()
154+
}
155+
PredicateOperator::In => {
156+
todo!()
157+
}
158+
PredicateOperator::NotIn => {
159+
todo!()
160+
}
161+
})
162+
}
163+
}
164+
165+
impl ManifestFilterVisitor<'_> {
166+
fn field_summary_for_reference(&self, reference: &BoundReference) -> &FieldSummary {
167+
let pos = reference.accessor().position();
168+
&self.partitions[pos]
169+
}
170+
}
171+
172+
let mut evaluator = ManifestFilterVisitor::new(self, &manifest_file.partitions);
173+
174+
visit(&mut evaluator, &self.partition_filter)
175+
}
176+
}
177+
178+
#[cfg(test)]
179+
mod test {
180+
use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
181+
use crate::expr::{Bind, Predicate, PredicateOperator, Reference, UnaryExpression};
182+
use crate::spec::{
183+
FieldSummary, ManifestContentType, ManifestFile, NestedField, PartitionField,
184+
PartitionSpec, PrimitiveType, Schema, Transform, Type,
185+
};
186+
use std::sync::Arc;
187+
188+
#[test]
189+
fn test_manifest_file_no_partitions() {
190+
let (table_schema_ref, partition_spec_ref) = create_test_schema_and_partition_spec();
191+
192+
let partition_filter = Predicate::AlwaysTrue
193+
.bind(table_schema_ref.clone(), false)
194+
.unwrap();
195+
196+
let case_sensitive = false;
197+
198+
let manifest_file_partitions = vec![];
199+
let manifest_file = create_test_manifest_file(manifest_file_partitions);
200+
201+
let manifest_evaluator = ManifestEvaluator::new(
202+
partition_spec_ref,
203+
table_schema_ref,
204+
partition_filter,
205+
case_sensitive,
206+
)
207+
.unwrap();
208+
209+
let result = manifest_evaluator.eval(&manifest_file).unwrap();
210+
211+
assert!(result);
212+
}
213+
214+
#[test]
215+
fn test_manifest_file_trivial_partition_passing_filter() {
216+
let (table_schema_ref, partition_spec_ref) = create_test_schema_and_partition_spec();
217+
218+
let partition_filter = Predicate::Unary(UnaryExpression::new(
219+
PredicateOperator::IsNull,
220+
Reference::new("a"),
221+
))
222+
.bind(table_schema_ref.clone(), true)
223+
.unwrap();
224+
225+
let manifest_file_partitions = vec![FieldSummary {
226+
contains_null: true,
227+
contains_nan: None,
228+
lower_bound: None,
229+
upper_bound: None,
230+
}];
231+
let manifest_file = create_test_manifest_file(manifest_file_partitions);
232+
233+
let manifest_evaluator =
234+
ManifestEvaluator::new(partition_spec_ref, table_schema_ref, partition_filter, true)
235+
.unwrap();
236+
237+
let result = manifest_evaluator.eval(&manifest_file).unwrap();
238+
239+
assert!(result);
240+
}
241+
242+
#[test]
243+
fn test_manifest_file_trivial_partition_rejected_filter() {
244+
let (table_schema_ref, partition_spec_ref) = create_test_schema_and_partition_spec();
245+
246+
let partition_filter = Predicate::Unary(UnaryExpression::new(
247+
PredicateOperator::IsNan,
248+
Reference::new("a"),
249+
))
250+
.bind(table_schema_ref.clone(), true)
251+
.unwrap();
252+
253+
let manifest_file_partitions = vec![FieldSummary {
254+
contains_null: false,
255+
contains_nan: None,
256+
lower_bound: None,
257+
upper_bound: None,
258+
}];
259+
let manifest_file = create_test_manifest_file(manifest_file_partitions);
260+
261+
let manifest_evaluator =
262+
ManifestEvaluator::new(partition_spec_ref, table_schema_ref, partition_filter, true)
263+
.unwrap();
264+
265+
let result = manifest_evaluator.eval(&manifest_file).unwrap();
266+
267+
assert!(!result);
268+
}
269+
270+
fn create_test_schema_and_partition_spec() -> (Arc<Schema>, Arc<PartitionSpec>) {
271+
let table_schema = Schema::builder()
272+
.with_fields(vec![Arc::new(NestedField::optional(
273+
1,
274+
"a",
275+
Type::Primitive(PrimitiveType::Float),
276+
))])
277+
.build()
278+
.unwrap();
279+
let table_schema_ref = Arc::new(table_schema);
280+
281+
let partition_spec = PartitionSpec::builder()
282+
.with_spec_id(1)
283+
.with_fields(vec![PartitionField::builder()
284+
.source_id(1)
285+
.name("a".to_string())
286+
.field_id(1)
287+
.transform(Transform::Identity)
288+
.build()])
289+
.build()
290+
.unwrap();
291+
let partition_spec_ref = Arc::new(partition_spec);
292+
(table_schema_ref, partition_spec_ref)
293+
}
294+
295+
fn create_test_manifest_file(manifest_file_partitions: Vec<FieldSummary>) -> ManifestFile {
296+
ManifestFile {
297+
manifest_path: "/test/path".to_string(),
298+
manifest_length: 0,
299+
partition_spec_id: 1,
300+
content: ManifestContentType::Data,
301+
sequence_number: 0,
302+
min_sequence_number: 0,
303+
added_snapshot_id: 0,
304+
added_data_files_count: None,
305+
existing_data_files_count: None,
306+
deleted_data_files_count: None,
307+
added_rows_count: None,
308+
existing_rows_count: None,
309+
deleted_rows_count: None,
310+
partitions: manifest_file_partitions,
311+
key_metadata: vec![],
312+
}
313+
}
314+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
pub(crate) mod manifest_evaluator;

crates/iceberg/src/transform/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ mod test {
147147
}
148148
}
149149

150-
/// A utitily struct, test fixture
150+
/// A utility struct, test fixture
151151
/// used for testing the transform on `Transform`
152152
pub(crate) struct TestTransformFixture {
153153
pub display: String,

0 commit comments

Comments
 (0)