diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 0bc23fb0dc..70b5fd62eb 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -606,6 +606,56 @@ min_snapshots_to_keep: [[null,10]] max_snapshot_age_in_ms: [[null,604800000]] ``` +### Manifests + +To show a table's current file manifests: + +```python +table.inspect.manifests() +``` + +``` +pyarrow.Table +content: int8 not null +path: string not null +length: int64 not null +partition_spec_id: int32 not null +added_snapshot_id: int64 not null +added_data_files_count: int32 not null +existing_data_files_count: int32 not null +deleted_data_files_count: int32 not null +added_delete_files_count: int32 not null +existing_delete_files_count: int32 not null +deleted_delete_files_count: int32 not null +partition_summaries: list> not null + child 0, item: struct + child 0, contains_null: bool not null + child 1, contains_nan: bool + child 2, lower_bound: string + child 3, upper_bound: string +---- +content: [[0]] +path: [["s3://warehouse/default/table_metadata_manifests/metadata/3bf5b4c6-a7a4-4b43-a6ce-ca2b4887945a-m0.avro"]] +length: [[6886]] +partition_spec_id: [[0]] +added_snapshot_id: [[3815834705531553721]] +added_data_files_count: [[1]] +existing_data_files_count: [[0]] +deleted_data_files_count: [[0]] +added_delete_files_count: [[0]] +existing_delete_files_count: [[0]] +deleted_delete_files_count: [[0]] +partition_summaries: [[ -- is_valid: all not null + -- child 0 type: bool +[false] + -- child 1 type: bool +[false] + -- child 2 type: string +["test"] + -- child 3 type: string +["test"]]] +``` + ## Add Files Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them. diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 13186c42cc..dbc7a8453e 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -71,6 +71,7 @@ ManifestEntry, ManifestEntryStatus, ManifestFile, + PartitionFieldSummary, write_manifest, write_manifest_list, ) @@ -3537,6 +3538,94 @@ def update_partitions_map( schema=table_schema, ) + def manifests(self) -> "pa.Table": + import pyarrow as pa + + from pyiceberg.conversions import from_bytes + + partition_summary_schema = pa.struct([ + pa.field("contains_null", pa.bool_(), nullable=False), + pa.field("contains_nan", pa.bool_(), nullable=True), + pa.field("lower_bound", pa.string(), nullable=True), + pa.field("upper_bound", pa.string(), nullable=True), + ]) + + manifest_schema = pa.schema([ + pa.field('content', pa.int8(), nullable=False), + pa.field('path', pa.string(), nullable=False), + pa.field('length', pa.int64(), nullable=False), + pa.field('partition_spec_id', pa.int32(), nullable=False), + pa.field('added_snapshot_id', pa.int64(), nullable=False), + pa.field('added_data_files_count', pa.int32(), nullable=False), + pa.field('existing_data_files_count', pa.int32(), nullable=False), + pa.field('deleted_data_files_count', pa.int32(), nullable=False), + pa.field('added_delete_files_count', pa.int32(), nullable=False), + pa.field('existing_delete_files_count', pa.int32(), nullable=False), + pa.field('deleted_delete_files_count', pa.int32(), nullable=False), + pa.field('partition_summaries', pa.list_(partition_summary_schema), nullable=False), + ]) + + def _partition_summaries_to_rows( + spec: PartitionSpec, partition_summaries: List[PartitionFieldSummary] + ) -> List[Dict[str, Any]]: + rows = [] + for i, field_summary in enumerate(partition_summaries): + field = spec.fields[i] + partition_field_type = spec.partition_type(self.tbl.schema()).fields[i].field_type + lower_bound = ( + ( + field.transform.to_human_string( + partition_field_type, from_bytes(partition_field_type, field_summary.lower_bound) + ) + ) + if field_summary.lower_bound + else None + ) + upper_bound = ( + ( + field.transform.to_human_string( + partition_field_type, from_bytes(partition_field_type, field_summary.upper_bound) + ) + ) + if field_summary.upper_bound + else None + ) + rows.append({ + 'contains_null': field_summary.contains_null, + 'contains_nan': field_summary.contains_nan, + 'lower_bound': lower_bound, + 'upper_bound': upper_bound, + }) + return rows + + specs = self.tbl.metadata.specs() + manifests = [] + if snapshot := self.tbl.metadata.current_snapshot(): + for manifest in snapshot.manifests(self.tbl.io): + is_data_file = manifest.content == ManifestContent.DATA + is_delete_file = manifest.content == ManifestContent.DELETES + manifests.append({ + 'content': manifest.content, + 'path': manifest.manifest_path, + 'length': manifest.manifest_length, + 'partition_spec_id': manifest.partition_spec_id, + 'added_snapshot_id': manifest.added_snapshot_id, + 'added_data_files_count': manifest.added_files_count if is_data_file else 0, + 'existing_data_files_count': manifest.existing_files_count if is_data_file else 0, + 'deleted_data_files_count': manifest.deleted_files_count if is_data_file else 0, + 'added_delete_files_count': manifest.added_files_count if is_delete_file else 0, + 'existing_delete_files_count': manifest.existing_files_count if is_delete_file else 0, + 'deleted_delete_files_count': manifest.deleted_files_count if is_delete_file else 0, + 'partition_summaries': _partition_summaries_to_rows(specs[manifest.partition_spec_id], manifest.partitions) + if manifest.partitions + else [], + }) + + return pa.Table.from_pylist( + manifests, + schema=manifest_schema, + ) + @dataclass(frozen=True) class TablePartition: diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index a884f9d4c0..8665435e43 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -445,3 +445,86 @@ def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> Non df = tbl.inspect.partitions(snapshot_id=snapshot.snapshot_id) spark_df = spark.sql(f"SELECT * FROM {identifier}.partitions VERSION AS OF {snapshot.snapshot_id}") check_pyiceberg_df_equals_spark_df(df, spark_df) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_manifests(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = "default.table_metadata_manifests" + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + spark.sql( + f""" + CREATE TABLE {identifier} ( + id int, + data string + ) + PARTITIONED BY (data) + """ + ) + + spark.sql( + f""" + INSERT INTO {identifier} VALUES (1, "a") + """ + ) + + spark.sql( + f""" + INSERT INTO {identifier} VALUES (2, "b") + """ + ) + + df = session_catalog.load_table(identifier).inspect.manifests() + + assert df.column_names == [ + 'content', + 'path', + 'length', + 'partition_spec_id', + 'added_snapshot_id', + 'added_data_files_count', + 'existing_data_files_count', + 'deleted_data_files_count', + 'added_delete_files_count', + 'existing_delete_files_count', + 'deleted_delete_files_count', + 'partition_summaries', + ] + + int_cols = [ + 'content', + 'length', + 'partition_spec_id', + 'added_snapshot_id', + 'added_data_files_count', + 'existing_data_files_count', + 'deleted_data_files_count', + 'added_delete_files_count', + 'existing_delete_files_count', + 'deleted_delete_files_count', + ] + + for column in int_cols: + for value in df[column]: + assert isinstance(value.as_py(), int) + + for value in df["path"]: + assert isinstance(value.as_py(), str) + + for value in df["partition_summaries"]: + assert isinstance(value.as_py(), list) + for row in value: + assert isinstance(row["contains_null"].as_py(), bool) + assert isinstance(row["contains_nan"].as_py(), (bool, type(None))) + assert isinstance(row["lower_bound"].as_py(), (str, type(None))) + assert isinstance(row["upper_bound"].as_py(), (str, type(None))) + + lhs = spark.table(f"{identifier}.manifests").toPandas() + rhs = df.to_pandas() + for column in df.column_names: + for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): + assert left == right, f"Difference in column {column}: {left} != {right}"