Skip to content

fix: Manifest parsing should consider schema evolution. #171

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 156 additions & 15 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1328,16 +1328,11 @@ mod _serde {
) -> Result<HashMap<i32, Literal>, Error> {
let mut m = HashMap::with_capacity(v.len());
for entry in v {
let data_type = &schema
.field_by_id(entry.key)
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Can't find field id {} for upper/lower_bounds", entry.key),
)
})?
.field_type;
m.insert(entry.key, Literal::try_from_bytes(&entry.value, data_type)?);
// We ignore the entry if the field is not found in the schema, due to schema evolution.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can happen if you drop a column indeed. This makes sense to me 👍

if let Some(field) = schema.field_by_id(entry.key) {
let data_type = &field.field_type;
m.insert(entry.key, Literal::try_from_bytes(&entry.value, data_type)?);
}
}
Ok(m)
}
Expand Down Expand Up @@ -1822,10 +1817,160 @@ mod tests {
assert_eq!(entry.partitions[0].upper_bound, Some(Literal::string("x")));
}

#[tokio::test]
async fn test_parse_manifest_with_schema_evolution() {
let manifest = Manifest {
metadata: ManifestMetadata {
schema_id: 0,
schema: Schema::builder()
.with_fields(vec![
Arc::new(NestedField::optional(
1,
"id",
Type::Primitive(PrimitiveType::Long),
)),
Arc::new(NestedField::optional(
2,
"v_int",
Type::Primitive(PrimitiveType::Int),
)),
])
.build()
.unwrap(),
partition_spec: PartitionSpec {
spec_id: 0,
fields: vec![],
},
content: ManifestContentType::Data,
format_version: FormatVersion::V2,
},
entries: vec![Arc::new(ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: None,
sequence_number: None,
file_sequence_number: None,
data_file: DataFile {
content: DataContentType::Data,
file_format: DataFileFormat::Parquet,
file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
partition: Struct::empty(),
record_count: 1,
file_size_in_bytes: 5442,
column_sizes: HashMap::from([
(1, 61),
(2, 73),
(3, 61),
]),
value_counts: HashMap::default(),
null_value_counts: HashMap::default(),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::from([
(1, Literal::long(1)),
(2, Literal::int(2)),
(3, Literal::string("x"))
]),
upper_bounds: HashMap::from([
(1, Literal::long(1)),
(2, Literal::int(2)),
(3, Literal::string("x"))
]),
key_metadata: vec![],
split_offsets: vec![4],
equality_ids: vec![],
sort_order_id: None,
},
})],
};

let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]);

let (avro_bytes, _) = write_manifest(&manifest, writer).await;

// The parse should succeed.
let actual_manifest = Manifest::parse_avro(avro_bytes.as_slice()).unwrap();

// Compared with original manifest, the lower_bounds and upper_bounds no longer has data for field 3, and
// other parts should be same.
let expected_manifest = Manifest {
metadata: ManifestMetadata {
schema_id: 0,
schema: Schema::builder()
.with_fields(vec![
Arc::new(NestedField::optional(
1,
"id",
Type::Primitive(PrimitiveType::Long),
)),
Arc::new(NestedField::optional(
2,
"v_int",
Type::Primitive(PrimitiveType::Int),
)),
])
.build()
.unwrap(),
partition_spec: PartitionSpec {
spec_id: 0,
fields: vec![],
},
content: ManifestContentType::Data,
format_version: FormatVersion::V2,
},
entries: vec![Arc::new(ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: None,
sequence_number: None,
file_sequence_number: None,
data_file: DataFile {
content: DataContentType::Data,
file_format: DataFileFormat::Parquet,
file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
partition: Struct::empty(),
record_count: 1,
file_size_in_bytes: 5442,
column_sizes: HashMap::from([
(1, 61),
(2, 73),
(3, 61),
]),
value_counts: HashMap::default(),
null_value_counts: HashMap::default(),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::from([
(1, Literal::long(1)),
(2, Literal::int(2)),
]),
upper_bounds: HashMap::from([
(1, Literal::long(1)),
(2, Literal::int(2)),
]),
key_metadata: vec![],
split_offsets: vec![4],
equality_ids: vec![],
sort_order_id: None,
},
})],
};

assert_eq!(actual_manifest, expected_manifest);
}

async fn test_manifest_read_write(
manifest: Manifest,
writer_builder: impl FnOnce(OutputFile) -> ManifestWriter,
) -> ManifestListEntry {
let (bs, res) = write_manifest(&manifest, writer_builder).await;
let actual_manifest = Manifest::parse_avro(bs.as_slice()).unwrap();

assert_eq!(actual_manifest, manifest);
res
}

/// Utility method which writes out a manifest and returns the bytes.
async fn write_manifest(
manifest: &Manifest,
writer_builder: impl FnOnce(OutputFile) -> ManifestWriter,
) -> (Vec<u8>, ManifestListEntry) {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().join("test_manifest.avro");
let io = FileIOBuilder::new_fs_io().build().unwrap();
Expand All @@ -1834,10 +1979,6 @@ mod tests {
let res = writer.write(manifest.clone()).await.unwrap();

// Verify manifest
let bs = fs::read(path).expect("read_file must succeed");
let actual_manifest = Manifest::parse_avro(bs.as_slice()).unwrap();

assert_eq!(actual_manifest, manifest);
res
(fs::read(path).expect("read_file must succeed"), res)
}
}