Skip to content

Commit dfa1730

Browse files
soumya-ghoshsungwy
authored andcommitted
Update metadata-log for non-rest catalogs (apache#977)
* Update metadata-log for non-rest catalogs * Add test for invalid value of previous-versions-max property * Update configuration docs with previous-versions-max property * Remove reference of PropertyUtil
1 parent 357cd92 commit dfa1730

File tree

6 files changed

+119
-13
lines changed

6 files changed

+119
-13
lines changed

mkdocs/docs/configuration.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,15 @@ Iceberg tables support table properties to configure table behavior.
2828

2929
### Write options
3030

31-
| Key | Options | Default | Description |
32-
| --------------------------------- | --------------------------------- | ------- | ------------------------------------------------------------------------------------------- |
33-
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
34-
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
35-
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
36-
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the approximate encoded size of data pages within a column chunk |
37-
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
38-
| `write.parquet.row-group-limit` | Number of rows | 122880 | The Parquet row group limit |
31+
| Key | Options | Default | Description |
32+
| -------------------------------------- | --------------------------------- | ------- | ------------------------------------------------------------------------------------------- |
33+
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
34+
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
35+
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
36+
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the approximate encoded size of data pages within a column chunk |
37+
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
38+
| `write.parquet.row-group-limit` | Number of rows | 122880 | The Parquet row group limit |
39+
| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. |
3940

4041
### Table behavior options
4142

pyiceberg/catalog/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -838,6 +838,7 @@ def _update_and_stage_table(self, current_table: Optional[Table], table_request:
838838
base_metadata=current_table.metadata if current_table else self._empty_table_metadata(),
839839
updates=table_request.updates,
840840
enforce_validation=current_table is None,
841+
metadata_location=current_table.metadata_location if current_table else None,
841842
)
842843

843844
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0

pyiceberg/table/__init__.py

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
)
118118
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
119119
from pyiceberg.table.snapshots import (
120+
MetadataLogEntry,
120121
Operation,
121122
Snapshot,
122123
SnapshotLogEntry,
@@ -226,6 +227,9 @@ class TableProperties:
226227
MANIFEST_MERGE_ENABLED = "commit.manifest-merge.enabled"
227228
MANIFEST_MERGE_ENABLED_DEFAULT = False
228229

230+
METADATA_PREVIOUS_VERSIONS_MAX = "write.metadata.previous-versions-max"
231+
METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT = 100
232+
229233

230234
class Transaction:
231235
_table: Table
@@ -1139,14 +1143,18 @@ def _(
11391143

11401144

11411145
def update_table_metadata(
1142-
base_metadata: TableMetadata, updates: Tuple[TableUpdate, ...], enforce_validation: bool = False
1146+
base_metadata: TableMetadata,
1147+
updates: Tuple[TableUpdate, ...],
1148+
enforce_validation: bool = False,
1149+
metadata_location: Optional[str] = None,
11431150
) -> TableMetadata:
11441151
"""Update the table metadata with the given updates in one transaction.
11451152
11461153
Args:
11471154
base_metadata: The base metadata to be updated.
11481155
updates: The updates in one transaction.
11491156
enforce_validation: Whether to trigger validation after applying the updates.
1157+
metadata_location: Current metadata location of the table
11501158
11511159
Returns:
11521160
The metadata with the updates applied.
@@ -1158,15 +1166,48 @@ def update_table_metadata(
11581166
new_metadata = _apply_table_update(update, new_metadata, context)
11591167

11601168
# Update last_updated_ms if it was not updated by update operations
1161-
if context.has_changes() and base_metadata.last_updated_ms == new_metadata.last_updated_ms:
1162-
new_metadata = new_metadata.model_copy(update={"last_updated_ms": datetime_to_millis(datetime.now().astimezone())})
1169+
if context.has_changes():
1170+
if metadata_location:
1171+
new_metadata = _update_table_metadata_log(new_metadata, metadata_location, base_metadata.last_updated_ms)
1172+
if base_metadata.last_updated_ms == new_metadata.last_updated_ms:
1173+
new_metadata = new_metadata.model_copy(update={"last_updated_ms": datetime_to_millis(datetime.now().astimezone())})
11631174

11641175
if enforce_validation:
11651176
return TableMetadataUtil.parse_obj(new_metadata.model_dump())
11661177
else:
11671178
return new_metadata.model_copy(deep=True)
11681179

11691180

1181+
def _update_table_metadata_log(base_metadata: TableMetadata, metadata_location: str, last_updated_ms: int) -> TableMetadata:
1182+
"""
1183+
Update the metadata log of the table.
1184+
1185+
Args:
1186+
base_metadata: The base metadata to be updated.
1187+
metadata_location: Current metadata location of the table
1188+
last_updated_ms: The timestamp of the last update of table metadata
1189+
1190+
Returns:
1191+
The metadata with the updates applied to metadata-log.
1192+
"""
1193+
max_metadata_log_entries = max(
1194+
1,
1195+
property_as_int(
1196+
base_metadata.properties,
1197+
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
1198+
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT,
1199+
), # type: ignore
1200+
)
1201+
previous_metadata_log = base_metadata.metadata_log
1202+
if len(base_metadata.metadata_log) >= max_metadata_log_entries: # type: ignore
1203+
remove_index = len(base_metadata.metadata_log) - max_metadata_log_entries + 1 # type: ignore
1204+
previous_metadata_log = base_metadata.metadata_log[remove_index:]
1205+
metadata_updates: Dict[str, Any] = {
1206+
"metadata_log": previous_metadata_log + [MetadataLogEntry(metadata_file=metadata_location, timestamp_ms=last_updated_ms)]
1207+
}
1208+
return base_metadata.model_copy(update=metadata_updates)
1209+
1210+
11701211
class ValidatableTableRequirement(IcebergBaseModel):
11711212
type: str
11721213

tests/catalog/test_glue.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -704,10 +704,13 @@ def test_commit_table_update_schema(
704704
test_catalog.create_namespace(namespace=database_name)
705705
table = test_catalog.create_table(identifier, table_schema_nested)
706706
original_table_metadata = table.metadata
707+
original_table_metadata_location = table.metadata_location
708+
original_table_last_updated_ms = table.metadata.last_updated_ms
707709

708-
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
709-
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
710+
assert TABLE_METADATA_LOCATION_REGEX.match(original_table_metadata_location)
711+
assert test_catalog._parse_metadata_version(original_table_metadata_location) == 0
710712
assert original_table_metadata.current_schema_id == 0
713+
assert len(original_table_metadata.metadata_log) == 0
711714

712715
transaction = table.transaction()
713716
update = transaction.update_schema()
@@ -725,6 +728,9 @@ def test_commit_table_update_schema(
725728
assert new_schema
726729
assert new_schema == update._apply()
727730
assert new_schema.find_field("b").field_type == IntegerType()
731+
assert len(updated_table_metadata.metadata_log) == 1
732+
assert updated_table_metadata.metadata_log[0].metadata_file == original_table_metadata_location
733+
assert updated_table_metadata.metadata_log[0].timestamp_ms == original_table_last_updated_ms
728734

729735
# Ensure schema is also pushed to Glue
730736
table_info = _glue.get_table(
@@ -838,6 +844,7 @@ def test_commit_overwrite_table_snapshot_properties(
838844
assert summary is not None
839845
assert summary["snapshot_prop_a"] is None
840846
assert summary["snapshot_prop_b"] == "test_prop_b"
847+
assert len(updated_table_metadata.metadata_log) == 2
841848

842849

843850
@mock_aws

tests/catalog/test_sql.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,6 +1241,8 @@ def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, table_id
12411241
catalog.create_namespace(namespace)
12421242
table = catalog.create_table(table_identifier, table_schema_nested)
12431243
last_updated_ms = table.metadata.last_updated_ms
1244+
original_table_metadata_location = table.metadata_location
1245+
original_table_last_updated_ms = table.metadata.last_updated_ms
12441246

12451247
assert catalog._parse_metadata_version(table.metadata_location) == 0
12461248
assert table.metadata.current_schema_id == 0
@@ -1261,6 +1263,9 @@ def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, table_id
12611263
assert new_schema == update._apply()
12621264
assert new_schema.find_field("b").field_type == IntegerType()
12631265
assert updated_table_metadata.last_updated_ms > last_updated_ms
1266+
assert len(updated_table_metadata.metadata_log) == 1
1267+
assert updated_table_metadata.metadata_log[0].metadata_file == original_table_metadata_location
1268+
assert updated_table_metadata.metadata_log[0].timestamp_ms == original_table_last_updated_ms
12641269

12651270

12661271
@pytest.mark.parametrize(
@@ -1307,6 +1312,7 @@ def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema, table_id
13071312
assert table.metadata.snapshots[0].summary["added-records"] == "1"
13081313
assert table.metadata.snapshots[0].summary["total-data-files"] == "1"
13091314
assert table.metadata.snapshots[0].summary["total-records"] == "1"
1315+
assert len(table.metadata.metadata_log) == 1
13101316

13111317
# read back the data
13121318
assert df == table.scan().to_arrow()

tests/table/test_init.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadataUtil, TableMetadataV2, _generate_snapshot_id
7070
from pyiceberg.table.refs import SnapshotRef
7171
from pyiceberg.table.snapshots import (
72+
MetadataLogEntry,
7273
Operation,
7374
Snapshot,
7475
SnapshotLogEntry,
@@ -1156,3 +1157,52 @@ def test_serialize_commit_table_request() -> None:
11561157

11571158
deserialized_request = CommitTableRequest.model_validate_json(request.model_dump_json())
11581159
assert request == deserialized_request
1160+
1161+
1162+
def test_update_metadata_log(table_v2: Table) -> None:
1163+
new_snapshot = Snapshot(
1164+
snapshot_id=25,
1165+
parent_snapshot_id=19,
1166+
sequence_number=200,
1167+
timestamp_ms=1602638593590,
1168+
manifest_list="s3:/a/b/c.avro",
1169+
summary=Summary(Operation.APPEND),
1170+
schema_id=3,
1171+
)
1172+
1173+
new_metadata = update_table_metadata(
1174+
table_v2.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),), False, table_v2.metadata_location
1175+
)
1176+
assert len(new_metadata.metadata_log) == 2
1177+
1178+
1179+
def test_update_metadata_log_overflow(table_v2: Table) -> None:
1180+
metadata_log = [
1181+
MetadataLogEntry(
1182+
timestamp_ms=1602638593590 + i,
1183+
metadata_file=f"/path/to/metadata/{i}.json",
1184+
)
1185+
for i in range(10)
1186+
]
1187+
table_v2.metadata = table_v2.metadata.model_copy(update={"metadata_log": metadata_log, "last_updated_ms": 1602638593600})
1188+
table_v2.metadata_location = "/path/to/metadata/10.json"
1189+
assert len(table_v2.metadata.metadata_log) == 10
1190+
1191+
base_metadata = table_v2.metadata
1192+
new_metadata = update_table_metadata(
1193+
base_metadata,
1194+
(SetPropertiesUpdate(updates={"write.metadata.previous-versions-max": "5"}),),
1195+
False,
1196+
table_v2.metadata_location,
1197+
)
1198+
assert len(new_metadata.metadata_log) == 5
1199+
assert new_metadata.metadata_log[-1].metadata_file == "/path/to/metadata/10.json"
1200+
1201+
# check invalid value of write.metadata.previous-versions-max
1202+
new_metadata = update_table_metadata(
1203+
base_metadata,
1204+
(SetPropertiesUpdate(updates={"write.metadata.previous-versions-max": "0"}),),
1205+
False,
1206+
table_v2.metadata_location,
1207+
)
1208+
assert len(new_metadata.metadata_log) == 1

0 commit comments

Comments
 (0)