diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index cf4276edec..0bc23fb0dc 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -581,6 +581,31 @@ readable_metrics: [ [6.0989]] ``` +### References + +To show a table's known snapshot references: + +```python +table.inspect.refs() +``` + +``` +pyarrow.Table +name: string not null +type: string not null +snapshot_id: int64 not null +max_reference_age_in_ms: int64 +min_snapshots_to_keep: int32 +max_snapshot_age_in_ms: int64 +---- +name: [["main","testTag"]] +type: [["BRANCH","TAG"]] +snapshot_id: [[2278002651076891950,2278002651076891950]] +max_reference_age_in_ms: [[null,604800000]] +min_snapshots_to_keep: [[null,10]] +max_snapshot_age_in_ms: [[null,604800000]] +``` + ## Add Files Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them. diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 95fdb1d288..13186c42cc 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3423,6 +3423,32 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: schema=entries_schema, ) + def refs(self) -> "pa.Table": + import pyarrow as pa + + ref_schema = pa.schema([ + pa.field('name', pa.string(), nullable=False), + pa.field('type', pa.dictionary(pa.int32(), pa.string()), nullable=False), + pa.field('snapshot_id', pa.int64(), nullable=False), + pa.field('max_reference_age_in_ms', pa.int64(), nullable=True), + pa.field('min_snapshots_to_keep', pa.int32(), nullable=True), + pa.field('max_snapshot_age_in_ms', pa.int64(), nullable=True), + ]) + + ref_results = [] + for ref in self.tbl.metadata.refs: + if snapshot_ref := self.tbl.metadata.refs.get(ref): + ref_results.append({ + 'name': ref, + 'type': snapshot_ref.snapshot_ref_type.upper(), + 'snapshot_id': snapshot_ref.snapshot_id, + 'max_reference_age_in_ms': snapshot_ref.max_ref_age_ms, + 'min_snapshots_to_keep': snapshot_ref.min_snapshots_to_keep, + 'max_snapshot_age_in_ms': snapshot_ref.max_snapshot_age_ms, + }) + + return pa.Table.from_pylist(ref_results, schema=ref_schema) + def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table": import pyarrow as pa diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index d9ec563466..0905eda8c7 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -272,6 +272,66 @@ def test_inspect_entries_partitioned(spark: SparkSession, session_catalog: Catal assert df.to_pydict()['data_file'][1]['partition'] == {'dt_day': None, 'dt_month': 612} +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_refs( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.table_metadata_refs" + tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) + + # write data to create snapshot + tbl.overwrite(arrow_table_with_null) + + # create a test branch + spark.sql( + f""" + ALTER TABLE {identifier} CREATE BRANCH IF NOT EXISTS testBranch RETAIN 7 DAYS WITH SNAPSHOT RETENTION 2 SNAPSHOTS + """ + ) + + # create a test tag against current snapshot + current_snapshot = tbl.current_snapshot() + assert current_snapshot is not None + current_snapshot_id = current_snapshot.snapshot_id + + spark.sql( + f""" + ALTER TABLE {identifier} CREATE TAG testTag AS OF VERSION {current_snapshot_id} RETAIN 180 DAYS + """ + ) + + df = tbl.refresh().inspect.refs() + + assert df.column_names == [ + 'name', + 'type', + 'snapshot_id', + 'max_reference_age_in_ms', + 'min_snapshots_to_keep', + 'max_snapshot_age_in_ms', + ] + + assert [name.as_py() for name in df['name']] == ['testBranch', 'main', 'testTag'] + assert [ref_type.as_py() for ref_type in df['type']] == ['BRANCH', 'BRANCH', 'TAG'] + + for snapshot_id in df['snapshot_id']: + assert isinstance(snapshot_id.as_py(), int) + + for int_column in ['max_reference_age_in_ms', 'min_snapshots_to_keep', 'max_snapshot_age_in_ms']: + for value in df[int_column]: + assert isinstance(value.as_py(), int) or not value.as_py() + + lhs = spark.table(f"{identifier}.refs").toPandas() + rhs = df.to_pandas() + for column in df.column_names: + for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): + if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right): + # NaN != NaN in Python + continue + assert left == right, f"Difference in column {column}: {left} != {right}" + + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_inspect_partitions_unpartitioned(