Skip to content

Make commit_table public #1112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@
from pyiceberg.serializers import ToOutputFile
from pyiceberg.table import (
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE,
CommitTableRequest,
CommitTableResponse,
CreateTableTransaction,
StagedTable,
Table,
TableRequirement,
TableUpdate,
update_table_metadata,
)
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
Expand Down Expand Up @@ -502,11 +503,15 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
"""

@abstractmethod
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update one or more tables.
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
"""Commit updates to a table.

Args:
table_request (CommitTableRequest): The table requests to be carried out.
table (Table): The table to be updated.
requirements: (Tuple[TableRequirement, ...]): Table requirements.
updates: (Tuple[TableUpdate, ...]): Table updates.

Returns:
CommitTableResponse: The updated metadata.
Expand Down Expand Up @@ -881,13 +886,19 @@ def _create_staged_table(
catalog=self,
)

def _update_and_stage_table(self, current_table: Optional[Table], table_request: CommitTableRequest) -> StagedTable:
for requirement in table_request.requirements:
def _update_and_stage_table(
self,
current_table: Optional[Table],
table_identifier: Identifier,
requirements: Tuple[TableRequirement, ...],
updates: Tuple[TableUpdate, ...],
) -> StagedTable:
for requirement in requirements:
requirement.validate(current_table.metadata if current_table else None)

updated_metadata = update_table_metadata(
base_metadata=current_table.metadata if current_table else self._empty_table_metadata(),
updates=table_request.updates,
updates=updates,
enforce_validation=current_table is None,
metadata_location=current_table.metadata_location if current_table else None,
)
Expand All @@ -896,7 +907,7 @@ def _update_and_stage_table(self, current_table: Optional[Table], table_request:
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)

return StagedTable(
identifier=tuple(table_request.identifier.namespace.root + [table_request.identifier.name]),
identifier=table_identifier,
metadata=updated_metadata,
metadata_location=new_metadata_location,
io=self._load_file_io(properties=updated_metadata.properties, location=new_metadata_location),
Expand Down
13 changes: 9 additions & 4 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
List,
Optional,
Set,
Tuple,
Union,
)

Expand Down Expand Up @@ -57,7 +58,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
from pyiceberg.table import CommitTableResponse, Table, TableRequirement, TableUpdate
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -215,11 +216,15 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
"""
raise NotImplementedError

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
"""Commit updates to a table.

Args:
table_request (CommitTableRequest): The table requests to be carried out.
table (Table): The table to be updated.
requirements: (Tuple[TableRequirement, ...]): Table requirements.
updates: (Tuple[TableUpdate, ...]): Table updates.

Returns:
CommitTableResponse: The updated metadata.
Expand Down
22 changes: 13 additions & 9 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
List,
Optional,
Set,
Tuple,
Union,
cast,
)
Expand Down Expand Up @@ -69,9 +70,10 @@
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
Table,
TableRequirement,
TableUpdate,
)
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -449,11 +451,15 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)
return self.load_table(identifier=identifier)

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
"""Commit updates to a table.

Args:
table_request (CommitTableRequest): The table requests to be carried out.
table (Table): The table to be updated.
requirements: (Tuple[TableRequirement, ...]): Table requirements.
updates: (Tuple[TableUpdate, ...]): Table updates.

Returns:
CommitTableResponse: The updated metadata.
Expand All @@ -462,10 +468,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple)
table_identifier = self._identifier_to_tuple_without_catalog(table.identifier)
database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)

current_glue_table: Optional[TableTypeDef]
glue_table_version_id: Optional[str]
Expand All @@ -479,7 +483,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
glue_table_version_id = None
current_table = None

updated_staged_table = self._update_and_stage_table(current_table, table_request)
updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
if current_table and updated_staged_table.metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
Expand Down
26 changes: 15 additions & 11 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
List,
Optional,
Set,
Tuple,
Type,
Union,
)
Expand Down Expand Up @@ -79,11 +80,12 @@
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
StagedTable,
Table,
TableProperties,
TableRequirement,
TableUpdate,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -421,11 +423,15 @@ def _do_wait_for_lock() -> LockResponse:

return _do_wait_for_lock()

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
"""Commit updates to a table.

Args:
table_request (CommitTableRequest): The table requests to be carried out.
table (Table): The table to be updated.
requirements: (Tuple[TableRequirement, ...]): Table requirements.
updates: (Tuple[TableUpdate, ...]): Table updates.

Returns:
CommitTableResponse: The updated metadata.
Expand All @@ -434,10 +440,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
table_identifier = self._identifier_to_tuple_without_catalog(table.identifier)
database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)
# commit to hive
# https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
with self._client as open_client:
Expand All @@ -448,7 +452,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
if lock.state == LockState.WAITING:
self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
else:
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}")

hive_table: Optional[HiveTable]
current_table: Optional[Table]
Expand All @@ -459,7 +463,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
hive_table = None
current_table = None

updated_staged_table = self._update_and_stage_table(current_table, table_request)
updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
if current_table and updated_staged_table.metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
Expand Down Expand Up @@ -489,7 +493,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
)
self._create_hive_table(open_client, hive_table)
except WaitingForLockException as e:
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") from e
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e
finally:
open_client.unlock(UnlockRequest(lockid=lock.lockid))

Expand Down
8 changes: 6 additions & 2 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@
List,
Optional,
Set,
Tuple,
Union,
)

from pyiceberg.catalog import Catalog, PropertiesUpdateSummary
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
CreateTableTransaction,
Table,
TableRequirement,
TableUpdate,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -91,7 +93,9 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
raise NotImplementedError

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
raise NotImplementedError

def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
Expand Down
17 changes: 13 additions & 4 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
StagedTable,
Table,
TableIdentifier,
TableRequirement,
TableUpdate,
)
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
Expand Down Expand Up @@ -753,11 +755,15 @@ def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
return [(*view.namespace, view.name) for view in ListViewsResponse(**response.json()).identifiers]

@retry(**_RETRY_ARGS)
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
"""Commit updates to a table.

Args:
table_request (CommitTableRequest): The table requests to be carried out.
table (Table): The table to be updated.
requirements: (Tuple[TableRequirement, ...]): Table requirements.
updates: (Tuple[TableUpdate, ...]): Table updates.

Returns:
CommitTableResponse: The updated metadata.
Expand All @@ -767,9 +773,12 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
CommitStateUnknownException: Failed due to an internal exception on the side of the catalog.
"""
identifier = self._identifier_to_tuple_without_catalog(table.identifier)
table_identifier = TableIdentifier(namespace=identifier[:-1], name=identifier[-1])
table_request = CommitTableRequest(identifier=table_identifier, requirements=requirements, updates=updates)
response = self._session.post(
self.url(Endpoints.update_table, prefixed=True, **self._split_identifier_for_path(table_request.identifier)),
data=self._remove_catalog_name_from_table_request_identifier(table_request).model_dump_json().encode(UTF8),
data=table_request.model_dump_json().encode(UTF8),
)
try:
response.raise_for_status()
Expand Down
25 changes: 14 additions & 11 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
List,
Optional,
Set,
Tuple,
Union,
)

Expand Down Expand Up @@ -60,7 +61,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
from pyiceberg.table import CommitTableResponse, Table, TableRequirement, TableUpdate
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -394,11 +395,15 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
raise TableAlreadyExistsError(f"Table {to_namespace}.{to_table_name} already exists") from e
return self.load_table(to_identifier)

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update one or more tables.
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
"""Commit updates to a table.

Args:
table_request (CommitTableRequest): The table requests to be carried out.
table (Table): The table to be updated.
requirements: (Tuple[TableRequirement, ...]): Table requirements.
updates: (Tuple[TableUpdate, ...]): Table updates.

Returns:
CommitTableResponse: The updated metadata.
Expand All @@ -407,20 +412,18 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
namespace_tuple = Catalog.namespace_from(identifier_tuple)
table_identifier = self._identifier_to_tuple_without_catalog(table.identifier)
namespace_tuple = Catalog.namespace_from(table_identifier)
namespace = Catalog.namespace_to_string(namespace_tuple)
table_name = Catalog.table_name_from(identifier_tuple)
table_name = Catalog.table_name_from(table_identifier)

current_table: Optional[Table]
try:
current_table = self.load_table(identifier_tuple)
current_table = self.load_table(table_identifier)
except NoSuchTableError:
current_table = None

updated_staged_table = self._update_and_stage_table(current_table, table_request)
updated_staged_table = self._update_and_stage_table(current_table, table.identifier, requirements, updates)
if current_table and updated_staged_table.metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
Expand Down
8 changes: 1 addition & 7 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1673,13 +1673,7 @@ def refs(self) -> Dict[str, SnapshotRef]:
return self.metadata.refs

def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None:
response = self.catalog._commit_table( # pylint: disable=W0212
CommitTableRequest(
identifier=TableIdentifier(namespace=self._identifier[:-1], name=self._identifier[-1]),
updates=updates,
requirements=requirements,
)
) # pylint: disable=W0212
response = self.catalog.commit_table(self, requirements, updates)
self.metadata = response.metadata
self.metadata_location = response.metadata_location

Expand Down
Loading