Skip to content

Commit 945d61d

Browse files
author
Yingjian Wu
committed
abort the whole transaction if any update on the chain has failed
1 parent ff3a249 commit 945d61d

File tree

2 files changed

+31
-3
lines changed

2 files changed

+31
-3
lines changed

pyiceberg/table/__init__.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from dataclasses import dataclass
2424
from functools import cached_property
2525
from itertools import chain
26+
from types import TracebackType
2627
from typing import (
2728
TYPE_CHECKING,
2829
Any,
@@ -33,6 +34,7 @@
3334
Optional,
3435
Set,
3536
Tuple,
37+
Type,
3638
TypeVar,
3739
Union,
3840
)
@@ -231,9 +233,12 @@ def __enter__(self) -> Transaction:
231233
"""Start a transaction to update the table."""
232234
return self
233235

234-
def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
235-
"""Close and commit the transaction."""
236-
self.commit_transaction()
236+
def __exit__(
237+
self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
238+
) -> None:
239+
"""Close and commit the transaction if no exceptions have been raised."""
240+
if exctype is None and excinst is None and exctb is None:
241+
self.commit_transaction()
237242

238243
def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...] = ()) -> Transaction:
239244
"""Check if the requirements are met, and applies the updates to the metadata."""

tests/integration/test_writes/test_writes.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1448,3 +1448,26 @@ def test_rewrite_manifest_after_partition_evolution(session_catalog: Catalog) ->
14481448
EqualTo("category", "A"),
14491449
),
14501450
)
1451+
1452+
1453+
@pytest.mark.integration
1454+
@pytest.mark.parametrize("format_version", [1, 2])
1455+
def test_abort_table_transaction_on_exception(
1456+
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
1457+
) -> None:
1458+
identifier = "default.table_test_abort_table_transaction_on_exception"
1459+
tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version})
1460+
1461+
# Pre-populate some data
1462+
tbl.append(arrow_table_with_null)
1463+
assert len(tbl.scan().to_pandas()) == 3
1464+
1465+
# try to commit a transaction that raises exception at the middle
1466+
with pytest.raises(ValueError):
1467+
with tbl.transaction() as txn:
1468+
txn.append(arrow_table_with_null)
1469+
raise ValueError
1470+
txn.append(arrow_table_with_null) # type: ignore
1471+
1472+
# Validate the transaction is aborted
1473+
assert len(tbl.scan().to_pandas()) == 3 # type: ignore

0 commit comments

Comments
 (0)