diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 9e200aaf67..6cd06d9931 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -175,6 +175,7 @@ from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config from pyiceberg.utils.datetime import millis_to_datetime +from pyiceberg.utils.deprecated import deprecation_message from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int from pyiceberg.utils.singleton import Singleton from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string @@ -1385,7 +1386,6 @@ def _task_to_record_batches( positional_deletes: Optional[List[ChunkedArray]], case_sensitive: bool, name_mapping: Optional[NameMapping] = None, - use_large_types: bool = True, partition_spec: Optional[PartitionSpec] = None, ) -> Iterator[pa.RecordBatch]: _, _, path = _parse_location(task.file.file_path) @@ -1415,13 +1415,7 @@ def _task_to_record_batches( fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, - # With PyArrow 16.0.0 there is an issue with casting record-batches: - # https://github.com/apache/arrow/issues/41884 - # https://github.com/apache/arrow/issues/43183 - # Would be good to remove this later on - schema=_pyarrow_schema_ensure_large_types(physical_schema) - if use_large_types - else (_pyarrow_schema_ensure_small_types(physical_schema)), + schema=physical_schema, # This will push down the query to Arrow. # But in case there are positional deletes, we have to apply them first filter=pyarrow_filter if not positional_deletes else None, @@ -1456,7 +1450,6 @@ def _task_to_record_batches( file_project_schema, current_batch, downcast_ns_timestamp_to_us=True, - use_large_types=use_large_types, ) # Inject projected column values if available @@ -1542,14 +1535,6 @@ def __init__( self._case_sensitive = case_sensitive self._limit = limit - @property - def _use_large_types(self) -> bool: - """Whether to represent data as large arrow types. - - Defaults to True. - """ - return property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True) - @property def _projected_field_ids(self) -> Set[int]: """Set of field IDs that should be projected from the data files.""" @@ -1611,11 +1596,21 @@ def _table_from_scan_task(task: FileScanTask) -> pa.Table: tables = [f.result() for f in completed_futures if f.result()] + arrow_schema = schema_to_pyarrow(self._projected_schema, include_field_ids=False) + if len(tables) < 1: - return pa.Table.from_batches([], schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False)) + return pa.Table.from_batches([], schema=arrow_schema) result = pa.concat_tables(tables, promote_options="permissive") + if property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, False): + deprecation_message( + deprecated_in="0.10.0", + removed_in="0.11.0", + help_message=f"Property `{PYARROW_USE_LARGE_TYPES_ON_READ}` will be removed.", + ) + result = result.cast(arrow_schema) + if self._limit is not None: return result.slice(0, self._limit) @@ -1658,7 +1653,6 @@ def _record_batches_from_scan_tasks_and_deletes( deletes_per_file.get(task.file.file_path), self._case_sensitive, self._table_metadata.name_mapping(), - self._use_large_types, self._table_metadata.spec(), ) for batch in batches: @@ -1677,13 +1671,12 @@ def _to_requested_schema( batch: pa.RecordBatch, downcast_ns_timestamp_to_us: bool = False, include_field_ids: bool = False, - use_large_types: bool = True, ) -> pa.RecordBatch: # We could reuse some of these visitors struct_array = visit_with_partner( requested_schema, batch, - ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids, use_large_types), + ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids), ArrowAccessor(file_schema), ) return pa.RecordBatch.from_struct_array(struct_array) @@ -1693,20 +1686,27 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra _file_schema: Schema _include_field_ids: bool _downcast_ns_timestamp_to_us: bool - _use_large_types: bool + _use_large_types: Optional[bool] def __init__( self, file_schema: Schema, downcast_ns_timestamp_to_us: bool = False, include_field_ids: bool = False, - use_large_types: bool = True, + use_large_types: Optional[bool] = None, ) -> None: self._file_schema = file_schema self._include_field_ids = include_field_ids self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us self._use_large_types = use_large_types + if use_large_types is not None: + deprecation_message( + deprecated_in="0.10.0", + removed_in="0.11.0", + help_message="Argument `use_large_types` will be removed from ArrowProjectionVisitor", + ) + def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: file_field = self._file_schema.find_field(field.field_id) @@ -1715,7 +1715,7 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: target_schema = schema_to_pyarrow( promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids ) - if not self._use_large_types: + if self._use_large_types is False: target_schema = _pyarrow_schema_ensure_small_types(target_schema) return values.cast(target_schema) elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index cab5d73d27..1ebdd8bcf1 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1785,7 +1785,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: return pa.RecordBatchReader.from_batches( target_schema, batches, - ) + ).cast(target_schema) def to_pandas(self, **kwargs: Any) -> pd.DataFrame: """Read a Pandas DataFrame eagerly from this Iceberg table. diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index bfbc8db668..2c6eb4b4ab 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -33,7 +33,7 @@ from pyiceberg.catalog import Catalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.io import FileIO -from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _pyarrow_schema_ensure_large_types +from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException from pyiceberg.manifest import DataFile from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema @@ -588,11 +588,6 @@ def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_ca pa.field("foo", pa.string(), nullable=False), ] ) - arrow_schema_large = pa.schema( - [ - pa.field("foo", pa.large_string(), nullable=False), - ] - ) tbl = _create_table(session_catalog, identifier, format_version, schema=iceberg_schema) @@ -614,27 +609,27 @@ def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_ca tbl.add_files([file_path]) table_schema = tbl.scan().to_arrow().schema - assert table_schema == arrow_schema_large + assert table_schema == arrow_schema file_path_large = f"s3://warehouse/default/unpartitioned_with_large_types/v{format_version}/test-1.parquet" _write_parquet( tbl.io, file_path_large, - arrow_schema_large, + arrow_schema, pa.Table.from_pylist( [ { "foo": "normal", } ], - schema=arrow_schema_large, + schema=arrow_schema, ), ) tbl.add_files([file_path_large]) table_schema = tbl.scan().to_arrow().schema - assert table_schema == arrow_schema_large + assert table_schema == arrow_schema @pytest.mark.integration @@ -748,8 +743,8 @@ def test_add_files_with_valid_upcast( pa.schema( ( pa.field("long", pa.int64(), nullable=True), - pa.field("list", pa.large_list(pa.int64()), nullable=False), - pa.field("map", pa.map_(pa.large_string(), pa.int64()), nullable=False), + pa.field("list", pa.list_(pa.int64()), nullable=False), + pa.field("map", pa.map_(pa.string(), pa.int64()), nullable=False), pa.field("double", pa.float64(), nullable=True), pa.field("uuid", pa.binary(length=16), nullable=True), # can UUID is read as fixed length binary of length 16 ) @@ -799,7 +794,7 @@ def test_add_files_subset_of_schema(spark: SparkSession, session_catalog: Catalo "qux": date(2024, 3, 7), } ], - schema=_pyarrow_schema_ensure_large_types(ARROW_SCHEMA), + schema=ARROW_SCHEMA, ) lhs = spark.table(f"{identifier}").toPandas() diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 8ab15489ef..5ac5162f8e 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -831,7 +831,16 @@ def test_configure_row_group_batch_size(session_catalog: Catalog) -> None: @pytest.mark.integration @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -def test_table_scan_default_to_large_types(catalog: Catalog) -> None: +def test_table_scan_keep_types(catalog: Catalog) -> None: + expected_schema = pa.schema( + [ + pa.field("string", pa.string()), + pa.field("string-to-binary", pa.large_binary()), + pa.field("binary", pa.binary()), + pa.field("list", pa.list_(pa.large_string())), + ] + ) + identifier = "default.test_table_scan_default_to_large_types" arrow_table = pa.Table.from_arrays( [ @@ -840,7 +849,7 @@ def test_table_scan_default_to_large_types(catalog: Catalog) -> None: pa.array([b"a", b"b", b"c"]), pa.array([["a", "b"], ["c", "d"], ["e", "f"]]), ], - names=["string", "string-to-binary", "binary", "list"], + schema=expected_schema, ) try: @@ -859,15 +868,6 @@ def test_table_scan_default_to_large_types(catalog: Catalog) -> None: update_schema.update_column("string-to-binary", BinaryType()) result_table = tbl.scan().to_arrow() - - expected_schema = pa.schema( - [ - pa.field("string", pa.large_string()), - pa.field("string-to-binary", pa.large_binary()), - pa.field("binary", pa.large_binary()), - pa.field("list", pa.large_list(pa.large_string())), - ] - ) assert result_table.schema.equals(expected_schema) @@ -906,7 +906,7 @@ def test_table_scan_override_with_small_types(catalog: Catalog) -> None: expected_schema = pa.schema( [ pa.field("string", pa.string()), - pa.field("string-to-binary", pa.binary()), + pa.field("string-to-binary", pa.large_binary()), pa.field("binary", pa.binary()), pa.field("list", pa.list_(pa.string())), ] diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 1fe29c684c..59c795cf75 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -25,6 +25,7 @@ from urllib.parse import urlparse import pandas as pd +import pandas.testing import pyarrow as pa import pyarrow.compute as pc import pyarrow.parquet as pq @@ -401,7 +402,14 @@ def test_python_writes_dictionary_encoded_column_with_spark_reads( tbl.append(arrow_table) spark_df = spark.sql(f"SELECT * FROM {identifier}").toPandas() pyiceberg_df = tbl.scan().to_pandas() - assert spark_df.equals(pyiceberg_df) + + # We're just interested in the content, PyIceberg actually makes a nice Categorical out of it: + # E AssertionError: Attributes of DataFrame.iloc[:, 1] (column name="name") are different + # E + # E Attribute "dtype" are different + # E [left]: object + # E [right]: CategoricalDtype(categories=['AB', 'CD', 'EF'], ordered=False, categories_dtype=object) + pandas.testing.assert_frame_equal(spark_df, pyiceberg_df, check_dtype=False, check_categorical=False) @pytest.mark.integration @@ -422,7 +430,7 @@ def test_python_writes_with_small_and_large_types_spark_reads( } pa_schema = pa.schema( [ - pa.field("foo", pa.large_string()), + pa.field("foo", pa.string()), pa.field("id", pa.int32()), pa.field("name", pa.string()), pa.field( @@ -432,7 +440,7 @@ def test_python_writes_with_small_and_large_types_spark_reads( pa.field("street", pa.string()), pa.field("city", pa.string()), pa.field("zip", pa.int32()), - pa.field("bar", pa.large_string()), + pa.field("bar", pa.string()), ] ), ), @@ -448,17 +456,17 @@ def test_python_writes_with_small_and_large_types_spark_reads( arrow_table_on_read = tbl.scan().to_arrow() assert arrow_table_on_read.schema == pa.schema( [ - pa.field("foo", pa.large_string()), + pa.field("foo", pa.string()), pa.field("id", pa.int32()), - pa.field("name", pa.large_string()), + pa.field("name", pa.string()), pa.field( "address", pa.struct( [ - pa.field("street", pa.large_string()), - pa.field("city", pa.large_string()), + pa.field("street", pa.string()), + pa.field("city", pa.string()), pa.field("zip", pa.int32()), - pa.field("bar", pa.large_string()), + pa.field("bar", pa.string()), ] ), ), @@ -1164,8 +1172,8 @@ def test_table_write_schema_with_valid_upcast( pa.schema( ( pa.field("long", pa.int64(), nullable=True), - pa.field("list", pa.large_list(pa.int64()), nullable=False), - pa.field("map", pa.map_(pa.large_string(), pa.int64()), nullable=False), + pa.field("list", pa.list_(pa.int64()), nullable=False), + pa.field("map", pa.map_(pa.string(), pa.int64()), nullable=False), pa.field("double", pa.float64(), nullable=True), # can support upcasting float to double pa.field("uuid", pa.binary(length=16), nullable=True), # can UUID is read as fixed length binary of length 16 ) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index e883e38cb8..02605d81a8 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1065,10 +1065,10 @@ def test_read_map(schema_map: Schema, file_map: str) -> None: assert ( repr(result_table.schema) - == """properties: map - child 0, entries: struct not null - child 0, key: large_string not null - child 1, value: large_string not null""" + == """properties: map + child 0, entries: struct not null + child 0, key: string not null + child 1, value: string not null""" ) @@ -1181,7 +1181,7 @@ def test_identity_transform_column_projection(tmp_path: str, catalog: InMemoryCa with transaction.update_snapshot().overwrite() as update: update.append_data_file(unpartitioned_file) - schema = pa.schema([("other_field", pa.large_string()), ("partition_id", pa.int64())]) + schema = pa.schema([("other_field", pa.string()), ("partition_id", pa.int64())]) assert table.scan().to_arrow() == pa.table( { "other_field": ["foo", "bar", "baz"], @@ -1245,7 +1245,7 @@ def test_identity_transform_columns_projection(tmp_path: str, catalog: InMemoryC assert ( str(table.scan().to_arrow()) == """pyarrow.Table -field_1: large_string +field_1: string field_2: int64 field_3: int64 ---- @@ -1470,9 +1470,9 @@ def test_projection_maps_of_structs(schema_map_of_structs: Schema, file_map_of_s assert actual.as_py() == expected assert ( repr(result_table.schema) - == """locations: map> - child 0, entries: struct not null> not null - child 0, key: large_string not null + == """locations: map> + child 0, entries: struct not null> not null + child 0, key: string not null child 1, value: struct not null child 0, latitude: double not null child 1, longitude: double not null