Skip to content

Add V3 read support #1554

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pyiceberg/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
Field,
PlainSerializer,
WithJsonSchema,
model_validator,
)
from typing_extensions import Annotated

Expand Down Expand Up @@ -111,6 +112,19 @@ def __init__(

super().__init__(**data)

@model_validator(mode="before")
@classmethod
def map_source_ids_onto_source_id(cls, data: Any) -> Any:
if isinstance(data, dict):
if "source-id" not in data and (source_ids := data["source-ids"]):
if isinstance(source_ids, list):
if len(source_ids) == 0:
raise ValueError("Empty source-ids is not allowed")
if len(source_ids) > 1:
raise ValueError("Multi argument transforms are not yet supported")
data["source-id"] = source_ids[0]
return data

def __str__(self) -> str:
"""Return the string representation of the PartitionField class."""
return f"{self.field_id}: {self.name}: {self.transform}({self.source_id})"
Expand Down
119 changes: 98 additions & 21 deletions pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,8 @@ def to_v2(self) -> TableMetadataV2:
return TableMetadataV2.model_validate(metadata)

format_version: Literal[1] = Field(alias="format-version", default=1)
"""An integer version number for the format. Currently, this can be 1 or 2
based on the spec. Implementations must throw an exception if a table’s
version is higher than the supported version."""
"""An integer version number for the format. Implementations must throw
an exception if a table’s version is higher than the supported version."""

schema_: Schema = Field(alias="schema")
"""The table’s current schema. (Deprecated: use schemas and
Expand Down Expand Up @@ -507,16 +506,74 @@ def construct_refs(cls, table_metadata: TableMetadata) -> TableMetadata:
return construct_refs(table_metadata)

format_version: Literal[2] = Field(alias="format-version", default=2)
"""An integer version number for the format. Currently, this can be 1 or 2
based on the spec. Implementations must throw an exception if a table’s
version is higher than the supported version."""
"""An integer version number for the format. Implementations must throw
an exception if a table’s version is higher than the supported version."""

last_sequence_number: int = Field(alias="last-sequence-number", default=INITIAL_SEQUENCE_NUMBER)
"""The table’s highest assigned sequence number, a monotonically
increasing long that tracks the order of snapshots in a table."""


TableMetadata = Annotated[Union[TableMetadataV1, TableMetadataV2], Field(discriminator="format_version")]
class TableMetadataV3(TableMetadataCommonFields, IcebergBaseModel):
"""Represents version 3 of the Table Metadata.

Version 3 of the Iceberg spec extends data types and existing metadata structures to add new capabilities:

- New data types: nanosecond timestamp(tz), unknown
- Default value support for columns
- Multi-argument transforms for partitioning and sorting
- Row Lineage tracking
- Binary deletion vectors

For more information:
https://iceberg.apache.org/spec/?column-projection#version-3-extended-types-and-capabilities
"""

@model_validator(mode="before")
def cleanup_snapshot_id(cls, data: Dict[str, Any]) -> Dict[str, Any]:
return cleanup_snapshot_id(data)

@model_validator(mode="after")
def check_schemas(cls, table_metadata: TableMetadata) -> TableMetadata:
return check_schemas(table_metadata)

@model_validator(mode="after")
def check_partition_specs(cls, table_metadata: TableMetadata) -> TableMetadata:
return check_partition_specs(table_metadata)

@model_validator(mode="after")
def check_sort_orders(cls, table_metadata: TableMetadata) -> TableMetadata:
return check_sort_orders(table_metadata)

@model_validator(mode="after")
def construct_refs(cls, table_metadata: TableMetadata) -> TableMetadata:
return construct_refs(table_metadata)

format_version: Literal[3] = Field(alias="format-version", default=3)
"""An integer version number for the format. Implementations must throw
an exception if a table’s version is higher than the supported version."""

last_sequence_number: int = Field(alias="last-sequence-number", default=INITIAL_SEQUENCE_NUMBER)
"""The table’s highest assigned sequence number, a monotonically
increasing long that tracks the order of snapshots in a table."""

row_lineage: bool = Field(alias="row-lineage", default=False)
"""Indicates that row-lineage is enabled on the table

For more information:
https://iceberg.apache.org/spec/?column-projection#row-lineage
"""

next_row_id: Optional[int] = Field(alias="next-row-id", default=None)
"""A long higher than all assigned row IDs; the next snapshot's `first-row-id`."""

def model_dump_json(
self, exclude_none: bool = True, exclude: Optional[Any] = None, by_alias: bool = True, **kwargs: Any
) -> str:
raise NotImplementedError("Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551")


TableMetadata = Annotated[Union[TableMetadataV1, TableMetadataV2, TableMetadataV3], Field(discriminator="format_version")]


def new_table_metadata(
Expand Down Expand Up @@ -553,20 +610,36 @@ def new_table_metadata(
last_partition_id=fresh_partition_spec.last_assigned_field_id,
table_uuid=table_uuid,
)

return TableMetadataV2(
location=location,
schemas=[fresh_schema],
last_column_id=fresh_schema.highest_field_id,
current_schema_id=fresh_schema.schema_id,
partition_specs=[fresh_partition_spec],
default_spec_id=fresh_partition_spec.spec_id,
sort_orders=[fresh_sort_order],
default_sort_order_id=fresh_sort_order.order_id,
properties=properties,
last_partition_id=fresh_partition_spec.last_assigned_field_id,
table_uuid=table_uuid,
)
elif format_version == 2:
return TableMetadataV2(
location=location,
schemas=[fresh_schema],
last_column_id=fresh_schema.highest_field_id,
current_schema_id=fresh_schema.schema_id,
partition_specs=[fresh_partition_spec],
default_spec_id=fresh_partition_spec.spec_id,
sort_orders=[fresh_sort_order],
default_sort_order_id=fresh_sort_order.order_id,
properties=properties,
last_partition_id=fresh_partition_spec.last_assigned_field_id,
table_uuid=table_uuid,
)
elif format_version == 3:
return TableMetadataV3(
location=location,
schemas=[fresh_schema],
last_column_id=fresh_schema.highest_field_id,
current_schema_id=fresh_schema.schema_id,
partition_specs=[fresh_partition_spec],
default_spec_id=fresh_partition_spec.spec_id,
sort_orders=[fresh_sort_order],
default_sort_order_id=fresh_sort_order.order_id,
properties=properties,
last_partition_id=fresh_partition_spec.last_assigned_field_id,
table_uuid=table_uuid,
)
else:
raise ValidationError(f"Unknown format version: {format_version}")


class TableMetadataWrapper(IcebergRootModel[TableMetadata]):
Expand All @@ -593,6 +666,8 @@ def parse_obj(data: Dict[str, Any]) -> TableMetadata:
return TableMetadataV1(**data)
elif format_version == 2:
return TableMetadataV2(**data)
elif format_version == 3:
return TableMetadataV3(**data)
else:
raise ValidationError(f"Unknown format version: {format_version}")

Expand All @@ -609,6 +684,8 @@ def _construct_without_validation(table_metadata: TableMetadata) -> TableMetadat
return TableMetadataV1.model_construct(**dict(table_metadata))
elif table_metadata.format_version == 2:
return TableMetadataV2.model_construct(**dict(table_metadata))
elif table_metadata.format_version == 3:
return TableMetadataV3.model_construct(**dict(table_metadata))
else:
raise ValidationError(f"Unknown format version: {table_metadata.format_version}")

Expand Down
13 changes: 13 additions & 0 deletions pyiceberg/table/sorting.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ def set_null_order(cls, values: Dict[str, Any]) -> Dict[str, Any]:
values["null-order"] = NullOrder.NULLS_FIRST if values["direction"] == SortDirection.ASC else NullOrder.NULLS_LAST
return values

@model_validator(mode="before")
@classmethod
def map_source_ids_onto_source_id(cls, data: Any) -> Any:
if isinstance(data, dict):
if "source-id" not in data and (source_ids := data["source-ids"]):
if isinstance(source_ids, list):
if len(source_ids) == 0:
raise ValueError("Empty source-ids is not allowed")
if len(source_ids) > 1:
raise ValueError("Multi argument transforms are not yet supported")
data["source-id"] = source_ids[0]
return data

source_id: int = Field(alias="source-id")
transform: Annotated[ # type: ignore
Transform,
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/typedef.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,4 @@ def __hash__(self) -> int:
return hash(str(self))


TableVersion: TypeAlias = Literal[1, 2]
TableVersion: TypeAlias = Literal[1, 2, 3]
71 changes: 71 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,72 @@ def generate_snapshot(
"refs": {"test": {"snapshot-id": 3051729675574597004, "type": "tag", "max-ref-age-ms": 10000000}},
}

EXAMPLE_TABLE_METADATA_V3 = {
"format-version": 3,
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
"location": "s3://bucket/test/location",
"last-sequence-number": 34,
"last-updated-ms": 1602638573590,
"last-column-id": 3,
"current-schema-id": 1,
"schemas": [
{"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": True, "type": "long"}]},
{
"type": "struct",
"schema-id": 1,
"identifier-field-ids": [1, 2],
"fields": [
{"id": 1, "name": "x", "required": True, "type": "long"},
{"id": 2, "name": "y", "required": True, "type": "long", "doc": "comment"},
{"id": 3, "name": "z", "required": True, "type": "long"},
# TODO: Add unknown, timestamp(tz)_ns
# {"id": 4, "name": "u", "required": True, "type": "unknown"},
# {"id": 5, "name": "ns", "required": True, "type": "timestamp_ns"},
# {"id": 6, "name": "nstz", "required": True, "type": "timestamptz_ns"},
],
},
],
"default-spec-id": 0,
"partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-ids": [1], "field-id": 1000}]}],
"last-partition-id": 1000,
"default-sort-order-id": 3,
"sort-orders": [
{
"order-id": 3,
"fields": [
{"transform": "identity", "source-ids": [2], "direction": "asc", "null-order": "nulls-first"},
{"transform": "bucket[4]", "source-ids": [3], "direction": "desc", "null-order": "nulls-last"},
],
}
],
"properties": {"read.split.target.size": "134217728"},
"current-snapshot-id": 3055729675574597004,
"snapshots": [
{
"snapshot-id": 3051729675574597004,
"timestamp-ms": 1515100955770,
"sequence-number": 0,
"summary": {"operation": "append"},
"manifest-list": "s3://a/b/1.avro",
},
{
"snapshot-id": 3055729675574597004,
"parent-snapshot-id": 3051729675574597004,
"timestamp-ms": 1555100955770,
"sequence-number": 1,
"summary": {"operation": "append"},
"manifest-list": "s3://a/b/2.avro",
"schema-id": 1,
},
],
"snapshot-log": [
{"snapshot-id": 3051729675574597004, "timestamp-ms": 1515100955770},
{"snapshot-id": 3055729675574597004, "timestamp-ms": 1555100955770},
],
"metadata-log": [{"metadata-file": "s3://bucket/.../v1.json", "timestamp-ms": 1515100}],
"refs": {"test": {"snapshot-id": 3051729675574597004, "type": "tag", "max-ref-age-ms": 10000000}},
}

TABLE_METADATA_V2_WITH_FIXED_AND_DECIMAL_TYPES = {
"format-version": 2,
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
Expand Down Expand Up @@ -1052,6 +1118,11 @@ def table_metadata_v2_with_statistics() -> Dict[str, Any]:
return TABLE_METADATA_V2_WITH_STATISTICS


@pytest.fixture
def example_table_metadata_v3() -> Dict[str, Any]:
return EXAMPLE_TABLE_METADATA_V3


@pytest.fixture(scope="session")
def metadata_location(tmp_path_factory: pytest.TempPathFactory) -> str:
from pyiceberg.io.pyarrow import PyArrowFileIO
Expand Down
10 changes: 10 additions & 0 deletions tests/table/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
TableMetadataUtil,
TableMetadataV1,
TableMetadataV2,
TableMetadataV3,
new_table_metadata,
)
from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
Expand Down Expand Up @@ -178,6 +179,15 @@ def test_serialize_v2(example_table_metadata_v2: Dict[str, Any]) -> None:
assert table_metadata == expected


def test_serialize_v3(example_table_metadata_v3: Dict[str, Any]) -> None:
# Writing will be part of https://github.com/apache/iceberg-python/issues/1551

with pytest.raises(NotImplementedError) as exc_info:
_ = TableMetadataV3(**example_table_metadata_v3).model_dump_json()

assert "Writing V3 is not yet supported, see: https://github.com/apache/iceberg-python/issues/1551" in str(exc_info.value)


def test_migrate_v1_schemas(example_table_metadata_v1: Dict[str, Any]) -> None:
table_metadata = TableMetadataV1(**example_table_metadata_v1)

Expand Down
14 changes: 14 additions & 0 deletions tests/table/test_partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,17 @@ def test_partition_type(table_schema_simple: Schema) -> None:
NestedField(field_id=1000, name="str_truncate", field_type=StringType(), required=False),
NestedField(field_id=1001, name="int_bucket", field_type=IntegerType(), required=True),
)


def test_deserialize_partition_field_v2() -> None:
json_partition_spec = """{"source-id": 1, "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}"""

field = PartitionField.model_validate_json(json_partition_spec)
assert field == PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate")


def test_deserialize_partition_field_v3() -> None:
json_partition_spec = """{"source-ids": [1], "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}"""

field = PartitionField.model_validate_json(json_partition_spec)
assert field == PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate")
12 changes: 12 additions & 0 deletions tests/table/test_sorting.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,15 @@ def test_unsorting_to_repr() -> None:
def test_sorting_repr(sort_order: SortOrder) -> None:
"""To make sure that the repr converts back to the original object"""
assert sort_order == eval(repr(sort_order))


def test_serialize_sort_field_v2() -> None:
expected = SortField(source_id=19, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST)
payload = '{"source-id":19,"transform":"identity","direction":"asc","null-order":"nulls-first"}'
assert SortField.model_validate_json(payload) == expected


def test_serialize_sort_field_v3() -> None:
expected = SortField(source_id=19, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST)
payload = '{"source-ids":[19],"transform":"identity","direction":"asc","null-order":"nulls-first"}'
assert SortField.model_validate_json(payload) == expected