Skip to content

Commit e7ea131

Browse files
committed
fix code format
1 parent dac2adc commit e7ea131

File tree

2 files changed

+83
-24
lines changed

2 files changed

+83
-24
lines changed

pyiceberg/table/update/snapshot.py

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
from sortedcontainers import SortedList
2929

30+
from pyiceberg.exceptions import CommitFailedException
3031
from pyiceberg.expressions import (
3132
AlwaysFalse,
3233
BooleanExpression,
@@ -83,7 +84,7 @@
8384
from pyiceberg.utils.properties import property_as_bool, property_as_int
8485

8586
if TYPE_CHECKING:
86-
from pyiceberg.table import Transaction
87+
from pyiceberg.table import Transaction, Table
8788

8889

8990
def _new_manifest_file_name(num: int, commit_uuid: uuid.UUID) -> str:
@@ -240,20 +241,20 @@ def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
240241
truncate_full_table=self._operation == Operation.OVERWRITE,
241242
)
242243

243-
def refresh(self) -> TableMetadata:
244-
try:
245-
table = self._transaction._table.refresh()
246-
return table.metadata
247-
except Exception:
248-
return self._transaction._table.metadata
244+
def refresh(self) -> Table:
245+
table = self._transaction._table.refresh()
246+
return table
249247

250248
@abstractmethod
251-
def _validate(self, current_metadata: TableMetadata, Snapshot: Optional[Snapshot]) -> None: ...
249+
def _validate(self, current_metadata: TableMetadata, snapshot: Optional[Snapshot]) -> None: ...
252250

253251
def _commit(self) -> UpdatesAndRequirements:
254252
current_snapshot = self._transaction.table_metadata.current_snapshot()
255-
table_metadata = self.refresh()
256-
self._validate(table_metadata, current_snapshot)
253+
if current_snapshot is not None:
254+
table = self.refresh()
255+
if table is None:
256+
raise CommitFailedException("Table is none.")
257+
self._validate(table.metadata, current_snapshot)
257258

258259
new_manifests = self._manifests()
259260
next_sequence_number = self._transaction.table_metadata.next_sequence_number()
@@ -461,12 +462,12 @@ def files_affected(self) -> bool:
461462
"""Indicate if any manifest-entries can be dropped."""
462463
return len(self._deleted_entries()) > 0
463464

464-
def _validate(self, current_metadata: TableMetadata, Snapshot: Optional[Snapshot]) -> None:
465-
if Snapshot is None:
466-
raise ValueError("Snapshot cannot be None.")
467-
468-
if Snapshot.snapshot_id != current_metadata.snapshot_id:
469-
raise ValueError("Operation conflicts are not allowed when performing deleting.")
465+
def _validate(self, current_metadata: TableMetadata, snapshot: Optional[Snapshot]) -> None:
466+
if snapshot is None:
467+
raise CommitFailedException("Snapshot cannot be None.")
468+
current_snapshot_id = current_metadata.current_snapshot_id
469+
if current_snapshot_id != None and snapshot.snapshot_id != current_snapshot_id:
470+
raise CommitFailedException("Operation conflicts are not allowed when performing deleting.")
470471
return
471472

472473

@@ -498,7 +499,7 @@ def _deleted_entries(self) -> List[ManifestEntry]:
498499
"""
499500
return []
500501

501-
def _validate(self, current_metadata: TableMetadata, Snapshot: Optional[Snapshot]) -> None:
502+
def _validate(self, current_metadata: TableMetadata, snapshot: Optional[Snapshot]) -> None:
502503
"""Other operations don't affect the appending operation, and we can just append."""
503504
return
504505

@@ -630,12 +631,12 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]:
630631
else:
631632
return []
632633

633-
def _validate(self, current_metadata: TableMetadata, Snapshot: Optional[Snapshot]) -> None:
634-
if Snapshot is None:
635-
raise ValueError("Snapshot cannot be None.")
636-
637-
if Snapshot.snapshot_id != current_metadata.snapshot_id:
638-
raise ValueError("Operation conflicts are not allowed when performing overwriting.")
634+
def _validate(self, current_metadata: TableMetadata, snapshot: Optional[Snapshot]) -> None:
635+
if snapshot is None:
636+
raise CommitFailedException("Snapshot cannot be None.")
637+
current_snapshot_id = current_metadata.current_snapshot_id
638+
if current_snapshot_id != None and snapshot.snapshot_id != current_snapshot_id:
639+
raise CommitFailedException("Operation conflicts are not allowed when performing overwriting.")
639640
return
640641

641642

tests/integration/test_add_files.py

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from pytest_mock.plugin import MockerFixture
3232

3333
from pyiceberg.catalog import Catalog
34-
from pyiceberg.exceptions import NoSuchTableError
34+
from pyiceberg.exceptions import NoSuchTableError, CommitFailedException
3535
from pyiceberg.io import FileIO
3636
from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _pyarrow_schema_ensure_large_types
3737
from pyiceberg.manifest import DataFile
@@ -903,3 +903,61 @@ def test_add_files_that_referenced_by_current_snapshot_with_check_duplicate_file
903903
with pytest.raises(ValueError) as exc_info:
904904
tbl.add_files(file_paths=[existing_files_in_table], check_duplicate_files=True)
905905
assert f"Cannot add files that are already referenced by table, files: {existing_files_in_table}" in str(exc_info.value)
906+
907+
@pytest.mark.integration
908+
@pytest.mark.parametrize("format_version", [1, 2])
909+
def test_conflict_delete_delete(
910+
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
911+
) -> None:
912+
identifier = "default.test_conflict"
913+
tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null])
914+
tbl2 = session_catalog.load_table(identifier)
915+
916+
tbl1.delete("string == 'z'")
917+
918+
with pytest.raises(CommitFailedException, match="(branch main has changed: expected id ).*"):
919+
# tbl2 isn't aware of the commit by tbl1
920+
tbl2.delete("string == 'z'")
921+
922+
923+
@pytest.mark.integration
924+
@pytest.mark.parametrize("format_version", [1, 2])
925+
def test_conflict_delete_append(
926+
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
927+
) -> None:
928+
identifier = "default.test_conflict"
929+
tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null])
930+
tbl2 = session_catalog.load_table(identifier)
931+
932+
# This is allowed
933+
tbl1.delete("string == 'z'")
934+
tbl2.append(arrow_table_with_null)
935+
936+
937+
@pytest.mark.integration
938+
@pytest.mark.parametrize("format_version", [1, 2])
939+
def test_conflict_append_delete(
940+
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
941+
) -> None:
942+
identifier = "default.test_conflict"
943+
tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null])
944+
tbl2 = session_catalog.load_table(identifier)
945+
946+
tbl1.delete("string == 'z'")
947+
948+
with pytest.raises(CommitFailedException, match="(branch main has changed: expected id ).*"):
949+
# tbl2 isn't aware of the commit by tbl1
950+
tbl2.delete("string == 'z'")
951+
952+
953+
@pytest.mark.integration
954+
@pytest.mark.parametrize("format_version", [1, 2])
955+
def test_conflict_append_append(
956+
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
957+
) -> None:
958+
identifier = "default.test_conflict"
959+
tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null])
960+
tbl2 = session_catalog.load_table(identifier)
961+
962+
tbl1.append(arrow_table_with_null)
963+
tbl2.append(arrow_table_with_null)

0 commit comments

Comments
 (0)