Skip to content

Commit cdfeaf3

Browse files
Fokkosungwy
authored andcommitted
Make commit_table public (apache#1112)
* Make `commit_table` public * Comments * Thanks Kevin! * Update tests
1 parent 6c046e8 commit cdfeaf3

File tree

10 files changed

+121
-82
lines changed

10 files changed

+121
-82
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,12 @@
5151
from pyiceberg.serializers import ToOutputFile
5252
from pyiceberg.table import (
5353
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE,
54-
CommitTableRequest,
5554
CommitTableResponse,
5655
CreateTableTransaction,
5756
StagedTable,
5857
Table,
58+
TableRequirement,
59+
TableUpdate,
5960
update_table_metadata,
6061
)
6162
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
@@ -502,11 +503,15 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
502503
"""
503504

504505
@abstractmethod
505-
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
506-
"""Update one or more tables.
506+
def commit_table(
507+
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
508+
) -> CommitTableResponse:
509+
"""Commit updates to a table.
507510
508511
Args:
509-
table_request (CommitTableRequest): The table requests to be carried out.
512+
table (Table): The table to be updated.
513+
requirements: (Tuple[TableRequirement, ...]): Table requirements.
514+
updates: (Tuple[TableUpdate, ...]): Table updates.
510515
511516
Returns:
512517
CommitTableResponse: The updated metadata.
@@ -881,13 +886,19 @@ def _create_staged_table(
881886
catalog=self,
882887
)
883888

884-
def _update_and_stage_table(self, current_table: Optional[Table], table_request: CommitTableRequest) -> StagedTable:
885-
for requirement in table_request.requirements:
889+
def _update_and_stage_table(
890+
self,
891+
current_table: Optional[Table],
892+
table_identifier: Identifier,
893+
requirements: Tuple[TableRequirement, ...],
894+
updates: Tuple[TableUpdate, ...],
895+
) -> StagedTable:
896+
for requirement in requirements:
886897
requirement.validate(current_table.metadata if current_table else None)
887898

888899
updated_metadata = update_table_metadata(
889900
base_metadata=current_table.metadata if current_table else self._empty_table_metadata(),
890-
updates=table_request.updates,
901+
updates=updates,
891902
enforce_validation=current_table is None,
892903
metadata_location=current_table.metadata_location if current_table else None,
893904
)
@@ -896,7 +907,7 @@ def _update_and_stage_table(self, current_table: Optional[Table], table_request:
896907
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
897908

898909
return StagedTable(
899-
identifier=tuple(table_request.identifier.namespace.root + [table_request.identifier.name]),
910+
identifier=table_identifier,
900911
metadata=updated_metadata,
901912
metadata_location=new_metadata_location,
902913
io=self._load_file_io(properties=updated_metadata.properties, location=new_metadata_location),

pyiceberg/catalog/dynamodb.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
List,
2424
Optional,
2525
Set,
26+
Tuple,
2627
Union,
2728
)
2829

@@ -57,7 +58,7 @@
5758
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
5859
from pyiceberg.schema import Schema
5960
from pyiceberg.serializers import FromInputFile
60-
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
61+
from pyiceberg.table import CommitTableResponse, Table, TableRequirement, TableUpdate
6162
from pyiceberg.table.metadata import new_table_metadata
6263
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
6364
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -215,11 +216,15 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
215216
"""
216217
raise NotImplementedError
217218

218-
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
219-
"""Update the table.
219+
def commit_table(
220+
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
221+
) -> CommitTableResponse:
222+
"""Commit updates to a table.
220223
221224
Args:
222-
table_request (CommitTableRequest): The table requests to be carried out.
225+
table (Table): The table to be updated.
226+
requirements: (Tuple[TableRequirement, ...]): Table requirements.
227+
updates: (Tuple[TableUpdate, ...]): Table updates.
223228
224229
Returns:
225230
CommitTableResponse: The updated metadata.

pyiceberg/catalog/glue.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
List,
2424
Optional,
2525
Set,
26+
Tuple,
2627
Union,
2728
cast,
2829
)
@@ -69,9 +70,10 @@
6970
from pyiceberg.schema import Schema, SchemaVisitor, visit
7071
from pyiceberg.serializers import FromInputFile
7172
from pyiceberg.table import (
72-
CommitTableRequest,
7373
CommitTableResponse,
7474
Table,
75+
TableRequirement,
76+
TableUpdate,
7577
)
7678
from pyiceberg.table.metadata import TableMetadata
7779
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -449,11 +451,15 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
449451
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)
450452
return self.load_table(identifier=identifier)
451453

452-
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
453-
"""Update the table.
454+
def commit_table(
455+
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
456+
) -> CommitTableResponse:
457+
"""Commit updates to a table.
454458
455459
Args:
456-
table_request (CommitTableRequest): The table requests to be carried out.
460+
table (Table): The table to be updated.
461+
requirements: (Tuple[TableRequirement, ...]): Table requirements.
462+
updates: (Tuple[TableUpdate, ...]): Table updates.
457463
458464
Returns:
459465
CommitTableResponse: The updated metadata.
@@ -462,10 +468,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
462468
NoSuchTableError: If a table with the given identifier does not exist.
463469
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
464470
"""
465-
identifier_tuple = self._identifier_to_tuple_without_catalog(
466-
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
467-
)
468-
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple)
471+
table_identifier = self._identifier_to_tuple_without_catalog(table.identifier)
472+
database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)
469473

470474
current_glue_table: Optional[TableTypeDef]
471475
glue_table_version_id: Optional[str]
@@ -479,7 +483,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
479483
glue_table_version_id = None
480484
current_table = None
481485

482-
updated_staged_table = self._update_and_stage_table(current_table, table_request)
486+
updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
483487
if current_table and updated_staged_table.metadata == current_table.metadata:
484488
# no changes, do nothing
485489
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)

pyiceberg/catalog/hive.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
List,
2727
Optional,
2828
Set,
29+
Tuple,
2930
Type,
3031
Union,
3132
)
@@ -79,11 +80,12 @@
7980
from pyiceberg.schema import Schema, SchemaVisitor, visit
8081
from pyiceberg.serializers import FromInputFile
8182
from pyiceberg.table import (
82-
CommitTableRequest,
8383
CommitTableResponse,
8484
StagedTable,
8585
Table,
8686
TableProperties,
87+
TableRequirement,
88+
TableUpdate,
8789
)
8890
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
8991
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -421,11 +423,15 @@ def _do_wait_for_lock() -> LockResponse:
421423

422424
return _do_wait_for_lock()
423425

424-
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
425-
"""Update the table.
426+
def commit_table(
427+
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
428+
) -> CommitTableResponse:
429+
"""Commit updates to a table.
426430
427431
Args:
428-
table_request (CommitTableRequest): The table requests to be carried out.
432+
table (Table): The table to be updated.
433+
requirements: (Tuple[TableRequirement, ...]): Table requirements.
434+
updates: (Tuple[TableUpdate, ...]): Table updates.
429435
430436
Returns:
431437
CommitTableResponse: The updated metadata.
@@ -434,10 +440,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
434440
NoSuchTableError: If a table with the given identifier does not exist.
435441
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
436442
"""
437-
identifier_tuple = self._identifier_to_tuple_without_catalog(
438-
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
439-
)
440-
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
443+
table_identifier = self._identifier_to_tuple_without_catalog(table.identifier)
444+
database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)
441445
# commit to hive
442446
# https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
443447
with self._client as open_client:
@@ -448,7 +452,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
448452
if lock.state == LockState.WAITING:
449453
self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
450454
else:
451-
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")
455+
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}")
452456

453457
hive_table: Optional[HiveTable]
454458
current_table: Optional[Table]
@@ -459,7 +463,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
459463
hive_table = None
460464
current_table = None
461465

462-
updated_staged_table = self._update_and_stage_table(current_table, table_request)
466+
updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
463467
if current_table and updated_staged_table.metadata == current_table.metadata:
464468
# no changes, do nothing
465469
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
@@ -489,7 +493,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
489493
)
490494
self._create_hive_table(open_client, hive_table)
491495
except WaitingForLockException as e:
492-
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") from e
496+
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e
493497
finally:
494498
open_client.unlock(UnlockRequest(lockid=lock.lockid))
495499

pyiceberg/catalog/noop.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,19 @@
1919
List,
2020
Optional,
2121
Set,
22+
Tuple,
2223
Union,
2324
)
2425

2526
from pyiceberg.catalog import Catalog, PropertiesUpdateSummary
2627
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
2728
from pyiceberg.schema import Schema
2829
from pyiceberg.table import (
29-
CommitTableRequest,
3030
CommitTableResponse,
3131
CreateTableTransaction,
3232
Table,
33+
TableRequirement,
34+
TableUpdate,
3335
)
3436
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
3537
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -91,7 +93,9 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:
9193
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
9294
raise NotImplementedError
9395

94-
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
96+
def commit_table(
97+
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
98+
) -> CommitTableResponse:
9599
raise NotImplementedError
96100

97101
def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:

pyiceberg/catalog/rest.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@
6969
StagedTable,
7070
Table,
7171
TableIdentifier,
72+
TableRequirement,
73+
TableUpdate,
7274
)
7375
from pyiceberg.table.metadata import TableMetadata
7476
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
@@ -750,11 +752,15 @@ def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
750752
return [(*view.namespace, view.name) for view in ListViewsResponse(**response.json()).identifiers]
751753

752754
@retry(**_RETRY_ARGS)
753-
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
754-
"""Update the table.
755+
def commit_table(
756+
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
757+
) -> CommitTableResponse:
758+
"""Commit updates to a table.
755759
756760
Args:
757-
table_request (CommitTableRequest): The table requests to be carried out.
761+
table (Table): The table to be updated.
762+
requirements: (Tuple[TableRequirement, ...]): Table requirements.
763+
updates: (Tuple[TableUpdate, ...]): Table updates.
758764
759765
Returns:
760766
CommitTableResponse: The updated metadata.
@@ -764,9 +770,12 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
764770
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
765771
CommitStateUnknownException: Failed due to an internal exception on the side of the catalog.
766772
"""
773+
identifier = self._identifier_to_tuple_without_catalog(table.identifier)
774+
table_identifier = TableIdentifier(namespace=identifier[:-1], name=identifier[-1])
775+
table_request = CommitTableRequest(identifier=table_identifier, requirements=requirements, updates=updates)
767776
response = self._session.post(
768777
self.url(Endpoints.update_table, prefixed=True, **self._split_identifier_for_path(table_request.identifier)),
769-
data=self._remove_catalog_name_from_table_request_identifier(table_request).model_dump_json().encode(UTF8),
778+
data=table_request.model_dump_json().encode(UTF8),
770779
)
771780
try:
772781
response.raise_for_status()

pyiceberg/catalog/sql.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
List,
2121
Optional,
2222
Set,
23+
Tuple,
2324
Union,
2425
)
2526

@@ -60,7 +61,7 @@
6061
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
6162
from pyiceberg.schema import Schema
6263
from pyiceberg.serializers import FromInputFile
63-
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
64+
from pyiceberg.table import CommitTableResponse, Table, TableRequirement, TableUpdate
6465
from pyiceberg.table.metadata import new_table_metadata
6566
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
6667
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -393,11 +394,15 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
393394
raise TableAlreadyExistsError(f"Table {to_namespace}.{to_table_name} already exists") from e
394395
return self.load_table(to_identifier)
395396

396-
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
397-
"""Update one or more tables.
397+
def commit_table(
398+
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
399+
) -> CommitTableResponse:
400+
"""Commit updates to a table.
398401
399402
Args:
400-
table_request (CommitTableRequest): The table requests to be carried out.
403+
table (Table): The table to be updated.
404+
requirements: (Tuple[TableRequirement, ...]): Table requirements.
405+
updates: (Tuple[TableUpdate, ...]): Table updates.
401406
402407
Returns:
403408
CommitTableResponse: The updated metadata.
@@ -406,20 +411,18 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
406411
NoSuchTableError: If a table with the given identifier does not exist.
407412
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
408413
"""
409-
identifier_tuple = self._identifier_to_tuple_without_catalog(
410-
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
411-
)
412-
namespace_tuple = Catalog.namespace_from(identifier_tuple)
414+
table_identifier = self._identifier_to_tuple_without_catalog(table.identifier)
415+
namespace_tuple = Catalog.namespace_from(table_identifier)
413416
namespace = Catalog.namespace_to_string(namespace_tuple)
414-
table_name = Catalog.table_name_from(identifier_tuple)
417+
table_name = Catalog.table_name_from(table_identifier)
415418

416419
current_table: Optional[Table]
417420
try:
418-
current_table = self.load_table(identifier_tuple)
421+
current_table = self.load_table(table_identifier)
419422
except NoSuchTableError:
420423
current_table = None
421424

422-
updated_staged_table = self._update_and_stage_table(current_table, table_request)
425+
updated_staged_table = self._update_and_stage_table(current_table, table.identifier, requirements, updates)
423426
if current_table and updated_staged_table.metadata == current_table.metadata:
424427
# no changes, do nothing
425428
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)

pyiceberg/table/__init__.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1673,13 +1673,7 @@ def refs(self) -> Dict[str, SnapshotRef]:
16731673
return self.metadata.refs
16741674

16751675
def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None:
1676-
response = self.catalog._commit_table( # pylint: disable=W0212
1677-
CommitTableRequest(
1678-
identifier=TableIdentifier(namespace=self._identifier[:-1], name=self._identifier[-1]),
1679-
updates=updates,
1680-
requirements=requirements,
1681-
)
1682-
) # pylint: disable=W0212
1676+
response = self.catalog.commit_table(self, requirements, updates)
16831677
self.metadata = response.metadata
16841678
self.metadata_location = response.metadata_location
16851679

0 commit comments

Comments
 (0)