Skip to content

Commit fa14cbd

Browse files
authored
Merge branch 'main' into fd-arrow
2 parents 03a64e0 + f948f56 commit fa14cbd

17 files changed

+752
-28
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ help: ## Display this help
2222
install-poetry: ## Install poetry if the user has not done that yet.
2323
@if ! command -v poetry &> /dev/null; then \
2424
echo "Poetry could not be found. Installing..."; \
25-
pip install --user poetry==1.8.5; \
25+
pip install --user poetry==2.0.1; \
2626
else \
2727
echo "Poetry is already installed."; \
2828
fi

dev/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12
4242
ENV ICEBERG_VERSION=1.6.0
4343
ENV PYICEBERG_VERSION=0.8.1
4444

45-
RUN curl --retry 5 -s -C - https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \
45+
RUN curl --retry 5 -s -C - https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \
4646
&& tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \
4747
&& rm -rf spark-${SPARK_VERSION}-bin-hadoop3.tgz
4848

mkdocs/docs/api.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1258,6 +1258,29 @@ with table.manage_snapshots() as ms:
12581258
ms.create_branch(snapshot_id1, "Branch_A").create_tag(snapshot_id2, "tag789")
12591259
```
12601260

1261+
## Table Statistics Management
1262+
1263+
Manage table statistics with operations through the `Table` API:
1264+
1265+
```python
1266+
# To run a specific operation
1267+
table.update_statistics().set_statistics(snapshot_id=1, statistics_file=statistics_file).commit()
1268+
# To run multiple operations
1269+
table.update_statistics()
1270+
.set_statistics(snapshot_id1, statistics_file1)
1271+
.remove_statistics(snapshot_id2)
1272+
.commit()
1273+
# Operations are applied on commit.
1274+
```
1275+
1276+
You can also use context managers to make more changes:
1277+
1278+
```python
1279+
with table.update_statistics() as update:
1280+
update.set_statistics(snaphsot_id1, statistics_file)
1281+
update.remove_statistics(snapshot_id2)
1282+
```
1283+
12611284
## Query the data
12621285

12631286
To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID:

poetry.lock

Lines changed: 20 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/table/__init__.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@
118118
_FastAppendFiles,
119119
)
120120
from pyiceberg.table.update.spec import UpdateSpec
121+
from pyiceberg.table.update.statistics import UpdateStatistics
121122
from pyiceberg.transforms import IdentityTransform
122123
from pyiceberg.typedef import (
123124
EMPTY_DICT,
@@ -1043,6 +1044,23 @@ def manage_snapshots(self) -> ManageSnapshots:
10431044
"""
10441045
return ManageSnapshots(transaction=Transaction(self, autocommit=True))
10451046

1047+
def update_statistics(self) -> UpdateStatistics:
1048+
"""
1049+
Shorthand to run statistics management operations like add statistics and remove statistics.
1050+
1051+
Use table.update_statistics().<operation>().commit() to run a specific operation.
1052+
Use table.update_statistics().<operation-one>().<operation-two>().commit() to run multiple operations.
1053+
1054+
Pending changes are applied on commit.
1055+
1056+
We can also use context managers to make more changes. For example:
1057+
1058+
with table.update_statistics() as update:
1059+
update.set_statistics(snapshot_id=1, statistics_file=statistics_file)
1060+
update.remove_statistics(snapshot_id=2)
1061+
"""
1062+
return UpdateStatistics(transaction=Transaction(self, autocommit=True))
1063+
10461064
def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
10471065
"""Create a new UpdateSchema to alter the columns of this table.
10481066

pyiceberg/table/metadata.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
SortOrder,
4545
assign_fresh_sort_order_ids,
4646
)
47+
from pyiceberg.table.statistics import StatisticsFile
4748
from pyiceberg.typedef import (
4849
EMPTY_DICT,
4950
IcebergBaseModel,
@@ -221,6 +222,14 @@ class TableMetadataCommonFields(IcebergBaseModel):
221222
There is always a main branch reference pointing to the
222223
current-snapshot-id even if the refs map is null."""
223224

225+
statistics: List[StatisticsFile] = Field(default_factory=list)
226+
"""A optional list of table statistics files.
227+
Table statistics files are valid Puffin files. Statistics are
228+
informational. A reader can choose to ignore statistics
229+
information. Statistics support is not required to read the
230+
table correctly. A table can contain many statistics files
231+
associated with different table snapshots."""
232+
224233
# validators
225234
@field_validator("properties", mode="before")
226235
def transform_properties_dict_value_to_str(cls, properties: Properties) -> Dict[str, str]:

pyiceberg/table/statistics.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from typing import Dict, List, Literal, Optional
18+
19+
from pydantic import Field
20+
21+
from pyiceberg.typedef import IcebergBaseModel
22+
23+
24+
class BlobMetadata(IcebergBaseModel):
25+
type: Literal["apache-datasketches-theta-v1", "deletion-vector-v1"]
26+
snapshot_id: int = Field(alias="snapshot-id")
27+
sequence_number: int = Field(alias="sequence-number")
28+
fields: List[int]
29+
properties: Optional[Dict[str, str]] = None
30+
31+
32+
class StatisticsFile(IcebergBaseModel):
33+
snapshot_id: int = Field(alias="snapshot-id")
34+
statistics_path: str = Field(alias="statistics-path")
35+
file_size_in_bytes: int = Field(alias="file-size-in-bytes")
36+
file_footer_size_in_bytes: int = Field(alias="file-footer-size-in-bytes")
37+
key_metadata: Optional[str] = Field(alias="key-metadata", default=None)
38+
blob_metadata: List[BlobMetadata] = Field(alias="blob-metadata")
39+
40+
41+
def filter_statistics_by_snapshot_id(
42+
statistics: List[StatisticsFile],
43+
reject_snapshot_id: int,
44+
) -> List[StatisticsFile]:
45+
return [stat for stat in statistics if stat.snapshot_id != reject_snapshot_id]

pyiceberg/table/update/__init__.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
SnapshotLogEntry,
3737
)
3838
from pyiceberg.table.sorting import SortOrder
39+
from pyiceberg.table.statistics import StatisticsFile, filter_statistics_by_snapshot_id
3940
from pyiceberg.typedef import (
4041
IcebergBaseModel,
4142
Properties,
@@ -174,6 +175,17 @@ class RemovePropertiesUpdate(IcebergBaseModel):
174175
removals: List[str]
175176

176177

178+
class SetStatisticsUpdate(IcebergBaseModel):
179+
action: Literal["set-statistics"] = Field(default="set-statistics")
180+
snapshot_id: int = Field(alias="snapshot-id")
181+
statistics: StatisticsFile
182+
183+
184+
class RemoveStatisticsUpdate(IcebergBaseModel):
185+
action: Literal["remove-statistics"] = Field(default="remove-statistics")
186+
snapshot_id: int = Field(alias="snapshot-id")
187+
188+
177189
TableUpdate = Annotated[
178190
Union[
179191
AssignUUIDUpdate,
@@ -191,6 +203,8 @@ class RemovePropertiesUpdate(IcebergBaseModel):
191203
SetLocationUpdate,
192204
SetPropertiesUpdate,
193205
RemovePropertiesUpdate,
206+
SetStatisticsUpdate,
207+
RemoveStatisticsUpdate,
194208
],
195209
Field(discriminator="action"),
196210
]
@@ -475,6 +489,28 @@ def _(
475489
return base_metadata.model_copy(update={"default_sort_order_id": new_sort_order_id})
476490

477491

492+
@_apply_table_update.register(SetStatisticsUpdate)
493+
def _(update: SetStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
494+
if update.snapshot_id != update.statistics.snapshot_id:
495+
raise ValueError("Snapshot id in statistics does not match the snapshot id in the update")
496+
497+
statistics = filter_statistics_by_snapshot_id(base_metadata.statistics, update.snapshot_id)
498+
context.add_update(update)
499+
500+
return base_metadata.model_copy(update={"statistics": statistics + [update.statistics]})
501+
502+
503+
@_apply_table_update.register(RemoveStatisticsUpdate)
504+
def _(update: RemoveStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
505+
if not any(stat.snapshot_id == update.snapshot_id for stat in base_metadata.statistics):
506+
raise ValueError(f"Statistics with snapshot id {update.snapshot_id} does not exist")
507+
508+
statistics = filter_statistics_by_snapshot_id(base_metadata.statistics, update.snapshot_id)
509+
context.add_update(update)
510+
511+
return base_metadata.model_copy(update={"statistics": statistics})
512+
513+
478514
def update_table_metadata(
479515
base_metadata: TableMetadata,
480516
updates: Tuple[TableUpdate, ...],

pyiceberg/table/update/statistics.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from typing import TYPE_CHECKING, Tuple
18+
19+
from pyiceberg.table.statistics import StatisticsFile
20+
from pyiceberg.table.update import (
21+
RemoveStatisticsUpdate,
22+
SetStatisticsUpdate,
23+
TableUpdate,
24+
UpdatesAndRequirements,
25+
UpdateTableMetadata,
26+
)
27+
28+
if TYPE_CHECKING:
29+
from pyiceberg.table import Transaction
30+
31+
32+
class UpdateStatistics(UpdateTableMetadata["UpdateStatistics"]):
33+
"""
34+
Run statistics management operations using APIs.
35+
36+
APIs include set_statistics and remove statistics operations.
37+
38+
Use table.update_statistics().<operation>().commit() to run a specific operation.
39+
Use table.update_statistics().<operation-one>().<operation-two>().commit() to run multiple operations.
40+
41+
Pending changes are applied on commit.
42+
43+
We can also use context managers to make more changes. For example:
44+
45+
with table.update_statistics() as update:
46+
update.set_statistics(snapshot_id=1, statistics_file=statistics_file)
47+
update.remove_statistics(snapshot_id=2)
48+
"""
49+
50+
_updates: Tuple[TableUpdate, ...] = ()
51+
52+
def __init__(self, transaction: "Transaction") -> None:
53+
super().__init__(transaction)
54+
55+
def set_statistics(self, snapshot_id: int, statistics_file: StatisticsFile) -> "UpdateStatistics":
56+
self._updates += (
57+
SetStatisticsUpdate(
58+
snapshot_id=snapshot_id,
59+
statistics=statistics_file,
60+
),
61+
)
62+
63+
return self
64+
65+
def remove_statistics(self, snapshot_id: int) -> "UpdateStatistics":
66+
self._updates = (
67+
RemoveStatisticsUpdate(
68+
snapshot_id=snapshot_id,
69+
),
70+
)
71+
72+
return self
73+
74+
def _commit(self) -> UpdatesAndRequirements:
75+
return self._updates, ()

0 commit comments

Comments
 (0)