Skip to content

Commit a892309

Browse files
authored
Add CreateTableTransaction API and implement it in Glue and Rest (#498)
1 parent 7837d52 commit a892309

File tree

14 files changed

+723
-195
lines changed

14 files changed

+723
-195
lines changed

mkdocs/docs/api.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,25 @@ catalog.create_table(
165165
)
166166
```
167167

168+
To create a table with some subsequent changes atomically in a transaction:
169+
170+
```python
171+
with catalog.create_table_transaction(
172+
identifier="docs_example.bids",
173+
schema=schema,
174+
location="s3://pyiceberg",
175+
partition_spec=partition_spec,
176+
sort_order=sort_order,
177+
) as txn:
178+
with txn.update_schema() as update_schema:
179+
update_schema.add_column(path="new_column", field_type=StringType())
180+
181+
with txn.update_spec() as update_spec:
182+
update_spec.add_identity("symbol")
183+
184+
txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
185+
```
186+
168187
## Load a table
169188

170189
### Catalog table

pyiceberg/catalog/__init__.py

Lines changed: 203 additions & 95 deletions
Large diffs are not rendered by default.

pyiceberg/catalog/dynamodb.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
METADATA_LOCATION,
3434
PREVIOUS_METADATA_LOCATION,
3535
TABLE_TYPE,
36-
Catalog,
36+
MetastoreCatalog,
3737
PropertiesUpdateSummary,
3838
)
3939
from pyiceberg.exceptions import (
@@ -79,7 +79,7 @@
7979
ITEM = "Item"
8080

8181

82-
class DynamoDbCatalog(Catalog):
82+
class DynamoDbCatalog(MetastoreCatalog):
8383
def __init__(self, name: str, **properties: str):
8484
super().__init__(name, **properties)
8585
session = boto3.Session(

pyiceberg/catalog/glue.py

Lines changed: 82 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
METADATA_LOCATION,
4646
PREVIOUS_METADATA_LOCATION,
4747
TABLE_TYPE,
48-
Catalog,
48+
MetastoreCatalog,
4949
PropertiesUpdateSummary,
5050
)
5151
from pyiceberg.exceptions import (
@@ -62,8 +62,13 @@
6262
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
6363
from pyiceberg.schema import Schema, SchemaVisitor, visit
6464
from pyiceberg.serializers import FromInputFile
65-
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
66-
from pyiceberg.table.metadata import TableMetadata, new_table_metadata
65+
from pyiceberg.table import (
66+
CommitTableRequest,
67+
CommitTableResponse,
68+
Table,
69+
update_table_metadata,
70+
)
71+
from pyiceberg.table.metadata import TableMetadata
6772
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
6873
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
6974
from pyiceberg.types import (
@@ -273,7 +278,7 @@ def add_glue_catalog_id(params: Dict[str, str], **kwargs: Any) -> None:
273278
event_system.register("provide-client-params.glue", add_glue_catalog_id)
274279

275280

276-
class GlueCatalog(Catalog):
281+
class GlueCatalog(MetastoreCatalog):
277282
def __init__(self, name: str, **properties: Any):
278283
super().__init__(name, **properties)
279284

@@ -384,20 +389,18 @@ def create_table(
384389
ValueError: If the identifier is invalid, or no path is given to store metadata.
385390
386391
"""
387-
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
388-
389-
database_name, table_name = self.identifier_to_database_and_table(identifier)
390-
391-
location = self._resolve_table_location(location, database_name, table_name)
392-
metadata_location = self._get_metadata_location(location=location)
393-
metadata = new_table_metadata(
394-
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
392+
staged_table = self._create_staged_table(
393+
identifier=identifier,
394+
schema=schema,
395+
location=location,
396+
partition_spec=partition_spec,
397+
sort_order=sort_order,
398+
properties=properties,
395399
)
396-
io = load_file_io(properties=self.properties, location=metadata_location)
397-
self._write_metadata(metadata, io, metadata_location)
398-
399-
table_input = _construct_table_input(table_name, metadata_location, properties, metadata)
400400
database_name, table_name = self.identifier_to_database_and_table(identifier)
401+
402+
self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location)
403+
table_input = _construct_table_input(table_name, staged_table.metadata_location, properties, staged_table.metadata)
401404
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)
402405

403406
return self.load_table(identifier=identifier)
@@ -435,46 +438,71 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
435438
)
436439
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple)
437440

438-
current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
439-
glue_table_version_id = current_glue_table.get("VersionId")
440-
if not glue_table_version_id:
441-
raise CommitFailedException(f"Cannot commit {database_name}.{table_name} because Glue table version id is missing")
442-
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
443-
base_metadata = current_table.metadata
444-
445-
# Validate the update requirements
446-
for requirement in table_request.requirements:
447-
requirement.validate(base_metadata)
448-
449-
updated_metadata = update_table_metadata(base_metadata, table_request.updates)
450-
if updated_metadata == base_metadata:
451-
# no changes, do nothing
452-
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)
453-
454-
# write new metadata
455-
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
456-
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
457-
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)
458-
459-
update_table_input = _construct_table_input(
460-
table_name=table_name,
461-
metadata_location=new_metadata_location,
462-
properties=current_table.properties,
463-
metadata=updated_metadata,
464-
glue_table=current_glue_table,
465-
prev_metadata_location=current_table.metadata_location,
466-
)
441+
try:
442+
current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
443+
# Update the table
444+
glue_table_version_id = current_glue_table.get("VersionId")
445+
if not glue_table_version_id:
446+
raise CommitFailedException(
447+
f"Cannot commit {database_name}.{table_name} because Glue table version id is missing"
448+
)
449+
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
450+
base_metadata = current_table.metadata
451+
452+
# Validate the update requirements
453+
for requirement in table_request.requirements:
454+
requirement.validate(base_metadata)
455+
456+
updated_metadata = update_table_metadata(base_metadata=base_metadata, updates=table_request.updates)
457+
if updated_metadata == base_metadata:
458+
# no changes, do nothing
459+
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)
460+
461+
# write new metadata
462+
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
463+
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
464+
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)
465+
466+
update_table_input = _construct_table_input(
467+
table_name=table_name,
468+
metadata_location=new_metadata_location,
469+
properties=current_table.properties,
470+
metadata=updated_metadata,
471+
glue_table=current_glue_table,
472+
prev_metadata_location=current_table.metadata_location,
473+
)
467474

468-
# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
469-
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
470-
self._update_glue_table(
471-
database_name=database_name,
472-
table_name=table_name,
473-
table_input=update_table_input,
474-
version_id=glue_table_version_id,
475-
)
475+
# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
476+
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
477+
self._update_glue_table(
478+
database_name=database_name,
479+
table_name=table_name,
480+
table_input=update_table_input,
481+
version_id=glue_table_version_id,
482+
)
483+
484+
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
485+
except NoSuchTableError:
486+
# Create the table
487+
updated_metadata = update_table_metadata(
488+
base_metadata=self._empty_table_metadata(), updates=table_request.updates, enforce_validation=True
489+
)
490+
new_metadata_version = 0
491+
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
492+
self._write_metadata(
493+
updated_metadata, self._load_file_io(updated_metadata.properties, new_metadata_location), new_metadata_location
494+
)
495+
496+
create_table_input = _construct_table_input(
497+
table_name=table_name,
498+
metadata_location=new_metadata_location,
499+
properties=updated_metadata.properties,
500+
metadata=updated_metadata,
501+
)
502+
503+
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=create_table_input)
476504

477-
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
505+
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
478506

479507
def load_table(self, identifier: Union[str, Identifier]) -> Table:
480508
"""Load the table's metadata and returns the table instance.

pyiceberg/catalog/hive.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
LOCATION,
5959
METADATA_LOCATION,
6060
TABLE_TYPE,
61-
Catalog,
61+
MetastoreCatalog,
6262
PropertiesUpdateSummary,
6363
)
6464
from pyiceberg.exceptions import (
@@ -230,7 +230,7 @@ def primitive(self, primitive: PrimitiveType) -> str:
230230
return HIVE_PRIMITIVE_TYPES[type(primitive)]
231231

232232

233-
class HiveCatalog(Catalog):
233+
class HiveCatalog(MetastoreCatalog):
234234
_client: _HiveClient
235235

236236
def __init__(self, name: str, **properties: str):

pyiceberg/catalog/noop.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from pyiceberg.table import (
2929
CommitTableRequest,
3030
CommitTableResponse,
31+
CreateTableTransaction,
3132
Table,
3233
)
3334
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -49,9 +50,23 @@ def create_table(
4950
) -> Table:
5051
raise NotImplementedError
5152

53+
def create_table_transaction(
54+
self,
55+
identifier: Union[str, Identifier],
56+
schema: Union[Schema, "pa.Schema"],
57+
location: Optional[str] = None,
58+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
59+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
60+
properties: Properties = EMPTY_DICT,
61+
) -> CreateTableTransaction:
62+
raise NotImplementedError
63+
5264
def load_table(self, identifier: Union[str, Identifier]) -> Table:
5365
raise NotImplementedError
5466

67+
def table_exists(self, identifier: Union[str, Identifier]) -> bool:
68+
raise NotImplementedError
69+
5570
def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
5671
"""Register a new table using existing metadata.
5772
@@ -70,6 +85,9 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
7085
def drop_table(self, identifier: Union[str, Identifier]) -> None:
7186
raise NotImplementedError
7287

88+
def purge_table(self, identifier: Union[str, Identifier]) -> None:
89+
raise NotImplementedError
90+
7391
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
7492
raise NotImplementedError
7593

0 commit comments

Comments
 (0)