Skip to content

Support snapshot management operations like creating tags by adding ManageSnapshots API #728

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Jun 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5f129b6
create API for set_ref_snapshot
chinmay-bhat May 12, 2024
ed2e647
remove parent_snapshot_id, add AssertTableUUID
chinmay-bhat May 13, 2024
fb3e25c
wrap `set_ref_snapshot` API in Table
chinmay-bhat May 13, 2024
aee9c31
rename variable to match `SetSnapshotRefUpdate`
chinmay-bhat May 13, 2024
8cd7846
add tests for table and transaction api
chinmay-bhat May 13, 2024
b6f8596
hide set_ref_snapshot from public API
chinmay-bhat May 22, 2024
815cf1f
deprecate add_snapshot()
chinmay-bhat May 22, 2024
1c70f62
add create_tag, create_branch apis
chinmay-bhat May 22, 2024
6c1e78f
small updates
chinmay-bhat May 22, 2024
ed05dc9
update tests
chinmay-bhat May 22, 2024
6447f6c
expose ManageSnapshot to Table
chinmay-bhat May 22, 2024
0349bbe
rename transaction to _transaction
chinmay-bhat May 22, 2024
1343d45
apply suggestions
chinmay-bhat May 25, 2024
6537229
update only if there are updates
chinmay-bhat May 25, 2024
042a1e7
add docstring
chinmay-bhat May 25, 2024
fbfd463
remove print
chinmay-bhat May 25, 2024
8cf9a8c
update to integration tests
chinmay-bhat May 31, 2024
5ba32f1
resolve mypy error: does not explicitly export attribute
chinmay-bhat May 31, 2024
febd36b
improve docstring
chinmay-bhat Jun 3, 2024
5d2b20e
fix failing tests
chinmay-bhat Jun 3, 2024
5f54ff9
move AssertRef to _set_ref_snapshot
chinmay-bhat Jun 3, 2024
8a923a4
remove unused parent_snapshot_id
chinmay-bhat Jun 3, 2024
658c45e
improve docstring for public api manage_snapshots()
chinmay-bhat Jun 3, 2024
97f142a
rename tests
chinmay-bhat Jun 3, 2024
603d5b3
update docs
chinmay-bhat Jun 9, 2024
1eba937
update commit()
chinmay-bhat Jun 10, 2024
fe686fc
update tests
chinmay-bhat Jun 10, 2024
1f32a6f
update docs
chinmay-bhat Jun 10, 2024
9f068a6
remove AssertTableUUID
chinmay-bhat Jun 11, 2024
41f5f16
rename test file to fix mypy error
chinmay-bhat Jun 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions dev/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,50 @@
(array(), map(), array(struct(1)))
"""
)

spark.sql(
f"""
CREATE OR REPLACE TABLE {catalog_name}.default.test_table_snapshot_operations (
number integer
)
USING iceberg
TBLPROPERTIES (
'format-version'='2'
);
"""
)

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_table_snapshot_operations
VALUES (1)
"""
)

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_table_snapshot_operations
VALUES (2)
"""
)

spark.sql(
f"""
DELETE FROM {catalog_name}.default.test_table_snapshot_operations
WHERE number = 2
"""
)

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_table_snapshot_operations
VALUES (3)
"""
)

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_table_snapshot_operations
VALUES (4)
"""
)
22 changes: 22 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,28 @@ tbl.overwrite(df, snapshot_properties={"abc": "def"})
assert tbl.metadata.snapshots[-1].summary["abc"] == "def"
```

## Snapshot Management

Manage snapshots with operations through the `Table` API:

```python
# To run a specific operation
table.manage_snapshots().create_tag(snapshot_id, "tag123").commit()
# To run multiple operations
table.manage_snapshots()
.create_tag(snapshot_id1, "tag123")
.create_tag(snapshot_id2, "tag456")
.commit()
# Operations are applied on commit.
```

You can also use context managers to make more changes:

```python
with table.manage_snapshots() as ms:
ms.create_branch(snapshot_id1, "Branch_A").create_tag(snapshot_id2, "tag789")
```

## Query the data

To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID:
Expand Down
176 changes: 176 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@
)
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.datetime import datetime_to_millis
from pyiceberg.utils.deprecated import deprecated
from pyiceberg.utils.singleton import _convert_to_hashable_type

if TYPE_CHECKING:
Expand Down Expand Up @@ -351,6 +352,88 @@ def set_properties(self, properties: Properties = EMPTY_DICT, **kwargs: Any) ->
updates = properties or kwargs
return self._apply((SetPropertiesUpdate(updates=updates),))

@deprecated(
deprecated_in="0.7.0",
removed_in="0.8.0",
help_message="Please use one of the functions in ManageSnapshots instead",
)
def add_snapshot(self, snapshot: Snapshot) -> Transaction:
"""Add a new snapshot to the table.

Returns:
The transaction with the add-snapshot staged.
"""
updates = (AddSnapshotUpdate(snapshot=snapshot),)

return self._apply(updates, ())

@deprecated(
deprecated_in="0.7.0",
removed_in="0.8.0",
help_message="Please use one of the functions in ManageSnapshots instead",
)
def set_ref_snapshot(
self,
snapshot_id: int,
parent_snapshot_id: Optional[int],
ref_name: str,
type: str,
max_ref_age_ms: Optional[int] = None,
max_snapshot_age_ms: Optional[int] = None,
min_snapshots_to_keep: Optional[int] = None,
) -> Transaction:
"""Update a ref to a snapshot.

Returns:
The transaction with the set-snapshot-ref staged
"""
updates = (
SetSnapshotRefUpdate(
snapshot_id=snapshot_id,
ref_name=ref_name,
type=type,
max_ref_age_ms=max_ref_age_ms,
max_snapshot_age_ms=max_snapshot_age_ms,
min_snapshots_to_keep=min_snapshots_to_keep,
),
)

requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref="main"),)
return self._apply(updates, requirements)

def _set_ref_snapshot(
self,
snapshot_id: int,
ref_name: str,
type: str,
max_ref_age_ms: Optional[int] = None,
max_snapshot_age_ms: Optional[int] = None,
min_snapshots_to_keep: Optional[int] = None,
) -> UpdatesAndRequirements:
"""Update a ref to a snapshot.

Returns:
The updates and requirements for the set-snapshot-ref staged
"""
updates = (
SetSnapshotRefUpdate(
snapshot_id=snapshot_id,
ref_name=ref_name,
type=type,
max_ref_age_ms=max_ref_age_ms,
max_snapshot_age_ms=max_snapshot_age_ms,
min_snapshots_to_keep=min_snapshots_to_keep,
),
)
requirements = (
AssertRefSnapshotId(
snapshot_id=self.table_metadata.refs[ref_name].snapshot_id if ref_name in self.table_metadata.refs else None,
ref=ref_name,
),
)

return updates, requirements

def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
"""Create a new UpdateSchema to alter the columns of this table.

Expand Down Expand Up @@ -1323,6 +1406,21 @@ def history(self) -> List[SnapshotLogEntry]:
"""Get the snapshot history of this table."""
return self.metadata.snapshot_log

def manage_snapshots(self) -> ManageSnapshots:
"""
Shorthand to run snapshot management operations like create branch, create tag, etc.

Use table.manage_snapshots().<operation>().commit() to run a specific operation.
Use table.manage_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
Pending changes are applied on commit.

We can also use context managers to make more changes. For example,

with table.manage_snapshots() as ms:
ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
"""
return ManageSnapshots(transaction=Transaction(self, autocommit=True))

def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
"""Create a new UpdateSchema to alter the columns of this table.

Expand Down Expand Up @@ -1835,6 +1933,84 @@ def __enter__(self) -> U:
return self # type: ignore


class ManageSnapshots(UpdateTableMetadata["ManageSnapshots"]):
"""
Run snapshot management operations using APIs.

APIs include create branch, create tag, etc.

Use table.manage_snapshots().<operation>().commit() to run a specific operation.
Use table.manage_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
Pending changes are applied on commit.

We can also use context managers to make more changes. For example,

with table.manage_snapshots() as ms:
ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
"""

_updates: Tuple[TableUpdate, ...] = ()
_requirements: Tuple[TableRequirement, ...] = ()

def _commit(self) -> UpdatesAndRequirements:
"""Apply the pending changes and commit."""
return self._updates, self._requirements

def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: Optional[int] = None) -> ManageSnapshots:
"""
Create a new tag pointing to the given snapshot id.

Args:
snapshot_id (int): snapshot id of the existing snapshot to tag
tag_name (str): name of the tag
max_ref_age_ms (Optional[int]): max ref age in milliseconds

Returns:
This for method chaining
"""
update, requirement = self._transaction._set_ref_snapshot(
snapshot_id=snapshot_id,
ref_name=tag_name,
type="tag",
max_ref_age_ms=max_ref_age_ms,
)
self._updates += update
self._requirements += requirement
return self

def create_branch(
self,
snapshot_id: int,
branch_name: str,
max_ref_age_ms: Optional[int] = None,
max_snapshot_age_ms: Optional[int] = None,
min_snapshots_to_keep: Optional[int] = None,
) -> ManageSnapshots:
"""
Create a new branch pointing to the given snapshot id.

Args:
snapshot_id (int): snapshot id of existing snapshot at which the branch is created.
branch_name (str): name of the new branch
max_ref_age_ms (Optional[int]): max ref age in milliseconds
max_snapshot_age_ms (Optional[int]): max age of snapshots to keep in milliseconds
min_snapshots_to_keep (Optional[int]): min number of snapshots to keep in milliseconds
Returns:
This for method chaining
"""
update, requirement = self._transaction._set_ref_snapshot(
snapshot_id=snapshot_id,
ref_name=branch_name,
type="branch",
max_ref_age_ms=max_ref_age_ms,
max_snapshot_age_ms=max_snapshot_age_ms,
min_snapshots_to_keep=min_snapshots_to_keep,
)
self._updates += update
self._requirements += requirement
return self


class UpdateSchema(UpdateTableMetadata["UpdateSchema"]):
_schema: Schema
_last_column_id: itertools.count[int]
Expand Down
42 changes: 42 additions & 0 deletions tests/integration/test_snapshot_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest

from pyiceberg.catalog import Catalog
from pyiceberg.table.refs import SnapshotRef


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_create_tag(catalog: Catalog) -> None:
identifier = "default.test_table_snapshot_operations"
tbl = catalog.load_table(identifier)
assert len(tbl.history()) > 3
tag_snapshot_id = tbl.history()[-3].snapshot_id
tbl.manage_snapshots().create_tag(snapshot_id=tag_snapshot_id, tag_name="tag123").commit()
assert tbl.metadata.refs["tag123"] == SnapshotRef(snapshot_id=tag_snapshot_id, snapshot_ref_type="tag")


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_create_branch(catalog: Catalog) -> None:
identifier = "default.test_table_snapshot_operations"
tbl = catalog.load_table(identifier)
assert len(tbl.history()) > 2
branch_snapshot_id = tbl.history()[-2].snapshot_id
tbl.manage_snapshots().create_branch(snapshot_id=branch_snapshot_id, branch_name="branch123").commit()
assert tbl.metadata.refs["branch123"] == SnapshotRef(snapshot_id=branch_snapshot_id, snapshot_ref_type="branch")
24 changes: 24 additions & 0 deletions tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,30 @@ def test_update_metadata_add_snapshot(table_v2: Table) -> None:
assert new_metadata.last_updated_ms == new_snapshot.timestamp_ms


def test_update_metadata_set_ref_snapshot(table_v2: Table) -> None:
update, _ = table_v2.transaction()._set_ref_snapshot(
snapshot_id=3051729675574597004,
ref_name="main",
type="branch",
max_ref_age_ms=123123123,
max_snapshot_age_ms=12312312312,
min_snapshots_to_keep=1,
)

new_metadata = update_table_metadata(table_v2.metadata, update)
assert len(new_metadata.snapshot_log) == 3
assert new_metadata.snapshot_log[2].snapshot_id == 3051729675574597004
assert new_metadata.current_snapshot_id == 3051729675574597004
assert new_metadata.last_updated_ms > table_v2.metadata.last_updated_ms
assert new_metadata.refs["main"] == SnapshotRef(
snapshot_id=3051729675574597004,
snapshot_ref_type="branch",
min_snapshots_to_keep=1,
max_snapshot_age_ms=12312312312,
max_ref_age_ms=123123123,
)


def test_update_metadata_set_snapshot_ref(table_v2: Table) -> None:
update = SetSnapshotRefUpdate(
ref_name="main",
Expand Down