Skip to content

Commit 36a505f

Browse files
kevinjqliuFokko
andauthored
Improve the InMemory Catalog Implementation (#289)
* extract InMemoryCatalog out of test * generalize InMemoryCatalog * make write work * write to temporary location * can override table location * memory.py -> in_memory.py * fix test_commit_table * rebase from main * revert fs changes * fix tests * add docs and comments * comma * comment * order * fix test * add license * `create_table` write metadata file * move InMemoryCatalog back to test_base * remove unused references * Update mkdocs/docs/configuration.md Co-authored-by: Fokko Driesprong <[email protected]> * Update mkdocs/docs/configuration.md Co-authored-by: Fokko Driesprong <[email protected]> * Update tests/catalog/test_base.py Co-authored-by: Fokko Driesprong <[email protected]> * remove schema_id --------- Co-authored-by: Fokko Driesprong <[email protected]>
1 parent 1fd85c8 commit 36a505f

File tree

3 files changed

+101
-73
lines changed

3 files changed

+101
-73
lines changed

pyiceberg/cli/output.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ def describe_properties(self, properties: Properties) -> None:
158158
Console().print(output_table)
159159

160160
def text(self, response: str) -> None:
161-
Console().print(response)
161+
Console(soft_wrap=True).print(response)
162162

163163
def schema(self, schema: Schema) -> None:
164164
output_table = self._table

tests/catalog/test_base.py

Lines changed: 70 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
# under the License.
1717
# pylint:disable=redefined-outer-name
1818

19+
20+
import uuid
21+
from pathlib import PosixPath
1922
from typing import (
2023
Dict,
2124
List,
@@ -40,7 +43,7 @@
4043
NoSuchTableError,
4144
TableAlreadyExistsError,
4245
)
43-
from pyiceberg.io import load_file_io
46+
from pyiceberg.io import WAREHOUSE, load_file_io
4447
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
4548
from pyiceberg.schema import Schema
4649
from pyiceberg.table import (
@@ -53,15 +56,21 @@
5356
TableIdentifier,
5457
update_table_metadata,
5558
)
56-
from pyiceberg.table.metadata import TableMetadataV1
59+
from pyiceberg.table.metadata import new_table_metadata
5760
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
5861
from pyiceberg.transforms import IdentityTransform
5962
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
6063
from pyiceberg.types import IntegerType, LongType, NestedField
6164

65+
DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse"
66+
6267

6368
class InMemoryCatalog(Catalog):
64-
"""An in-memory catalog implementation for testing purposes."""
69+
"""
70+
An in-memory catalog implementation that uses in-memory data-structures to store the namespaces and tables.
71+
72+
This is useful for test, demo, and playground but not in production as data is not persisted.
73+
"""
6574

6675
__tables: Dict[Identifier, Table]
6776
__namespaces: Dict[Identifier, Properties]
@@ -70,6 +79,7 @@ def __init__(self, name: str, **properties: str) -> None:
7079
super().__init__(name, **properties)
7180
self.__tables = {}
7281
self.__namespaces = {}
82+
self._warehouse_location = properties.get(WAREHOUSE, DEFAULT_WAREHOUSE_LOCATION)
7383

7484
def create_table(
7585
self,
@@ -79,6 +89,7 @@ def create_table(
7989
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
8090
sort_order: SortOrder = UNSORTED_SORT_ORDER,
8191
properties: Properties = EMPTY_DICT,
92+
table_uuid: Optional[uuid.UUID] = None,
8293
) -> Table:
8394
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
8495

@@ -91,24 +102,26 @@ def create_table(
91102
if namespace not in self.__namespaces:
92103
self.__namespaces[namespace] = {}
93104

94-
new_location = location or f's3://warehouse/{"/".join(identifier)}/data'
95-
metadata = TableMetadataV1(**{
96-
"format-version": 1,
97-
"table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
98-
"location": new_location,
99-
"last-updated-ms": 1602638573874,
100-
"last-column-id": schema.highest_field_id,
101-
"schema": schema.model_dump(),
102-
"partition-spec": partition_spec.model_dump()["fields"],
103-
"properties": properties,
104-
"current-snapshot-id": -1,
105-
"snapshots": [{"snapshot-id": 1925, "timestamp-ms": 1602638573822}],
106-
})
105+
if not location:
106+
location = f'{self._warehouse_location}/{"/".join(identifier)}'
107+
108+
metadata_location = self._get_metadata_location(location=location)
109+
metadata = new_table_metadata(
110+
schema=schema,
111+
partition_spec=partition_spec,
112+
sort_order=sort_order,
113+
location=location,
114+
properties=properties,
115+
table_uuid=table_uuid,
116+
)
117+
io = load_file_io({**self.properties, **properties}, location=location)
118+
self._write_metadata(metadata, io, metadata_location)
119+
107120
table = Table(
108121
identifier=identifier,
109122
metadata=metadata,
110-
metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json',
111-
io=load_file_io(),
123+
metadata_location=metadata_location,
124+
io=io,
112125
catalog=self,
113126
)
114127
self.__tables[identifier] = table
@@ -118,14 +131,29 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
118131
raise NotImplementedError
119132

120133
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
121-
identifier = tuple(table_request.identifier.namespace.root) + (table_request.identifier.name,)
122-
table = self.__tables[identifier]
123-
table.metadata = update_table_metadata(base_metadata=table.metadata, updates=table_request.updates)
124-
125-
return CommitTableResponse(
126-
metadata=table.metadata.model_dump(),
127-
metadata_location=table.location(),
134+
identifier_tuple = self.identifier_to_tuple_without_catalog(
135+
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
128136
)
137+
current_table = self.load_table(identifier_tuple)
138+
base_metadata = current_table.metadata
139+
140+
for requirement in table_request.requirements:
141+
requirement.validate(base_metadata)
142+
143+
updated_metadata = update_table_metadata(base_metadata, table_request.updates)
144+
if updated_metadata == base_metadata:
145+
# no changes, do nothing
146+
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)
147+
148+
# write new metadata
149+
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
150+
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
151+
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)
152+
153+
# update table state
154+
current_table.metadata = updated_metadata
155+
156+
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
129157

130158
def load_table(self, identifier: Union[str, Identifier]) -> Table:
131159
identifier = self.identifier_to_tuple_without_catalog(identifier)
@@ -160,7 +188,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
160188
identifier=to_identifier,
161189
metadata=table.metadata,
162190
metadata_location=table.metadata_location,
163-
io=load_file_io(),
191+
io=self._load_file_io(properties=table.metadata.properties, location=table.metadata_location),
164192
catalog=self,
165193
)
166194
return self.__tables[to_identifier]
@@ -232,8 +260,8 @@ def update_namespace_properties(
232260

233261

234262
@pytest.fixture
235-
def catalog() -> InMemoryCatalog:
236-
return InMemoryCatalog("test.in.memory.catalog", **{"test.key": "test.value"})
263+
def catalog(tmp_path: PosixPath) -> InMemoryCatalog:
264+
return InMemoryCatalog("test.in_memory.catalog", **{WAREHOUSE: tmp_path.absolute().as_posix(), "test.key": "test.value"})
237265

238266

239267
TEST_TABLE_IDENTIFIER = ("com", "organization", "department", "my_table")
@@ -244,7 +272,6 @@ def catalog() -> InMemoryCatalog:
244272
NestedField(2, "y", LongType(), doc="comment"),
245273
NestedField(3, "z", LongType()),
246274
)
247-
TEST_TABLE_LOCATION = "protocol://some/location"
248275
TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000))
249276
TEST_TABLE_PROPERTIES = {"key1": "value1", "key2": "value2"}
250277
NO_SUCH_TABLE_ERROR = "Table does not exist: \\('com', 'organization', 'department', 'my_table'\\)"
@@ -261,7 +288,6 @@ def given_catalog_has_a_table(
261288
return catalog.create_table(
262289
identifier=TEST_TABLE_IDENTIFIER,
263290
schema=TEST_TABLE_SCHEMA,
264-
location=TEST_TABLE_LOCATION,
265291
partition_spec=TEST_TABLE_PARTITION_SPEC,
266292
properties=properties or TEST_TABLE_PROPERTIES,
267293
)
@@ -307,13 +333,25 @@ def test_create_table(catalog: InMemoryCatalog) -> None:
307333
table = catalog.create_table(
308334
identifier=TEST_TABLE_IDENTIFIER,
309335
schema=TEST_TABLE_SCHEMA,
310-
location=TEST_TABLE_LOCATION,
311336
partition_spec=TEST_TABLE_PARTITION_SPEC,
312337
properties=TEST_TABLE_PROPERTIES,
313338
)
314339
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
315340

316341

342+
def test_create_table_location_override(catalog: InMemoryCatalog) -> None:
343+
new_location = f"{catalog._warehouse_location}/new_location"
344+
table = catalog.create_table(
345+
identifier=TEST_TABLE_IDENTIFIER,
346+
schema=TEST_TABLE_SCHEMA,
347+
location=new_location,
348+
partition_spec=TEST_TABLE_PARTITION_SPEC,
349+
properties=TEST_TABLE_PROPERTIES,
350+
)
351+
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
352+
assert table.location() == new_location
353+
354+
317355
@pytest.mark.parametrize(
318356
"schema,expected",
319357
[
@@ -335,8 +373,6 @@ def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, pyarrow_schema_si
335373
table = catalog.create_table(
336374
identifier=TEST_TABLE_IDENTIFIER,
337375
schema=pyarrow_schema_simple_without_ids,
338-
location=TEST_TABLE_LOCATION,
339-
partition_spec=TEST_TABLE_PARTITION_SPEC,
340376
properties=TEST_TABLE_PROPERTIES,
341377
)
342378
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
@@ -662,7 +698,7 @@ def test_add_column_with_statement(catalog: InMemoryCatalog) -> None:
662698

663699
def test_catalog_repr(catalog: InMemoryCatalog) -> None:
664700
s = repr(catalog)
665-
assert s == "test.in.memory.catalog (<class 'test_base.InMemoryCatalog'>)"
701+
assert s == "test.in_memory.catalog (<class 'test_base.InMemoryCatalog'>)"
666702

667703

668704
def test_table_properties_int_value(catalog: InMemoryCatalog) -> None:

0 commit comments

Comments
 (0)