Skip to content

Compatibility issues with org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.5.0 #338

@zeodtr

Description

@zeodtr

Hi,
I've been developing a query engine that uses iceberg-rust crate.
Upon checking Iceberg compatibility with org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.4.3, I didn't encounter any issues, at least not with my engine. However, when testing with org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.5.0, I did come across a few issues. I managed to address them, either through fixes or workarounds. Here's a summary of the issues encountered and the solutions applied:

Issue 1.

In the following scenario,

  1. CREATE TABLE with my engine - success
  2. INSERT INTO with my engine - success
  3. DELETE FROM with Spark - fails with the following message:
java.lang.IllegalArgumentException: Not a list type: map<int, long>
        at org.apache.iceberg.types.Type.asListType(Type.java:75)
        at org.apache.iceberg.avro.AvroWithPartnerVisitor$FieldIDAccessors.listElementPartner(AvroWithPartnerVisitor.java:65)
        at org.apache.iceberg.avro.AvroWithPartnerVisitor$FieldIDAccessors.listElementPartner(AvroWithPartnerVisitor.java:40)
        at org.apache.iceberg.avro.AvroWithPartnerVisitor.visitArray(AvroWithPartnerVisitor.java:205)

The reason behind this is that iceberg-rust doesn't include "logicalType": "map" in the Avro schema for Iceberg maps with non-string keys, which are represented as Avro arrays.

To address this, I've applied the not-yet-official apache_avro 0.17 from GitHub and adjusted the iceberg-rust code to align with the changed Avro Rust API. (BTW, the API change was done by an iceberg-rust developer maybe to fix this kind of issue). Then add the logical type to the schema.

Issue 2.

In the following scenario,

  1. CREATE TABLE with my engine - success
  2. INSERT INTO with my engine - success
  3. INSERT INTO with Spark - fails with the following message:
org.apache.iceberg.shaded.org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException
        at org.apache.iceberg.shaded.org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:317)
        at org.apache.iceberg.avro.AvroFileAppender.add(AvroFileAppender.java:66)
        at org.apache.iceberg.ManifestListWriter.add(ManifestListWriter.java:45)
        at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)

Once I applied version apache_avro 0.17 and started writing field-id to the Avro schema, this issue was resolved.

Issue 3.

In the following scenario,

  1. CREATE TABLE with my engine - success
  2. INSERT INTO with Spark - success
  3. INSERT INTO with my engine - fails with the following message:
      DataInvalid => Failure in conversion with avro, source: Missing field in record: \"added_data_files_count\"

This error is related to the Iceberg issue apache/iceberg#8684 and iceberg-rust's inability to read an Avro data using field-id, instead relying on field names.

In the aforementioned Iceberg issue, an Iceberg Java developer discovered inconsistencies between the Java source code and specifications regarding the field names of the manifest_file struct. Subsequently, the source code was modified to align with the specifications. As a result, Iceberg Java's Avro writers started using different (correct) field names. This adjustment didn't affect Iceberg Java, as it reads the Avro data using field-id rather than the field name. However, iceberg-rust reads the Avro schema using the field name, causing the current issue.

To address this, I examined the iceberg-rust and Avro rust codes. However, implementing the functionality to read the Avro data using field-id seemed to require a fair amount of time (at least for me). As a temporary solution, I applied an ad hoc workaround in manifest_list.rs, after replacing all the incorrect field names in the code.

const WRONG_FIELD_NAME_MAPS: &[(&str, &str)] = &[
    ("added_data_files_count", "added_files_count"),
    ("existing_data_files_count", "existing_files_count"),
    ("deleted_data_files_count", "deleted_files_count"),
];

impl ManifestList {
    /// Parse manifest list from bytes.
    pub fn parse_with_version(
        bs: &[u8],
        version: FormatVersion,
        partition_type: &StructType,
    ) -> Result<ManifestList, Error> {
        // We cannot use avro's schema resolution, so use Reader::new() instead of Reader::with_schema(),
        // and let from_value() check the 'schema' correctness.
        let reader = Reader::new(bs)?;
        let wrong_field_name_found = match reader.writer_schema() {
            Schema::Record(record_schema) => record_schema
                .lookup
                .contains_key(WRONG_FIELD_NAME_MAPS[0].0),
            _ => false,
        };
        let values = reader.collect::<Result<Vec<Value>, _>>()?;
        let values = if wrong_field_name_found {
            values
                .into_iter()
                .map(|value| match value {
                    Value::Record(fields) => {
                        let new_fields = fields
                            .into_iter()
                            .map(|field| {
                                for map in WRONG_FIELD_NAME_MAPS {
                                    if map.0 == field.0 {
                                        return (map.1.to_string(), field.1);
                                    }
                                }
                                field
                            })
                            .collect::<Vec<_>>();
                        Value::Record(new_fields)
                    }
                    _ => value,
                })
                .collect::<Vec<_>>()
        } else {
            values
        };
        let values = Value::Array(values);
        match version {
            FormatVersion::V1 => {
                from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_type)
            }
            FormatVersion::V2 => {
                from_value::<_serde::ManifestListV2>(&values)?.try_into(partition_type)
            }
        }
    }

    /// Get the entries in the manifest list.
    pub fn entries(&self) -> &[ManifestListEntry] {
        &self.entries
    }
}

It essentially replaces 'wrong' field names with the correct ones. However, I perceive this as more of a workaround than a solution. Nonetheless, it serves its purpose for the time being. It would be nice if a more fundamental solution could be implemented in the future, such as reading the Avro data using field-id.

Thank you.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions