Skip to content

Commit ce56282

Browse files
committed
Make commit_table public
1 parent 35423c4 commit ce56282

File tree

10 files changed

+109
-96
lines changed

10 files changed

+109
-96
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,12 @@
5151
from pyiceberg.serializers import ToOutputFile
5252
from pyiceberg.table import (
5353
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE,
54-
CommitTableRequest,
5554
CommitTableResponse,
5655
CreateTableTransaction,
5756
StagedTable,
5857
Table,
58+
TableRequirement,
59+
TableUpdate,
5960
update_table_metadata,
6061
)
6162
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
@@ -502,11 +503,15 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
502503
"""
503504

504505
@abstractmethod
505-
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
506-
"""Update one or more tables.
506+
def commit_table(
507+
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
508+
) -> CommitTableResponse:
509+
"""Commit updates to a table.
507510
508511
Args:
509-
table_request (CommitTableRequest): The table requests to be carried out.
512+
table (Table): The table to be updated.
513+
requirements: (Tuple[TableRequirement, ...]): Table requirements.
514+
updates: (Tuple[TableUpdate, ...]): Table updates.
510515
511516
Returns:
512517
CommitTableResponse: The updated metadata.
@@ -854,13 +859,19 @@ def _create_staged_table(
854859
catalog=self,
855860
)
856861

857-
def _update_and_stage_table(self, current_table: Optional[Table], table_request: CommitTableRequest) -> StagedTable:
858-
for requirement in table_request.requirements:
862+
def _update_and_stage_table(
863+
self,
864+
current_table: Optional[Table],
865+
table_identifier: Identifier,
866+
requirements: Tuple[TableRequirement, ...],
867+
updates: Tuple[TableUpdate, ...],
868+
) -> StagedTable:
869+
for requirement in requirements:
859870
requirement.validate(current_table.metadata if current_table else None)
860871

861872
updated_metadata = update_table_metadata(
862873
base_metadata=current_table.metadata if current_table else self._empty_table_metadata(),
863-
updates=table_request.updates,
874+
updates=updates,
864875
enforce_validation=current_table is None,
865876
metadata_location=current_table.metadata_location if current_table else None,
866877
)
@@ -869,7 +880,7 @@ def _update_and_stage_table(self, current_table: Optional[Table], table_request:
869880
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
870881

871882
return StagedTable(
872-
identifier=tuple(table_request.identifier.namespace.root + [table_request.identifier.name]),
883+
identifier=table_identifier,
873884
metadata=updated_metadata,
874885
metadata_location=new_metadata_location,
875886
io=self._load_file_io(properties=updated_metadata.properties, location=new_metadata_location),

pyiceberg/catalog/dynamodb.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
List,
2424
Optional,
2525
Set,
26+
Tuple,
2627
Union,
2728
)
2829

@@ -57,7 +58,7 @@
5758
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
5859
from pyiceberg.schema import Schema
5960
from pyiceberg.serializers import FromInputFile
60-
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
61+
from pyiceberg.table import CommitTableResponse, Table, TableRequirement, TableUpdate
6162
from pyiceberg.table.metadata import new_table_metadata
6263
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
6364
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -215,11 +216,15 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
215216
"""
216217
raise NotImplementedError
217218

218-
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
219-
"""Update the table.
219+
def commit_table(
220+
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
221+
) -> CommitTableResponse:
222+
"""Commit updates to a table.
220223
221224
Args:
222-
table_request (CommitTableRequest): The table requests to be carried out.
225+
table (Table): The table to be updated.
226+
requirements: (Tuple[TableRequirement, ...]): Table requirements.
227+
updates: (Tuple[TableUpdate, ...]): Table updates.
223228
224229
Returns:
225230
CommitTableResponse: The updated metadata.

pyiceberg/catalog/glue.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
List,
2424
Optional,
2525
Set,
26+
Tuple,
2627
Union,
2728
cast,
2829
)
@@ -69,9 +70,10 @@
6970
from pyiceberg.schema import Schema, SchemaVisitor, visit
7071
from pyiceberg.serializers import FromInputFile
7172
from pyiceberg.table import (
72-
CommitTableRequest,
7373
CommitTableResponse,
7474
Table,
75+
TableRequirement,
76+
TableUpdate,
7577
)
7678
from pyiceberg.table.metadata import TableMetadata
7779
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -449,11 +451,15 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
449451
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)
450452
return self.load_table(identifier=identifier)
451453

452-
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
453-
"""Update the table.
454+
def commit_table(
455+
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
456+
) -> CommitTableResponse:
457+
"""Commit updates to a table.
454458
455459
Args:
456-
table_request (CommitTableRequest): The table requests to be carried out.
460+
table (Table): The table to be updated.
461+
requirements: (Tuple[TableRequirement, ...]): Table requirements.
462+
updates: (Tuple[TableUpdate, ...]): Table updates.
457463
458464
Returns:
459465
CommitTableResponse: The updated metadata.
@@ -462,10 +468,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
462468
NoSuchTableError: If a table with the given identifier does not exist.
463469
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
464470
"""
465-
identifier_tuple = self._identifier_to_tuple_without_catalog(
466-
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
467-
)
468-
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple)
471+
table_identifier = self.identifier_to_tuple_without_catalog(table.identifier)
472+
database_name, table_name = table_identifier
469473

470474
current_glue_table: Optional[TableTypeDef]
471475
glue_table_version_id: Optional[str]
@@ -479,7 +483,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
479483
glue_table_version_id = None
480484
current_table = None
481485

482-
updated_staged_table = self._update_and_stage_table(current_table, table_request)
486+
updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
483487
if current_table and updated_staged_table.metadata == current_table.metadata:
484488
# no changes, do nothing
485489
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)

pyiceberg/catalog/hive.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
List,
2727
Optional,
2828
Set,
29+
Tuple,
2930
Type,
3031
Union,
3132
)
@@ -79,11 +80,12 @@
7980
from pyiceberg.schema import Schema, SchemaVisitor, visit
8081
from pyiceberg.serializers import FromInputFile
8182
from pyiceberg.table import (
82-
CommitTableRequest,
8383
CommitTableResponse,
8484
StagedTable,
8585
Table,
8686
TableProperties,
87+
TableRequirement,
88+
TableUpdate,
8789
)
8890
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
8991
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -418,11 +420,15 @@ def _do_wait_for_lock() -> LockResponse:
418420

419421
return _do_wait_for_lock()
420422

421-
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
422-
"""Update the table.
423+
def commit_table(
424+
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
425+
) -> CommitTableResponse:
426+
"""Commit updates to a table.
423427
424428
Args:
425-
table_request (CommitTableRequest): The table requests to be carried out.
429+
table (Table): The table to be updated.
430+
requirements: (Tuple[TableRequirement, ...]): Table requirements.
431+
updates: (Tuple[TableUpdate, ...]): Table updates.
426432
427433
Returns:
428434
CommitTableResponse: The updated metadata.
@@ -431,10 +437,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
431437
NoSuchTableError: If a table with the given identifier does not exist.
432438
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
433439
"""
434-
identifier_tuple = self._identifier_to_tuple_without_catalog(
435-
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
436-
)
437-
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
440+
table_identifier = self.identifier_to_tuple_without_catalog(table.identifier)
441+
database_name, table_name = table_identifier
438442
# commit to hive
439443
# https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
440444
with self._client as open_client:
@@ -445,7 +449,9 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
445449
if lock.state == LockState.WAITING:
446450
self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
447451
else:
448-
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")
452+
raise CommitFailedException(
453+
f"Failed to acquire lock for {table_identifier.identifier}, state: {lock.state}"
454+
)
449455

450456
hive_table: Optional[HiveTable]
451457
current_table: Optional[Table]
@@ -456,7 +462,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
456462
hive_table = None
457463
current_table = None
458464

459-
updated_staged_table = self._update_and_stage_table(current_table, table_request)
465+
updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
460466
if current_table and updated_staged_table.metadata == current_table.metadata:
461467
# no changes, do nothing
462468
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
@@ -486,7 +492,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
486492
)
487493
self._create_hive_table(open_client, hive_table)
488494
except WaitingForLockException as e:
489-
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") from e
495+
raise CommitFailedException(f"Failed to acquire lock for {table.identifier}, state: {lock.state}") from e
490496
finally:
491497
open_client.unlock(UnlockRequest(lockid=lock.lockid))
492498

pyiceberg/catalog/noop.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,19 @@
1919
List,
2020
Optional,
2121
Set,
22+
Tuple,
2223
Union,
2324
)
2425

2526
from pyiceberg.catalog import Catalog, PropertiesUpdateSummary
2627
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
2728
from pyiceberg.schema import Schema
2829
from pyiceberg.table import (
29-
CommitTableRequest,
3030
CommitTableResponse,
3131
CreateTableTransaction,
3232
Table,
33+
TableRequirement,
34+
TableUpdate,
3335
)
3436
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
3537
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -91,7 +93,9 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:
9193
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
9294
raise NotImplementedError
9395

94-
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
96+
def commit_table(
97+
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
98+
) -> CommitTableResponse:
9599
raise NotImplementedError
96100

97101
def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:

pyiceberg/catalog/rest.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@
6666
StagedTable,
6767
Table,
6868
TableIdentifier,
69+
TableRequirement,
70+
TableUpdate,
6971
)
7072
from pyiceberg.table.metadata import TableMetadata
7173
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
@@ -719,12 +721,15 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm
719721
)
720722
return table_request
721723

722-
@retry(**_RETRY_ARGS)
723-
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
724-
"""Update the table.
724+
def commit_table(
725+
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
726+
) -> CommitTableResponse:
727+
"""Commit updates to a table.
725728
726729
Args:
727-
table_request (CommitTableRequest): The table requests to be carried out.
730+
table (Table): The table to be updated.
731+
requirements: (Tuple[TableRequirement, ...]): Table requirements.
732+
updates: (Tuple[TableUpdate, ...]): Table updates.
728733
729734
Returns:
730735
CommitTableResponse: The updated metadata.
@@ -734,9 +739,11 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
734739
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
735740
CommitStateUnknownException: Failed due to an internal exception on the side of the catalog.
736741
"""
742+
identifier = TableIdentifier(namespace=table.identifier[1:-1], name=table.identifier[-1])
743+
table_request = CommitTableRequest(identifier=identifier, requirements=requirements, updates=updates)
737744
response = self._session.post(
738745
self.url(Endpoints.update_table, prefixed=True, **self._split_identifier_for_path(table_request.identifier)),
739-
data=self._remove_catalog_name_from_table_request_identifier(table_request).model_dump_json().encode(UTF8),
746+
data=table_request.model_dump_json().encode(UTF8),
740747
)
741748
try:
742749
response.raise_for_status()

pyiceberg/catalog/sql.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
List,
2121
Optional,
2222
Set,
23+
Tuple,
2324
Union,
2425
)
2526

@@ -60,7 +61,7 @@
6061
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
6162
from pyiceberg.schema import Schema
6263
from pyiceberg.serializers import FromInputFile
63-
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
64+
from pyiceberg.table import CommitTableResponse, Table, TableRequirement, TableUpdate
6465
from pyiceberg.table.metadata import new_table_metadata
6566
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
6667
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -394,11 +395,15 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
394395
raise TableAlreadyExistsError(f"Table {to_namespace}.{to_table_name} already exists") from e
395396
return self.load_table(to_identifier)
396397

397-
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
398-
"""Update one or more tables.
398+
def commit_table(
399+
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
400+
) -> CommitTableResponse:
401+
"""Commit updates to a table.
399402
400403
Args:
401-
table_request (CommitTableRequest): The table requests to be carried out.
404+
table (Table): The table to be updated.
405+
requirements: (Tuple[TableRequirement, ...]): Table requirements.
406+
updates: (Tuple[TableUpdate, ...]): Table updates.
402407
403408
Returns:
404409
CommitTableResponse: The updated metadata.
@@ -407,20 +412,18 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
407412
NoSuchTableError: If a table with the given identifier does not exist.
408413
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
409414
"""
410-
identifier_tuple = self._identifier_to_tuple_without_catalog(
411-
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
412-
)
413-
namespace_tuple = Catalog.namespace_from(identifier_tuple)
415+
table_identifier = self.identifier_to_tuple_without_catalog(table.identifier)
416+
namespace_tuple = Catalog.namespace_from(table_identifier)
414417
namespace = Catalog.namespace_to_string(namespace_tuple)
415-
table_name = Catalog.table_name_from(identifier_tuple)
418+
table_name = Catalog.table_name_from(table_identifier)
416419

417420
current_table: Optional[Table]
418421
try:
419-
current_table = self.load_table(identifier_tuple)
422+
current_table = self.load_table(table_identifier)
420423
except NoSuchTableError:
421424
current_table = None
422425

423-
updated_staged_table = self._update_and_stage_table(current_table, table_request)
426+
updated_staged_table = self._update_and_stage_table(current_table, table.identifier[1:], requirements, updates)
424427
if current_table and updated_staged_table.metadata == current_table.metadata:
425428
# no changes, do nothing
426429
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)

pyiceberg/table/__init__.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1673,12 +1673,8 @@ def refs(self) -> Dict[str, SnapshotRef]:
16731673
return self.metadata.refs
16741674

16751675
def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None:
1676-
response = self.catalog._commit_table( # pylint: disable=W0212
1677-
CommitTableRequest(
1678-
identifier=TableIdentifier(namespace=self._identifier[:-1], name=self._identifier[-1]),
1679-
updates=updates,
1680-
requirements=requirements,
1681-
)
1676+
response = self.catalog.commit_table( # pylint: disable=W0212
1677+
self, requirements, updates
16821678
) # pylint: disable=W0212
16831679
self.metadata = response.metadata
16841680
self.metadata_location = response.metadata_location

0 commit comments

Comments
 (0)