diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index bdd0d0a56..76e6e7629 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -1328,16 +1328,11 @@ mod _serde { ) -> Result, Error> { let mut m = HashMap::with_capacity(v.len()); for entry in v { - let data_type = &schema - .field_by_id(entry.key) - .ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Can't find field id {} for upper/lower_bounds", entry.key), - ) - })? - .field_type; - m.insert(entry.key, Literal::try_from_bytes(&entry.value, data_type)?); + // We ignore the entry if the field is not found in the schema, due to schema evolution. + if let Some(field) = schema.field_by_id(entry.key) { + let data_type = &field.field_type; + m.insert(entry.key, Literal::try_from_bytes(&entry.value, data_type)?); + } } Ok(m) } @@ -1822,10 +1817,160 @@ mod tests { assert_eq!(entry.partitions[0].upper_bound, Some(Literal::string("x"))); } + #[tokio::test] + async fn test_parse_manifest_with_schema_evolution() { + let manifest = Manifest { + metadata: ManifestMetadata { + schema_id: 0, + schema: Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 2, + "v_int", + Type::Primitive(PrimitiveType::Int), + )), + ]) + .build() + .unwrap(), + partition_spec: PartitionSpec { + spec_id: 0, + fields: vec![], + }, + content: ManifestContentType::Data, + format_version: FormatVersion::V2, + }, + entries: vec![Arc::new(ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: None, + sequence_number: None, + file_sequence_number: None, + data_file: DataFile { + content: DataContentType::Data, + file_format: DataFileFormat::Parquet, + file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(), + partition: Struct::empty(), + record_count: 1, + file_size_in_bytes: 5442, + column_sizes: HashMap::from([ + (1, 61), + (2, 73), + (3, 61), + ]), + value_counts: HashMap::default(), + null_value_counts: HashMap::default(), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::from([ + (1, Literal::long(1)), + (2, Literal::int(2)), + (3, Literal::string("x")) + ]), + upper_bounds: HashMap::from([ + (1, Literal::long(1)), + (2, Literal::int(2)), + (3, Literal::string("x")) + ]), + key_metadata: vec![], + split_offsets: vec![4], + equality_ids: vec![], + sort_order_id: None, + }, + })], + }; + + let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]); + + let (avro_bytes, _) = write_manifest(&manifest, writer).await; + + // The parse should succeed. + let actual_manifest = Manifest::parse_avro(avro_bytes.as_slice()).unwrap(); + + // Compared with original manifest, the lower_bounds and upper_bounds no longer has data for field 3, and + // other parts should be same. + let expected_manifest = Manifest { + metadata: ManifestMetadata { + schema_id: 0, + schema: Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 2, + "v_int", + Type::Primitive(PrimitiveType::Int), + )), + ]) + .build() + .unwrap(), + partition_spec: PartitionSpec { + spec_id: 0, + fields: vec![], + }, + content: ManifestContentType::Data, + format_version: FormatVersion::V2, + }, + entries: vec![Arc::new(ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: None, + sequence_number: None, + file_sequence_number: None, + data_file: DataFile { + content: DataContentType::Data, + file_format: DataFileFormat::Parquet, + file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(), + partition: Struct::empty(), + record_count: 1, + file_size_in_bytes: 5442, + column_sizes: HashMap::from([ + (1, 61), + (2, 73), + (3, 61), + ]), + value_counts: HashMap::default(), + null_value_counts: HashMap::default(), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::from([ + (1, Literal::long(1)), + (2, Literal::int(2)), + ]), + upper_bounds: HashMap::from([ + (1, Literal::long(1)), + (2, Literal::int(2)), + ]), + key_metadata: vec![], + split_offsets: vec![4], + equality_ids: vec![], + sort_order_id: None, + }, + })], + }; + + assert_eq!(actual_manifest, expected_manifest); + } + async fn test_manifest_read_write( manifest: Manifest, writer_builder: impl FnOnce(OutputFile) -> ManifestWriter, ) -> ManifestListEntry { + let (bs, res) = write_manifest(&manifest, writer_builder).await; + let actual_manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); + + assert_eq!(actual_manifest, manifest); + res + } + + /// Utility method which writes out a manifest and returns the bytes. + async fn write_manifest( + manifest: &Manifest, + writer_builder: impl FnOnce(OutputFile) -> ManifestWriter, + ) -> (Vec, ManifestListEntry) { let temp_dir = TempDir::new().unwrap(); let path = temp_dir.path().join("test_manifest.avro"); let io = FileIOBuilder::new_fs_io().build().unwrap(); @@ -1834,10 +1979,6 @@ mod tests { let res = writer.write(manifest.clone()).await.unwrap(); // Verify manifest - let bs = fs::read(path).expect("read_file must succeed"); - let actual_manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); - - assert_eq!(actual_manifest, manifest); - res + (fs::read(path).expect("read_file must succeed"), res) } }