From 9ade07348011e454c6fe0864222a8916782aab21 Mon Sep 17 00:00:00 2001 From: Liwei Li Date: Tue, 19 Mar 2024 17:51:23 +0800 Subject: [PATCH 1/4] Refactor inheritance relationship support incremental append scan support incremental append scan --- dev/provision.py | 42 ++++ pyiceberg/manifest.py | 11 + pyiceberg/table/__init__.py | 376 ++++++++++++++++++++++++++++---- tests/integration/test_reads.py | 52 +++++ 4 files changed, 443 insertions(+), 38 deletions(-) diff --git a/dev/provision.py b/dev/provision.py index 6c8fe366d7..6addac987e 100644 --- a/dev/provision.py +++ b/dev/provision.py @@ -389,3 +389,45 @@ VALUES (4) """ ) + + spark.sql( + f""" + CREATE OR REPLACE TABLE {catalog_name}.default.test_table_read_from_snapshots ( + dt date, + number integer, + letter string + ) + USING iceberg + TBLPROPERTIES ( + 'format-version'='2' + ); + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_table_read_from_snapshots + VALUES (CAST('2022-03-01' AS date), 1, 'a') + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_table_read_from_snapshots + VALUES (CAST('2022-03-01' AS date), 2, 'b') + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_table_read_from_snapshots + VALUES (CAST('2022-03-02' AS date), 3, 'c') + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_table_read_from_snapshots + VALUES (CAST('2022-03-02' AS date), 4, 'd') + """ + ) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index bf5749ce9b..26d4313652 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -551,6 +551,17 @@ class ManifestFile(Record): def __init__(self, *data: Any, **named_data: Any) -> None: super().__init__(*data, **{"struct": MANIFEST_LIST_FILE_STRUCTS[DEFAULT_READ_VERSION], **named_data}) + def __eq__(self, other: Any) -> bool: + """Return the equality of two instances of the ManifestFile class.""" + if not isinstance(other, ManifestFile): + return False + else: + return self.manifest_path == other.manifest_path + + def __hash__(self) -> int: + """Return the hash of manifest_path.""" + return hash(self.manifest_path) + def has_added_files(self) -> bool: return self.added_files_count is None or self.added_files_count > 0 diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 8c1493974b..937d74514f 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1321,7 +1321,24 @@ def scan( row_filter=row_filter, selected_fields=selected_fields, case_sensitive=case_sensitive, - snapshot_id=snapshot_id, + options=options, + limit=limit, + ) + + def incremental_append_scan( + self, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ) -> IncrementalAppendScan: + return IncrementalAppendScan( + table_metadata=self.metadata, + io=self.io, + row_filter=row_filter, + selected_fields=selected_fields, + case_sensitive=case_sensitive, options=options, limit=limit, ) @@ -1599,7 +1616,6 @@ class TableScan(ABC): row_filter: BooleanExpression selected_fields: Tuple[str, ...] case_sensitive: bool - snapshot_id: Optional[int] options: Properties limit: Optional[int] @@ -1610,7 +1626,6 @@ def __init__( row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, selected_fields: Tuple[str, ...] = ("*",), case_sensitive: bool = True, - snapshot_id: Optional[int] = None, options: Properties = EMPTY_DICT, limit: Optional[int] = None, ): @@ -1619,34 +1634,11 @@ def __init__( self.row_filter = _parse_row_filter(row_filter) self.selected_fields = selected_fields self.case_sensitive = case_sensitive - self.snapshot_id = snapshot_id self.options = options self.limit = limit - def snapshot(self) -> Optional[Snapshot]: - if self.snapshot_id: - return self.table_metadata.snapshot_by_id(self.snapshot_id) - return self.table_metadata.current_snapshot() - - def projection(self) -> Schema: - current_schema = self.table_metadata.schema() - if self.snapshot_id is not None: - snapshot = self.table_metadata.snapshot_by_id(self.snapshot_id) - if snapshot is not None: - if snapshot.schema_id is not None: - try: - current_schema = next( - schema for schema in self.table_metadata.schemas if schema.schema_id == snapshot.schema_id - ) - except StopIteration: - warnings.warn(f"Metadata does not contain schema with id: {snapshot.schema_id}") - else: - raise ValueError(f"Snapshot not found: {self.snapshot_id}") - - if "*" in self.selected_fields: - return current_schema - - return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) + @abstractmethod + def projection(self) -> Schema: ... @abstractmethod def plan_files(self) -> Iterable[ScanTask]: ... @@ -1661,13 +1653,8 @@ def update(self: S, **overrides: Any) -> S: """Create a copy of this table scan with updated fields.""" return type(self)(**{**self.__dict__, **overrides}) - def use_ref(self: S, name: str) -> S: - if self.snapshot_id: - raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") - if snapshot := self.table_metadata.snapshot_by_name(name): - return self.update(snapshot_id=snapshot.snapshot_id) - - raise ValueError(f"Cannot scan unknown ref={name}") + @abstractmethod + def use_ref(self: S, name: str) -> S: ... def select(self: S, *field_names: str) -> S: if "*" in self.selected_fields: @@ -1680,6 +1667,45 @@ def filter(self: S, expr: Union[str, BooleanExpression]) -> S: def with_case_sensitive(self: S, case_sensitive: bool = True) -> S: return self.update(case_sensitive=case_sensitive) + def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: + spec = self.table_metadata.specs()[spec_id] + return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive) + + def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: + spec = self.table_metadata.specs()[spec_id] + partition_type = spec.partition_type(self.table_metadata.schema()) + partition_schema = Schema(*partition_type.fields) + partition_expr = self.partition_filters[spec_id] + + # The lambda created here is run in multiple threads. + # So we avoid creating _EvaluatorExpression methods bound to a single + # shared instance across multiple threads. + return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) + + def _check_sequence_number(self, min_data_sequence_number: int, manifest: ManifestFile) -> bool: + """Ensure that no manifests are loaded that contain deletes that are older than the data. + + Args: + min_data_sequence_number (int): The minimal sequence number. + manifest (ManifestFile): A ManifestFile that can be either data or deletes. + + Returns: + Boolean indicating if it is either a data file, or a relevant delete file. + """ + return manifest.content == ManifestContent.DATA or ( + # Not interested in deletes that are older than the data + manifest.content == ManifestContent.DELETES + and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_data_sequence_number + ) + + def _build_partition_projection(self, spec_id: int) -> BooleanExpression: + project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id]) + return project(self.row_filter) + + @cached_property + def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: + return KeyDefaultDict(self._build_partition_projection) + class ScanTask(ABC): pass @@ -1756,18 +1782,55 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent class DataScan(TableScan): + snapshot_id: Optional[int] + + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + snapshot_id: Optional[int] = None, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ): + super().__init__(table_metadata, io, row_filter, selected_fields, case_sensitive, options, limit) + self.snapshot_id = snapshot_id + def _build_partition_projection(self, spec_id: int) -> BooleanExpression: project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id]) return project(self.row_filter) - @cached_property - def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: - return KeyDefaultDict(self._build_partition_projection) + def snapshot(self) -> Optional[Snapshot]: + if self.snapshot_id: + return self.table_metadata.snapshot_by_id(self.snapshot_id) + return self.table_metadata.current_snapshot() def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: spec = self.table_metadata.specs()[spec_id] return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive) + def projection(self) -> Schema: + current_schema = self.table_metadata.schema() + if self.snapshot_id is not None: + snapshot = self.table_metadata.snapshot_by_id(self.snapshot_id) + if snapshot is not None: + if snapshot.schema_id is not None: + try: + current_schema = next( + schema for schema in self.table_metadata.schemas if schema.schema_id == snapshot.schema_id + ) + except StopIteration: + warnings.warn(f"Metadata does not contain schema with id: {snapshot.schema_id}") + else: + raise ValueError(f"Snapshot not found: {self.snapshot_id}") + + if "*" in self.selected_fields: + return current_schema + + return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) + def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: spec = self.table_metadata.specs()[spec_id] partition_type = spec.partition_type(self.table_metadata.schema()) @@ -1795,6 +1858,14 @@ def _check_sequence_number(self, min_data_sequence_number: int, manifest: Manife and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_data_sequence_number ) + def use_ref(self: S, name: str) -> S: + if self.snapshot_id: # type: ignore + raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") # type: ignore + if snapshot := self.table_metadata.snapshot_by_name(name): + return self.update(snapshot_id=snapshot.snapshot_id) + + raise ValueError(f"Cannot scan unknown ref={name}") + def plan_files(self) -> Iterable[FileScanTask]: """Plans the relevant files by filtering on the PartitionSpecs. @@ -1914,6 +1985,203 @@ def to_ray(self) -> ray.data.dataset.Dataset: return ray.data.from_arrow(self.to_arrow()) +class BaseIncrementalScan(TableScan): + to_snapshot_id: Optional[int] + from_snapshot_id_exclusive: Optional[int] + + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + to_snapshot_id: Optional[int] = None, + from_snapshot_id_exclusive: Optional[int] = None, + ): + super().__init__(table_metadata, io, row_filter, selected_fields, case_sensitive, options, limit) + self.to_snapshot_id = to_snapshot_id + self.from_snapshot_id_exclusive = from_snapshot_id_exclusive + + def to_snapshot(self: S, to_snapshot_id: int) -> S: + """Instructs this scan to look for changes up to a particular snapshot (inclusive). + + If the end snapshot is not configured, it defaults to the current table snapshot (inclusive). + + Args: + to_snapshot_id: the end snapshot ID (inclusive) + + Returns: + this for method chaining + """ + return self.update(to_snapshot_id=to_snapshot_id) + + def from_snapshot_exclusive(self: S, from_snapshot_id: int) -> S: + """Instructs this scan to look for changes starting from a particular snapshot (exclusive). + + If the start snapshot is not configured, it defaults to the oldest ancestor of the end snapshot (inclusive). + + Args: + from_snapshot_id: the start snapshot ID (exclusive) + + Returns: + this for method chaining + """ + return self.update(from_snapshot_id_exclusive=from_snapshot_id) + + def use_ref(self: S, name: str) -> S: + raise NotImplementedError("Not implemented for IncrementalScan yet.") + + def projection(self) -> Schema: + current_schema = self.table_metadata.schema() + if "*" in self.selected_fields: + return current_schema + + return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) + + @abstractmethod + def _do_plan_files(self, from_snapshot_id_exclusive: int, to_snapshot_id: int) -> Iterable[FileScanTask]: ... + + def plan_files(self) -> Iterable[FileScanTask]: + if ( + self.from_snapshot_id_exclusive is None + and self.to_snapshot_id is None + and self.table_metadata.current_snapshot() is None + ): + return iter([]) + + if self.to_snapshot_id is None: + current_snapshot = self.table_metadata.current_snapshot() + if current_snapshot is None: + raise ValueError("End snapshot is not set and table has no current snapshot") + self.to_snapshot_id = current_snapshot.snapshot_id + + if self.table_metadata.snapshot_by_id(self.to_snapshot_id) is None: + raise ValueError(f"End snapshot not found: {self.to_snapshot_id}") + + if self.from_snapshot_id_exclusive is not None: + if not is_parent_ancestor_of(self.to_snapshot_id, self.from_snapshot_id_exclusive, self.table_metadata): + raise ValueError( + f"Starting snapshot (exclusive) {self.from_snapshot_id_exclusive} is not a parent ancestor of end snapshot {self.to_snapshot_id}" + ) + return self._do_plan_files(self.from_snapshot_id_exclusive, self.to_snapshot_id) # type: ignore + + def to_arrow(self) -> pa.Table: + from pyiceberg.io.pyarrow import project_table + + return project_table( + self.plan_files(), + self.table_metadata, + self.io, + self.row_filter, + self.projection(), + case_sensitive=self.case_sensitive, + limit=self.limit, + ) + + def to_pandas(self, **kwargs: Any) -> pd.DataFrame: + return self.to_arrow().to_pandas(**kwargs) + + def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: + import duckdb + + con = connection or duckdb.connect(database=":memory:") + con.register(table_name, self.to_arrow()) + + return con + + def to_ray(self) -> ray.data.dataset.Dataset: + import ray + + return ray.data.from_arrow(self.to_arrow()) + + +class IncrementalAppendScan(BaseIncrementalScan): + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + to_snapshot_id: Optional[int] = None, + from_snapshot_id_exclusive: Optional[int] = None, + ): + super().__init__( + table_metadata, + io, + row_filter, + selected_fields, + case_sensitive, + options, + limit, + to_snapshot_id, + from_snapshot_id_exclusive, + ) + + def use_ref(self: S, name: str) -> S: + raise NotImplementedError("Not implemented for IncrementalAppendScan yet.") + + def _do_plan_files(self, from_snapshot_id_exclusive: int, to_snapshot_id: int) -> Iterable[FileScanTask]: + snapshots_between = [ + snapshot + for snapshot in ancestors_between(to_snapshot_id, from_snapshot_id_exclusive, self.table_metadata) + if snapshot.summary.operation == Operation.APPEND # type: ignore + ] + + snapshot_ids = {snapshot.snapshot_id for snapshot in snapshots_between} + + # step 1: filter manifests using partition summaries + # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id + + manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) + + manifests = { + manifest_file + for snapshot_id in snapshot_ids + for manifest_file in self.table_metadata.snapshot_by_id(snapshot_id).manifests(self.io) # type: ignore + if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) + if manifest_file.added_snapshot_id in snapshot_ids + } + + # step 2: filter the data files in each manifest + # this filter depends on the partition spec used to write the manifest file + + partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator) + metrics_evaluator = _InclusiveMetricsEvaluator( + self.table_metadata.schema(), self.row_filter, self.case_sensitive, self.options.get("include_empty_files") == "true" + ).eval + + min_data_sequence_number = _min_data_file_sequence_number(manifests) # type: ignore + + added_data_entries: List[ManifestEntry] = [] + + executor = ExecutorFactory.get_or_create() + for manifest_entry in chain( + *executor.map( + lambda args: _open_manifest(*args), + [ + ( + self.io, + manifest, + partition_evaluators[manifest.partition_spec_id], + metrics_evaluator, + ) + for manifest in manifests + if self._check_sequence_number(min_data_sequence_number, manifest) + ], + ) + ): + if manifest_entry.status == ManifestEntryStatus.ADDED and manifest_entry.snapshot_id in snapshot_ids: + added_data_entries.append(manifest_entry) + + return [FileScanTask(data_entry.data_file) for data_entry in added_data_entries] + + class MoveOperation(Enum): First = 1 Before = 2 @@ -4005,3 +4273,35 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T table_partitions: List[TablePartition] = _get_table_partitions(arrow_table, spec, schema, slice_instructions) return table_partitions + + +def ancestors_between(to_snapshot: int, from_snapshot: Optional[int], table_metadata: TableMetadata) -> Iterable[Snapshot]: + if from_snapshot is not None: + for snapshot in ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata): # type: ignore + if snapshot.snapshot_id == from_snapshot: + break + yield snapshot + else: + yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata) # type: ignore + + +def is_parent_ancestor_of(snapshot_id: int, ancestor_parent_snapshot_id: int, table_metadata: TableMetadata) -> bool: + for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore + if snapshot.parent_snapshot_id and snapshot.parent_snapshot_id == ancestor_parent_snapshot_id: + return True + return False + + +def oldest_ancestor_of(snapshot_id: int, table_metadata: TableMetadata) -> Optional[int]: + last_snapshot = None + for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore + last_snapshot = snapshot.snapshot_id + return last_snapshot + + +def ancestors_of(latest_snapshot: Snapshot, table_metadata: TableMetadata) -> Iterable[Snapshot]: + if latest_snapshot: + yield latest_snapshot + if latest_snapshot.parent_snapshot_id: + if parent := table_metadata.snapshot_by_id(latest_snapshot.parent_snapshot_id): + yield from ancestors_of(parent, table_metadata) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 078abf406a..85a967e6ad 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -549,6 +549,58 @@ def test_scan_branch(catalog: Catalog) -> None: assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12] +@pytest.mark.integration +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')]) +def test_incremental_append_scan(catalog: Catalog) -> None: + print(catalog.list_tables("default")) + test_table = catalog.load_table("default.test_table_read_from_snapshots") + + scan = test_table.incremental_append_scan().from_snapshot_exclusive(test_table.history()[0].snapshot_id) + assert len(list(scan.plan_files())) == 3 + + scan = test_table.incremental_append_scan().to_snapshot(test_table.history()[1].snapshot_id) + assert len(list(scan.plan_files())) == 2 + + scan = ( + test_table.incremental_append_scan() + .from_snapshot_exclusive(test_table.history()[0].snapshot_id) + .to_snapshot(test_table.history()[2].snapshot_id) + ) + assert len(list(scan.plan_files())) == 2 + + +@pytest.mark.integration +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')]) +def test_incremental_append_scan_from_snapshot_is_not_parent_ancestor_of_to_snapshot_should_fail(catalog: Catalog) -> None: + print(catalog.list_tables("default")) + test_table = catalog.load_table("default.test_table_read_from_snapshots") + + with pytest.raises(ValueError) as e: + ( + test_table.incremental_append_scan() + .from_snapshot_exclusive(test_table.history()[2].snapshot_id) + .to_snapshot(test_table.history()[1].snapshot_id) + .plan_files() + ) + assert "is not a parent ancestor of end snapshot" in str(e.value) + + +@pytest.mark.integration +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')]) +def test_incremental_append_scan_invalid_to_snapshot_should_fail(catalog: Catalog) -> None: + print(catalog.list_tables("default")) + test_table = catalog.load_table("default.test_table_read_from_snapshots") + + with pytest.raises(ValueError) as e: + ( + test_table.incremental_append_scan() + .from_snapshot_exclusive(test_table.history()[2].snapshot_id) + .to_snapshot(123) + .plan_files() + ) + assert "End snapshot not found: 123" in str(e.value) + + @pytest.mark.integration @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) def test_filter_on_new_column(catalog: Catalog) -> None: From 0066b3b37f7a2f82a103a13251db90101f5334b6 Mon Sep 17 00:00:00 2001 From: Liwei Li Date: Tue, 30 Apr 2024 18:20:19 +0800 Subject: [PATCH 2/4] move to snapshot --- pyiceberg/manifest.py | 5 +- pyiceberg/table/__init__.py | 88 +++++++----------------------------- pyiceberg/table/snapshots.py | 25 ++++++++++ 3 files changed, 43 insertions(+), 75 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 26d4313652..d3db42c559 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -553,10 +553,7 @@ def __init__(self, *data: Any, **named_data: Any) -> None: def __eq__(self, other: Any) -> bool: """Return the equality of two instances of the ManifestFile class.""" - if not isinstance(other, ManifestFile): - return False - else: - return self.manifest_path == other.manifest_path + return self.manifest_path == other.manifest_path if isinstance(other, ManifestFile) else False def __hash__(self) -> int: """Return the hash of manifest_path.""" diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 937d74514f..08a220a1c1 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -113,7 +113,9 @@ SnapshotLogEntry, SnapshotSummaryCollector, Summary, + ancestors_between, ancestors_of, + is_parent_ancestor_of, update_snapshot_summaries, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -1807,10 +1809,6 @@ def snapshot(self) -> Optional[Snapshot]: return self.table_metadata.snapshot_by_id(self.snapshot_id) return self.table_metadata.current_snapshot() - def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: - spec = self.table_metadata.specs()[spec_id] - return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive) - def projection(self) -> Schema: current_schema = self.table_metadata.schema() if self.snapshot_id is not None: @@ -1831,41 +1829,6 @@ def projection(self) -> Schema: return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) - def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: - spec = self.table_metadata.specs()[spec_id] - partition_type = spec.partition_type(self.table_metadata.schema()) - partition_schema = Schema(*partition_type.fields) - partition_expr = self.partition_filters[spec_id] - - # The lambda created here is run in multiple threads. - # So we avoid creating _EvaluatorExpression methods bound to a single - # shared instance across multiple threads. - return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) - - def _check_sequence_number(self, min_data_sequence_number: int, manifest: ManifestFile) -> bool: - """Ensure that no manifests are loaded that contain deletes that are older than the data. - - Args: - min_data_sequence_number (int): The minimal sequence number. - manifest (ManifestFile): A ManifestFile that can be either data or deletes. - - Returns: - Boolean indicating if it is either a data file, or a relevant delete file. - """ - return manifest.content == ManifestContent.DATA or ( - # Not interested in deletes that are older than the data - manifest.content == ManifestContent.DELETES - and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_data_sequence_number - ) - - def use_ref(self: S, name: str) -> S: - if self.snapshot_id: # type: ignore - raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") # type: ignore - if snapshot := self.table_metadata.snapshot_by_name(name): - return self.update(snapshot_id=snapshot.snapshot_id) - - raise ValueError(f"Cannot scan unknown ref={name}") - def plan_files(self) -> Iterable[FileScanTask]: """Plans the relevant files by filtering on the PartitionSpecs. @@ -1971,6 +1934,14 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: def to_pandas(self, **kwargs: Any) -> pd.DataFrame: return self.to_arrow().to_pandas(**kwargs) + def use_ref(self: S, name: str) -> S: + if self.snapshot_id: # type: ignore + raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") # type: ignore + if snapshot := self.table_metadata.snapshot_by_name(name): + return self.update(snapshot_id=snapshot.snapshot_id) + + raise ValueError(f"Cannot scan unknown ref={name}") + def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: import duckdb @@ -1986,6 +1957,13 @@ def to_ray(self) -> ray.data.dataset.Dataset: class BaseIncrementalScan(TableScan): + """Base class for incremental scans. + + Args: + to_snapshot_id: The end snapshot ID (inclusive). + from_snapshot_id_exclusive: The start snapshot ID (exclusive). + """ + to_snapshot_id: Optional[int] from_snapshot_id_exclusive: Optional[int] @@ -4273,35 +4251,3 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T table_partitions: List[TablePartition] = _get_table_partitions(arrow_table, spec, schema, slice_instructions) return table_partitions - - -def ancestors_between(to_snapshot: int, from_snapshot: Optional[int], table_metadata: TableMetadata) -> Iterable[Snapshot]: - if from_snapshot is not None: - for snapshot in ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata): # type: ignore - if snapshot.snapshot_id == from_snapshot: - break - yield snapshot - else: - yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata) # type: ignore - - -def is_parent_ancestor_of(snapshot_id: int, ancestor_parent_snapshot_id: int, table_metadata: TableMetadata) -> bool: - for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore - if snapshot.parent_snapshot_id and snapshot.parent_snapshot_id == ancestor_parent_snapshot_id: - return True - return False - - -def oldest_ancestor_of(snapshot_id: int, table_metadata: TableMetadata) -> Optional[int]: - last_snapshot = None - for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore - last_snapshot = snapshot.snapshot_id - return last_snapshot - - -def ancestors_of(latest_snapshot: Snapshot, table_metadata: TableMetadata) -> Iterable[Snapshot]: - if latest_snapshot: - yield latest_snapshot - if latest_snapshot.parent_snapshot_id: - if parent := table_metadata.snapshot_by_id(latest_snapshot.parent_snapshot_id): - yield from ancestors_of(parent, table_metadata) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 842d42522a..b7bec79d5c 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -429,3 +429,28 @@ def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMeta if snapshot.parent_snapshot_id is None: break snapshot = table_metadata.snapshot_by_id(snapshot.parent_snapshot_id) + + +def ancestors_between(to_snapshot: int, from_snapshot: Optional[int], table_metadata: TableMetadata) -> Iterable[Snapshot]: + if from_snapshot is not None: + for snapshot in ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata): # type: ignore + if snapshot.snapshot_id == from_snapshot: + break + yield snapshot + else: + yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata) # type: ignore + + +def is_parent_ancestor_of(snapshot_id: int, ancestor_parent_snapshot_id: int, table_metadata: TableMetadata) -> bool: + for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore + if snapshot.parent_snapshot_id and snapshot.parent_snapshot_id == ancestor_parent_snapshot_id: + return True + return False + + +def oldest_ancestor_of(snapshot_id: int, table_metadata: TableMetadata) -> Optional[int]: + last_snapshot = None + for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore + last_snapshot = snapshot.snapshot_id + return last_snapshot + From 38d8b1eef83e5af7c085523a4307cee09b85513e Mon Sep 17 00:00:00 2001 From: Liwei Li Date: Tue, 2 Jul 2024 15:53:58 +0800 Subject: [PATCH 3/4] handler to_snapshot_id is None --- pyiceberg/table/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 08a220a1c1..3d6bcfc493 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1994,6 +1994,8 @@ def to_snapshot(self: S, to_snapshot_id: int) -> S: Returns: this for method chaining """ + if to_snapshot_id is None: + return self return self.update(to_snapshot_id=to_snapshot_id) def from_snapshot_exclusive(self: S, from_snapshot_id: int) -> S: From 3b5644bb5b0f927895f8e7a733addccd9d096adf Mon Sep 17 00:00:00 2001 From: Liwei Li Date: Wed, 5 Jun 2024 10:34:20 +0800 Subject: [PATCH 4/4] change FileScanTask to ScanTask --- pyiceberg/table/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 3d6bcfc493..620a331e5e 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2022,9 +2022,9 @@ def projection(self) -> Schema: return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) @abstractmethod - def _do_plan_files(self, from_snapshot_id_exclusive: int, to_snapshot_id: int) -> Iterable[FileScanTask]: ... + def _do_plan_files(self, from_snapshot_id_exclusive: int, to_snapshot_id: int) -> Iterable[ScanTask]: ... - def plan_files(self) -> Iterable[FileScanTask]: + def plan_files(self) -> Iterable[ScanTask]: if ( self.from_snapshot_id_exclusive is None and self.to_snapshot_id is None