Skip to content

feat: Infer partition values from bounds #1079

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 8, 2025
55 changes: 52 additions & 3 deletions crates/iceberg/src/writer/file_writer/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ use crate::arrow::{
};
use crate::io::{FileIO, FileWrite, OutputFile};
use crate::spec::{
visit_schema, DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, MapType,
NestedFieldRef, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct, StructType,
TableMetadata, Type,
visit_schema, DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal,
MapType, NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor,
Struct, StructType, TableMetadata, Type,
};
use crate::transform::create_transform_function;
use crate::writer::{CurrentFileStatus, DataFile};
use crate::{Error, ErrorKind, Result};

Expand Down Expand Up @@ -458,6 +459,54 @@ impl ParquetWriter {

Ok(builder)
}

#[allow(dead_code)]
fn partition_value_from_bounds(
table_spec: Arc<PartitionSpec>,
lower_bounds: &HashMap<i32, Datum>,
upper_bounds: &HashMap<i32, Datum>,
) -> Result<Struct> {
let mut partition_literals: Vec<Option<Literal>> = Vec::new();

for field in table_spec.fields() {
if let (Some(lower), Some(upper)) = (
lower_bounds.get(&field.source_id),
upper_bounds.get(&field.source_id),
) {
if !field.transform.preserves_order() {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"cannot infer partition value for non linear partition field (needs to preserve order): {} with transform {}",
field.name, field.transform
),
));
}

if lower != upper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CMIIW, it looks like the possible lower upper can be different, and their partitions are the same. So we need to check transform(lower) != transform(upper)? iceberg-python has the same logic, should we fix it? cc @kevinjqliu @Fokko

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, transform(lower) == transform(upper) doesn't mean the transformed result of each row are all same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, transform(lower) == transform(upper) doesn't mean the transformed result of each row are all same.

This is interesting. The check here restricts the appended data file to have the same value for partition column. But in spec, the data file only needs to guarantee that the partition value of partition column within single data file is same. e.g. for year(ts), 2015-10-13, 2015-11-13 is ok to exist in single data file I think. But under this restriction, we could not append data file containing these two row, right?
I'm not sure whether worth it, I think there are two ways to avoid this restriction:

  1. Scan whole data file to compute the partition and make sure they are same.
  2. For partition transform, preserve original order properties(I'm not sure whether this description is accurate, e.g year, month), transform(lower) == transform(upper) means the transformed result of each row are all same?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that if the transform preserves order, we can relax the check.

return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"multiple partition values for field {}: lower: {:?}, upper: {:?}",
field.name, lower, upper
),
));
}

let transform_fn = create_transform_function(&field.transform)?;
let transform_literal =
Literal::from(transform_fn.transform_literal_result(lower)?);

partition_literals.push(Some(transform_literal));
} else {
partition_literals.push(None);
}
}

let partition_struct = Struct::from_iter(partition_literals);

Ok(partition_struct)
}
}

impl FileWriter for ParquetWriter {
Expand Down
Loading