Skip to content

Commit 7f712fd

Browse files
authored
Add Data Files from Parquet Files to UnPartitioned Table (#506)
1 parent b447461 commit 7f712fd

File tree

4 files changed

+359
-1
lines changed

4 files changed

+359
-1
lines changed

mkdocs/docs/api.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,39 @@ The nested lists indicate the different Arrow buffers, where the first write res
302302

303303
<!-- prettier-ignore-end -->
304304

305+
### Add Files
306+
307+
Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.
308+
309+
```
310+
# Given that these parquet files have schema consistent with the Iceberg table
311+
312+
file_paths = [
313+
"s3a://warehouse/default/existing-1.parquet",
314+
"s3a://warehouse/default/existing-2.parquet",
315+
]
316+
317+
# They can be added to the table without rewriting them
318+
319+
tbl.add_files(file_paths=file_paths)
320+
321+
# A new snapshot is committed to the table with manifests pointing to the existing parquet files
322+
```
323+
324+
<!-- prettier-ignore-start -->
325+
326+
!!! note "Name Mapping"
327+
Because `add_files` uses existing files without writing new parquet files that are aware of the Iceberg's schema, it requires the Iceberg's table to have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) (The Name mapping maps the field names within the parquet files to the Iceberg field IDs). Hence, `add_files` requires that there are no field IDs in the parquet file's metadata, and creates a new Name Mapping based on the table's current schema if the table doesn't already have one.
328+
329+
<!-- prettier-ignore-end -->
330+
331+
<!-- prettier-ignore-start -->
332+
333+
!!! warning "Maintenance Operations"
334+
Because `add_files` commits the existing parquet files to the Iceberg Table as any other data file, destructive maintenance operations like expiring snapshots will remove them.
335+
336+
<!-- prettier-ignore-end -->
337+
305338
## Schema evolution
306339
307340
PyIceberg supports full schema evolution through the Python API. It takes care of setting the field-IDs and makes sure that only non-breaking changes are done (can be overriden).

pyiceberg/io/pyarrow.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@
124124
visit,
125125
visit_with_partner,
126126
)
127-
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
127+
from pyiceberg.table import AddFileTask, PropertyUtil, TableProperties, WriteTask
128128
from pyiceberg.table.metadata import TableMetadata
129129
from pyiceberg.table.name_mapping import NameMapping
130130
from pyiceberg.transforms import TruncateTransform
@@ -1772,6 +1772,39 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT
17721772
return iter([data_file])
17731773

17741774

1775+
def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[AddFileTask]) -> Iterator[DataFile]:
1776+
for task in tasks:
1777+
input_file = io.new_input(task.file_path)
1778+
with input_file.open() as input_stream:
1779+
parquet_metadata = pq.read_metadata(input_stream)
1780+
1781+
if visit_pyarrow(parquet_metadata.schema.to_arrow_schema(), _HasIds()):
1782+
raise NotImplementedError(
1783+
f"Cannot add file {task.file_path} because it has field IDs. `add_files` only supports addition of files without field_ids"
1784+
)
1785+
1786+
schema = table_metadata.schema()
1787+
data_file = DataFile(
1788+
content=DataFileContent.DATA,
1789+
file_path=task.file_path,
1790+
file_format=FileFormat.PARQUET,
1791+
partition=task.partition_field_value,
1792+
record_count=parquet_metadata.num_rows,
1793+
file_size_in_bytes=len(input_file),
1794+
sort_order_id=None,
1795+
spec_id=table_metadata.default_spec_id,
1796+
equality_ids=None,
1797+
key_metadata=None,
1798+
)
1799+
fill_parquet_file_metadata(
1800+
data_file=data_file,
1801+
parquet_metadata=parquet_metadata,
1802+
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
1803+
parquet_column_mapping=parquet_path_to_id_mapping(schema),
1804+
)
1805+
yield data_file
1806+
1807+
17751808
ICEBERG_UNCOMPRESSED_CODEC = "uncompressed"
17761809
PYARROW_UNCOMPRESSED_CODEC = "none"
17771810

pyiceberg/table/__init__.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
Dict,
3434
Generic,
3535
Iterable,
36+
Iterator,
3637
List,
3738
Literal,
3839
Optional,
@@ -115,6 +116,7 @@
115116
Identifier,
116117
KeyDefaultDict,
117118
Properties,
119+
Record,
118120
)
119121
from pyiceberg.types import (
120122
IcebergType,
@@ -1147,6 +1149,27 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T
11471149
for data_file in data_files:
11481150
update_snapshot.append_data_file(data_file)
11491151

1152+
def add_files(self, file_paths: List[str]) -> None:
1153+
"""
1154+
Shorthand API for adding files as data files to the table.
1155+
1156+
Args:
1157+
file_paths: The list of full file paths to be added as data files to the table
1158+
1159+
Raises:
1160+
FileNotFoundError: If the file does not exist.
1161+
"""
1162+
if len(self.spec().fields) > 0:
1163+
raise ValueError("Cannot add files to partitioned tables")
1164+
1165+
with self.transaction() as tx:
1166+
if self.name_mapping() is None:
1167+
tx.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: self.schema().name_mapping.model_dump_json()})
1168+
with tx.update_snapshot().fast_append() as update_snapshot:
1169+
data_files = _parquet_files_to_data_files(table_metadata=self.metadata, file_paths=file_paths, io=self.io)
1170+
for data_file in data_files:
1171+
update_snapshot.append_data_file(data_file)
1172+
11501173
def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
11511174
return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive)
11521175

@@ -2444,6 +2467,12 @@ def generate_data_file_filename(self, extension: str) -> str:
24442467
return f"00000-{self.task_id}-{self.write_uuid}.{extension}"
24452468

24462469

2470+
@dataclass(frozen=True)
2471+
class AddFileTask:
2472+
file_path: str
2473+
partition_field_value: Record
2474+
2475+
24472476
def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
24482477
return f'{location}/metadata/{commit_uuid}-m{num}.avro'
24492478

@@ -2475,6 +2504,29 @@ def _dataframe_to_data_files(
24752504
yield from write_file(io=io, table_metadata=table_metadata, tasks=iter([WriteTask(write_uuid, next(counter), df)]))
24762505

24772506

2507+
def add_file_tasks_from_file_paths(file_paths: List[str], table_metadata: TableMetadata) -> Iterator[AddFileTask]:
2508+
if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 0]) > 0:
2509+
raise ValueError("Cannot add files to partitioned tables")
2510+
2511+
for file_path in file_paths:
2512+
yield AddFileTask(
2513+
file_path=file_path,
2514+
partition_field_value=Record(),
2515+
)
2516+
2517+
2518+
def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]:
2519+
"""Convert a list files into DataFiles.
2520+
2521+
Returns:
2522+
An iterable that supplies DataFiles that describe the parquet files.
2523+
"""
2524+
from pyiceberg.io.pyarrow import parquet_files_to_data_files
2525+
2526+
tasks = add_file_tasks_from_file_paths(file_paths, table_metadata)
2527+
yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, tasks=tasks)
2528+
2529+
24782530
class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
24792531
commit_uuid: uuid.UUID
24802532
_operation: Operation

0 commit comments

Comments
 (0)