From 9b47befd0f39c68df1e8e473eddfca68e99d5c6b Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Mon, 18 Mar 2024 18:31:27 +0000 Subject: [PATCH 1/5] add_files support partitioned tables --- pyiceberg/io/pyarrow.py | 140 +++++++++++++++++++--------- pyiceberg/manifest.py | 4 + pyiceberg/partitioning.py | 25 ++++- pyiceberg/table/__init__.py | 18 +--- tests/integration/test_add_files.py | 71 +++++++++++++- tests/io/test_pyarrow_stats.py | 132 +++++++++++++------------- 6 files changed, 256 insertions(+), 134 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 31d846f6f0..46870e69d0 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -111,6 +111,7 @@ DataFileContent, FileFormat, ) +from pyiceberg.partitioning import PartitionField, PartitionSpec, partition_record_value from pyiceberg.schema import ( PartnerAccessor, PreOrderSchemaVisitor, @@ -124,7 +125,7 @@ visit, visit_with_partner, ) -from pyiceberg.table import AddFileTask, PropertyUtil, TableProperties, WriteTask +from pyiceberg.table import PropertyUtil, TableProperties, WriteTask from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.name_mapping import NameMapping from pyiceberg.transforms import TruncateTransform @@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping( return result -def fill_parquet_file_metadata( - data_file: DataFile, +@dataclass +class DataFileStatistics: + record_count: int + column_sizes: Dict[int, int] + value_counts: Dict[int, int] + null_value_counts: Dict[int, int] + nan_value_counts: Dict[int, int] + column_aggregates: Dict[int, StatsAggregator] + split_offsets: Optional[List[int]] = None + + def _partition_value(self, partition_field: PartitionField, schema: Schema) -> Any: + if partition_field.source_id not in self.column_aggregates: + return None + + if not partition_field.transform.preserves_order: + raise ValueError( + f"Cannot infer partition value from parquet metadata for a non-linear Partition Field. {partition_field}" + ) + + lower_value = partition_record_value( + partition_field=partition_field, + value=self.column_aggregates[partition_field.source_id].current_min, + schema=schema, + ) + upper_value = partition_record_value( + partition_field=partition_field, + value=self.column_aggregates[partition_field.source_id].current_max, + schema=schema, + ) + if lower_value != upper_value: + raise ValueError( + f"Cannot infer partition value from parquet metadata as there are more than one partition values: {lower_value=}, {upper_value=}" + ) + return lower_value + + def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record: + return Record(**{field.name: self._partition_value(field, schema) for field in partition_spec.fields}) + + def to_serialized_dict(self) -> Dict[str, Any]: + lower_bounds = {} + upper_bounds = {} + + for k, agg in self.column_aggregates.items(): + _min = agg.min_as_bytes() + if _min is not None: + lower_bounds[k] = _min + _max = agg.max_as_bytes() + if _max is not None: + upper_bounds[k] = _max + return { + "record_count": self.record_count, + "column_sizes": self.column_sizes, + "value_counts": self.value_counts, + "null_value_counts": self.null_value_counts, + "nan_value_counts": self.nan_value_counts, + "lower_bounds": lower_bounds, + "upper_bounds": upper_bounds, + "split_offsets": self.split_offsets, + } + + +def data_file_statistics_from_parquet_metadata( parquet_metadata: pq.FileMetaData, stats_columns: Dict[int, StatisticsCollector], parquet_column_mapping: Dict[str, int], -) -> None: +) -> DataFileStatistics: """ - Compute and fill the following fields of the DataFile object. + Compute and return DataFileStatistics that includes the following. - - file_format + - record_count - column_sizes - value_counts - null_value_counts - nan_value_counts - - lower_bounds - - upper_bounds + - column_aggregates - split_offsets Args: - data_file (DataFile): A DataFile object representing the Parquet file for which metadata is to be filled. parquet_metadata (pyarrow.parquet.FileMetaData): A pyarrow metadata object. stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to set the mode for column metrics collection + parquet_column_mapping (Dict[str, int]): The mapping of the parquet file name to the field ID """ if parquet_metadata.num_columns != len(stats_columns): raise ValueError( @@ -1695,30 +1755,19 @@ def fill_parquet_file_metadata( split_offsets.sort() - lower_bounds = {} - upper_bounds = {} - - for k, agg in col_aggs.items(): - _min = agg.min_as_bytes() - if _min is not None: - lower_bounds[k] = _min - _max = agg.max_as_bytes() - if _max is not None: - upper_bounds[k] = _max - for field_id in invalidate_col: - del lower_bounds[field_id] - del upper_bounds[field_id] + del col_aggs[field_id] del null_value_counts[field_id] - data_file.record_count = parquet_metadata.num_rows - data_file.column_sizes = column_sizes - data_file.value_counts = value_counts - data_file.null_value_counts = null_value_counts - data_file.nan_value_counts = nan_value_counts - data_file.lower_bounds = lower_bounds - data_file.upper_bounds = upper_bounds - data_file.split_offsets = split_offsets + return DataFileStatistics( + record_count=parquet_metadata.num_rows, + column_sizes=column_sizes, + value_counts=value_counts, + null_value_counts=null_value_counts, + nan_value_counts=nan_value_counts, + column_aggregates=col_aggs, + split_offsets=split_offsets, + ) def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: @@ -1762,33 +1811,36 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT equality_ids=None, key_metadata=None, ) - - fill_parquet_file_metadata( - data_file=data_file, + statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=writer.writer.metadata, stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) + data_file.update(statistics.to_serialized_dict()) return iter([data_file]) -def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[AddFileTask]) -> Iterator[DataFile]: - for task in tasks: - input_file = io.new_input(task.file_path) +def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]: + for file_path in file_paths: + input_file = io.new_input(file_path) with input_file.open() as input_stream: parquet_metadata = pq.read_metadata(input_stream) if visit_pyarrow(parquet_metadata.schema.to_arrow_schema(), _HasIds()): raise NotImplementedError( - f"Cannot add file {task.file_path} because it has field IDs. `add_files` only supports addition of files without field_ids" + f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids" ) - schema = table_metadata.schema() + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=parquet_metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), + ) data_file = DataFile( content=DataFileContent.DATA, - file_path=task.file_path, + file_path=file_path, file_format=FileFormat.PARQUET, - partition=task.partition_field_value, + partition=statistics.partition(table_metadata.spec(), table_metadata.schema()), record_count=parquet_metadata.num_rows, file_size_in_bytes=len(input_file), sort_order_id=None, @@ -1796,12 +1848,8 @@ def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, tasks equality_ids=None, key_metadata=None, ) - fill_parquet_file_metadata( - data_file=data_file, - parquet_metadata=parquet_metadata, - stats_columns=compute_statistics_plan(schema, table_metadata.properties), - parquet_column_mapping=parquet_path_to_id_mapping(schema), - ) + + data_file.update(statistics.to_serialized_dict()) yield data_file diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 03dc3199bf..8722914d07 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -389,6 +389,10 @@ def __eq__(self, other: Any) -> bool: """ return self.file_path == other.file_path if isinstance(other, DataFile) else False + def update(self, other: Dict[str, Any]) -> None: + for k, v in other.items(): + self.__setattr__(k, v) + MANIFEST_ENTRY_SCHEMAS = { 1: Schema( diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index a6692b325e..4512a8aea9 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -388,16 +388,33 @@ def partition(self) -> Record: # partition key transformed with iceberg interna if len(partition_fields) != 1: raise ValueError("partition_fields must contain exactly one field.") partition_field = partition_fields[0] - iceberg_type = self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type - iceberg_typed_value = _to_partition_representation(iceberg_type, raw_partition_field_value.value) - transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value) - iceberg_typed_key_values[partition_field.name] = transformed_value + iceberg_typed_key_values[partition_field.name] = partition_record_value( + partition_field=partition_field, + value=raw_partition_field_value.value, + schema=self.schema, + ) return Record(**iceberg_typed_key_values) def to_path(self) -> str: return self.partition_spec.partition_to_path(self.partition, self.schema) +def partition_record_value(partition_field: PartitionField, value: Any, schema: Schema) -> Any: + """ + Return the Partition Record representation of the value. + + The value is first converted to internal partition representation. + For example, UUID is converted to str, DateType to epoch-days, etc. + + Then the corresponding PartitionField's transform is applied to return + the final partition record value. + """ + iceberg_type = schema.find_field(name_or_id=partition_field.source_id).field_type + iceberg_typed_value = _to_partition_representation(iceberg_type, value) + transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value) + return transformed_value + + @singledispatch def _to_partition_representation(type: IcebergType, value: Any) -> Any: return TypeError(f"Unsupported partition field type: {type}") diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 517e6c86df..c0db85f854 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -33,7 +33,6 @@ Dict, Generic, Iterable, - Iterator, List, Literal, Optional, @@ -1170,9 +1169,6 @@ def add_files(self, file_paths: List[str]) -> None: Raises: FileNotFoundError: If the file does not exist. """ - if len(self.spec().fields) > 0: - raise ValueError("Cannot add files to partitioned tables") - with self.transaction() as tx: if self.name_mapping() is None: tx.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: self.schema().name_mapping.model_dump_json()}) @@ -2515,17 +2511,6 @@ def _dataframe_to_data_files( yield from write_file(io=io, table_metadata=table_metadata, tasks=iter([WriteTask(write_uuid, next(counter), df)])) -def add_file_tasks_from_file_paths(file_paths: List[str], table_metadata: TableMetadata) -> Iterator[AddFileTask]: - if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 0]) > 0: - raise ValueError("Cannot add files to partitioned tables") - - for file_path in file_paths: - yield AddFileTask( - file_path=file_path, - partition_field_value=Record(), - ) - - def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]: """Convert a list files into DataFiles. @@ -2534,8 +2519,7 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List """ from pyiceberg.io.pyarrow import parquet_files_to_data_files - tasks = add_file_tasks_from_file_paths(file_paths, table_metadata) - yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, tasks=tasks) + yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, file_paths=iter(file_paths)) class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]): diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 2066e178cd..b7ce435bdd 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -26,9 +26,10 @@ from pyiceberg.catalog import Catalog from pyiceberg.exceptions import NoSuchTableError -from pyiceberg.partitioning import PartitionSpec +from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table +from pyiceberg.transforms import IdentityTransform, MonthTransform from pyiceberg.types import ( BooleanType, DateType, @@ -238,3 +239,71 @@ def test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio for col in df.columns: value_count = 1 if col == "quux" else 6 assert df.filter(df[col].isNotNull()).count() == value_count, f"Expected {value_count} rows to be non-null" + + +@pytest.mark.integration +def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.partitioned_table" + + partition_spec = PartitionSpec( + PartitionField(source_id=4, field_id=1000, transform=IdentityTransform(), name="baz"), + PartitionField(source_id=10, field_id=1001, transform=MonthTransform(), name="qux_month"), + spec_id=0, + ) + + tbl = _create_table(session_catalog, identifier, partition_spec) + + date_iter = iter([date(2024, 3, 7), date(2024, 3, 8), date(2024, 3, 16), date(2024, 3, 18), date(2024, 3, 19)]) + + file_paths = [f"s3://warehouse/default/partitioned/baz=123/qux=2024-03-07/test-{i}.parquet" for i in range(5)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer: + writer.write_table( + pa.Table.from_pylist( + [ + { + "foo": True, + "bar": "bar_string", + "baz": 123, + "qux": next(date_iter), + } + ], + schema=ARROW_SCHEMA, + ) + ) + + # add the parquet files as data files + tbl.add_files(file_paths=file_paths) + + # NameMapping must have been set to enable reads + assert tbl.name_mapping() is not None + + rows = spark.sql( + f""" + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + FROM {identifier}.all_manifests + """ + ).collect() + + assert [row.added_data_files_count for row in rows] == [5] + assert [row.existing_data_files_count for row in rows] == [0] + assert [row.deleted_data_files_count for row in rows] == [0] + + df = spark.table(identifier) + assert df.count() == 5, "Expected 5 rows" + for col in df.columns: + assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 rows to be non-null" + + partition_rows = spark.sql( + f""" + SELECT partition, record_count, file_count + FROM {identifier}.partitions + """ + ).collect() + + assert [row.record_count for row in partition_rows] == [5] + assert [row.file_count for row in partition_rows] == [5] + assert [(row.partition.baz, row.partition.qux_month) for row in partition_rows] == [(123, 650)] diff --git a/tests/io/test_pyarrow_stats.py b/tests/io/test_pyarrow_stats.py index 01b844a43e..b9b7b3b306 100644 --- a/tests/io/test_pyarrow_stats.py +++ b/tests/io/test_pyarrow_stats.py @@ -52,7 +52,7 @@ MetricsMode, PyArrowStatisticsCollector, compute_statistics_plan, - fill_parquet_file_metadata, + data_file_statistics_from_parquet_metadata, match_metrics_mode, parquet_path_to_id_mapping, schema_to_pyarrow, @@ -186,12 +186,12 @@ def test_record_count() -> None: schema = get_current_schema(table_metadata) datafile = DataFile() - fill_parquet_file_metadata( - datafile, - metadata, - compute_statistics_plan(schema, table_metadata.properties), - parquet_path_to_id_mapping(schema), + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), ) + datafile.update(statistics.to_serialized_dict()) assert datafile.record_count == 4 @@ -200,12 +200,12 @@ def test_value_counts() -> None: schema = get_current_schema(table_metadata) datafile = DataFile() - fill_parquet_file_metadata( - datafile, - metadata, - compute_statistics_plan(schema, table_metadata.properties), - parquet_path_to_id_mapping(schema), + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), ) + datafile.update(statistics.to_serialized_dict()) assert len(datafile.value_counts) == 7 assert datafile.value_counts[1] == 4 @@ -222,12 +222,12 @@ def test_column_sizes() -> None: schema = get_current_schema(table_metadata) datafile = DataFile() - fill_parquet_file_metadata( - datafile, - metadata, - compute_statistics_plan(schema, table_metadata.properties), - parquet_path_to_id_mapping(schema), + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), ) + datafile.update(statistics.to_serialized_dict()) assert len(datafile.column_sizes) == 7 # these values are an artifact of how the write_table encodes the columns @@ -243,12 +243,12 @@ def test_null_and_nan_counts() -> None: schema = get_current_schema(table_metadata) datafile = DataFile() - fill_parquet_file_metadata( - datafile, - metadata, - compute_statistics_plan(schema, table_metadata.properties), - parquet_path_to_id_mapping(schema), + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), ) + datafile.update(statistics.to_serialized_dict()) assert len(datafile.null_value_counts) == 7 assert datafile.null_value_counts[1] == 1 @@ -271,12 +271,12 @@ def test_bounds() -> None: schema = get_current_schema(table_metadata) datafile = DataFile() - fill_parquet_file_metadata( - datafile, - metadata, - compute_statistics_plan(schema, table_metadata.properties), - parquet_path_to_id_mapping(schema), + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), ) + datafile.update(statistics.to_serialized_dict()) assert len(datafile.lower_bounds) == 2 assert datafile.lower_bounds[1].decode() == "aaaaaaaaaaaaaaaa" @@ -316,12 +316,12 @@ def test_metrics_mode_none() -> None: schema = get_current_schema(table_metadata) datafile = DataFile() table_metadata.properties["write.metadata.metrics.default"] = "none" - fill_parquet_file_metadata( - datafile, - metadata, - compute_statistics_plan(schema, table_metadata.properties), - parquet_path_to_id_mapping(schema), + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), ) + datafile.update(statistics.to_serialized_dict()) assert len(datafile.value_counts) == 0 assert len(datafile.null_value_counts) == 0 @@ -336,12 +336,12 @@ def test_metrics_mode_counts() -> None: schema = get_current_schema(table_metadata) datafile = DataFile() table_metadata.properties["write.metadata.metrics.default"] = "counts" - fill_parquet_file_metadata( - datafile, - metadata, - compute_statistics_plan(schema, table_metadata.properties), - parquet_path_to_id_mapping(schema), + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), ) + datafile.update(statistics.to_serialized_dict()) assert len(datafile.value_counts) == 7 assert len(datafile.null_value_counts) == 7 @@ -356,12 +356,12 @@ def test_metrics_mode_full() -> None: schema = get_current_schema(table_metadata) datafile = DataFile() table_metadata.properties["write.metadata.metrics.default"] = "full" - fill_parquet_file_metadata( - datafile, - metadata, - compute_statistics_plan(schema, table_metadata.properties), - parquet_path_to_id_mapping(schema), + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), ) + datafile.update(statistics.to_serialized_dict()) assert len(datafile.value_counts) == 7 assert len(datafile.null_value_counts) == 7 @@ -382,12 +382,12 @@ def test_metrics_mode_non_default_trunc() -> None: schema = get_current_schema(table_metadata) datafile = DataFile() table_metadata.properties["write.metadata.metrics.default"] = "truncate(2)" - fill_parquet_file_metadata( - datafile, - metadata, - compute_statistics_plan(schema, table_metadata.properties), - parquet_path_to_id_mapping(schema), + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), ) + datafile.update(statistics.to_serialized_dict()) assert len(datafile.value_counts) == 7 assert len(datafile.null_value_counts) == 7 @@ -409,12 +409,12 @@ def test_column_metrics_mode() -> None: datafile = DataFile() table_metadata.properties["write.metadata.metrics.default"] = "truncate(2)" table_metadata.properties["write.metadata.metrics.column.strings"] = "none" - fill_parquet_file_metadata( - datafile, - metadata, - compute_statistics_plan(schema, table_metadata.properties), - parquet_path_to_id_mapping(schema), + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), ) + datafile.update(statistics.to_serialized_dict()) assert len(datafile.value_counts) == 6 assert len(datafile.null_value_counts) == 6 @@ -510,12 +510,12 @@ def test_metrics_primitive_types() -> None: schema = get_current_schema(table_metadata) datafile = DataFile() table_metadata.properties["write.metadata.metrics.default"] = "truncate(2)" - fill_parquet_file_metadata( - datafile, - metadata, - compute_statistics_plan(schema, table_metadata.properties), - parquet_path_to_id_mapping(schema), + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), ) + datafile.update(statistics.to_serialized_dict()) assert len(datafile.value_counts) == 12 assert len(datafile.null_value_counts) == 12 @@ -609,12 +609,12 @@ def test_metrics_invalid_upper_bound() -> None: schema = get_current_schema(table_metadata) datafile = DataFile() table_metadata.properties["write.metadata.metrics.default"] = "truncate(2)" - fill_parquet_file_metadata( - datafile, - metadata, - compute_statistics_plan(schema, table_metadata.properties), - parquet_path_to_id_mapping(schema), + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), ) + datafile.update(statistics.to_serialized_dict()) assert len(datafile.value_counts) == 4 assert len(datafile.null_value_counts) == 4 @@ -636,12 +636,12 @@ def test_offsets() -> None: schema = get_current_schema(table_metadata) datafile = DataFile() - fill_parquet_file_metadata( - datafile, - metadata, - compute_statistics_plan(schema, table_metadata.properties), - parquet_path_to_id_mapping(schema), + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), ) + datafile.update(statistics.to_serialized_dict()) assert datafile.split_offsets is not None assert len(datafile.split_offsets) == 1 From 1b6288573360093e2f07aef866a98a21c0e3e1a2 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Tue, 19 Mar 2024 00:43:56 +0000 Subject: [PATCH 2/5] docs --- mkdocs/docs/api.md | 7 +++++ tests/integration/test_add_files.py | 42 +++++++++++++++++++++++++++-- 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 5897881fcc..06f7ff41db 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -330,6 +330,13 @@ tbl.add_files(file_paths=file_paths) +!!! note "Partitions" + `add_files` only requires the client to read the existing parquet files' metadata footer in order to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like MonthTransform, and TruncateTransform which preserve the order of the values after the transformation (Any Transform that has `preserves_order` property set to True is supported). + + + + + !!! warning "Maintenance Operations" Because `add_files` commits the existing parquet files to the Iceberg Table as any other data file, destructive maintenance operations like expiring snapshots will remove them. diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index b7ce435bdd..a340f30839 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -29,7 +29,7 @@ from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table -from pyiceberg.transforms import IdentityTransform, MonthTransform +from pyiceberg.transforms import BucketTransform, IdentityTransform, MonthTransform from pyiceberg.types import ( BooleanType, DateType, @@ -255,7 +255,7 @@ def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Ca date_iter = iter([date(2024, 3, 7), date(2024, 3, 8), date(2024, 3, 16), date(2024, 3, 18), date(2024, 3, 19)]) - file_paths = [f"s3://warehouse/default/partitioned/baz=123/qux=2024-03-07/test-{i}.parquet" for i in range(5)] + file_paths = [f"s3://warehouse/default/partitioned/test-{i}.parquet" for i in range(5)] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) @@ -307,3 +307,41 @@ def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Ca assert [row.record_count for row in partition_rows] == [5] assert [row.file_count for row in partition_rows] == [5] assert [(row.partition.baz, row.partition.qux_month) for row in partition_rows] == [(123, 650)] + + +@pytest.mark.integration +def test_add_files_to_bucket_partitioned_table_fails(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.partitioned_table_2" + + partition_spec = PartitionSpec( + PartitionField(source_id=4, field_id=1000, transform=BucketTransform(num_buckets=3), name="baz_bucket_3"), + spec_id=0, + ) + + tbl = _create_table(session_catalog, identifier, partition_spec) + + int_iter = iter(range(5)) + + file_paths = [f"s3://warehouse/default/partitioned_2/test-{i}.parquet" for i in range(5)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer: + writer.write_table( + pa.Table.from_pylist( + [ + { + "foo": True, + "bar": "bar_string", + "baz": next(int_iter), + "qux": date(2024, 3, 7), + } + ], + schema=ARROW_SCHEMA, + ) + ) + + # add the parquet files as data files + with pytest.raises(ValueError): + tbl.add_files(file_paths=file_paths) From ae3619aa8db6071537a498f28f719792ecea7c6d Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Tue, 19 Mar 2024 13:31:10 +0000 Subject: [PATCH 3/5] more --- Makefile | 2 +- mkdocs/docs/api.md | 10 +------ pyiceberg/io/pyarrow.py | 22 +++++++------- pyiceberg/manifest.py | 4 --- pyiceberg/partitioning.py | 2 +- tests/integration/test_add_files.py | 46 ++++++++++++++++++++++++++++- tests/io/test_pyarrow_stats.py | 39 ++++++++---------------- 7 files changed, 72 insertions(+), 53 deletions(-) diff --git a/Makefile b/Makefile index c3e816ebd5..133a13983b 100644 --- a/Makefile +++ b/Makefile @@ -42,7 +42,7 @@ test-integration: docker-compose -f dev/docker-compose-integration.yml up -d sleep 10 docker-compose -f dev/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py - poetry run pytest tests/ -v -m integration ${PYTEST_ARGS} + poetry run pytest tests/integration/test_add_files.py -v -m integration ${PYTEST_ARGS} test-integration-rebuild: docker-compose -f dev/docker-compose-integration.yml kill diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 06f7ff41db..66c1a5b635 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -326,16 +326,8 @@ tbl.add_files(file_paths=file_paths) !!! note "Name Mapping" Because `add_files` uses existing files without writing new parquet files that are aware of the Iceberg's schema, it requires the Iceberg's table to have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) (The Name mapping maps the field names within the parquet files to the Iceberg field IDs). Hence, `add_files` requires that there are no field IDs in the parquet file's metadata, and creates a new Name Mapping based on the table's current schema if the table doesn't already have one. - - - - !!! note "Partitions" - `add_files` only requires the client to read the existing parquet files' metadata footer in order to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like MonthTransform, and TruncateTransform which preserve the order of the values after the transformation (Any Transform that has `preserves_order` property set to True is supported). - - - - + `add_files` only requires the client to read the existing parquet files' metadata footer to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like `MonthTransform`, and `TruncateTransform` which preserve the order of the values after the transformation (Any Transform that has the `preserves_order` property set to True is supported). !!! warning "Maintenance Operations" Because `add_files` commits the existing parquet files to the Iceberg Table as any other data file, destructive maintenance operations like expiring snapshots will remove them. diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 46870e69d0..3affd08cbb 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1595,7 +1595,7 @@ def parquet_path_to_id_mapping( return result -@dataclass +@dataclass(frozen=True) class DataFileStatistics: record_count: int column_sizes: Dict[int, int] @@ -1611,7 +1611,7 @@ def _partition_value(self, partition_field: PartitionField, schema: Schema) -> A if not partition_field.transform.preserves_order: raise ValueError( - f"Cannot infer partition value from parquet metadata for a non-linear Partition Field. {partition_field}" + f"Cannot infer partition value from parquet metadata for a non-linear Partition Field: {partition_field.name} with transform {partition_field.transform}" ) lower_value = partition_record_value( @@ -1626,7 +1626,7 @@ def _partition_value(self, partition_field: PartitionField, schema: Schema) -> A ) if lower_value != upper_value: raise ValueError( - f"Cannot infer partition value from parquet metadata as there are more than one partition values: {lower_value=}, {upper_value=}" + f"Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: {partition_field.name}. {lower_value=}, {upper_value=}" ) return lower_value @@ -1796,6 +1796,11 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer: writer.write_table(task.df, row_group_size=row_group_size) + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=writer.writer.metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), + ) data_file = DataFile( content=DataFileContent.DATA, file_path=file_path, @@ -1810,13 +1815,9 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT spec_id=table_metadata.default_spec_id, equality_ids=None, key_metadata=None, + **statistics.to_serialized_dict(), ) - statistics = data_file_statistics_from_parquet_metadata( - parquet_metadata=writer.writer.metadata, - stats_columns=compute_statistics_plan(schema, table_metadata.properties), - parquet_column_mapping=parquet_path_to_id_mapping(schema), - ) - data_file.update(statistics.to_serialized_dict()) + return iter([data_file]) @@ -1841,15 +1842,14 @@ def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_ file_path=file_path, file_format=FileFormat.PARQUET, partition=statistics.partition(table_metadata.spec(), table_metadata.schema()), - record_count=parquet_metadata.num_rows, file_size_in_bytes=len(input_file), sort_order_id=None, spec_id=table_metadata.default_spec_id, equality_ids=None, key_metadata=None, + **statistics.to_serialized_dict(), ) - data_file.update(statistics.to_serialized_dict()) yield data_file diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 8722914d07..03dc3199bf 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -389,10 +389,6 @@ def __eq__(self, other: Any) -> bool: """ return self.file_path == other.file_path if isinstance(other, DataFile) else False - def update(self, other: Dict[str, Any]) -> None: - for k, v in other.items(): - self.__setattr__(k, v) - MANIFEST_ENTRY_SCHEMAS = { 1: Schema( diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index 4512a8aea9..16f158828d 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -404,7 +404,7 @@ def partition_record_value(partition_field: PartitionField, value: Any, schema: Return the Partition Record representation of the value. The value is first converted to internal partition representation. - For example, UUID is converted to str, DateType to epoch-days, etc. + For example, UUID is converted to bytes[16], DateType to days since epoch, etc. Then the corresponding PartitionField's transform is applied to return the final partition record value. diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index a340f30839..f725e50b9e 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -343,5 +343,49 @@ def test_add_files_to_bucket_partitioned_table_fails(spark: SparkSession, sessio ) # add the parquet files as data files - with pytest.raises(ValueError): + with pytest.raises(ValueError) as exc_info: tbl.add_files(file_paths=file_paths) + assert "Cannot infer partition value from parquet metadata for a non-linear Partition Field: baz_bucket_3 with transform bucket[3]" in str(exc_info.value) + + +@pytest.mark.integration +def test_add_files_to_partitioned_table_fails_with_lower_and_upper_mismatch(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.partitioned_table_3" + + partition_spec = PartitionSpec( + PartitionField(source_id=4, field_id=1000, transform=IdentityTransform(), name="baz"), + spec_id=0, + ) + + tbl = _create_table(session_catalog, identifier, partition_spec) + + file_paths = [f"s3://warehouse/default/partitioned_3/test-{i}.parquet" for i in range(5)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer: + writer.write_table( + pa.Table.from_pylist( + [ + { + "foo": True, + "bar": "bar_string", + "baz": 123, + "qux": date(2024, 3, 7), + }, + { + "foo": True, + "bar": "bar_string", + "baz": 124, + "qux": date(2024, 3, 7), + } + ], + schema=ARROW_SCHEMA, + ) + ) + + # add the parquet files as data files + with pytest.raises(ValueError) as exc_info: + tbl.add_files(file_paths=file_paths) + assert "Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: baz. lower_value=123, upper_value=124" in str(exc_info.value) \ No newline at end of file diff --git a/tests/io/test_pyarrow_stats.py b/tests/io/test_pyarrow_stats.py index b9b7b3b306..41f1432dbf 100644 --- a/tests/io/test_pyarrow_stats.py +++ b/tests/io/test_pyarrow_stats.py @@ -185,13 +185,12 @@ def test_record_count() -> None: metadata, table_metadata = construct_test_table() schema = get_current_schema(table_metadata) - datafile = DataFile() statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=metadata, stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile.update(statistics.to_serialized_dict()) + datafile = DataFile(**statistics.to_serialized_dict()) assert datafile.record_count == 4 @@ -199,13 +198,12 @@ def test_value_counts() -> None: metadata, table_metadata = construct_test_table() schema = get_current_schema(table_metadata) - datafile = DataFile() statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=metadata, stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile.update(statistics.to_serialized_dict()) + datafile = DataFile(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 7 assert datafile.value_counts[1] == 4 @@ -221,13 +219,12 @@ def test_column_sizes() -> None: metadata, table_metadata = construct_test_table() schema = get_current_schema(table_metadata) - datafile = DataFile() statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=metadata, stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile.update(statistics.to_serialized_dict()) + datafile = DataFile(**statistics.to_serialized_dict()) assert len(datafile.column_sizes) == 7 # these values are an artifact of how the write_table encodes the columns @@ -242,13 +239,12 @@ def test_null_and_nan_counts() -> None: metadata, table_metadata = construct_test_table() schema = get_current_schema(table_metadata) - datafile = DataFile() statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=metadata, stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile.update(statistics.to_serialized_dict()) + datafile = DataFile(**statistics.to_serialized_dict()) assert len(datafile.null_value_counts) == 7 assert datafile.null_value_counts[1] == 1 @@ -270,13 +266,12 @@ def test_bounds() -> None: metadata, table_metadata = construct_test_table() schema = get_current_schema(table_metadata) - datafile = DataFile() statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=metadata, stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile.update(statistics.to_serialized_dict()) + datafile = DataFile(**statistics.to_serialized_dict()) assert len(datafile.lower_bounds) == 2 assert datafile.lower_bounds[1].decode() == "aaaaaaaaaaaaaaaa" @@ -314,14 +309,13 @@ def test_metrics_mode_none() -> None: metadata, table_metadata = construct_test_table() schema = get_current_schema(table_metadata) - datafile = DataFile() table_metadata.properties["write.metadata.metrics.default"] = "none" statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=metadata, stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile.update(statistics.to_serialized_dict()) + datafile = DataFile(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 0 assert len(datafile.null_value_counts) == 0 @@ -334,14 +328,13 @@ def test_metrics_mode_counts() -> None: metadata, table_metadata = construct_test_table() schema = get_current_schema(table_metadata) - datafile = DataFile() table_metadata.properties["write.metadata.metrics.default"] = "counts" statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=metadata, stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile.update(statistics.to_serialized_dict()) + datafile = DataFile(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 7 assert len(datafile.null_value_counts) == 7 @@ -354,14 +347,13 @@ def test_metrics_mode_full() -> None: metadata, table_metadata = construct_test_table() schema = get_current_schema(table_metadata) - datafile = DataFile() table_metadata.properties["write.metadata.metrics.default"] = "full" statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=metadata, stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile.update(statistics.to_serialized_dict()) + datafile = DataFile(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 7 assert len(datafile.null_value_counts) == 7 @@ -380,14 +372,13 @@ def test_metrics_mode_non_default_trunc() -> None: metadata, table_metadata = construct_test_table() schema = get_current_schema(table_metadata) - datafile = DataFile() table_metadata.properties["write.metadata.metrics.default"] = "truncate(2)" statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=metadata, stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile.update(statistics.to_serialized_dict()) + datafile = DataFile(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 7 assert len(datafile.null_value_counts) == 7 @@ -406,7 +397,6 @@ def test_column_metrics_mode() -> None: metadata, table_metadata = construct_test_table() schema = get_current_schema(table_metadata) - datafile = DataFile() table_metadata.properties["write.metadata.metrics.default"] = "truncate(2)" table_metadata.properties["write.metadata.metrics.column.strings"] = "none" statistics = data_file_statistics_from_parquet_metadata( @@ -414,7 +404,7 @@ def test_column_metrics_mode() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile.update(statistics.to_serialized_dict()) + datafile = DataFile(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 6 assert len(datafile.null_value_counts) == 6 @@ -508,14 +498,13 @@ def test_metrics_primitive_types() -> None: metadata, table_metadata = construct_test_table_primitive_types() schema = get_current_schema(table_metadata) - datafile = DataFile() table_metadata.properties["write.metadata.metrics.default"] = "truncate(2)" statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=metadata, stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile.update(statistics.to_serialized_dict()) + datafile = DataFile(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 12 assert len(datafile.null_value_counts) == 12 @@ -607,14 +596,13 @@ def test_metrics_invalid_upper_bound() -> None: metadata, table_metadata = construct_test_table_invalid_upper_bound() schema = get_current_schema(table_metadata) - datafile = DataFile() table_metadata.properties["write.metadata.metrics.default"] = "truncate(2)" statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=metadata, stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile.update(statistics.to_serialized_dict()) + datafile = DataFile(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 4 assert len(datafile.null_value_counts) == 4 @@ -635,13 +623,12 @@ def test_offsets() -> None: metadata, table_metadata = construct_test_table() schema = get_current_schema(table_metadata) - datafile = DataFile() statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=metadata, stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile.update(statistics.to_serialized_dict()) + datafile = DataFile(**statistics.to_serialized_dict()) assert datafile.split_offsets is not None assert len(datafile.split_offsets) == 1 From a8213a7f23b84ccb4942e06b0bf9d02633c57118 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Tue, 19 Mar 2024 15:00:25 +0000 Subject: [PATCH 4/5] adopt review feedback --- Makefile | 2 +- mkdocs/docs/api.md | 2 +- tests/integration/test_add_files.py | 100 ++++++++++++++++++---------- 3 files changed, 65 insertions(+), 39 deletions(-) diff --git a/Makefile b/Makefile index 133a13983b..c3e816ebd5 100644 --- a/Makefile +++ b/Makefile @@ -42,7 +42,7 @@ test-integration: docker-compose -f dev/docker-compose-integration.yml up -d sleep 10 docker-compose -f dev/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py - poetry run pytest tests/integration/test_add_files.py -v -m integration ${PYTEST_ARGS} + poetry run pytest tests/ -v -m integration ${PYTEST_ARGS} test-integration-rebuild: docker-compose -f dev/docker-compose-integration.yml kill diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 66c1a5b635..5eec487c67 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -327,7 +327,7 @@ tbl.add_files(file_paths=file_paths) Because `add_files` uses existing files without writing new parquet files that are aware of the Iceberg's schema, it requires the Iceberg's table to have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) (The Name mapping maps the field names within the parquet files to the Iceberg field IDs). Hence, `add_files` requires that there are no field IDs in the parquet file's metadata, and creates a new Name Mapping based on the table's current schema if the table doesn't already have one. !!! note "Partitions" - `add_files` only requires the client to read the existing parquet files' metadata footer to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like `MonthTransform`, and `TruncateTransform` which preserve the order of the values after the transformation (Any Transform that has the `preserves_order` property set to True is supported). + `add_files` only requires the client to read the existing parquet files' metadata footer to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like `MonthTransform`, and `TruncateTransform` which preserve the order of the values after the transformation (Any Transform that has the `preserves_order` property set to True is supported). Please note that if the column statistics of the `PartitionField`'s source column are not present in the parquet metadata, the partition value is inferred as `None`. !!! warning "Maintenance Operations" Because `add_files` commits the existing parquet files to the Iceberg Table as any other data file, destructive maintenance operations like expiring snapshots will remove them. diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index f725e50b9e..7c17618280 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -104,25 +104,31 @@ ) -def _create_table(session_catalog: Catalog, identifier: str, partition_spec: Optional[PartitionSpec] = None) -> Table: +def _create_table( + session_catalog: Catalog, identifier: str, format_version: int, partition_spec: Optional[PartitionSpec] = None +) -> Table: try: session_catalog.drop_table(identifier=identifier) except NoSuchTableError: pass tbl = session_catalog.create_table( - identifier=identifier, schema=TABLE_SCHEMA, partition_spec=partition_spec if partition_spec else PartitionSpec() + identifier=identifier, + schema=TABLE_SCHEMA, + properties={"format-version": str(format_version)}, + partition_spec=partition_spec if partition_spec else PartitionSpec(), ) return tbl @pytest.mark.integration -def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog: Catalog) -> None: - identifier = "default.unpartitioned_table" - tbl = _create_table(session_catalog, identifier) +@pytest.mark.parametrize("format_version", [1, 2]) +def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.unpartitioned_table_v{format_version}" + tbl = _create_table(session_catalog, identifier, format_version) - file_paths = [f"s3://warehouse/default/unpartitioned/test-{i}.parquet" for i in range(5)] + file_paths = [f"s3://warehouse/default/unpartitioned/v{format_version}/test-{i}.parquet" for i in range(5)] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) @@ -154,11 +160,14 @@ def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog: @pytest.mark.integration -def test_add_files_to_unpartitioned_table_raises_file_not_found(spark: SparkSession, session_catalog: Catalog) -> None: - identifier = "default.unpartitioned_raises_not_found" - tbl = _create_table(session_catalog, identifier) - - file_paths = [f"s3://warehouse/default/unpartitioned_raises_not_found/test-{i}.parquet" for i in range(5)] +@pytest.mark.parametrize("format_version", [1, 2]) +def test_add_files_to_unpartitioned_table_raises_file_not_found( + spark: SparkSession, session_catalog: Catalog, format_version: int +) -> None: + identifier = f"default.unpartitioned_raises_not_found_v{format_version}" + tbl = _create_table(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/unpartitioned_raises_not_found/v{format_version}/test-{i}.parquet" for i in range(5)] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) @@ -172,11 +181,14 @@ def test_add_files_to_unpartitioned_table_raises_file_not_found(spark: SparkSess @pytest.mark.integration -def test_add_files_to_unpartitioned_table_raises_has_field_ids(spark: SparkSession, session_catalog: Catalog) -> None: - identifier = "default.unpartitioned_raises_field_ids" - tbl = _create_table(session_catalog, identifier) - - file_paths = [f"s3://warehouse/default/unpartitioned_raises_field_ids/test-{i}.parquet" for i in range(5)] +@pytest.mark.parametrize("format_version", [1, 2]) +def test_add_files_to_unpartitioned_table_raises_has_field_ids( + spark: SparkSession, session_catalog: Catalog, format_version: int +) -> None: + identifier = f"default.unpartitioned_raises_field_ids_v{format_version}" + tbl = _create_table(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/unpartitioned_raises_field_ids/v{format_version}/test-{i}.parquet" for i in range(5)] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) @@ -190,11 +202,14 @@ def test_add_files_to_unpartitioned_table_raises_has_field_ids(spark: SparkSessi @pytest.mark.integration -def test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSession, session_catalog: Catalog) -> None: - identifier = "default.unpartitioned_table_2" - tbl = _create_table(session_catalog, identifier) - - file_paths = [f"s3://warehouse/default/unpartitioned_2/test-{i}.parquet" for i in range(5)] +@pytest.mark.parametrize("format_version", [1, 2]) +def test_add_files_to_unpartitioned_table_with_schema_updates( + spark: SparkSession, session_catalog: Catalog, format_version: int +) -> None: + identifier = f"default.unpartitioned_table_schema_updates_v{format_version}" + tbl = _create_table(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/unpartitioned_schema_updates/v{format_version}/test-{i}.parquet" for i in range(5)] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) @@ -212,7 +227,7 @@ def test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio update.add_column("quux", IntegerType()) update.delete_column("bar") - file_path = "s3://warehouse/default/unpartitioned_2/test-6.parquet" + file_path = f"s3://warehouse/default/unpartitioned_schema_updates/v{format_version}/test-6.parquet" # write parquet files fo = tbl.io.new_output(file_path) with fo.create(overwrite=True) as fos: @@ -242,8 +257,9 @@ def test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio @pytest.mark.integration -def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Catalog) -> None: - identifier = "default.partitioned_table" +@pytest.mark.parametrize("format_version", [1, 2]) +def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.partitioned_table_v{format_version}" partition_spec = PartitionSpec( PartitionField(source_id=4, field_id=1000, transform=IdentityTransform(), name="baz"), @@ -251,11 +267,11 @@ def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Ca spec_id=0, ) - tbl = _create_table(session_catalog, identifier, partition_spec) + tbl = _create_table(session_catalog, identifier, format_version, partition_spec) date_iter = iter([date(2024, 3, 7), date(2024, 3, 8), date(2024, 3, 16), date(2024, 3, 18), date(2024, 3, 19)]) - file_paths = [f"s3://warehouse/default/partitioned/test-{i}.parquet" for i in range(5)] + file_paths = [f"s3://warehouse/default/partitioned/v{format_version}/test-{i}.parquet" for i in range(5)] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) @@ -310,19 +326,20 @@ def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Ca @pytest.mark.integration -def test_add_files_to_bucket_partitioned_table_fails(spark: SparkSession, session_catalog: Catalog) -> None: - identifier = "default.partitioned_table_2" +@pytest.mark.parametrize("format_version", [1, 2]) +def test_add_files_to_bucket_partitioned_table_fails(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.partitioned_table_bucket_fails_v{format_version}" partition_spec = PartitionSpec( PartitionField(source_id=4, field_id=1000, transform=BucketTransform(num_buckets=3), name="baz_bucket_3"), spec_id=0, ) - tbl = _create_table(session_catalog, identifier, partition_spec) + tbl = _create_table(session_catalog, identifier, format_version, partition_spec) int_iter = iter(range(5)) - file_paths = [f"s3://warehouse/default/partitioned_2/test-{i}.parquet" for i in range(5)] + file_paths = [f"s3://warehouse/default/partitioned_table_bucket_fails/v{format_version}/test-{i}.parquet" for i in range(5)] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) @@ -345,21 +362,27 @@ def test_add_files_to_bucket_partitioned_table_fails(spark: SparkSession, sessio # add the parquet files as data files with pytest.raises(ValueError) as exc_info: tbl.add_files(file_paths=file_paths) - assert "Cannot infer partition value from parquet metadata for a non-linear Partition Field: baz_bucket_3 with transform bucket[3]" in str(exc_info.value) + assert ( + "Cannot infer partition value from parquet metadata for a non-linear Partition Field: baz_bucket_3 with transform bucket[3]" + in str(exc_info.value) + ) @pytest.mark.integration -def test_add_files_to_partitioned_table_fails_with_lower_and_upper_mismatch(spark: SparkSession, session_catalog: Catalog) -> None: - identifier = "default.partitioned_table_3" +@pytest.mark.parametrize("format_version", [1, 2]) +def test_add_files_to_partitioned_table_fails_with_lower_and_upper_mismatch( + spark: SparkSession, session_catalog: Catalog, format_version: int +) -> None: + identifier = f"default.partitioned_table_mismatch_fails_v{format_version}" partition_spec = PartitionSpec( PartitionField(source_id=4, field_id=1000, transform=IdentityTransform(), name="baz"), spec_id=0, ) - tbl = _create_table(session_catalog, identifier, partition_spec) + tbl = _create_table(session_catalog, identifier, format_version, partition_spec) - file_paths = [f"s3://warehouse/default/partitioned_3/test-{i}.parquet" for i in range(5)] + file_paths = [f"s3://warehouse/default/partitioned_table_mismatch_fails/v{format_version}/test-{i}.parquet" for i in range(5)] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) @@ -379,7 +402,7 @@ def test_add_files_to_partitioned_table_fails_with_lower_and_upper_mismatch(spar "bar": "bar_string", "baz": 124, "qux": date(2024, 3, 7), - } + }, ], schema=ARROW_SCHEMA, ) @@ -388,4 +411,7 @@ def test_add_files_to_partitioned_table_fails_with_lower_and_upper_mismatch(spar # add the parquet files as data files with pytest.raises(ValueError) as exc_info: tbl.add_files(file_paths=file_paths) - assert "Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: baz. lower_value=123, upper_value=124" in str(exc_info.value) \ No newline at end of file + assert ( + "Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: baz. lower_value=123, upper_value=124" + in str(exc_info.value) + ) From 8f6b66037fcf1de664166c62a7d1f90aeb07d540 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Tue, 19 Mar 2024 15:16:14 +0000 Subject: [PATCH 5/5] split-offsets required --- pyiceberg/io/pyarrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 3affd08cbb..72de14880a 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1603,7 +1603,7 @@ class DataFileStatistics: null_value_counts: Dict[int, int] nan_value_counts: Dict[int, int] column_aggregates: Dict[int, StatsAggregator] - split_offsets: Optional[List[int]] = None + split_offsets: List[int] def _partition_value(self, partition_field: PartitionField, schema: Schema) -> Any: if partition_field.source_id not in self.column_aggregates: