diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index 1813772217..95cbe16ecb 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -37,6 +37,7 @@ Field, PlainSerializer, WithJsonSchema, + model_validator, ) from typing_extensions import Annotated @@ -111,6 +112,19 @@ def __init__( super().__init__(**data) + @model_validator(mode="before") + @classmethod + def map_source_ids_onto_source_id(cls, data: Any) -> Any: + if isinstance(data, dict): + if "source-id" not in data and (source_ids := data["source-ids"]): + if isinstance(source_ids, list): + if len(source_ids) == 0: + raise ValueError("Empty source-ids is not allowed") + if len(source_ids) > 1: + raise ValueError("Multi argument transforms are not yet supported") + data["source-id"] = source_ids[0] + return data + def __str__(self) -> str: """Return the string representation of the PartitionField class.""" return f"{self.field_id}: {self.name}: {self.transform}({self.source_id})" diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index ef1a324c45..29067838e5 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -459,9 +459,8 @@ def to_v2(self) -> TableMetadataV2: return TableMetadataV2.model_validate(metadata) format_version: Literal[1] = Field(alias="format-version", default=1) - """An integer version number for the format. Currently, this can be 1 or 2 - based on the spec. Implementations must throw an exception if a table’s - version is higher than the supported version.""" + """An integer version number for the format. Implementations must throw + an exception if a table’s version is higher than the supported version.""" schema_: Schema = Field(alias="schema") """The table’s current schema. (Deprecated: use schemas and @@ -507,16 +506,74 @@ def construct_refs(cls, table_metadata: TableMetadata) -> TableMetadata: return construct_refs(table_metadata) format_version: Literal[2] = Field(alias="format-version", default=2) - """An integer version number for the format. Currently, this can be 1 or 2 - based on the spec. Implementations must throw an exception if a table’s - version is higher than the supported version.""" + """An integer version number for the format. Implementations must throw + an exception if a table’s version is higher than the supported version.""" last_sequence_number: int = Field(alias="last-sequence-number", default=INITIAL_SEQUENCE_NUMBER) """The table’s highest assigned sequence number, a monotonically increasing long that tracks the order of snapshots in a table.""" -TableMetadata = Annotated[Union[TableMetadataV1, TableMetadataV2], Field(discriminator="format_version")] +class TableMetadataV3(TableMetadataCommonFields, IcebergBaseModel): + """Represents version 3 of the Table Metadata. + + Version 3 of the Iceberg spec extends data types and existing metadata structures to add new capabilities: + + - New data types: nanosecond timestamp(tz), unknown + - Default value support for columns + - Multi-argument transforms for partitioning and sorting + - Row Lineage tracking + - Binary deletion vectors + + For more information: + https://iceberg.apache.org/spec/?column-projection#version-3-extended-types-and-capabilities + """ + + @model_validator(mode="before") + def cleanup_snapshot_id(cls, data: Dict[str, Any]) -> Dict[str, Any]: + return cleanup_snapshot_id(data) + + @model_validator(mode="after") + def check_schemas(cls, table_metadata: TableMetadata) -> TableMetadata: + return check_schemas(table_metadata) + + @model_validator(mode="after") + def check_partition_specs(cls, table_metadata: TableMetadata) -> TableMetadata: + return check_partition_specs(table_metadata) + + @model_validator(mode="after") + def check_sort_orders(cls, table_metadata: TableMetadata) -> TableMetadata: + return check_sort_orders(table_metadata) + + @model_validator(mode="after") + def construct_refs(cls, table_metadata: TableMetadata) -> TableMetadata: + return construct_refs(table_metadata) + + format_version: Literal[3] = Field(alias="format-version", default=3) + """An integer version number for the format. Implementations must throw + an exception if a table’s version is higher than the supported version.""" + + last_sequence_number: int = Field(alias="last-sequence-number", default=INITIAL_SEQUENCE_NUMBER) + """The table’s highest assigned sequence number, a monotonically + increasing long that tracks the order of snapshots in a table.""" + + row_lineage: bool = Field(alias="row-lineage", default=False) + """Indicates that row-lineage is enabled on the table + + For more information: + https://iceberg.apache.org/spec/?column-projection#row-lineage + """ + + next_row_id: Optional[int] = Field(alias="next-row-id", default=None) + """A long higher than all assigned row IDs; the next snapshot's `first-row-id`.""" + + def model_dump_json( + self, exclude_none: bool = True, exclude: Optional[Any] = None, by_alias: bool = True, **kwargs: Any + ) -> str: + raise NotImplementedError("Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551") + + +TableMetadata = Annotated[Union[TableMetadataV1, TableMetadataV2, TableMetadataV3], Field(discriminator="format_version")] def new_table_metadata( @@ -553,20 +610,36 @@ def new_table_metadata( last_partition_id=fresh_partition_spec.last_assigned_field_id, table_uuid=table_uuid, ) - - return TableMetadataV2( - location=location, - schemas=[fresh_schema], - last_column_id=fresh_schema.highest_field_id, - current_schema_id=fresh_schema.schema_id, - partition_specs=[fresh_partition_spec], - default_spec_id=fresh_partition_spec.spec_id, - sort_orders=[fresh_sort_order], - default_sort_order_id=fresh_sort_order.order_id, - properties=properties, - last_partition_id=fresh_partition_spec.last_assigned_field_id, - table_uuid=table_uuid, - ) + elif format_version == 2: + return TableMetadataV2( + location=location, + schemas=[fresh_schema], + last_column_id=fresh_schema.highest_field_id, + current_schema_id=fresh_schema.schema_id, + partition_specs=[fresh_partition_spec], + default_spec_id=fresh_partition_spec.spec_id, + sort_orders=[fresh_sort_order], + default_sort_order_id=fresh_sort_order.order_id, + properties=properties, + last_partition_id=fresh_partition_spec.last_assigned_field_id, + table_uuid=table_uuid, + ) + elif format_version == 3: + return TableMetadataV3( + location=location, + schemas=[fresh_schema], + last_column_id=fresh_schema.highest_field_id, + current_schema_id=fresh_schema.schema_id, + partition_specs=[fresh_partition_spec], + default_spec_id=fresh_partition_spec.spec_id, + sort_orders=[fresh_sort_order], + default_sort_order_id=fresh_sort_order.order_id, + properties=properties, + last_partition_id=fresh_partition_spec.last_assigned_field_id, + table_uuid=table_uuid, + ) + else: + raise ValidationError(f"Unknown format version: {format_version}") class TableMetadataWrapper(IcebergRootModel[TableMetadata]): @@ -593,6 +666,8 @@ def parse_obj(data: Dict[str, Any]) -> TableMetadata: return TableMetadataV1(**data) elif format_version == 2: return TableMetadataV2(**data) + elif format_version == 3: + return TableMetadataV3(**data) else: raise ValidationError(f"Unknown format version: {format_version}") @@ -609,6 +684,8 @@ def _construct_without_validation(table_metadata: TableMetadata) -> TableMetadat return TableMetadataV1.model_construct(**dict(table_metadata)) elif table_metadata.format_version == 2: return TableMetadataV2.model_construct(**dict(table_metadata)) + elif table_metadata.format_version == 3: + return TableMetadataV3.model_construct(**dict(table_metadata)) else: raise ValidationError(f"Unknown format version: {table_metadata.format_version}") diff --git a/pyiceberg/table/sorting.py b/pyiceberg/table/sorting.py index 64d56f0e63..e7c409fcff 100644 --- a/pyiceberg/table/sorting.py +++ b/pyiceberg/table/sorting.py @@ -102,6 +102,19 @@ def set_null_order(cls, values: Dict[str, Any]) -> Dict[str, Any]: values["null-order"] = NullOrder.NULLS_FIRST if values["direction"] == SortDirection.ASC else NullOrder.NULLS_LAST return values + @model_validator(mode="before") + @classmethod + def map_source_ids_onto_source_id(cls, data: Any) -> Any: + if isinstance(data, dict): + if "source-id" not in data and (source_ids := data["source-ids"]): + if isinstance(source_ids, list): + if len(source_ids) == 0: + raise ValueError("Empty source-ids is not allowed") + if len(source_ids) > 1: + raise ValueError("Multi argument transforms are not yet supported") + data["source-id"] = source_ids[0] + return data + source_id: int = Field(alias="source-id") transform: Annotated[ # type: ignore Transform, diff --git a/pyiceberg/typedef.py b/pyiceberg/typedef.py index 01b8bea58c..e3fc312801 100644 --- a/pyiceberg/typedef.py +++ b/pyiceberg/typedef.py @@ -206,4 +206,4 @@ def __hash__(self) -> int: return hash(str(self)) -TableVersion: TypeAlias = Literal[1, 2] +TableVersion: TypeAlias = Literal[1, 2, 3] diff --git a/tests/conftest.py b/tests/conftest.py index c8dde01563..cfd9796312 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -902,6 +902,72 @@ def generate_snapshot( "refs": {"test": {"snapshot-id": 3051729675574597004, "type": "tag", "max-ref-age-ms": 10000000}}, } +EXAMPLE_TABLE_METADATA_V3 = { + "format-version": 3, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 1, + "schemas": [ + {"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": True, "type": "long"}]}, + { + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [1, 2], + "fields": [ + {"id": 1, "name": "x", "required": True, "type": "long"}, + {"id": 2, "name": "y", "required": True, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": True, "type": "long"}, + # TODO: Add unknown, timestamp(tz)_ns + # {"id": 4, "name": "u", "required": True, "type": "unknown"}, + # {"id": 5, "name": "ns", "required": True, "type": "timestamp_ns"}, + # {"id": 6, "name": "nstz", "required": True, "type": "timestamptz_ns"}, + ], + }, + ], + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-ids": [1], "field-id": 1000}]}], + "last-partition-id": 1000, + "default-sort-order-id": 3, + "sort-orders": [ + { + "order-id": 3, + "fields": [ + {"transform": "identity", "source-ids": [2], "direction": "asc", "null-order": "nulls-first"}, + {"transform": "bucket[4]", "source-ids": [3], "direction": "desc", "null-order": "nulls-last"}, + ], + } + ], + "properties": {"read.split.target.size": "134217728"}, + "current-snapshot-id": 3055729675574597004, + "snapshots": [ + { + "snapshot-id": 3051729675574597004, + "timestamp-ms": 1515100955770, + "sequence-number": 0, + "summary": {"operation": "append"}, + "manifest-list": "s3://a/b/1.avro", + }, + { + "snapshot-id": 3055729675574597004, + "parent-snapshot-id": 3051729675574597004, + "timestamp-ms": 1555100955770, + "sequence-number": 1, + "summary": {"operation": "append"}, + "manifest-list": "s3://a/b/2.avro", + "schema-id": 1, + }, + ], + "snapshot-log": [ + {"snapshot-id": 3051729675574597004, "timestamp-ms": 1515100955770}, + {"snapshot-id": 3055729675574597004, "timestamp-ms": 1555100955770}, + ], + "metadata-log": [{"metadata-file": "s3://bucket/.../v1.json", "timestamp-ms": 1515100}], + "refs": {"test": {"snapshot-id": 3051729675574597004, "type": "tag", "max-ref-age-ms": 10000000}}, +} + TABLE_METADATA_V2_WITH_FIXED_AND_DECIMAL_TYPES = { "format-version": 2, "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", @@ -1052,6 +1118,11 @@ def table_metadata_v2_with_statistics() -> Dict[str, Any]: return TABLE_METADATA_V2_WITH_STATISTICS +@pytest.fixture +def example_table_metadata_v3() -> Dict[str, Any]: + return EXAMPLE_TABLE_METADATA_V3 + + @pytest.fixture(scope="session") def metadata_location(tmp_path_factory: pytest.TempPathFactory) -> str: from pyiceberg.io.pyarrow import PyArrowFileIO diff --git a/tests/table/test_metadata.py b/tests/table/test_metadata.py index 6423531304..d2ee5c3130 100644 --- a/tests/table/test_metadata.py +++ b/tests/table/test_metadata.py @@ -33,6 +33,7 @@ TableMetadataUtil, TableMetadataV1, TableMetadataV2, + TableMetadataV3, new_table_metadata, ) from pyiceberg.table.refs import SnapshotRef, SnapshotRefType @@ -178,6 +179,15 @@ def test_serialize_v2(example_table_metadata_v2: Dict[str, Any]) -> None: assert table_metadata == expected +def test_serialize_v3(example_table_metadata_v3: Dict[str, Any]) -> None: + # Writing will be part of https://github.com/apache/iceberg-python/issues/1551 + + with pytest.raises(NotImplementedError) as exc_info: + _ = TableMetadataV3(**example_table_metadata_v3).model_dump_json() + + assert "Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551" in str(exc_info.value) + + def test_migrate_v1_schemas(example_table_metadata_v1: Dict[str, Any]) -> None: table_metadata = TableMetadataV1(**example_table_metadata_v1) diff --git a/tests/table/test_partitioning.py b/tests/table/test_partitioning.py index 127d57a798..55a2ffdb21 100644 --- a/tests/table/test_partitioning.py +++ b/tests/table/test_partitioning.py @@ -151,3 +151,17 @@ def test_partition_type(table_schema_simple: Schema) -> None: NestedField(field_id=1000, name="str_truncate", field_type=StringType(), required=False), NestedField(field_id=1001, name="int_bucket", field_type=IntegerType(), required=True), ) + + +def test_deserialize_partition_field_v2() -> None: + json_partition_spec = """{"source-id": 1, "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}""" + + field = PartitionField.model_validate_json(json_partition_spec) + assert field == PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate") + + +def test_deserialize_partition_field_v3() -> None: + json_partition_spec = """{"source-ids": [1], "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}""" + + field = PartitionField.model_validate_json(json_partition_spec) + assert field == PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate") diff --git a/tests/table/test_sorting.py b/tests/table/test_sorting.py index 977ff9d5d8..3efda56509 100644 --- a/tests/table/test_sorting.py +++ b/tests/table/test_sorting.py @@ -102,3 +102,15 @@ def test_unsorting_to_repr() -> None: def test_sorting_repr(sort_order: SortOrder) -> None: """To make sure that the repr converts back to the original object""" assert sort_order == eval(repr(sort_order)) + + +def test_serialize_sort_field_v2() -> None: + expected = SortField(source_id=19, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST) + payload = '{"source-id":19,"transform":"identity","direction":"asc","null-order":"nulls-first"}' + assert SortField.model_validate_json(payload) == expected + + +def test_serialize_sort_field_v3() -> None: + expected = SortField(source_id=19, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST) + payload = '{"source-ids":[19],"transform":"identity","direction":"asc","null-order":"nulls-first"}' + assert SortField.model_validate_json(payload) == expected