Skip to content

Commit 19d52a0

Browse files
author
yinzhengsun
committed
feat: check whether table ops conflict when committing
1 parent 1c0e2b0 commit 19d52a0

File tree

1 file changed

+36
-0
lines changed

1 file changed

+36
-0
lines changed

pyiceberg/table/update/snapshot.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
from pyiceberg.partitioning import (
5656
PartitionSpec,
5757
)
58+
from pyiceberg.table.metadata import TableMetadata
5859
from pyiceberg.table.snapshots import (
5960
Operation,
6061
Snapshot,
@@ -239,7 +240,21 @@ def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
239240
truncate_full_table=self._operation == Operation.OVERWRITE,
240241
)
241242

243+
def refresh(self) -> TableMetadata:
244+
try:
245+
table = self._transaction._table.refresh()
246+
return table.metadata
247+
except Exception:
248+
return self._transaction._table.metadata
249+
250+
@abstractmethod
251+
def _validate(self, current_metadata: TableMetadata, Snapshot: Optional[Snapshot]) -> None: ...
252+
242253
def _commit(self) -> UpdatesAndRequirements:
254+
current_snapshot = self._transaction.table_metadata.current_snapshot()
255+
table_metadata = self.refresh()
256+
self._validate(table_metadata, current_snapshot)
257+
243258
new_manifests = self._manifests()
244259
next_sequence_number = self._transaction.table_metadata.next_sequence_number()
245260

@@ -249,6 +264,7 @@ def _commit(self) -> UpdatesAndRequirements:
249264
attempt=0,
250265
commit_uuid=self.commit_uuid,
251266
)
267+
252268
location_provider = self._transaction._table.location_provider()
253269
manifest_list_file_path = location_provider.new_metadata_location(file_name)
254270
with write_manifest_list(
@@ -445,6 +461,14 @@ def files_affected(self) -> bool:
445461
"""Indicate if any manifest-entries can be dropped."""
446462
return len(self._deleted_entries()) > 0
447463

464+
def _validate(self, current_metadata: TableMetadata, Snapshot: Optional[Snapshot]) -> None:
465+
if Snapshot is None:
466+
raise ValueError("Snapshot cannot be None.")
467+
468+
if Snapshot.snapshot_id != current_metadata.snapshot_id:
469+
raise ValueError("Operation conflicts are not allowed when performing deleting.")
470+
return
471+
448472

449473
class _FastAppendFiles(_SnapshotProducer["_FastAppendFiles"]):
450474
def _existing_manifests(self) -> List[ManifestFile]:
@@ -474,6 +498,10 @@ def _deleted_entries(self) -> List[ManifestEntry]:
474498
"""
475499
return []
476500

501+
def _validate(self, current_metadata: TableMetadata, Snapshot: Optional[Snapshot]) -> None:
502+
"""Other operations don't affect the appending operation, and we can just append."""
503+
return
504+
477505

478506
class _MergeAppendFiles(_FastAppendFiles):
479507
_target_size_bytes: int
@@ -602,6 +630,14 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]:
602630
else:
603631
return []
604632

633+
def _validate(self, current_metadata: TableMetadata, Snapshot: Optional[Snapshot]) -> None:
634+
if Snapshot is None:
635+
raise ValueError("Snapshot cannot be None.")
636+
637+
if Snapshot.snapshot_id != current_metadata.snapshot_id:
638+
raise ValueError("Operation conflicts are not allowed when performing overwriting.")
639+
return
640+
605641

606642
class UpdateSnapshot:
607643
_transaction: Transaction

0 commit comments

Comments
 (0)