Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ schema = Schema(
tbl = catalog.create_table("default.cities", schema=schema)
```

### Append and Overwrite

Now write the data to the table:

<!-- prettier-ignore-start -->
Expand Down Expand Up @@ -333,6 +335,49 @@ df = pa.Table.from_pylist(
table.append(df)
```

### Write Parquet Files

PyIceberg provides a low-level API to write Parquet files in Iceberg-compatible format without committing them to the table metadata. This is useful when you need more control over the commit process:

```python
file_paths = tbl.write_parquet(df)
```

The `write_parquet()` method takes a PyArrow table and writes it to Parquet files following the table's schema and partitioning, returning the paths of the written files:

```python
import pyarrow as pa

df = pa.Table.from_pylist(
[
{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
{"city": "Drachten", "lat": 53.11254, "long": 6.0989},
{"city": "Paris", "lat": 48.864716, "long": 2.349014},
],
)

# Write files but don't commit them
file_paths = tbl.write_parquet(df)
print(file_paths)
# ['s3a://warehouse/default/cities/data/00000-0-8e056d57-7ffa-4c22-9f99-52a0e5ea4b19.parquet']

# Files written but not committed - won't appear in queries until committed
```

To make these files visible when querying the table, you need to commit them using the [`add_files`](#add-files) API:


```python
# Commit the files to the table metadata
tbl.add_files(file_paths=file_paths)

# Now the data is visible when querying the table
```

### Delete


You can delete some of the data from the table by calling `tbl.delete()` with a desired `delete_filter`.

```python
Expand Down Expand Up @@ -1055,6 +1100,19 @@ tbl.add_files(file_paths=file_paths)
# A new snapshot is committed to the table with manifests pointing to the existing parquet files
```

The `write_parquet()` method provides an easy way to write files in Iceberg-compatible format that can then be committed using `add_files`:

```python
# Write data to parquet files without committing
file_paths = tbl.write_parquet(df)

# Commit the files to make them visible in queries
tbl.add_files(file_paths=file_paths)
```

This is very useful for detaching the commit process when ingesting data into an Iceberg table with high concurrency, such as using serverless functions. By separating the write and commit phases, you can implement a queue or orchestration system to handle the concurrency lock only during the commit process, which is typically much faster than the data writing phase.


<!-- prettier-ignore-start -->

!!! note "Name Mapping"
Expand Down
23 changes: 14 additions & 9 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2205,24 +2205,25 @@ def _partition_value(self, partition_field: PartitionField, schema: Schema) -> A
f"Cannot infer partition value from parquet metadata for a non-linear Partition Field: {partition_field.name} with transform {partition_field.transform}"
)

lower_value = partition_record_value(
source_field = schema.find_field(partition_field.source_id)
transform = partition_field.transform.transform(source_field.field_type)

lower_value = transform(partition_record_value(
partition_field=partition_field,
value=self.column_aggregates[partition_field.source_id].current_min,
schema=schema,
)
upper_value = partition_record_value(
))
upper_value = transform(partition_record_value(
partition_field=partition_field,
value=self.column_aggregates[partition_field.source_id].current_max,
schema=schema,
)
))
if lower_value != upper_value:
raise ValueError(
f"Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: {partition_field.name}. {lower_value=}, {upper_value=}"
)

source_field = schema.find_field(partition_field.source_id)
transform = partition_field.transform.transform(source_field.field_type)
return transform(lower_value)
return lower_value

def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record:
return Record(**{field.name: self._partition_value(field, schema) for field in partition_spec.fields})
Expand Down Expand Up @@ -2353,7 +2354,8 @@ def data_file_statistics_from_parquet_metadata(
)


def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask], include_field_ids: bool = True
) -> Iterator[DataFile]:
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties

parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
Expand All @@ -2380,7 +2382,7 @@ def write_parquet(task: WriteTask) -> DataFile:
file_schema=task.schema,
batch=batch,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
include_field_ids=True,
include_field_ids=include_field_ids,
)
for batch in task.record_batches
]
Expand Down Expand Up @@ -2549,6 +2551,7 @@ def _dataframe_to_data_files(
io: FileIO,
write_uuid: Optional[uuid.UUID] = None,
counter: Optional[itertools.count[int]] = None,
include_field_ids: bool = True
) -> Iterable[DataFile]:
"""Convert a PyArrow table into a DataFile.

Expand Down Expand Up @@ -2578,6 +2581,7 @@ def _dataframe_to_data_files(
for batches in bin_pack_arrow_table(df, target_file_size)
]
),
include_field_ids=include_field_ids
)
else:
partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df)
Expand All @@ -2597,6 +2601,7 @@ def _dataframe_to_data_files(
for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size)
]
),
include_field_ids=include_field_ids
)


Expand Down
43 changes: 43 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,49 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
with self.transaction() as tx:
tx.append(df=df, snapshot_properties=snapshot_properties)

def write_parquet(self, df: pa.Table) -> List[str]:
"""
Shorthand API for writing a PyArrow table as Parquet files for the table.
Writes data files but does not commit them to the table.

Args:
df: The Arrow table that will be written as Parquet files

Returns:
List of file paths to the written Parquet files
"""
try:
import pyarrow as pa
except ModuleNotFoundError as e:
raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

if not isinstance(df, pa.Table):
raise ValueError(f"Expected PyArrow table, got: {df}")

if unsupported_partitions := [
field for field in self.metadata.spec().fields if not field.transform.supports_pyarrow_transform
]:
raise ValueError(
f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}."
)
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
_check_pyarrow_schema_compatible(
self.metadata.schema(), provided_schema=df.schema,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
)

if df.shape[0] > 0:
data_files = list(
_dataframe_to_data_files(
table_metadata=self.metadata, write_uuid=uuid.uuid4(), df=df, io=self.io, include_field_ids=False
)
)

return [data_file.file_path for data_file in data_files]


def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
"""Shorthand for dynamic overwriting the table with a PyArrow table.

Expand Down