From d4ca6535a3cd978cd3b129ab5fcd1b6bc5a99999 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 27 Jan 2024 10:46:37 -0800 Subject: [PATCH 01/21] s/"main"/MAIN_BRANCH (cherry picked from commit ad1ec672bac7cc7cf5e74defcee8a37df6c316f4) --- pyiceberg/table/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b43dc3206b..08f554bfaa 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -397,7 +397,7 @@ def set_ref_snapshot( ), ) - requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref="main"),) + requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=MAIN_BRANCH),) return self._apply(updates, requirements) def _set_ref_snapshot( @@ -3205,10 +3205,10 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name="main", type="branch" + snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type="branch" ), ), - (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),), + (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=MAIN_BRANCH),), ) @property From 0b7aaaf928f57941da4f28694d46fdf98884245a Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 27 Jan 2024 11:17:28 -0800 Subject: [PATCH 02/21] replace string literals (cherry picked from commit 40cf10e0c5d3d2a5366b5b9b85191aa319e151d0) --- pyiceberg/cli/console.py | 6 +++--- pyiceberg/table/__init__.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pyiceberg/cli/console.py b/pyiceberg/cli/console.py index d1833df081..213883bc70 100644 --- a/pyiceberg/cli/console.py +++ b/pyiceberg/cli/console.py @@ -32,7 +32,7 @@ from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.cli.output import ConsoleOutput, JsonOutput, Output from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError -from pyiceberg.table.refs import SnapshotRef +from pyiceberg.table.refs import SnapshotRef, SnapshotRefType DEFAULT_MIN_SNAPSHOTS_TO_KEEP = 1 DEFAULT_MAX_SNAPSHOT_AGE_MS = 432000000 @@ -420,7 +420,7 @@ def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None: refs = table.refs() if type: type = type.lower() - if type not in {"branch", "tag"}: + if type not in {SnapshotRefType.BRANCH, SnapshotRefType.TAG}: raise ValueError(f"Type must be either branch or tag, got: {type}") relevant_refs = [ @@ -434,7 +434,7 @@ def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None: def _retention_properties(ref: SnapshotRef, table_properties: Dict[str, str]) -> Dict[str, str]: retention_properties = {} - if ref.snapshot_ref_type == "branch": + if ref.snapshot_ref_type == SnapshotRefType.BRANCH: default_min_snapshots_to_keep = table_properties.get( "history.expire.min-snapshots-to-keep", DEFAULT_MIN_SNAPSHOTS_TO_KEEP ) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 08f554bfaa..395b8fa688 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -117,7 +117,7 @@ NameMapping, update_mapping, ) -from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef +from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( Operation, Snapshot, @@ -813,7 +813,7 @@ class AddSnapshotUpdate(IcebergBaseModel): class SetSnapshotRefUpdate(IcebergBaseModel): action: Literal["set-snapshot-ref"] = Field(default="set-snapshot-ref") ref_name: str = Field(alias="ref-name") - type: Literal["tag", "branch"] + type: Literal[SnapshotRefType.TAG, SnapshotRefType.BRANCH] snapshot_id: int = Field(alias="snapshot-id") max_ref_age_ms: Annotated[Optional[int], Field(alias="max-ref-age-ms", default=None)] max_snapshot_age_ms: Annotated[Optional[int], Field(alias="max-snapshot-age-ms", default=None)] @@ -3205,7 +3205,7 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type="branch" + snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type=SnapshotRefType.BRANCH ), ), (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=MAIN_BRANCH),), From 23f04ec039f223c3995ac27dd75e72e8f21b178f Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 27 Jan 2024 11:20:48 -0800 Subject: [PATCH 03/21] default writes to main branch (cherry picked from commit 6dbe68d8dcb282375f158fbcc0466b985092078e) --- pyiceberg/table/__init__.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 395b8fa688..5aac334764 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -397,7 +397,7 @@ def set_ref_snapshot( ), ) - requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=MAIN_BRANCH),) + requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=ref_name),) return self._apply(updates, requirements) def _set_ref_snapshot( @@ -1540,7 +1540,7 @@ def name_mapping(self) -> Optional[NameMapping]: """Return the table's field-id NameMapping.""" return self.metadata.name_mapping() - def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,branch: str = MAIN_BRANCH) -> None: """ Shorthand API for appending a PyArrow table to the table. @@ -3044,14 +3044,16 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): _added_data_files: List[DataFile] _manifest_num_counter: itertools.count[int] _deleted_data_files: Set[DataFile] + _branch: str def __init__( self, operation: Operation, transaction: Transaction, io: FileIO, + branch: str, commit_uuid: Optional[uuid.UUID] = None, - snapshot_properties: Dict[str, str] = EMPTY_DICT, + snapshot_properties: Dict[str, str] = EMPTY_DICT ) -> None: super().__init__(transaction) self.commit_uuid = commit_uuid or uuid.uuid4() @@ -3059,6 +3061,7 @@ def __init__( self._operation = operation self._snapshot_id = self._transaction.table_metadata.new_snapshot_id() # Since we only support the main branch for now + self._branch = branch self._parent_snapshot_id = ( snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None ) @@ -3205,10 +3208,10 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type=SnapshotRefType.BRANCH + snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=self._branch, type=SnapshotRefType.BRANCH ), ), - (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=MAIN_BRANCH),), + (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=self._branch),), ) @property From af6ff9aebad07cde3fdb746cfb1d3c73ddcedc83 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Thu, 18 Jul 2024 22:33:17 +0530 Subject: [PATCH 04/21] Added some more methods for branches --- pyiceberg/table/__init__.py | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 5aac334764..1cd0a2c04c 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1549,13 +1549,14 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, snapshot_properties: Custom properties to be added to the snapshot summary """ with self.transaction() as tx: - tx.append(df=df, snapshot_properties=snapshot_properties) + tx.append(df=df, snapshot_properties=snapshot_properties,branch=branch) def overwrite( self, df: pa.Table, overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT, + branch: str = MAIN_BRANCH, ) -> None: """ Shorthand for overwriting the table with a PyArrow table. @@ -1573,7 +1574,7 @@ def overwrite( snapshot_properties: Custom properties to be added to the snapshot summary """ with self.transaction() as tx: - tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties) + tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties,branch=branch) def delete( self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT @@ -3053,14 +3054,13 @@ def __init__( io: FileIO, branch: str, commit_uuid: Optional[uuid.UUID] = None, - snapshot_properties: Dict[str, str] = EMPTY_DICT + snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: super().__init__(transaction) self.commit_uuid = commit_uuid or uuid.uuid4() self._io = io self._operation = operation self._snapshot_id = self._transaction.table_metadata.new_snapshot_id() - # Since we only support the main branch for now self._branch = branch self._parent_snapshot_id = ( snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None @@ -3208,7 +3208,10 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=self._branch, type=SnapshotRefType.BRANCH + snapshot_id=self._snapshot_id, + parent_snapshot_id=self._parent_snapshot_id, + ref_name=self._branch, + type=SnapshotRefType.BRANCH, ), ), (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=self._branch),), @@ -3260,10 +3263,13 @@ def __init__( operation: Operation, transaction: Transaction, io: FileIO, + branch: str, commit_uuid: Optional[uuid.UUID] = None, snapshot_properties: Dict[str, str] = EMPTY_DICT, ): - super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) + super().__init__( + operation=operation, transaction=transaction, io=io, branch=branch, commit_uuid=commit_uuid, snapshot_properties=snapshot_properties + ) self._predicate = AlwaysFalse() def _commit(self) -> UpdatesAndRequirements: @@ -3419,10 +3425,11 @@ def __init__( operation: Operation, transaction: Transaction, io: FileIO, + branch: str, commit_uuid: Optional[uuid.UUID] = None, snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: - super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) + super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties) self._target_size_bytes = PropertyUtil.property_as_int( self._transaction.table_metadata.properties, TableProperties.MANIFEST_TARGET_SIZE_BYTES, @@ -3538,21 +3545,23 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: class UpdateSnapshot: _transaction: Transaction _io: FileIO + _branch: str _snapshot_properties: Dict[str, str] - def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + def __init__(self, transaction: Transaction, io: FileIO, branch:str, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: self._transaction = transaction self._io = io self._snapshot_properties = snapshot_properties + self._branch = branch def fast_append(self) -> FastAppendFiles: return FastAppendFiles( - operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + operation=Operation.APPEND, transaction=self._transaction, io=self._io,branch=self._branch, snapshot_properties=self._snapshot_properties ) def merge_append(self) -> MergeAppendFiles: return MergeAppendFiles( - operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + operation=Operation.APPEND, transaction=self._transaction, io=self._io,branch=self._branch, snapshot_properties=self._snapshot_properties ) def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles: @@ -3564,6 +3573,7 @@ def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles: transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties, + branch=self._branch ) def delete(self) -> DeleteFiles: @@ -3572,6 +3582,7 @@ def delete(self) -> DeleteFiles: transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties, + branch=self._branch ) From 6fbf3f18c994b4c2f8d3484ee5c43cf4c3d9991f Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 27 Jan 2024 10:46:37 -0800 Subject: [PATCH 05/21] s/"main"/MAIN_BRANCH (cherry picked from commit ad1ec672bac7cc7cf5e74defcee8a37df6c316f4) --- pyiceberg/table/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 34e9d2c53b..cb6e4efde4 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -402,7 +402,7 @@ def set_ref_snapshot( ), ) - requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref="main"),) + requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=MAIN_BRANCH),) return self._apply(updates, requirements) def _set_ref_snapshot( @@ -3227,10 +3227,10 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name="main", type="branch" + snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type="branch" ), ), - (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),), + (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=MAIN_BRANCH),), ) @property From 8ce1509c6ca5e6aa38d615c99bcdb25051bcb4ad Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 27 Jan 2024 11:17:28 -0800 Subject: [PATCH 06/21] replace string literals (cherry picked from commit 40cf10e0c5d3d2a5366b5b9b85191aa319e151d0) --- pyiceberg/cli/console.py | 6 +++--- pyiceberg/table/__init__.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pyiceberg/cli/console.py b/pyiceberg/cli/console.py index d1833df081..213883bc70 100644 --- a/pyiceberg/cli/console.py +++ b/pyiceberg/cli/console.py @@ -32,7 +32,7 @@ from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.cli.output import ConsoleOutput, JsonOutput, Output from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError -from pyiceberg.table.refs import SnapshotRef +from pyiceberg.table.refs import SnapshotRef, SnapshotRefType DEFAULT_MIN_SNAPSHOTS_TO_KEEP = 1 DEFAULT_MAX_SNAPSHOT_AGE_MS = 432000000 @@ -420,7 +420,7 @@ def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None: refs = table.refs() if type: type = type.lower() - if type not in {"branch", "tag"}: + if type not in {SnapshotRefType.BRANCH, SnapshotRefType.TAG}: raise ValueError(f"Type must be either branch or tag, got: {type}") relevant_refs = [ @@ -434,7 +434,7 @@ def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None: def _retention_properties(ref: SnapshotRef, table_properties: Dict[str, str]) -> Dict[str, str]: retention_properties = {} - if ref.snapshot_ref_type == "branch": + if ref.snapshot_ref_type == SnapshotRefType.BRANCH: default_min_snapshots_to_keep = table_properties.get( "history.expire.min-snapshots-to-keep", DEFAULT_MIN_SNAPSHOTS_TO_KEEP ) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index cb6e4efde4..b8362338f4 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -115,7 +115,7 @@ NameMapping, update_mapping, ) -from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef +from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( Operation, Snapshot, @@ -828,7 +828,7 @@ class AddSnapshotUpdate(IcebergBaseModel): class SetSnapshotRefUpdate(IcebergBaseModel): action: Literal["set-snapshot-ref"] = Field(default="set-snapshot-ref") ref_name: str = Field(alias="ref-name") - type: Literal["tag", "branch"] + type: Literal[SnapshotRefType.TAG, SnapshotRefType.BRANCH] snapshot_id: int = Field(alias="snapshot-id") max_ref_age_ms: Annotated[Optional[int], Field(alias="max-ref-age-ms", default=None)] max_snapshot_age_ms: Annotated[Optional[int], Field(alias="max-snapshot-age-ms", default=None)] @@ -3227,7 +3227,7 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type="branch" + snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type=SnapshotRefType.BRANCH ), ), (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=MAIN_BRANCH),), From 6daf29e4459c9e64403e56eab07f07457bd392a9 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 27 Jan 2024 11:20:48 -0800 Subject: [PATCH 07/21] default writes to main branch (cherry picked from commit 6dbe68d8dcb282375f158fbcc0466b985092078e) --- pyiceberg/table/__init__.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b8362338f4..595adb701e 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -402,7 +402,7 @@ def set_ref_snapshot( ), ) - requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=MAIN_BRANCH),) + requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=ref_name),) return self._apply(updates, requirements) def _set_ref_snapshot( @@ -1562,7 +1562,7 @@ def name_mapping(self) -> Optional[NameMapping]: """Return the table's field-id NameMapping.""" return self.metadata.name_mapping() - def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,branch: str = MAIN_BRANCH) -> None: """ Shorthand API for appending a PyArrow table to the table. @@ -3066,14 +3066,16 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): _added_data_files: List[DataFile] _manifest_num_counter: itertools.count[int] _deleted_data_files: Set[DataFile] + _branch: str def __init__( self, operation: Operation, transaction: Transaction, io: FileIO, + branch: str, commit_uuid: Optional[uuid.UUID] = None, - snapshot_properties: Dict[str, str] = EMPTY_DICT, + snapshot_properties: Dict[str, str] = EMPTY_DICT ) -> None: super().__init__(transaction) self.commit_uuid = commit_uuid or uuid.uuid4() @@ -3081,6 +3083,7 @@ def __init__( self._operation = operation self._snapshot_id = self._transaction.table_metadata.new_snapshot_id() # Since we only support the main branch for now + self._branch = branch self._parent_snapshot_id = ( snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None ) @@ -3227,10 +3230,10 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type=SnapshotRefType.BRANCH + snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=self._branch, type=SnapshotRefType.BRANCH ), ), - (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=MAIN_BRANCH),), + (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=self._branch),), ) @property From 09321cdbe3ee36bff32cc389c5679308a2e6e83e Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Thu, 18 Jul 2024 22:33:17 +0530 Subject: [PATCH 08/21] Added some more methods for branches --- pyiceberg/table/__init__.py | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 595adb701e..d2a94b2e35 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1571,13 +1571,14 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, snapshot_properties: Custom properties to be added to the snapshot summary """ with self.transaction() as tx: - tx.append(df=df, snapshot_properties=snapshot_properties) + tx.append(df=df, snapshot_properties=snapshot_properties,branch=branch) def overwrite( self, df: pa.Table, overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT, + branch: str = MAIN_BRANCH, ) -> None: """ Shorthand for overwriting the table with a PyArrow table. @@ -1595,7 +1596,7 @@ def overwrite( snapshot_properties: Custom properties to be added to the snapshot summary """ with self.transaction() as tx: - tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties) + tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties,branch=branch) def delete( self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT @@ -3075,14 +3076,13 @@ def __init__( io: FileIO, branch: str, commit_uuid: Optional[uuid.UUID] = None, - snapshot_properties: Dict[str, str] = EMPTY_DICT + snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: super().__init__(transaction) self.commit_uuid = commit_uuid or uuid.uuid4() self._io = io self._operation = operation self._snapshot_id = self._transaction.table_metadata.new_snapshot_id() - # Since we only support the main branch for now self._branch = branch self._parent_snapshot_id = ( snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None @@ -3230,7 +3230,10 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=self._branch, type=SnapshotRefType.BRANCH + snapshot_id=self._snapshot_id, + parent_snapshot_id=self._parent_snapshot_id, + ref_name=self._branch, + type=SnapshotRefType.BRANCH, ), ), (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=self._branch),), @@ -3282,10 +3285,13 @@ def __init__( operation: Operation, transaction: Transaction, io: FileIO, + branch: str, commit_uuid: Optional[uuid.UUID] = None, snapshot_properties: Dict[str, str] = EMPTY_DICT, ): - super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) + super().__init__( + operation=operation, transaction=transaction, io=io, branch=branch, commit_uuid=commit_uuid, snapshot_properties=snapshot_properties + ) self._predicate = AlwaysFalse() def _commit(self) -> UpdatesAndRequirements: @@ -3441,10 +3447,11 @@ def __init__( operation: Operation, transaction: Transaction, io: FileIO, + branch: str, commit_uuid: Optional[uuid.UUID] = None, snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: - super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) + super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties) self._target_size_bytes = PropertyUtil.property_as_int( self._transaction.table_metadata.properties, TableProperties.MANIFEST_TARGET_SIZE_BYTES, @@ -3560,21 +3567,23 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: class UpdateSnapshot: _transaction: Transaction _io: FileIO + _branch: str _snapshot_properties: Dict[str, str] - def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + def __init__(self, transaction: Transaction, io: FileIO, branch:str, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: self._transaction = transaction self._io = io self._snapshot_properties = snapshot_properties + self._branch = branch def fast_append(self) -> FastAppendFiles: return FastAppendFiles( - operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + operation=Operation.APPEND, transaction=self._transaction, io=self._io,branch=self._branch, snapshot_properties=self._snapshot_properties ) def merge_append(self) -> MergeAppendFiles: return MergeAppendFiles( - operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + operation=Operation.APPEND, transaction=self._transaction, io=self._io,branch=self._branch, snapshot_properties=self._snapshot_properties ) def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles: @@ -3586,6 +3595,7 @@ def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles: transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties, + branch=self._branch ) def delete(self) -> DeleteFiles: @@ -3594,6 +3604,7 @@ def delete(self) -> DeleteFiles: transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties, + branch=self._branch ) From 45b01a68e1d4572494082a09b49883ecdf00b329 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Sun, 13 Oct 2024 04:50:17 +0530 Subject: [PATCH 09/21] Updated antries for branches --- pyiceberg/table/__init__.py | 32 +++++++++++++++---------- pyiceberg/table/update/__init__.py | 4 ++-- pyiceberg/table/update/snapshot.py | 38 +++++++++++++++++++++++------- 3 files changed, 52 insertions(+), 22 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 52dca937f8..5f4f24f40a 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -78,7 +78,7 @@ from pyiceberg.table.name_mapping import ( NameMapping, ) -from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType +from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef from pyiceberg.table.snapshots import ( Snapshot, SnapshotLogEntry, @@ -402,21 +402,22 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive name_mapping=self.table_metadata.name_mapping(), ) - def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot: + def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> UpdateSnapshot: """Create a new UpdateSnapshot to produce a new snapshot for the table. Returns: A new UpdateSnapshot """ - return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties) + return UpdateSnapshot(self, io=self._table.io, branch=branch, snapshot_properties=snapshot_properties) - def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> None: """ Shorthand API for appending a PyArrow table to a table transaction. Args: df: The Arrow dataframe that will be appended to overwrite the table snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the overwrite operation """ try: import pyarrow as pa @@ -444,7 +445,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) TableProperties.MANIFEST_MERGE_ENABLED, TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, ) - update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties) + update_snapshot = self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties) append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append with append_method() as append_files: @@ -461,6 +462,7 @@ def overwrite( df: pa.Table, overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT, + branch: str = MAIN_BRANCH, ) -> None: """ Shorthand for adding a table overwrite with a PyArrow table to the transaction. @@ -476,6 +478,7 @@ def overwrite( overwrite_filter: ALWAYS_TRUE when you overwrite all the data, or a boolean expression in case of a partial overwrite snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the overwrite operation """ try: import pyarrow as pa @@ -500,7 +503,7 @@ def overwrite( self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties) - with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot: + with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).fast_append() as update_snapshot: # skip writing data files if the dataframe is empty if df.shape[0] > 0: data_files = _dataframe_to_data_files( @@ -509,7 +512,12 @@ def overwrite( for data_file in data_files: update_snapshot.append_data_file(data_file) - def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + def delete( + self, + delete_filter: Union[str, BooleanExpression], + snapshot_properties: Dict[str, str] = EMPTY_DICT, + branch: str = MAIN_BRANCH, + ) -> None: """ Shorthand for deleting record from a table. @@ -537,7 +545,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti if isinstance(delete_filter, str): delete_filter = _parse_row_filter(delete_filter) - with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot: + with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).delete() as delete_snapshot: delete_snapshot.delete_by_predicate(delete_filter) # Check if there are any files that require an actual rewrite of a data file @@ -585,7 +593,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti )) if len(replaced_files) > 0: - with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite( + with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).overwrite( commit_uuid=commit_uuid ) as overwrite_snapshot: for original_data_file, replaced_data_files in replaced_files: @@ -1003,7 +1011,7 @@ def name_mapping(self) -> Optional[NameMapping]: """Return the table's field-id NameMapping.""" return self.metadata.name_mapping() - def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,branch: str = MAIN_BRANCH) -> None: + def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> None: """ Shorthand API for appending a PyArrow table to the table. @@ -1012,7 +1020,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, snapshot_properties: Custom properties to be added to the snapshot summary """ with self.transaction() as tx: - tx.append(df=df, snapshot_properties=snapshot_properties,branch=branch) + tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch) def overwrite( self, @@ -1037,7 +1045,7 @@ def overwrite( snapshot_properties: Custom properties to be added to the snapshot summary """ with self.transaction() as tx: - tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties,branch=branch) + tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties, branch=branch) def delete( self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 6e14046f9a..d02fdc39b6 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -30,7 +30,7 @@ from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil -from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef +from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( MetadataLogEntry, Snapshot, @@ -136,7 +136,7 @@ class AddSnapshotUpdate(IcebergBaseModel): class SetSnapshotRefUpdate(IcebergBaseModel): action: Literal["set-snapshot-ref"] = Field(default="set-snapshot-ref") ref_name: str = Field(alias="ref-name") - type: Literal["tag", "branch"] + type: Literal[SnapshotRefType.TAG, SnapshotRefType.BRANCH] snapshot_id: int = Field(alias="snapshot-id") max_ref_age_ms: Annotated[Optional[int], Field(alias="max-ref-age-ms", default=None)] max_snapshot_age_ms: Annotated[Optional[int], Field(alias="max-snapshot-age-ms", default=None)] diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 47e5fc55e3..3af3ba9a23 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -55,6 +55,7 @@ from pyiceberg.partitioning import ( PartitionSpec, ) +from pyiceberg.table.refs import SnapshotRefType from pyiceberg.table.snapshots import ( Operation, Snapshot, @@ -103,12 +104,14 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): _added_data_files: List[DataFile] _manifest_num_counter: itertools.count[int] _deleted_data_files: Set[DataFile] + _branch: str def __init__( self, operation: Operation, transaction: Transaction, io: FileIO, + branch: str, commit_uuid: Optional[uuid.UUID] = None, snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: @@ -117,7 +120,7 @@ def __init__( self._io = io self._operation = operation self._snapshot_id = self._transaction.table_metadata.new_snapshot_id() - # Since we only support the main branch for now + self._branch = branch self._parent_snapshot_id = ( snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None ) @@ -272,10 +275,13 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name="main", type="branch" + snapshot_id=self._snapshot_id, + parent_snapshot_id=self._parent_snapshot_id, + ref_name=self._branch, + type=SnapshotRefType.BRANCH, ), ), - (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),), + (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=self._branch),), ) @property @@ -324,10 +330,11 @@ def __init__( operation: Operation, transaction: Transaction, io: FileIO, + branch: str, commit_uuid: Optional[uuid.UUID] = None, snapshot_properties: Dict[str, str] = EMPTY_DICT, ): - super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) + super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties) self._predicate = AlwaysFalse() def _commit(self) -> UpdatesAndRequirements: @@ -482,12 +489,13 @@ def __init__( operation: Operation, transaction: Transaction, io: FileIO, + branch: str, commit_uuid: Optional[uuid.UUID] = None, snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: from pyiceberg.table import TableProperties - super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) + super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties) self._target_size_bytes = property_as_int( self._transaction.table_metadata.properties, TableProperties.MANIFEST_TARGET_SIZE_BYTES, @@ -603,21 +611,33 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: class UpdateSnapshot: _transaction: Transaction _io: FileIO + _branch: str _snapshot_properties: Dict[str, str] - def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + def __init__( + self, transaction: Transaction, io: FileIO, branch: str, snapshot_properties: Dict[str, str] = EMPTY_DICT + ) -> None: self._transaction = transaction self._io = io self._snapshot_properties = snapshot_properties + self._branch = branch def fast_append(self) -> _FastAppendFiles: return _FastAppendFiles( - operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + operation=Operation.APPEND, + transaction=self._transaction, + io=self._io, + branch=self._branch, + snapshot_properties=self._snapshot_properties, ) def merge_append(self) -> _MergeAppendFiles: return _MergeAppendFiles( - operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + operation=Operation.APPEND, + transaction=self._transaction, + io=self._io, + branch=self._branch, + snapshot_properties=self._snapshot_properties, ) def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles: @@ -628,6 +648,7 @@ def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles: else Operation.APPEND, transaction=self._transaction, io=self._io, + branch=self._branch, snapshot_properties=self._snapshot_properties, ) @@ -636,6 +657,7 @@ def delete(self) -> _DeleteFiles: operation=Operation.DELETE, transaction=self._transaction, io=self._io, + branch=self._branch, snapshot_properties=self._snapshot_properties, ) From 917b044b2a3e244a7a25919325cc619fd8992500 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Tue, 15 Oct 2024 05:23:32 +0530 Subject: [PATCH 10/21] Fixed some bugs --- pyiceberg/table/__init__.py | 3 ++- pyiceberg/table/update/__init__.py | 2 ++ pyiceberg/table/update/snapshot.py | 9 ++++++++- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 5f4f24f40a..2260cd150b 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -417,7 +417,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, Args: df: The Arrow dataframe that will be appended to overwrite the table snapshot_properties: Custom properties to be added to the snapshot summary - branch: Branch Reference to run the overwrite operation + branch: Branch Reference to run the append operation """ try: import pyarrow as pa @@ -529,6 +529,7 @@ def delete( Args: delete_filter: A boolean expression to delete rows from a table snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the delete operation """ from pyiceberg.io.pyarrow import ( ArrowScan, diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index d02fdc39b6..03b9855307 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -597,6 +597,8 @@ class AssertRefSnapshotId(ValidatableTableRequirement): def validate(self, base_metadata: Optional[TableMetadata]) -> None: if base_metadata is None: raise CommitFailedException("Requirement failed: current table metadata is missing") + elif len(base_metadata.snapshots) == 0 and self.ref != MAIN_BRANCH: + raise CommitFailedException(f"Requirement failed: No snapshot available in table for ref: {self.ref}") elif snapshot_ref := base_metadata.refs.get(self.ref): ref_type = snapshot_ref.snapshot_ref_type if self.snapshot_id is None: diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 3af3ba9a23..902141fa5c 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -281,7 +281,14 @@ def _commit(self) -> UpdatesAndRequirements: type=SnapshotRefType.BRANCH, ), ), - (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=self._branch),), + ( + AssertRefSnapshotId( + snapshot_id=self._transaction.table_metadata.refs[self._branch].snapshot_id + if self._branch in self._transaction.table_metadata.refs + else self._transaction.table_metadata.current_snapshot_id, + ref=self._branch, + ), + ), ) @property From 398f6c0ee33cee4ba8564d62ede143de52c1243f Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Wed, 16 Oct 2024 05:29:23 +0530 Subject: [PATCH 11/21] Fixed bugs in delete and overwrite --- pyiceberg/table/__init__.py | 14 ++++++++++---- pyiceberg/table/update/snapshot.py | 4 ++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 2260cd150b..662a9795d2 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -501,7 +501,7 @@ def overwrite( self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us ) - self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties) + self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties,branch=branch) with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).fast_append() as update_snapshot: # skip writing data files if the dataframe is empty @@ -554,7 +554,7 @@ def delete( bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive=True) preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter) - files = self._scan(row_filter=delete_filter).plan_files() + files = self._scan(row_filter=delete_filter).use_ref(branch).plan_files() commit_uuid = uuid.uuid4() counter = itertools.count(0) @@ -1019,6 +1019,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, Args: df: The Arrow dataframe that will be appended to overwrite the table snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the delete operation """ with self.transaction() as tx: tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch) @@ -1044,12 +1045,16 @@ def overwrite( overwrite_filter: ALWAYS_TRUE when you overwrite all the data, or a boolean expression in case of a partial overwrite snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the delete operation """ with self.transaction() as tx: tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties, branch=branch) def delete( - self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT + self, + delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + branch: str = MAIN_BRANCH, ) -> None: """ Shorthand for deleting rows from the table. @@ -1057,9 +1062,10 @@ def delete( Args: delete_filter: The predicate that used to remove rows snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the delete operation """ with self.transaction() as tx: - tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties) + tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch) def add_files( self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 902141fa5c..135e4f5293 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -122,7 +122,7 @@ def __init__( self._snapshot_id = self._transaction.table_metadata.new_snapshot_id() self._branch = branch self._parent_snapshot_id = ( - snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None + snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.snapshot_by_name(self._branch)) else None ) self._added_data_files = [] self._deleted_data_files = set() @@ -548,7 +548,7 @@ def _existing_manifests(self) -> List[ManifestFile]: """Determine if there are any existing manifest files.""" existing_files = [] - if snapshot := self._transaction.table_metadata.current_snapshot(): + if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._branch): for manifest_file in snapshot.manifests(io=self._io): entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True) found_deleted_data_files = [entry.data_file for entry in entries if entry.data_file in self._deleted_data_files] From b7b8ba0b945f592d771aca936d1ceafb78a478c5 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Wed, 16 Oct 2024 12:23:35 +0530 Subject: [PATCH 12/21] Added tests and some refactoring --- pyiceberg/table/__init__.py | 2 +- pyiceberg/table/update/snapshot.py | 4 +- tests/integration/test_writes/test_writes.py | 80 +++++++++++++++++++- 3 files changed, 80 insertions(+), 6 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 662a9795d2..d34f875b29 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -501,7 +501,7 @@ def overwrite( self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us ) - self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties,branch=branch) + self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties, branch=branch) with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).fast_append() as update_snapshot: # skip writing data files if the dataframe is empty diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 135e4f5293..b3ef552c55 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -55,7 +55,7 @@ from pyiceberg.partitioning import ( PartitionSpec, ) -from pyiceberg.table.refs import SnapshotRefType +from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRefType from pyiceberg.table.snapshots import ( Operation, Snapshot, @@ -622,7 +622,7 @@ class UpdateSnapshot: _snapshot_properties: Dict[str, str] def __init__( - self, transaction: Transaction, io: FileIO, branch: str, snapshot_properties: Dict[str, str] = EMPTY_DICT + self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH ) -> None: self._transaction = transaction self._io = io diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index fc2746c614..39e7758461 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -39,7 +39,7 @@ from pyiceberg.catalog.hive import HiveCatalog from pyiceberg.catalog.rest import RestCatalog from pyiceberg.catalog.sql import SqlCatalog -from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.exceptions import CommitFailedException, NoSuchTableError from pyiceberg.expressions import And, EqualTo, GreaterThanOrEqual, In, LessThan, Not from pyiceberg.io.pyarrow import _dataframe_to_data_files from pyiceberg.partitioning import PartitionField, PartitionSpec @@ -1015,7 +1015,8 @@ def test_table_write_schema_with_valid_nullability_diff( NestedField(field_id=1, name="long", field_type=LongType(), required=False), ) other_schema = pa.schema(( - pa.field("long", pa.int64(), nullable=False), # can support writing required pyarrow field to optional Iceberg field + pa.field("long", pa.int64(), nullable=False), + # can support writing required pyarrow field to optional Iceberg field )) arrow_table = pa.Table.from_pydict( { @@ -1062,7 +1063,8 @@ def test_table_write_schema_with_valid_upcast( pa.field("list", pa.large_list(pa.int64()), nullable=False), pa.field("map", pa.map_(pa.large_string(), pa.int64()), nullable=False), pa.field("double", pa.float64(), nullable=True), # can support upcasting float to double - pa.field("uuid", pa.binary(length=16), nullable=True), # can UUID is read as fixed length binary of length 16 + pa.field("uuid", pa.binary(length=16), nullable=True), + # can UUID is read as fixed length binary of length 16 )) ) lhs = spark.table(f"{identifier}").toPandas() @@ -1448,3 +1450,75 @@ def test_rewrite_manifest_after_partition_evolution(session_catalog: Catalog) -> EqualTo("category", "A"), ), ) + + +@pytest.mark.integration +def test_append_to_non_existing_branch(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.test_non_existing_branch" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, []) + with pytest.raises(CommitFailedException, match="No snapshot available in table for ref:"): + tbl.append(arrow_table_with_null, branch="non_existing_branch") + + +@pytest.mark.integration +def test_append_to_existing_branch(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.test_existing_branch_append" + branch = "existing_branch" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) + + assert tbl.metadata.current_snapshot_id is not None + + tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit() + tbl.append(arrow_table_with_null, branch=branch) + + assert len(tbl.scan().use_ref(branch).to_arrow()) == 6 + assert len(tbl.scan().to_arrow()) == 3 + branch_snapshot = tbl.metadata.snapshot_by_name(branch) + assert branch_snapshot is not None + main_snapshot = tbl.metadata.snapshot_by_name("main") + assert main_snapshot is not None + assert branch_snapshot.parent_snapshot_id == main_snapshot.snapshot_id + + +@pytest.mark.integration +def test_delete_to_existing_branch(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.test_existing_branch_delete" + branch = "existing_branch" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) + + assert tbl.metadata.current_snapshot_id is not None + + tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit() + tbl.delete(delete_filter="int = 9", branch=branch) + + assert len(tbl.scan().use_ref(branch).to_arrow()) == 2 + assert len(tbl.scan().to_arrow()) == 3 + branch_snapshot = tbl.metadata.snapshot_by_name(branch) + assert branch_snapshot is not None + main_snapshot = tbl.metadata.snapshot_by_name("main") + assert main_snapshot is not None + assert branch_snapshot.parent_snapshot_id == main_snapshot.snapshot_id + + +@pytest.mark.integration +def test_overwrite_to_existing_branch(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.test_existing_branch_overwrite" + branch = "existing_branch" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) + + assert tbl.metadata.current_snapshot_id is not None + + tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit() + tbl.overwrite(arrow_table_with_null, branch=branch) + + assert len(tbl.scan().use_ref(branch).to_arrow()) == 3 + assert len(tbl.scan().to_arrow()) == 3 + branch_snapshot = tbl.metadata.snapshot_by_name(branch) + assert branch_snapshot is not None and branch_snapshot.parent_snapshot_id is not None + delete_snapshot = tbl.metadata.snapshot_by_id(branch_snapshot.parent_snapshot_id) + assert delete_snapshot is not None + main_snapshot = tbl.metadata.snapshot_by_name("main") + assert main_snapshot is not None + assert ( + delete_snapshot.parent_snapshot_id == main_snapshot.snapshot_id + ) # Currently overwrite is a delete followed by an append operation From ee591b476becc77193b8bb28b67d3d237da81131 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Wed, 16 Oct 2024 13:44:38 +0530 Subject: [PATCH 13/21] Added another integration test --- tests/integration/test_writes/test_writes.py | 25 ++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 39e7758461..c6b4b5257b 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -1522,3 +1522,28 @@ def test_overwrite_to_existing_branch(session_catalog: Catalog, arrow_table_with assert ( delete_snapshot.parent_snapshot_id == main_snapshot.snapshot_id ) # Currently overwrite is a delete followed by an append operation + + +@pytest.mark.integration +def test_intertwined_branch_writes(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.test_concurrent_branch_operations" + branch1 = "existing_branch_1" + branch2 = "existing_branch_2" + + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) + + assert tbl.metadata.current_snapshot_id is not None + + tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch1).commit() + + tbl.delete("int = 9", branch=branch1) + + tbl.append(arrow_table_with_null) + + tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch2).commit() + + tbl.overwrite(arrow_table_with_null, branch=branch2) + + assert len(tbl.scan().use_ref(branch1).to_arrow()) == 2 + assert len(tbl.scan().use_ref(branch2).to_arrow()) == 3 + assert len(tbl.scan().to_arrow()) == 6 From e81907d296f8fc2f2f9cc8917e67933120506b11 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Thu, 17 Oct 2024 01:59:16 +0530 Subject: [PATCH 14/21] Fixed bug: concurrent same name branch and tag writes --- pyiceberg/table/__init__.py | 3 ++- pyiceberg/table/update/__init__.py | 3 +++ pyiceberg/table/update/snapshot.py | 1 + tests/table/test_init.py | 29 ++++++++++++++++++++++------- 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index d34f875b29..3c14c565d7 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -349,7 +349,7 @@ def set_ref_snapshot( ), ) - requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=ref_name),) + requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=ref_name, ref_type=type),) return self._apply(updates, requirements) def _set_ref_snapshot( @@ -380,6 +380,7 @@ def _set_ref_snapshot( AssertRefSnapshotId( snapshot_id=self.table_metadata.refs[ref_name].snapshot_id if ref_name in self.table_metadata.refs else None, ref=ref_name, + ref_type=type, ), ) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 03b9855307..8c511b9763 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -592,6 +592,7 @@ class AssertRefSnapshotId(ValidatableTableRequirement): type: Literal["assert-ref-snapshot-id"] = Field(default="assert-ref-snapshot-id") ref: str = Field(...) + ref_type: SnapshotRefType = Field(...) snapshot_id: Optional[int] = Field(default=None, alias="snapshot-id") def validate(self, base_metadata: Optional[TableMetadata]) -> None: @@ -607,6 +608,8 @@ def validate(self, base_metadata: Optional[TableMetadata]) -> None: raise CommitFailedException( f"Requirement failed: {ref_type} {self.ref} has changed: expected id {self.snapshot_id}, found {snapshot_ref.snapshot_id}" ) + elif ref_type != self.ref_type: + raise CommitFailedException(f"Requirement failed: {ref_type} {self.ref} can't be changed to type {self.ref_type}") elif self.snapshot_id is not None: raise CommitFailedException(f"Requirement failed: branch or tag {self.ref} is missing, expected {self.snapshot_id}") diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index b3ef552c55..638f3d4d7b 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -287,6 +287,7 @@ def _commit(self) -> UpdatesAndRequirements: if self._branch in self._transaction.table_metadata.refs else self._transaction.table_metadata.current_snapshot_id, ref=self._branch, + ref_type=SnapshotRefType.BRANCH, ), ), ) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 1c4029a292..d0ae5e8da4 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -49,7 +49,7 @@ _match_deletes_to_data_file, ) from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadataUtil, TableMetadataV2, _generate_snapshot_id -from pyiceberg.table.refs import SnapshotRef +from pyiceberg.table.refs import SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( MetadataLogEntry, Operation, @@ -982,28 +982,43 @@ def test_assert_table_uuid(table_v2: Table) -> None: def test_assert_ref_snapshot_id(table_v2: Table) -> None: base_metadata = table_v2.metadata - AssertRefSnapshotId(ref="main", snapshot_id=base_metadata.current_snapshot_id).validate(base_metadata) + AssertRefSnapshotId(ref="main", snapshot_id=base_metadata.current_snapshot_id, ref_type=SnapshotRefType.BRANCH).validate( + base_metadata + ) with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"): - AssertRefSnapshotId(ref="main", snapshot_id=1).validate(None) + AssertRefSnapshotId(ref="main", snapshot_id=1, ref_type=SnapshotRefType.BRANCH).validate(None) with pytest.raises( CommitFailedException, match="Requirement failed: branch main was created concurrently", ): - AssertRefSnapshotId(ref="main", snapshot_id=None).validate(base_metadata) + AssertRefSnapshotId(ref="main", snapshot_id=None, ref_type=SnapshotRefType.BRANCH).validate(base_metadata) with pytest.raises( CommitFailedException, match="Requirement failed: branch main has changed: expected id 1, found 3055729675574597004", ): - AssertRefSnapshotId(ref="main", snapshot_id=1).validate(base_metadata) + AssertRefSnapshotId(ref="main", snapshot_id=1, ref_type=SnapshotRefType.BRANCH).validate(base_metadata) + + with pytest.raises( + CommitFailedException, + match="Requirement failed: branch or tag not_exist_branch is missing, expected 1", + ): + AssertRefSnapshotId(ref="not_exist_branch", snapshot_id=1, ref_type=SnapshotRefType.BRANCH).validate(base_metadata) + + with pytest.raises( + CommitFailedException, + match="Requirement failed: branch or tag not_exist_tag is missing, expected 1", + ): + AssertRefSnapshotId(ref="not_exist_tag", snapshot_id=1, ref_type=SnapshotRefType.TAG).validate(base_metadata) + # existing Tag in metadata: test with pytest.raises( CommitFailedException, - match="Requirement failed: branch or tag not_exist is missing, expected 1", + match="Requirement failed: tag test can't be changed to type branch", ): - AssertRefSnapshotId(ref="not_exist", snapshot_id=1).validate(base_metadata) + AssertRefSnapshotId(ref="test", snapshot_id=3051729675574597004, ref_type=SnapshotRefType.BRANCH).validate(base_metadata) def test_assert_last_assigned_field_id(table_v2: Table) -> None: From bc6fb6883c1da195d40acded2c9d1684d2444975 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Thu, 14 Nov 2024 06:28:13 +0530 Subject: [PATCH 15/21] Added integration tests with spark --- tests/integration/test_writes/test_writes.py | 60 +++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index bdfc5f124f..35e2ccf668 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -1648,7 +1648,7 @@ def test_overwrite_to_existing_branch(session_catalog: Catalog, arrow_table_with @pytest.mark.integration def test_intertwined_branch_writes(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: - identifier = "default.test_concurrent_branch_operations" + identifier = "default.test_intertwined_branch_operations" branch1 = "existing_branch_1" branch2 = "existing_branch_2" @@ -1669,3 +1669,61 @@ def test_intertwined_branch_writes(session_catalog: Catalog, arrow_table_with_nu assert len(tbl.scan().use_ref(branch1).to_arrow()) == 2 assert len(tbl.scan().use_ref(branch2).to_arrow()) == 3 assert len(tbl.scan().to_arrow()) == 6 + + +@pytest.mark.integration +def test_branch_spark_write_py_read(session_catalog: Catalog, spark: SparkSession, arrow_table_with_null: pa.Table) -> None: + # Intialize table with branch + identifier = "default.test_branch_spark_write_py_read" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) + branch = "existing_spark_branch" + + # Create branch in Spark + spark.sql(f"ALTER TABLE {identifier} CREATE BRANCH {branch}") + + # Spark Write + spark.sql( + f""" + DELETE FROM {identifier}.branch_{branch} + WHERE int = 9 + """ + ) + + # Refresh table to get new refs + tbl.refresh() + + # Python Read + assert len(tbl.scan().to_arrow()) == 3 + assert len(tbl.scan().use_ref(branch).to_arrow()) == 2 + + +@pytest.mark.integration +def test_branch_py_write_spark_read(session_catalog: Catalog, spark: SparkSession, arrow_table_with_null: pa.Table) -> None: + # Intialize table with branch + identifier = "default.test_branch_py_write_spark_read" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) + branch = "existing_py_branch" + + assert tbl.metadata.current_snapshot_id is not None + + # Create branch + tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit() + + # Python Write + tbl.delete("int = 9", branch=branch) + + # Spark Read + main_df = spark.sql( + f""" + SELECT * + FROM {identifier} + """ + ) + branch_df = spark.sql( + f""" + SELECT * + FROM {identifier}.branch_{branch} + """ + ) + assert main_df.count() == 3 + assert branch_df.count() == 2 From 82e65e155aeda7f49395d381546512032e3f6a35 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Mon, 24 Feb 2025 01:47:55 +0530 Subject: [PATCH 16/21] Fixed comments for AssertSnapshotRef --- pyiceberg/table/__init__.py | 1 - pyiceberg/table/update/__init__.py | 10 ++++++---- pyiceberg/table/update/snapshot.py | 1 - tests/integration/test_writes/test_writes.py | 4 ++-- tests/table/test_init.py | 20 +++++++++----------- 5 files changed, 17 insertions(+), 19 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 4913ac0baf..a0f420c2eb 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -341,7 +341,6 @@ def _set_ref_snapshot( AssertRefSnapshotId( snapshot_id=self.table_metadata.refs[ref_name].snapshot_id if ref_name in self.table_metadata.refs else None, ref=ref_name, - ref_type=type, ), ) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index de54eded3e..9dfe28acb1 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -609,14 +609,16 @@ class AssertRefSnapshotId(ValidatableTableRequirement): type: Literal["assert-ref-snapshot-id"] = Field(default="assert-ref-snapshot-id") ref: str = Field(...) - ref_type: SnapshotRefType = Field(...) + # ref_type: SnapshotRefType = Field(...) snapshot_id: Optional[int] = Field(default=None, alias="snapshot-id") def validate(self, base_metadata: Optional[TableMetadata]) -> None: if base_metadata is None: raise CommitFailedException("Requirement failed: current table metadata is missing") elif len(base_metadata.snapshots) == 0 and self.ref != MAIN_BRANCH: - raise CommitFailedException(f"Requirement failed: No snapshot available in table for ref: {self.ref}") + raise CommitFailedException( + f"Requirement failed: Table has no snapshots and can only be written to the {MAIN_BRANCH} BRANCH." + ) elif snapshot_ref := base_metadata.refs.get(self.ref): ref_type = snapshot_ref.snapshot_ref_type if self.snapshot_id is None: @@ -625,8 +627,8 @@ def validate(self, base_metadata: Optional[TableMetadata]) -> None: raise CommitFailedException( f"Requirement failed: {ref_type} {self.ref} has changed: expected id {self.snapshot_id}, found {snapshot_ref.snapshot_id}" ) - elif ref_type != self.ref_type: - raise CommitFailedException(f"Requirement failed: {ref_type} {self.ref} can't be changed to type {self.ref_type}") + elif ref_type == SnapshotRefType.TAG: + raise CommitFailedException(f"Requirement failed: TAG {self.ref} can't be updated once created") elif self.snapshot_id is not None: raise CommitFailedException(f"Requirement failed: branch or tag {self.ref} is missing, expected {self.snapshot_id}") diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 638f3d4d7b..b3ef552c55 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -287,7 +287,6 @@ def _commit(self) -> UpdatesAndRequirements: if self._branch in self._transaction.table_metadata.refs else self._transaction.table_metadata.current_snapshot_id, ref=self._branch, - ref_type=SnapshotRefType.BRANCH, ), ), ) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 35e2ccf668..2624fcd6bb 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -44,7 +44,7 @@ from pyiceberg.io.pyarrow import _dataframe_to_data_files from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table import TableProperties +from pyiceberg.table import TableProperties, MAIN_BRANCH from pyiceberg.table.sorting import SortDirection, SortField, SortOrder from pyiceberg.transforms import DayTransform, HourTransform, IdentityTransform from pyiceberg.types import ( @@ -1578,7 +1578,7 @@ def test_abort_table_transaction_on_exception( def test_append_to_non_existing_branch(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.test_non_existing_branch" tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, []) - with pytest.raises(CommitFailedException, match="No snapshot available in table for ref:"): + with pytest.raises(CommitFailedException, match=f"Table has no snapshots and can only be written to the {MAIN_BRANCH} BRANCH."): tbl.append(arrow_table_with_null, branch="non_existing_branch") diff --git a/tests/table/test_init.py b/tests/table/test_init.py index d0ae5e8da4..78b3d31aac 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -49,7 +49,7 @@ _match_deletes_to_data_file, ) from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadataUtil, TableMetadataV2, _generate_snapshot_id -from pyiceberg.table.refs import SnapshotRef, SnapshotRefType +from pyiceberg.table.refs import SnapshotRef from pyiceberg.table.snapshots import ( MetadataLogEntry, Operation, @@ -982,43 +982,41 @@ def test_assert_table_uuid(table_v2: Table) -> None: def test_assert_ref_snapshot_id(table_v2: Table) -> None: base_metadata = table_v2.metadata - AssertRefSnapshotId(ref="main", snapshot_id=base_metadata.current_snapshot_id, ref_type=SnapshotRefType.BRANCH).validate( - base_metadata - ) + AssertRefSnapshotId(ref="main", snapshot_id=base_metadata.current_snapshot_id).validate(base_metadata) with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"): - AssertRefSnapshotId(ref="main", snapshot_id=1, ref_type=SnapshotRefType.BRANCH).validate(None) + AssertRefSnapshotId(ref="main", snapshot_id=1).validate(None) with pytest.raises( CommitFailedException, match="Requirement failed: branch main was created concurrently", ): - AssertRefSnapshotId(ref="main", snapshot_id=None, ref_type=SnapshotRefType.BRANCH).validate(base_metadata) + AssertRefSnapshotId(ref="main", snapshot_id=None).validate(base_metadata) with pytest.raises( CommitFailedException, match="Requirement failed: branch main has changed: expected id 1, found 3055729675574597004", ): - AssertRefSnapshotId(ref="main", snapshot_id=1, ref_type=SnapshotRefType.BRANCH).validate(base_metadata) + AssertRefSnapshotId(ref="main", snapshot_id=1).validate(base_metadata) with pytest.raises( CommitFailedException, match="Requirement failed: branch or tag not_exist_branch is missing, expected 1", ): - AssertRefSnapshotId(ref="not_exist_branch", snapshot_id=1, ref_type=SnapshotRefType.BRANCH).validate(base_metadata) + AssertRefSnapshotId(ref="not_exist_branch", snapshot_id=1).validate(base_metadata) with pytest.raises( CommitFailedException, match="Requirement failed: branch or tag not_exist_tag is missing, expected 1", ): - AssertRefSnapshotId(ref="not_exist_tag", snapshot_id=1, ref_type=SnapshotRefType.TAG).validate(base_metadata) + AssertRefSnapshotId(ref="not_exist_tag", snapshot_id=1).validate(base_metadata) # existing Tag in metadata: test with pytest.raises( CommitFailedException, - match="Requirement failed: tag test can't be changed to type branch", + match="Requirement failed: TAG test can't be updated once created", ): - AssertRefSnapshotId(ref="test", snapshot_id=3051729675574597004, ref_type=SnapshotRefType.BRANCH).validate(base_metadata) + AssertRefSnapshotId(ref="test", snapshot_id=3051729675574597004).validate(base_metadata) def test_assert_last_assigned_field_id(table_v2: Table) -> None: From 82e5b906206ab466775cbd44e5c2a7be66c922d1 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Mon, 24 Feb 2025 02:08:22 +0530 Subject: [PATCH 17/21] Fixed comments and linter issues --- tests/integration/test_writes/test_writes.py | 7 +++++-- tests/table/test_init.py | 6 +++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 2624fcd6bb..0af5b1e8a4 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -44,7 +44,8 @@ from pyiceberg.io.pyarrow import _dataframe_to_data_files from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table import TableProperties, MAIN_BRANCH +from pyiceberg.table import TableProperties +from pyiceberg.table.refs import MAIN_BRANCH from pyiceberg.table.sorting import SortDirection, SortField, SortOrder from pyiceberg.transforms import DayTransform, HourTransform, IdentityTransform from pyiceberg.types import ( @@ -1578,7 +1579,9 @@ def test_abort_table_transaction_on_exception( def test_append_to_non_existing_branch(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.test_non_existing_branch" tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, []) - with pytest.raises(CommitFailedException, match=f"Table has no snapshots and can only be written to the {MAIN_BRANCH} BRANCH."): + with pytest.raises( + CommitFailedException, match=f"Table has no snapshots and can only be written to the {MAIN_BRANCH} BRANCH." + ): tbl.append(arrow_table_with_null, branch="non_existing_branch") diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 78b3d31aac..7a416ac83c 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -49,7 +49,7 @@ _match_deletes_to_data_file, ) from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadataUtil, TableMetadataV2, _generate_snapshot_id -from pyiceberg.table.refs import SnapshotRef +from pyiceberg.table.refs import SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( MetadataLogEntry, Operation, @@ -1012,6 +1012,10 @@ def test_assert_ref_snapshot_id(table_v2: Table) -> None: AssertRefSnapshotId(ref="not_exist_tag", snapshot_id=1).validate(base_metadata) # existing Tag in metadata: test + ref_tag = table_v2.refs().get("test") + assert ref_tag is not None + assert ref_tag.snapshot_ref_type == SnapshotRefType.TAG, "TAG test should be present in table to be tested" + with pytest.raises( CommitFailedException, match="Requirement failed: TAG test can't be updated once created", From 84d0971a100e66a6fd8118806498ab21e397dc6c Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Mon, 24 Feb 2025 02:11:55 +0530 Subject: [PATCH 18/21] Fixed comments --- pyiceberg/table/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index a0f420c2eb..b90936c532 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -988,7 +988,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, Args: df: The Arrow dataframe that will be appended to overwrite the table snapshot_properties: Custom properties to be added to the snapshot summary - branch: Branch Reference to run the delete operation + branch: Branch Reference to run the append operation """ with self.transaction() as tx: tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch) @@ -1014,7 +1014,7 @@ def overwrite( overwrite_filter: ALWAYS_TRUE when you overwrite all the data, or a boolean expression in case of a partial overwrite snapshot_properties: Custom properties to be added to the snapshot summary - branch: Branch Reference to run the delete operation + branch: Branch Reference to run the overwrite operation """ with self.transaction() as tx: tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties, branch=branch) From 3efe53cc35fe761c2e329599823a8b3accd4b327 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Mon, 24 Feb 2025 02:22:28 +0530 Subject: [PATCH 19/21] Fixed comments --- tests/table/test_init.py | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 7a416ac83c..42c066ef01 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -49,7 +49,7 @@ _match_deletes_to_data_file, ) from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadataUtil, TableMetadataV2, _generate_snapshot_id -from pyiceberg.table.refs import SnapshotRef, SnapshotRefType +from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( MetadataLogEntry, Operation, @@ -982,34 +982,31 @@ def test_assert_table_uuid(table_v2: Table) -> None: def test_assert_ref_snapshot_id(table_v2: Table) -> None: base_metadata = table_v2.metadata - AssertRefSnapshotId(ref="main", snapshot_id=base_metadata.current_snapshot_id).validate(base_metadata) + AssertRefSnapshotId(ref=MAIN_BRANCH, snapshot_id=base_metadata.current_snapshot_id).validate(base_metadata) with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"): - AssertRefSnapshotId(ref="main", snapshot_id=1).validate(None) + AssertRefSnapshotId(ref=MAIN_BRANCH, snapshot_id=1).validate(None) with pytest.raises( CommitFailedException, - match="Requirement failed: branch main was created concurrently", + match=f"Requirement failed: branch {MAIN_BRANCH} was created concurrently", ): - AssertRefSnapshotId(ref="main", snapshot_id=None).validate(base_metadata) + AssertRefSnapshotId(ref=MAIN_BRANCH, snapshot_id=None).validate(base_metadata) with pytest.raises( CommitFailedException, - match="Requirement failed: branch main has changed: expected id 1, found 3055729675574597004", + match=f"Requirement failed: branch {MAIN_BRANCH} has changed: expected id 1, found 3055729675574597004", ): - AssertRefSnapshotId(ref="main", snapshot_id=1).validate(base_metadata) + AssertRefSnapshotId(ref=MAIN_BRANCH, snapshot_id=1).validate(base_metadata) - with pytest.raises( - CommitFailedException, - match="Requirement failed: branch or tag not_exist_branch is missing, expected 1", - ): - AssertRefSnapshotId(ref="not_exist_branch", snapshot_id=1).validate(base_metadata) + non_existing_ref = "not_exist_branch_or_tag" + assert table_v2.refs().get("not_exist_branch_or_tag") is None with pytest.raises( CommitFailedException, - match="Requirement failed: branch or tag not_exist_tag is missing, expected 1", + match=f"Requirement failed: branch or tag {non_existing_ref} is missing, expected 1", ): - AssertRefSnapshotId(ref="not_exist_tag", snapshot_id=1).validate(base_metadata) + AssertRefSnapshotId(ref=non_existing_ref, snapshot_id=1).validate(base_metadata) # existing Tag in metadata: test ref_tag = table_v2.refs().get("test") From dfedc63c04328d89fb61b0dcb5799ad39a94c4ff Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Mon, 24 Feb 2025 05:42:32 +0530 Subject: [PATCH 20/21] Fixed a bug in tests --- pyiceberg/table/update/__init__.py | 5 ++--- tests/table/test_init.py | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 9dfe28acb1..671b3b9e94 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -609,7 +609,6 @@ class AssertRefSnapshotId(ValidatableTableRequirement): type: Literal["assert-ref-snapshot-id"] = Field(default="assert-ref-snapshot-id") ref: str = Field(...) - # ref_type: SnapshotRefType = Field(...) snapshot_id: Optional[int] = Field(default=None, alias="snapshot-id") def validate(self, base_metadata: Optional[TableMetadata]) -> None: @@ -624,11 +623,11 @@ def validate(self, base_metadata: Optional[TableMetadata]) -> None: if self.snapshot_id is None: raise CommitFailedException(f"Requirement failed: {ref_type} {self.ref} was created concurrently") elif self.snapshot_id != snapshot_ref.snapshot_id: + if ref_type == SnapshotRefType.TAG: + raise CommitFailedException(f"Requirement failed: TAG {self.ref} can't be updated once created") raise CommitFailedException( f"Requirement failed: {ref_type} {self.ref} has changed: expected id {self.snapshot_id}, found {snapshot_ref.snapshot_id}" ) - elif ref_type == SnapshotRefType.TAG: - raise CommitFailedException(f"Requirement failed: TAG {self.ref} can't be updated once created") elif self.snapshot_id is not None: raise CommitFailedException(f"Requirement failed: branch or tag {self.ref} is missing, expected {self.snapshot_id}") diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 42c066ef01..89e0577ae8 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -1017,7 +1017,7 @@ def test_assert_ref_snapshot_id(table_v2: Table) -> None: CommitFailedException, match="Requirement failed: TAG test can't be updated once created", ): - AssertRefSnapshotId(ref="test", snapshot_id=3051729675574597004).validate(base_metadata) + AssertRefSnapshotId(ref="test", snapshot_id=3055729675574597004).validate(base_metadata) def test_assert_last_assigned_field_id(table_v2: Table) -> None: From 076a6d5c040ca19cdc58949e7c533418211dc027 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Mon, 24 Feb 2025 05:55:59 +0530 Subject: [PATCH 21/21] Fixed some more tests --- pyiceberg/table/update/__init__.py | 2 -- tests/table/test_init.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 671b3b9e94..249de747de 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -623,8 +623,6 @@ def validate(self, base_metadata: Optional[TableMetadata]) -> None: if self.snapshot_id is None: raise CommitFailedException(f"Requirement failed: {ref_type} {self.ref} was created concurrently") elif self.snapshot_id != snapshot_ref.snapshot_id: - if ref_type == SnapshotRefType.TAG: - raise CommitFailedException(f"Requirement failed: TAG {self.ref} can't be updated once created") raise CommitFailedException( f"Requirement failed: {ref_type} {self.ref} has changed: expected id {self.snapshot_id}, found {snapshot_ref.snapshot_id}" ) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 89e0577ae8..15f27aba61 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -1015,7 +1015,7 @@ def test_assert_ref_snapshot_id(table_v2: Table) -> None: with pytest.raises( CommitFailedException, - match="Requirement failed: TAG test can't be updated once created", + match="Requirement failed: tag test has changed: expected id 3055729675574597004, found 3051729675574597004", ): AssertRefSnapshotId(ref="test", snapshot_id=3055729675574597004).validate(base_metadata)