Skip to content

Commit d1207f8

Browse files
committed
move to snapshot
1 parent 3517327 commit d1207f8

File tree

3 files changed

+65
-76
lines changed

3 files changed

+65
-76
lines changed

pyiceberg/manifest.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -550,10 +550,7 @@ def __init__(self, *data: Any, **named_data: Any) -> None:
550550

551551
def __eq__(self, other: Any) -> bool:
552552
"""Return the equality of two instances of the ManifestFile class."""
553-
if not isinstance(other, ManifestFile):
554-
return False
555-
else:
556-
return self.manifest_path == other.manifest_path
553+
return self.manifest_path == other.manifest_path if isinstance(other, ManifestFile) else False
557554

558555
def __hash__(self) -> int:
559556
"""Return the hash of manifest_path."""

pyiceberg/table/__init__.py

Lines changed: 17 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@
112112
SnapshotLogEntry,
113113
SnapshotSummaryCollector,
114114
Summary,
115+
ancestors_between,
116+
is_parent_ancestor_of,
115117
update_snapshot_summaries,
116118
)
117119
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -1679,10 +1681,6 @@ def snapshot(self) -> Optional[Snapshot]:
16791681
return self.table_metadata.snapshot_by_id(self.snapshot_id)
16801682
return self.table_metadata.current_snapshot()
16811683

1682-
def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
1683-
spec = self.table_metadata.specs()[spec_id]
1684-
return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive)
1685-
16861684
def projection(self) -> Schema:
16871685
current_schema = self.table_metadata.schema()
16881686
if self.snapshot_id is not None:
@@ -1703,41 +1701,6 @@ def projection(self) -> Schema:
17031701

17041702
return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive)
17051703

1706-
def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
1707-
spec = self.table_metadata.specs()[spec_id]
1708-
partition_type = spec.partition_type(self.table_metadata.schema())
1709-
partition_schema = Schema(*partition_type.fields)
1710-
partition_expr = self.partition_filters[spec_id]
1711-
1712-
# The lambda created here is run in multiple threads.
1713-
# So we avoid creating _EvaluatorExpression methods bound to a single
1714-
# shared instance across multiple threads.
1715-
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)
1716-
1717-
def _check_sequence_number(self, min_data_sequence_number: int, manifest: ManifestFile) -> bool:
1718-
"""Ensure that no manifests are loaded that contain deletes that are older than the data.
1719-
1720-
Args:
1721-
min_data_sequence_number (int): The minimal sequence number.
1722-
manifest (ManifestFile): A ManifestFile that can be either data or deletes.
1723-
1724-
Returns:
1725-
Boolean indicating if it is either a data file, or a relevant delete file.
1726-
"""
1727-
return manifest.content == ManifestContent.DATA or (
1728-
# Not interested in deletes that are older than the data
1729-
manifest.content == ManifestContent.DELETES
1730-
and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_data_sequence_number
1731-
)
1732-
1733-
def use_ref(self: S, name: str) -> S:
1734-
if self.snapshot_id: # type: ignore
1735-
raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") # type: ignore
1736-
if snapshot := self.table_metadata.snapshot_by_name(name):
1737-
return self.update(snapshot_id=snapshot.snapshot_id)
1738-
1739-
raise ValueError(f"Cannot scan unknown ref={name}")
1740-
17411704
def plan_files(self) -> Iterable[FileScanTask]:
17421705
"""Plans the relevant files by filtering on the PartitionSpecs.
17431706
@@ -1825,6 +1788,14 @@ def to_arrow(self) -> pa.Table:
18251788
def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
18261789
return self.to_arrow().to_pandas(**kwargs)
18271790

1791+
def use_ref(self: S, name: str) -> S:
1792+
if self.snapshot_id: # type: ignore
1793+
raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") # type: ignore
1794+
if snapshot := self.table_metadata.snapshot_by_name(name):
1795+
return self.update(snapshot_id=snapshot.snapshot_id)
1796+
1797+
raise ValueError(f"Cannot scan unknown ref={name}")
1798+
18281799
def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
18291800
import duckdb
18301801

@@ -1840,6 +1811,13 @@ def to_ray(self) -> ray.data.dataset.Dataset:
18401811

18411812

18421813
class BaseIncrementalScan(TableScan):
1814+
"""Base class for incremental scans.
1815+
1816+
Args:
1817+
to_snapshot_id: The end snapshot ID (inclusive).
1818+
from_snapshot_id_exclusive: The start snapshot ID (exclusive).
1819+
"""
1820+
18431821
to_snapshot_id: Optional[int]
18441822
from_snapshot_id_exclusive: Optional[int]
18451823

@@ -3913,35 +3891,3 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T
39133891
table_partitions: list[TablePartition] = _get_table_partitions(arrow_table, spec, schema, slice_instructions)
39143892

39153893
return table_partitions
3916-
3917-
3918-
def ancestors_between(to_snapshot: int, from_snapshot: Optional[int], table_metadata: TableMetadata) -> Iterable[Snapshot]:
3919-
if from_snapshot is not None:
3920-
for snapshot in ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata): # type: ignore
3921-
if snapshot.snapshot_id == from_snapshot:
3922-
break
3923-
yield snapshot
3924-
else:
3925-
yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata) # type: ignore
3926-
3927-
3928-
def is_parent_ancestor_of(snapshot_id: int, ancestor_parent_snapshot_id: int, table_metadata: TableMetadata) -> bool:
3929-
for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore
3930-
if snapshot.parent_snapshot_id and snapshot.parent_snapshot_id == ancestor_parent_snapshot_id:
3931-
return True
3932-
return False
3933-
3934-
3935-
def oldest_ancestor_of(snapshot_id: int, table_metadata: TableMetadata) -> Optional[int]:
3936-
last_snapshot = None
3937-
for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore
3938-
last_snapshot = snapshot.snapshot_id
3939-
return last_snapshot
3940-
3941-
3942-
def ancestors_of(latest_snapshot: Snapshot, table_metadata: TableMetadata) -> Iterable[Snapshot]:
3943-
if latest_snapshot:
3944-
yield latest_snapshot
3945-
if latest_snapshot.parent_snapshot_id:
3946-
if parent := table_metadata.snapshot_by_id(latest_snapshot.parent_snapshot_id):
3947-
yield from ancestors_of(parent, table_metadata)

pyiceberg/table/snapshots.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,21 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
from __future__ import annotations
18+
1719
import time
1820
from collections import defaultdict
1921
from enum import Enum
20-
from typing import Any, DefaultDict, Dict, List, Mapping, Optional
22+
from typing import (
23+
TYPE_CHECKING,
24+
Any,
25+
DefaultDict,
26+
Dict,
27+
Iterable,
28+
List,
29+
Mapping,
30+
Optional,
31+
)
2132

2233
from pydantic import Field, PrivateAttr, model_serializer
2334

@@ -27,6 +38,9 @@
2738
from pyiceberg.schema import Schema
2839
from pyiceberg.typedef import IcebergBaseModel
2940

41+
if TYPE_CHECKING:
42+
from pyiceberg.table.metadata import TableMetadata
43+
3044
ADDED_DATA_FILES = 'added-data-files'
3145
ADDED_DELETE_FILES = 'added-delete-files'
3246
ADDED_EQUALITY_DELETES = 'added-equality-deletes'
@@ -412,3 +426,35 @@ def _update_totals(total_property: str, added_property: str, removed_property: s
412426
def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None:
413427
if num > 0:
414428
properties[property_name] = str(num)
429+
430+
431+
def ancestors_between(to_snapshot: int, from_snapshot: Optional[int], table_metadata: TableMetadata) -> Iterable[Snapshot]:
432+
if from_snapshot is not None:
433+
for snapshot in ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata): # type: ignore
434+
if snapshot.snapshot_id == from_snapshot:
435+
break
436+
yield snapshot
437+
else:
438+
yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata) # type: ignore
439+
440+
441+
def is_parent_ancestor_of(snapshot_id: int, ancestor_parent_snapshot_id: int, table_metadata: TableMetadata) -> bool:
442+
for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore
443+
if snapshot.parent_snapshot_id and snapshot.parent_snapshot_id == ancestor_parent_snapshot_id:
444+
return True
445+
return False
446+
447+
448+
def oldest_ancestor_of(snapshot_id: int, table_metadata: TableMetadata) -> Optional[int]:
449+
last_snapshot = None
450+
for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore
451+
last_snapshot = snapshot.snapshot_id
452+
return last_snapshot
453+
454+
455+
def ancestors_of(latest_snapshot: Snapshot, table_metadata: TableMetadata) -> Iterable[Snapshot]:
456+
if latest_snapshot:
457+
yield latest_snapshot
458+
if latest_snapshot.parent_snapshot_id:
459+
if parent := table_metadata.snapshot_by_id(latest_snapshot.parent_snapshot_id):
460+
yield from ancestors_of(parent, table_metadata)

0 commit comments

Comments
 (0)