From fa9b3caf24841cde1ce753c33f56fadc7640b891 Mon Sep 17 00:00:00 2001 From: Fokko Date: Sun, 16 Feb 2025 23:37:51 +0100 Subject: [PATCH 1/9] Arrow: Infer the types when reading When reading a Parquet file using PyArrow, there is some metadata stored in the Parquet file to either make it a large type (eg `large_string`, or a normal type (`string`). The difference is that the large types use a 64 bit offset to encode their arrays. This is not always needed, and we can could first check all the in the types of which it is stored, and let PyArrow decide here: https://github.com/apache/iceberg-python/blob/300b8405a0fe7d0111321e5644d704026af9266b/pyiceberg/io/pyarrow.py#L1579 In PyArrow today we just bump everything to a large type, which might lead to additional memory consumption because it allocates a int64 array to allocate the offsets, instead of an int32. I thought we would be good to go for this now with the new lower bound of PyArrow to 17. But, it looks like we still have to wait for Arrow 18 to fix the issue with the `date` types: https://github.com/apache/arrow/issues/43183 Fixes: https://github.com/apache/iceberg-python/issues/1049 --- pyiceberg/io/pyarrow.py | 47 +++++++++++--------- tests/integration/test_add_files.py | 21 ++++----- tests/integration/test_reads.py | 8 ++-- tests/integration/test_writes/test_writes.py | 28 +++++++----- tests/io/test_pyarrow.py | 18 ++++---- 5 files changed, 66 insertions(+), 56 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index c0d078abc7..91d607a4ff 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1348,7 +1348,7 @@ def _task_to_record_batches( positional_deletes: Optional[List[ChunkedArray]], case_sensitive: bool, name_mapping: Optional[NameMapping] = None, - use_large_types: bool = True, + use_large_types: Optional[bool] = True, partition_spec: Optional[PartitionSpec] = None, ) -> Iterator[pa.RecordBatch]: _, _, path = _parse_location(task.file.file_path) @@ -1376,15 +1376,21 @@ def _task_to_record_batches( file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) + fragment_schema = physical_schema + if use_large_types is not None: + fragment_schema = ( + _pyarrow_schema_ensure_large_types(physical_schema) + if use_large_types + else (_pyarrow_schema_ensure_small_types(physical_schema)) + ) + fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, # With PyArrow 16.0.0 there is an issue with casting record-batches: # https://github.com/apache/arrow/issues/41884 # https://github.com/apache/arrow/issues/43183 # Would be good to remove this later on - schema=_pyarrow_schema_ensure_large_types(physical_schema) - if use_large_types - else (_pyarrow_schema_ensure_small_types(physical_schema)), + schema=fragment_schema, # This will push down the query to Arrow. # But in case there are positional deletes, we have to apply them first filter=pyarrow_filter if not positional_deletes else None, @@ -1504,14 +1510,6 @@ def __init__( self._case_sensitive = case_sensitive self._limit = limit - @property - def _use_large_types(self) -> bool: - """Whether to represent data as large arrow types. - - Defaults to True. - """ - return property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True) - @property def _projected_field_ids(self) -> Set[int]: """Set of field IDs that should be projected from the data files.""" @@ -1541,8 +1539,12 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table: deletes_per_file = _read_all_delete_files(self._io, tasks) executor = ExecutorFactory.get_or_create() + use_large_types = None + if PYARROW_USE_LARGE_TYPES_ON_READ in self._io.properties: + use_large_types = property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True) + def _table_from_scan_task(task: FileScanTask) -> pa.Table: - batches = list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file)) + batches = list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file, use_large_types)) if len(batches) > 0: return pa.Table.from_batches(batches) else: @@ -1602,10 +1604,15 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record ValueError: When a field type in the file cannot be projected to the schema type """ deletes_per_file = _read_all_delete_files(self._io, tasks) - return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file) + # Always use large types, since we cannot infer it in a streaming fashion, + # without fetching all the schemas first, which defeats the purpose of streaming + return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file, use_large_types=True) def _record_batches_from_scan_tasks_and_deletes( - self, tasks: Iterable[FileScanTask], deletes_per_file: Dict[str, List[ChunkedArray]] + self, + tasks: Iterable[FileScanTask], + deletes_per_file: Dict[str, List[ChunkedArray]], + use_large_types: Optional[bool] = True, ) -> Iterator[pa.RecordBatch]: total_row_count = 0 for task in tasks: @@ -1620,7 +1627,7 @@ def _record_batches_from_scan_tasks_and_deletes( deletes_per_file.get(task.file.file_path), self._case_sensitive, self._table_metadata.name_mapping(), - self._use_large_types, + use_large_types, self._table_metadata.spec(), ) for batch in batches: @@ -1639,7 +1646,7 @@ def _to_requested_schema( batch: pa.RecordBatch, downcast_ns_timestamp_to_us: bool = False, include_field_ids: bool = False, - use_large_types: bool = True, + use_large_types: Optional[bool] = True, ) -> pa.RecordBatch: # We could reuse some of these visitors struct_array = visit_with_partner( @@ -1655,14 +1662,14 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra _file_schema: Schema _include_field_ids: bool _downcast_ns_timestamp_to_us: bool - _use_large_types: bool + _use_large_types: Optional[bool] def __init__( self, file_schema: Schema, downcast_ns_timestamp_to_us: bool = False, include_field_ids: bool = False, - use_large_types: bool = True, + use_large_types: Optional[bool] = True, ) -> None: self._file_schema = file_schema self._include_field_ids = include_field_ids @@ -1677,7 +1684,7 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: target_schema = schema_to_pyarrow( promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids ) - if not self._use_large_types: + if self._use_large_types is False: target_schema = _pyarrow_schema_ensure_small_types(target_schema) return values.cast(target_schema) elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type: diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 8713615218..1bea1dc9ac 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -30,7 +30,7 @@ from pyiceberg.catalog import Catalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.io import FileIO -from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _pyarrow_schema_ensure_large_types +from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table @@ -535,11 +535,6 @@ def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_ca pa.field("foo", pa.string(), nullable=False), ] ) - arrow_schema_large = pa.schema( - [ - pa.field("foo", pa.large_string(), nullable=False), - ] - ) tbl = _create_table(session_catalog, identifier, format_version, schema=iceberg_schema) @@ -561,27 +556,27 @@ def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_ca tbl.add_files([file_path]) table_schema = tbl.scan().to_arrow().schema - assert table_schema == arrow_schema_large + assert table_schema == arrow_schema file_path_large = f"s3://warehouse/default/unpartitioned_with_large_types/v{format_version}/test-1.parquet" _write_parquet( tbl.io, file_path_large, - arrow_schema_large, + arrow_schema, pa.Table.from_pylist( [ { "foo": "normal", } ], - schema=arrow_schema_large, + schema=arrow_schema, ), ) tbl.add_files([file_path_large]) table_schema = tbl.scan().to_arrow().schema - assert table_schema == arrow_schema_large + assert table_schema == arrow_schema @pytest.mark.integration @@ -695,8 +690,8 @@ def test_add_files_with_valid_upcast( pa.schema( ( pa.field("long", pa.int64(), nullable=True), - pa.field("list", pa.large_list(pa.int64()), nullable=False), - pa.field("map", pa.map_(pa.large_string(), pa.int64()), nullable=False), + pa.field("list", pa.list_(pa.int64()), nullable=False), + pa.field("map", pa.map_(pa.string(), pa.int64()), nullable=False), pa.field("double", pa.float64(), nullable=True), pa.field("uuid", pa.binary(length=16), nullable=True), # can UUID is read as fixed length binary of length 16 ) @@ -746,7 +741,7 @@ def test_add_files_subset_of_schema(spark: SparkSession, session_catalog: Catalo "qux": date(2024, 3, 7), } ], - schema=_pyarrow_schema_ensure_large_types(ARROW_SCHEMA), + schema=ARROW_SCHEMA, ) lhs = spark.table(f"{identifier}").toPandas() diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index ee5f8a2574..2139376dc0 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -806,7 +806,7 @@ def test_configure_row_group_batch_size(session_catalog: Catalog) -> None: @pytest.mark.integration @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -def test_table_scan_default_to_large_types(catalog: Catalog) -> None: +def test_table_scan_keep_types(catalog: Catalog) -> None: identifier = "default.test_table_scan_default_to_large_types" arrow_table = pa.Table.from_arrays( [ @@ -837,10 +837,10 @@ def test_table_scan_default_to_large_types(catalog: Catalog) -> None: expected_schema = pa.schema( [ - pa.field("string", pa.large_string()), + pa.field("string", pa.string()), pa.field("string-to-binary", pa.large_binary()), - pa.field("binary", pa.large_binary()), - pa.field("list", pa.large_list(pa.large_string())), + pa.field("binary", pa.binary()), + pa.field("list", pa.list_(pa.string())), ] ) assert result_table.schema.equals(expected_schema) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 1fe29c684c..59c795cf75 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -25,6 +25,7 @@ from urllib.parse import urlparse import pandas as pd +import pandas.testing import pyarrow as pa import pyarrow.compute as pc import pyarrow.parquet as pq @@ -401,7 +402,14 @@ def test_python_writes_dictionary_encoded_column_with_spark_reads( tbl.append(arrow_table) spark_df = spark.sql(f"SELECT * FROM {identifier}").toPandas() pyiceberg_df = tbl.scan().to_pandas() - assert spark_df.equals(pyiceberg_df) + + # We're just interested in the content, PyIceberg actually makes a nice Categorical out of it: + # E AssertionError: Attributes of DataFrame.iloc[:, 1] (column name="name") are different + # E + # E Attribute "dtype" are different + # E [left]: object + # E [right]: CategoricalDtype(categories=['AB', 'CD', 'EF'], ordered=False, categories_dtype=object) + pandas.testing.assert_frame_equal(spark_df, pyiceberg_df, check_dtype=False, check_categorical=False) @pytest.mark.integration @@ -422,7 +430,7 @@ def test_python_writes_with_small_and_large_types_spark_reads( } pa_schema = pa.schema( [ - pa.field("foo", pa.large_string()), + pa.field("foo", pa.string()), pa.field("id", pa.int32()), pa.field("name", pa.string()), pa.field( @@ -432,7 +440,7 @@ def test_python_writes_with_small_and_large_types_spark_reads( pa.field("street", pa.string()), pa.field("city", pa.string()), pa.field("zip", pa.int32()), - pa.field("bar", pa.large_string()), + pa.field("bar", pa.string()), ] ), ), @@ -448,17 +456,17 @@ def test_python_writes_with_small_and_large_types_spark_reads( arrow_table_on_read = tbl.scan().to_arrow() assert arrow_table_on_read.schema == pa.schema( [ - pa.field("foo", pa.large_string()), + pa.field("foo", pa.string()), pa.field("id", pa.int32()), - pa.field("name", pa.large_string()), + pa.field("name", pa.string()), pa.field( "address", pa.struct( [ - pa.field("street", pa.large_string()), - pa.field("city", pa.large_string()), + pa.field("street", pa.string()), + pa.field("city", pa.string()), pa.field("zip", pa.int32()), - pa.field("bar", pa.large_string()), + pa.field("bar", pa.string()), ] ), ), @@ -1164,8 +1172,8 @@ def test_table_write_schema_with_valid_upcast( pa.schema( ( pa.field("long", pa.int64(), nullable=True), - pa.field("list", pa.large_list(pa.int64()), nullable=False), - pa.field("map", pa.map_(pa.large_string(), pa.int64()), nullable=False), + pa.field("list", pa.list_(pa.int64()), nullable=False), + pa.field("map", pa.map_(pa.string(), pa.int64()), nullable=False), pa.field("double", pa.float64(), nullable=True), # can support upcasting float to double pa.field("uuid", pa.binary(length=16), nullable=True), # can UUID is read as fixed length binary of length 16 ) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index e2be7872a9..0b60fd0f42 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1065,10 +1065,10 @@ def test_read_map(schema_map: Schema, file_map: str) -> None: assert ( repr(result_table.schema) - == """properties: map - child 0, entries: struct not null - child 0, key: large_string not null - child 1, value: large_string not null""" + == """properties: map + child 0, entries: struct not null + child 0, key: string not null + child 1, value: string not null""" ) @@ -1184,7 +1184,7 @@ def test_identity_transform_column_projection(tmp_path: str, catalog: InMemoryCa assert ( str(table.scan().to_arrow()) == """pyarrow.Table -other_field: large_string +other_field: string partition_id: int64 ---- other_field: [["foo"]] @@ -1246,7 +1246,7 @@ def test_identity_transform_columns_projection(tmp_path: str, catalog: InMemoryC assert ( str(table.scan().to_arrow()) == """pyarrow.Table -field_1: large_string +field_1: string field_2: int64 field_3: int64 ---- @@ -1471,9 +1471,9 @@ def test_projection_maps_of_structs(schema_map_of_structs: Schema, file_map_of_s assert actual.as_py() == expected assert ( repr(result_table.schema) - == """locations: map> - child 0, entries: struct not null> not null - child 0, key: large_string not null + == """locations: map> + child 0, entries: struct not null> not null + child 0, key: string not null child 1, value: struct not null child 0, latitude: double not null child 1, longitude: double not null From 0384b4eaac6d4a0e55749d89b0c639ddac7311bb Mon Sep 17 00:00:00 2001 From: Fokko Date: Tue, 18 Feb 2025 15:22:25 +0100 Subject: [PATCH 2/9] =?UTF-8?q?Less=20is=20more=20=F0=9F=98=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pyiceberg/io/pyarrow.py | 30 ++++-------------------------- pyiceberg/table/__init__.py | 2 +- tests/integration/test_reads.py | 2 +- 3 files changed, 6 insertions(+), 28 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 91d607a4ff..7e26bd9ec1 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -99,7 +99,6 @@ HDFS_KERB_TICKET, HDFS_PORT, HDFS_USER, - PYARROW_USE_LARGE_TYPES_ON_READ, S3_ACCESS_KEY_ID, S3_CONNECT_TIMEOUT, S3_ENDPOINT, @@ -1348,7 +1347,6 @@ def _task_to_record_batches( positional_deletes: Optional[List[ChunkedArray]], case_sensitive: bool, name_mapping: Optional[NameMapping] = None, - use_large_types: Optional[bool] = True, partition_spec: Optional[PartitionSpec] = None, ) -> Iterator[pa.RecordBatch]: _, _, path = _parse_location(task.file.file_path) @@ -1376,21 +1374,13 @@ def _task_to_record_batches( file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) - fragment_schema = physical_schema - if use_large_types is not None: - fragment_schema = ( - _pyarrow_schema_ensure_large_types(physical_schema) - if use_large_types - else (_pyarrow_schema_ensure_small_types(physical_schema)) - ) - fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, # With PyArrow 16.0.0 there is an issue with casting record-batches: # https://github.com/apache/arrow/issues/41884 # https://github.com/apache/arrow/issues/43183 # Would be good to remove this later on - schema=fragment_schema, + schema=physical_schema, # This will push down the query to Arrow. # But in case there are positional deletes, we have to apply them first filter=pyarrow_filter if not positional_deletes else None, @@ -1425,7 +1415,6 @@ def _task_to_record_batches( file_project_schema, current_batch, downcast_ns_timestamp_to_us=True, - use_large_types=use_large_types, ) # Inject projected column values if available @@ -1539,12 +1528,8 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table: deletes_per_file = _read_all_delete_files(self._io, tasks) executor = ExecutorFactory.get_or_create() - use_large_types = None - if PYARROW_USE_LARGE_TYPES_ON_READ in self._io.properties: - use_large_types = property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True) - def _table_from_scan_task(task: FileScanTask) -> pa.Table: - batches = list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file, use_large_types)) + batches = list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file)) if len(batches) > 0: return pa.Table.from_batches(batches) else: @@ -1606,13 +1591,12 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record deletes_per_file = _read_all_delete_files(self._io, tasks) # Always use large types, since we cannot infer it in a streaming fashion, # without fetching all the schemas first, which defeats the purpose of streaming - return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file, use_large_types=True) + return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file) def _record_batches_from_scan_tasks_and_deletes( self, tasks: Iterable[FileScanTask], deletes_per_file: Dict[str, List[ChunkedArray]], - use_large_types: Optional[bool] = True, ) -> Iterator[pa.RecordBatch]: total_row_count = 0 for task in tasks: @@ -1627,7 +1611,6 @@ def _record_batches_from_scan_tasks_and_deletes( deletes_per_file.get(task.file.file_path), self._case_sensitive, self._table_metadata.name_mapping(), - use_large_types, self._table_metadata.spec(), ) for batch in batches: @@ -1646,13 +1629,12 @@ def _to_requested_schema( batch: pa.RecordBatch, downcast_ns_timestamp_to_us: bool = False, include_field_ids: bool = False, - use_large_types: Optional[bool] = True, ) -> pa.RecordBatch: # We could reuse some of these visitors struct_array = visit_with_partner( requested_schema, batch, - ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids, use_large_types), + ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids), ArrowAccessor(file_schema), ) return pa.RecordBatch.from_struct_array(struct_array) @@ -1669,12 +1651,10 @@ def __init__( file_schema: Schema, downcast_ns_timestamp_to_us: bool = False, include_field_ids: bool = False, - use_large_types: Optional[bool] = True, ) -> None: self._file_schema = file_schema self._include_field_ids = include_field_ids self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us - self._use_large_types = use_large_types def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: file_field = self._file_schema.find_field(field.field_id) @@ -1684,8 +1664,6 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: target_schema = schema_to_pyarrow( promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids ) - if self._use_large_types is False: - target_schema = _pyarrow_schema_ensure_small_types(target_schema) return values.cast(target_schema) elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type: if field.field_type == TimestampType(): diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 93edf70f46..00be6d8ddc 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1750,7 +1750,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: return pa.RecordBatchReader.from_batches( target_schema, batches, - ) + ).cast(target_schema) def to_pandas(self, **kwargs: Any) -> pd.DataFrame: """Read a Pandas DataFrame eagerly from this Iceberg table. diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 2139376dc0..d7eecd5116 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -881,7 +881,7 @@ def test_table_scan_override_with_small_types(catalog: Catalog) -> None: expected_schema = pa.schema( [ pa.field("string", pa.string()), - pa.field("string-to-binary", pa.binary()), + pa.field("string-to-binary", pa.large_binary()), pa.field("binary", pa.binary()), pa.field("list", pa.list_(pa.string())), ] From 6dd93083acd9024a8728e1b1cc490bbc88bd358d Mon Sep 17 00:00:00 2001 From: Fokko Date: Tue, 18 Feb 2025 16:04:59 +0100 Subject: [PATCH 3/9] Reinstate the table property --- pyiceberg/io/pyarrow.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 7e26bd9ec1..123a229711 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -99,6 +99,7 @@ HDFS_KERB_TICKET, HDFS_PORT, HDFS_USER, + PYARROW_USE_LARGE_TYPES_ON_READ, S3_ACCESS_KEY_ID, S3_CONNECT_TIMEOUT, S3_ENDPOINT, @@ -1560,11 +1561,16 @@ def _table_from_scan_task(task: FileScanTask) -> pa.Table: tables = [f.result() for f in completed_futures if f.result()] + arrow_schema = schema_to_pyarrow(self._projected_schema, include_field_ids=False) + if len(tables) < 1: - return pa.Table.from_batches([], schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False)) + return pa.Table.from_batches([], schema=arrow_schema) result = pa.concat_tables(tables, promote_options="permissive") + if property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, False): + result = result.cast(arrow_schema) + if self._limit is not None: return result.slice(0, self._limit) From 2817c61b4e636dd0c0a8cd83451dba4e7d9c732a Mon Sep 17 00:00:00 2001 From: Fokko Date: Tue, 18 Feb 2025 16:08:53 +0100 Subject: [PATCH 4/9] Cleanup --- pyiceberg/io/pyarrow.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 123a229711..2abf060868 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1595,14 +1595,10 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record ValueError: When a field type in the file cannot be projected to the schema type """ deletes_per_file = _read_all_delete_files(self._io, tasks) - # Always use large types, since we cannot infer it in a streaming fashion, - # without fetching all the schemas first, which defeats the purpose of streaming return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file) def _record_batches_from_scan_tasks_and_deletes( - self, - tasks: Iterable[FileScanTask], - deletes_per_file: Dict[str, List[ChunkedArray]], + self, tasks: Iterable[FileScanTask], deletes_per_file: Dict[str, List[ChunkedArray]] ) -> Iterator[pa.RecordBatch]: total_row_count = 0 for task in tasks: @@ -1650,7 +1646,6 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra _file_schema: Schema _include_field_ids: bool _downcast_ns_timestamp_to_us: bool - _use_large_types: Optional[bool] def __init__( self, From fff741490d97c5c406b30cbb5b7b8ffd192c73bf Mon Sep 17 00:00:00 2001 From: Fokko Date: Tue, 4 Mar 2025 10:20:07 +0100 Subject: [PATCH 5/9] Fix import --- tests/integration/test_add_files.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 4ae52a3556..2c6eb4b4ab 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -33,7 +33,7 @@ from pyiceberg.catalog import Catalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.io import FileIO -from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _pyarrow_schema_ensure_large_types +from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException from pyiceberg.manifest import DataFile from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema From 0d199876a52df972a62e1186f941e9e723b32e1d Mon Sep 17 00:00:00 2001 From: Fokko Date: Tue, 4 Mar 2025 23:33:43 +0100 Subject: [PATCH 6/9] Add warning --- pyiceberg/io/pyarrow.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 3ef2cf77eb..1274bcbe22 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -171,6 +171,7 @@ from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config from pyiceberg.utils.datetime import millis_to_datetime +from pyiceberg.utils.deprecated import deprecation_message from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int from pyiceberg.utils.singleton import Singleton from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string @@ -1657,16 +1658,26 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra _file_schema: Schema _include_field_ids: bool _downcast_ns_timestamp_to_us: bool + _use_large_types: Optional[bool] def __init__( self, file_schema: Schema, downcast_ns_timestamp_to_us: bool = False, include_field_ids: bool = False, + use_large_types: Optional[bool] = None, ) -> None: self._file_schema = file_schema self._include_field_ids = include_field_ids self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us + self._use_large_types = use_large_types + + if use_large_types is not None: + deprecation_message( + deprecated_in="0.10.0", + removed_in="0.11.0", + help_message="Argument `use_large_types` will be removed from ArrowProjectionVisitor", + ) def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: file_field = self._file_schema.find_field(field.field_id) @@ -1676,6 +1687,8 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: target_schema = schema_to_pyarrow( promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids ) + if self._use_large_types is False: + target_schema = _pyarrow_schema_ensure_small_types(target_schema) return values.cast(target_schema) elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type: if field.field_type == TimestampType(): From 7382112c8fcdf1931fa50b558cfe701971f53ce4 Mon Sep 17 00:00:00 2001 From: Fokko Date: Tue, 4 Mar 2025 23:36:08 +0100 Subject: [PATCH 7/9] MOAR deprecation --- pyiceberg/io/pyarrow.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 1274bcbe22..8d0a39b300 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1581,6 +1581,11 @@ def _table_from_scan_task(task: FileScanTask) -> pa.Table: result = pa.concat_tables(tables, promote_options="permissive") if property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, False): + deprecation_message( + deprecated_in="0.10.0", + removed_in="0.11.0", + help_message=f"Property `{PYARROW_USE_LARGE_TYPES_ON_READ}` will be removed.", + ) result = result.cast(arrow_schema) if self._limit is not None: From d9d4fda4c60d8c3674656b3b183d0e66a0736240 Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 26 Mar 2025 22:50:36 +0100 Subject: [PATCH 8/9] Thanks Kevin! --- pyiceberg/io/pyarrow.py | 4 ---- tests/integration/test_reads.py | 20 ++++++++++---------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 475d863855..6cd06d9931 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1415,10 +1415,6 @@ def _task_to_record_batches( fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, - # With PyArrow 16.0.0 there is an issue with casting record-batches: - # https://github.com/apache/arrow/issues/41884 - # https://github.com/apache/arrow/issues/43183 - # Would be good to remove this later on schema=physical_schema, # This will push down the query to Arrow. # But in case there are positional deletes, we have to apply them first diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index cb635d35d5..5ac5162f8e 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -832,6 +832,15 @@ def test_configure_row_group_batch_size(session_catalog: Catalog) -> None: @pytest.mark.integration @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) def test_table_scan_keep_types(catalog: Catalog) -> None: + expected_schema = pa.schema( + [ + pa.field("string", pa.string()), + pa.field("string-to-binary", pa.large_binary()), + pa.field("binary", pa.binary()), + pa.field("list", pa.list_(pa.large_string())), + ] + ) + identifier = "default.test_table_scan_default_to_large_types" arrow_table = pa.Table.from_arrays( [ @@ -840,7 +849,7 @@ def test_table_scan_keep_types(catalog: Catalog) -> None: pa.array([b"a", b"b", b"c"]), pa.array([["a", "b"], ["c", "d"], ["e", "f"]]), ], - names=["string", "string-to-binary", "binary", "list"], + schema=expected_schema, ) try: @@ -859,15 +868,6 @@ def test_table_scan_keep_types(catalog: Catalog) -> None: update_schema.update_column("string-to-binary", BinaryType()) result_table = tbl.scan().to_arrow() - - expected_schema = pa.schema( - [ - pa.field("string", pa.string()), - pa.field("string-to-binary", pa.large_binary()), - pa.field("binary", pa.binary()), - pa.field("list", pa.list_(pa.string())), - ] - ) assert result_table.schema.equals(expected_schema) From dd1c5d45878bbb129024a54be09df5385fba4df1 Mon Sep 17 00:00:00 2001 From: Fokko Date: Thu, 27 Mar 2025 00:17:47 +0100 Subject: [PATCH 9/9] Fix --- tests/io/test_pyarrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index b42e7fc140..02605d81a8 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1181,7 +1181,7 @@ def test_identity_transform_column_projection(tmp_path: str, catalog: InMemoryCa with transaction.update_snapshot().overwrite() as update: update.append_data_file(unpartitioned_file) - schema = pa.schema([("other_field", pa.large_string()), ("partition_id", pa.int64())]) + schema = pa.schema([("other_field", pa.string()), ("partition_id", pa.int64())]) assert table.scan().to_arrow() == pa.table( { "other_field": ["foo", "bar", "baz"],