Skip to content

Update metadata-log for non-rest catalogs #977

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ Iceberg tables support table properties to configure table behavior.

### Write options

| Key | Options | Default | Description |
| --------------------------------- | --------------------------------- | ------- | ------------------------------------------------------------------------------------------- |
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the approximate encoded size of data pages within a column chunk |
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
| `write.parquet.row-group-limit` | Number of rows | 122880 | The Parquet row group limit |
| Key | Options | Default | Description |
| -------------------------------------- | --------------------------------- | ------- | ------------------------------------------------------------------------------------------- |
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the approximate encoded size of data pages within a column chunk |
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
| `write.parquet.row-group-limit` | Number of rows | 122880 | The Parquet row group limit |
| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. |

### Table behavior options

Expand Down
1 change: 1 addition & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,7 @@ def _update_and_stage_table(self, current_table: Optional[Table], table_request:
base_metadata=current_table.metadata if current_table else self._empty_table_metadata(),
updates=table_request.updates,
enforce_validation=current_table is None,
metadata_location=current_table.metadata_location if current_table else None,
)

new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0
Expand Down
47 changes: 44 additions & 3 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
)
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
from pyiceberg.table.snapshots import (
MetadataLogEntry,
Operation,
Snapshot,
SnapshotLogEntry,
Expand Down Expand Up @@ -226,6 +227,9 @@ class TableProperties:
MANIFEST_MERGE_ENABLED = "commit.manifest-merge.enabled"
MANIFEST_MERGE_ENABLED_DEFAULT = False

METADATA_PREVIOUS_VERSIONS_MAX = "write.metadata.previous-versions-max"
METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT = 100


class Transaction:
_table: Table
Expand Down Expand Up @@ -1139,14 +1143,18 @@ def _(


def update_table_metadata(
base_metadata: TableMetadata, updates: Tuple[TableUpdate, ...], enforce_validation: bool = False
base_metadata: TableMetadata,
updates: Tuple[TableUpdate, ...],
enforce_validation: bool = False,
metadata_location: Optional[str] = None,
) -> TableMetadata:
"""Update the table metadata with the given updates in one transaction.

Args:
base_metadata: The base metadata to be updated.
updates: The updates in one transaction.
enforce_validation: Whether to trigger validation after applying the updates.
metadata_location: Current metadata location of the table

Returns:
The metadata with the updates applied.
Expand All @@ -1158,15 +1166,48 @@ def update_table_metadata(
new_metadata = _apply_table_update(update, new_metadata, context)

# Update last_updated_ms if it was not updated by update operations
if context.has_changes() and base_metadata.last_updated_ms == new_metadata.last_updated_ms:
new_metadata = new_metadata.model_copy(update={"last_updated_ms": datetime_to_millis(datetime.now().astimezone())})
if context.has_changes():
if metadata_location:
new_metadata = _update_table_metadata_log(new_metadata, metadata_location, base_metadata.last_updated_ms)
if base_metadata.last_updated_ms == new_metadata.last_updated_ms:
new_metadata = new_metadata.model_copy(update={"last_updated_ms": datetime_to_millis(datetime.now().astimezone())})

if enforce_validation:
return TableMetadataUtil.parse_obj(new_metadata.model_dump())
else:
return new_metadata.model_copy(deep=True)


def _update_table_metadata_log(base_metadata: TableMetadata, metadata_location: str, last_updated_ms: int) -> TableMetadata:
"""
Update the metadata log of the table.

Args:
base_metadata: The base metadata to be updated.
metadata_location: Current metadata location of the table
last_updated_ms: The timestamp of the last update of table metadata

Returns:
The metadata with the updates applied to metadata-log.
"""
max_metadata_log_entries = max(
1,
property_as_int(
base_metadata.properties,
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT,
), # type: ignore
)
previous_metadata_log = base_metadata.metadata_log
if len(base_metadata.metadata_log) >= max_metadata_log_entries: # type: ignore
remove_index = len(base_metadata.metadata_log) - max_metadata_log_entries + 1 # type: ignore
previous_metadata_log = base_metadata.metadata_log[remove_index:]
metadata_updates: Dict[str, Any] = {
"metadata_log": previous_metadata_log + [MetadataLogEntry(metadata_file=metadata_location, timestamp_ms=last_updated_ms)]
}
return base_metadata.model_copy(update=metadata_updates)


class ValidatableTableRequirement(IcebergBaseModel):
type: str

Expand Down
11 changes: 9 additions & 2 deletions tests/catalog/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,10 +707,13 @@ def test_commit_table_update_schema(
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
original_table_metadata = table.metadata
original_table_metadata_location = table.metadata_location
original_table_last_updated_ms = table.metadata.last_updated_ms

assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
assert TABLE_METADATA_LOCATION_REGEX.match(original_table_metadata_location)
assert test_catalog._parse_metadata_version(original_table_metadata_location) == 0
assert original_table_metadata.current_schema_id == 0
assert len(original_table_metadata.metadata_log) == 0

transaction = table.transaction()
update = transaction.update_schema()
Expand All @@ -728,6 +731,9 @@ def test_commit_table_update_schema(
assert new_schema
assert new_schema == update._apply()
assert new_schema.find_field("b").field_type == IntegerType()
assert len(updated_table_metadata.metadata_log) == 1
assert updated_table_metadata.metadata_log[0].metadata_file == original_table_metadata_location
assert updated_table_metadata.metadata_log[0].timestamp_ms == original_table_last_updated_ms

# Ensure schema is also pushed to Glue
table_info = _glue.get_table(
Expand Down Expand Up @@ -841,6 +847,7 @@ def test_commit_overwrite_table_snapshot_properties(
assert summary is not None
assert summary["snapshot_prop_a"] is None
assert summary["snapshot_prop_b"] == "test_prop_b"
assert len(updated_table_metadata.metadata_log) == 2


@mock_aws
Expand Down
6 changes: 6 additions & 0 deletions tests/catalog/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,8 @@ def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, table_id
catalog.create_namespace(namespace)
table = catalog.create_table(table_identifier, table_schema_nested)
last_updated_ms = table.metadata.last_updated_ms
original_table_metadata_location = table.metadata_location
original_table_last_updated_ms = table.metadata.last_updated_ms

assert catalog._parse_metadata_version(table.metadata_location) == 0
assert table.metadata.current_schema_id == 0
Expand All @@ -1291,6 +1293,9 @@ def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, table_id
assert new_schema == update._apply()
assert new_schema.find_field("b").field_type == IntegerType()
assert updated_table_metadata.last_updated_ms > last_updated_ms
assert len(updated_table_metadata.metadata_log) == 1
assert updated_table_metadata.metadata_log[0].metadata_file == original_table_metadata_location
assert updated_table_metadata.metadata_log[0].timestamp_ms == original_table_last_updated_ms


@pytest.mark.parametrize(
Expand Down Expand Up @@ -1338,6 +1343,7 @@ def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema, table_id
assert table.metadata.snapshots[0].summary["added-records"] == "1"
assert table.metadata.snapshots[0].summary["total-data-files"] == "1"
assert table.metadata.snapshots[0].summary["total-records"] == "1"
assert len(table.metadata.metadata_log) == 1

# read back the data
assert df == table.scan().to_arrow()
Expand Down
50 changes: 50 additions & 0 deletions tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadataUtil, TableMetadataV2, _generate_snapshot_id
from pyiceberg.table.refs import SnapshotRef
from pyiceberg.table.snapshots import (
MetadataLogEntry,
Operation,
Snapshot,
SnapshotLogEntry,
Expand Down Expand Up @@ -1156,3 +1157,52 @@ def test_serialize_commit_table_request() -> None:

deserialized_request = CommitTableRequest.model_validate_json(request.model_dump_json())
assert request == deserialized_request


def test_update_metadata_log(table_v2: Table) -> None:
new_snapshot = Snapshot(
snapshot_id=25,
parent_snapshot_id=19,
sequence_number=200,
timestamp_ms=1602638593590,
manifest_list="s3:/a/b/c.avro",
summary=Summary(Operation.APPEND),
schema_id=3,
)

new_metadata = update_table_metadata(
table_v2.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),), False, table_v2.metadata_location
)
assert len(new_metadata.metadata_log) == 2


def test_update_metadata_log_overflow(table_v2: Table) -> None:
metadata_log = [
MetadataLogEntry(
timestamp_ms=1602638593590 + i,
metadata_file=f"/path/to/metadata/{i}.json",
)
for i in range(10)
]
table_v2.metadata = table_v2.metadata.model_copy(update={"metadata_log": metadata_log, "last_updated_ms": 1602638593600})
table_v2.metadata_location = "/path/to/metadata/10.json"
assert len(table_v2.metadata.metadata_log) == 10

base_metadata = table_v2.metadata
new_metadata = update_table_metadata(
base_metadata,
(SetPropertiesUpdate(updates={"write.metadata.previous-versions-max": "5"}),),
False,
table_v2.metadata_location,
)
assert len(new_metadata.metadata_log) == 5
assert new_metadata.metadata_log[-1].metadata_file == "/path/to/metadata/10.json"

# check invalid value of write.metadata.previous-versions-max
new_metadata = update_table_metadata(
base_metadata,
(SetPropertiesUpdate(updates={"write.metadata.previous-versions-max": "0"}),),
False,
table_v2.metadata_location,
)
assert len(new_metadata.metadata_log) == 1