Skip to content

Commit 384e229

Browse files
committed
Add table statistics update
1 parent 24a0175 commit 384e229

File tree

7 files changed

+324
-2
lines changed

7 files changed

+324
-2
lines changed

pyiceberg/table/__init__.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
SnapshotLogEntry,
8585
)
8686
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
87+
from pyiceberg.table.statistics import StatisticsFile
8788
from pyiceberg.table.update import (
8889
AddPartitionSpecUpdate,
8990
AddSchemaUpdate,
@@ -94,12 +95,14 @@
9495
AssertTableUUID,
9596
AssignUUIDUpdate,
9697
RemovePropertiesUpdate,
98+
RemoveStatisticsUpdate,
9799
SetCurrentSchemaUpdate,
98100
SetDefaultSortOrderUpdate,
99101
SetDefaultSpecUpdate,
100102
SetLocationUpdate,
101103
SetPropertiesUpdate,
102104
SetSnapshotRefUpdate,
105+
SetStatisticsUpdate,
103106
TableRequirement,
104107
TableUpdate,
105108
UpdatesAndRequirements,
@@ -663,6 +666,42 @@ def update_location(self, location: str) -> Transaction:
663666
"""
664667
raise NotImplementedError("Not yet implemented")
665668

669+
def set_statistics(self, snapshot_id: int, statistics_file: StatisticsFile) -> Transaction:
670+
"""Set the statistics for a snapshot.
671+
672+
Args:
673+
snapshot_id: The snapshot ID to set the statistics for.
674+
statistics_file: The statistics file to set.
675+
676+
Returns:
677+
The alter table builder.
678+
"""
679+
updates = (
680+
SetStatisticsUpdate(
681+
snapshot_id=snapshot_id,
682+
statistics=statistics_file,
683+
),
684+
)
685+
686+
return self._apply(updates, ())
687+
688+
def remove_statistics(self, snapshot_id: int) -> Transaction:
689+
"""Remove the statistics for a snapshot.
690+
691+
Args:
692+
snapshot_id: The snapshot ID to remove the statistics for.
693+
694+
Returns:
695+
The alter table builder.
696+
"""
697+
updates = (
698+
RemoveStatisticsUpdate(
699+
snapshot_id=snapshot_id,
700+
),
701+
)
702+
703+
return self._apply(updates, ())
704+
666705
def commit_transaction(self) -> Table:
667706
"""Commit the changes to the catalog.
668707

pyiceberg/table/metadata.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
SortOrder,
4545
assign_fresh_sort_order_ids,
4646
)
47+
from pyiceberg.table.statistics import StatisticsFile
4748
from pyiceberg.typedef import (
4849
EMPTY_DICT,
4950
IcebergBaseModel,
@@ -221,6 +222,14 @@ class TableMetadataCommonFields(IcebergBaseModel):
221222
There is always a main branch reference pointing to the
222223
current-snapshot-id even if the refs map is null."""
223224

225+
statistics: List[StatisticsFile] = Field(default_factory=list)
226+
"""A optional list of table statistics files.
227+
Table statistics files are valid Puffin files. Statistics are
228+
informational. A reader can choose to ignore statistics
229+
information. Statistics support is not required to read the
230+
table correctly. A table can contain many statistics files
231+
associated with different table snapshots."""
232+
224233
# validators
225234
@field_validator("properties", mode="before")
226235
def transform_properties_dict_value_to_str(cls, properties: Properties) -> Dict[str, str]:

pyiceberg/table/statistics.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from typing import (
18+
Dict,
19+
List,
20+
Optional,
21+
)
22+
23+
from pydantic import Field
24+
25+
from pyiceberg.typedef import IcebergBaseModel
26+
27+
28+
class BlobMetadata(IcebergBaseModel):
29+
type: str
30+
snapshot_id: int = Field(alias="snapshot-id")
31+
sequence_number: int = Field(alias="sequence-number")
32+
fields: List[int]
33+
properties: Optional[Dict[str, str]] = None
34+
35+
36+
class StatisticsFile(IcebergBaseModel):
37+
snapshot_id: int = Field(alias="snapshot-id")
38+
statistics_path: str = Field(alias="statistics-path")
39+
file_size_in_bytes: int = Field(alias="file-size-in-bytes")
40+
file_footer_size_in_bytes: int = Field(alias="file-footer-size-in-bytes")
41+
blob_metadata: List[BlobMetadata] = Field(alias="blob-metadata")

pyiceberg/table/update/__init__.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
SnapshotLogEntry,
3838
)
3939
from pyiceberg.table.sorting import SortOrder
40+
from pyiceberg.table.statistics import StatisticsFile
4041
from pyiceberg.typedef import (
4142
IcebergBaseModel,
4243
Properties,
@@ -172,6 +173,17 @@ class RemovePropertiesUpdate(IcebergBaseModel):
172173
removals: List[str]
173174

174175

176+
class SetStatisticsUpdate(IcebergBaseModel):
177+
action: Literal["set-statistics"] = Field(default="set-statistics")
178+
snapshot_id: int = Field(alias="snapshot-id")
179+
statistics: StatisticsFile
180+
181+
182+
class RemoveStatisticsUpdate(IcebergBaseModel):
183+
action: Literal["remove-statistics"] = Field(default="remove-statistics")
184+
snapshot_id: int = Field(alias="snapshot-id")
185+
186+
175187
TableUpdate = Annotated[
176188
Union[
177189
AssignUUIDUpdate,
@@ -189,6 +201,8 @@ class RemovePropertiesUpdate(IcebergBaseModel):
189201
SetLocationUpdate,
190202
SetPropertiesUpdate,
191203
RemovePropertiesUpdate,
204+
SetStatisticsUpdate,
205+
RemoveStatisticsUpdate,
192206
],
193207
Field(discriminator="action"),
194208
]
@@ -477,6 +491,29 @@ def _(
477491
return base_metadata.model_copy(update={"default_sort_order_id": new_sort_order_id})
478492

479493

494+
@_apply_table_update.register(SetStatisticsUpdate)
495+
def _(update: SetStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
496+
if update.snapshot_id != update.statistics.snapshot_id:
497+
raise ValueError("Snapshot id in statistics does not match the snapshot id in the update")
498+
499+
rest_statistics = [stat for stat in base_metadata.statistics if stat.snapshot_id != update.snapshot_id]
500+
501+
context.add_update(update)
502+
return base_metadata.model_copy(update={"statistics": rest_statistics + [update.statistics]})
503+
504+
505+
@_apply_table_update.register(RemoveStatisticsUpdate)
506+
def _(update: RemoveStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
507+
if not any(stat.snapshot_id == update.snapshot_id for stat in base_metadata.statistics):
508+
raise ValueError(f"Statistics with snapshot id {update.snapshot_id} does not exist")
509+
510+
statistics = [stat for stat in base_metadata.statistics if stat.snapshot_id != update.snapshot_id]
511+
512+
context.add_update(update)
513+
514+
return base_metadata.model_copy(update={"statistics": statistics})
515+
516+
480517
def update_table_metadata(
481518
base_metadata: TableMetadata,
482519
updates: Tuple[TableUpdate, ...],

tests/conftest.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -918,6 +918,87 @@ def generate_snapshot(
918918
"refs": {"test": {"snapshot-id": 3051729675574597004, "type": "tag", "max-ref-age-ms": 10000000}},
919919
}
920920

921+
TABLE_METADATA_V2_WITH_STATISTICS = {
922+
"format-version": 2,
923+
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
924+
"location": "s3://bucket/test/location",
925+
"last-sequence-number": 34,
926+
"last-updated-ms": 1602638573590,
927+
"last-column-id": 3,
928+
"current-schema-id": 0,
929+
"schemas": [
930+
{
931+
"type": "struct",
932+
"schema-id": 0,
933+
"fields": [
934+
{
935+
"id": 1,
936+
"name": "x",
937+
"required": True,
938+
"type": "long",
939+
}
940+
],
941+
}
942+
],
943+
"default-spec-id": 0,
944+
"partition-specs": [{"spec-id": 0, "fields": []}],
945+
"last-partition-id": 1000,
946+
"default-sort-order-id": 0,
947+
"sort-orders": [{"order-id": 0, "fields": []}],
948+
"properties": {},
949+
"current-snapshot-id": 3055729675574597004,
950+
"snapshots": [
951+
{
952+
"snapshot-id": 3051729675574597004,
953+
"timestamp-ms": 1515100955770,
954+
"sequence-number": 0,
955+
"summary": {"operation": "append"},
956+
"manifest-list": "s3://a/b/1.avro",
957+
},
958+
{
959+
"snapshot-id": 3055729675574597004,
960+
"parent-snapshot-id": 3051729675574597004,
961+
"timestamp-ms": 1555100955770,
962+
"sequence-number": 1,
963+
"summary": {"operation": "append"},
964+
"manifest-list": "s3://a/b/2.avro",
965+
"schema-id": 1,
966+
},
967+
],
968+
"statistics": [
969+
{
970+
"snapshot-id": 3051729675574597004,
971+
"statistics-path": "s3://a/b/stats.puffin",
972+
"file-size-in-bytes": 413,
973+
"file-footer-size-in-bytes": 42,
974+
"blob-metadata": [
975+
{
976+
"type": "ndv",
977+
"snapshot-id": 3051729675574597004,
978+
"sequence-number": 1,
979+
"fields": [1],
980+
}
981+
],
982+
},
983+
{
984+
"snapshot-id": 3055729675574597004,
985+
"statistics-path": "s3://a/b/stats.puffin",
986+
"file-size-in-bytes": 413,
987+
"file-footer-size-in-bytes": 42,
988+
"blob-metadata": [
989+
{
990+
"type": "ndv",
991+
"snapshot-id": 3055729675574597004,
992+
"sequence-number": 1,
993+
"fields": [1],
994+
}
995+
],
996+
},
997+
],
998+
"snapshot-log": [],
999+
"metadata-log": [],
1000+
}
1001+
9211002

9221003
@pytest.fixture
9231004
def example_table_metadata_v2() -> Dict[str, Any]:
@@ -929,6 +1010,11 @@ def table_metadata_v2_with_fixed_and_decimal_types() -> Dict[str, Any]:
9291010
return TABLE_METADATA_V2_WITH_FIXED_AND_DECIMAL_TYPES
9301011

9311012

1013+
@pytest.fixture
1014+
def table_metadata_v2_with_statistics() -> Dict[str, Any]:
1015+
return TABLE_METADATA_V2_WITH_STATISTICS
1016+
1017+
9321018
@pytest.fixture(scope="session")
9331019
def metadata_location(tmp_path_factory: pytest.TempPathFactory) -> str:
9341020
from pyiceberg.io.pyarrow import PyArrowFileIO
@@ -2170,6 +2256,18 @@ def table_v2_with_extensive_snapshots(example_table_metadata_v2_with_extensive_s
21702256
)
21712257

21722258

2259+
@pytest.fixture
2260+
def table_v2_with_statistics(table_metadata_v2_with_statistics: Dict[str, Any]) -> Table:
2261+
table_metadata = TableMetadataV2(**table_metadata_v2_with_statistics)
2262+
return Table(
2263+
identifier=("database", "table"),
2264+
metadata=table_metadata,
2265+
metadata_location=f"{table_metadata.location}/uuid.metadata.json",
2266+
io=load_file_io(),
2267+
catalog=NoopCatalog("NoopCatalog"),
2268+
)
2269+
2270+
21732271
@pytest.fixture
21742272
def bound_reference_str() -> BoundReference[str]:
21752273
return BoundReference(field=NestedField(1, "field", StringType(), required=False), accessor=Accessor(position=0, inner=None))

0 commit comments

Comments
 (0)