Skip to content

Commit 7a56ddb

Browse files
authored
Arrow: Infer the types when reading (#1669)
### Rationale for this change Time to give this another go 😆 When reading a Parquet file using PyArrow, there is some metadata stored in the Parquet file to either make it a large type (eg `large_string`, or a normal type (`string`). The difference is that the large types use a 64 bit offset to encode their arrays. This is not always needed, and we can could first check all the in the types of which it is stored, and let PyArrow decide here: https://github.com/apache/iceberg-python/blob/300b8405a0fe7d0111321e5644d704026af9266b/pyiceberg/io/pyarrow.py#L1579 In PyArrow today we just bump everything to a large type, which might lead to additional memory consumption because it allocates an int64 array to allocate the offsets, instead of an int32. I thought we would be good to go for this now with the new lower bound of PyArrow to 17. But, it looks like we still have to wait for Arrow 18 to fix the issue with the `date` types: apache/arrow#43183 Fixes: #1049 ### Are these changes tested? Yes, existing tests :) ### Are there any user-facing changes? Before, PyIceberg would always return the large Arrow types (eg, `large_string` instead of `string`). After this change, it will return the type it was written with.
1 parent 62191ee commit 7a56ddb

File tree

6 files changed

+72
-69
lines changed

6 files changed

+72
-69
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@
175175
from pyiceberg.utils.concurrent import ExecutorFactory
176176
from pyiceberg.utils.config import Config
177177
from pyiceberg.utils.datetime import millis_to_datetime
178+
from pyiceberg.utils.deprecated import deprecation_message
178179
from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int
179180
from pyiceberg.utils.singleton import Singleton
180181
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string
@@ -1385,7 +1386,6 @@ def _task_to_record_batches(
13851386
positional_deletes: Optional[List[ChunkedArray]],
13861387
case_sensitive: bool,
13871388
name_mapping: Optional[NameMapping] = None,
1388-
use_large_types: bool = True,
13891389
partition_spec: Optional[PartitionSpec] = None,
13901390
) -> Iterator[pa.RecordBatch]:
13911391
_, _, path = _parse_location(task.file.file_path)
@@ -1415,13 +1415,7 @@ def _task_to_record_batches(
14151415

14161416
fragment_scanner = ds.Scanner.from_fragment(
14171417
fragment=fragment,
1418-
# With PyArrow 16.0.0 there is an issue with casting record-batches:
1419-
# https://github.com/apache/arrow/issues/41884
1420-
# https://github.com/apache/arrow/issues/43183
1421-
# Would be good to remove this later on
1422-
schema=_pyarrow_schema_ensure_large_types(physical_schema)
1423-
if use_large_types
1424-
else (_pyarrow_schema_ensure_small_types(physical_schema)),
1418+
schema=physical_schema,
14251419
# This will push down the query to Arrow.
14261420
# But in case there are positional deletes, we have to apply them first
14271421
filter=pyarrow_filter if not positional_deletes else None,
@@ -1456,7 +1450,6 @@ def _task_to_record_batches(
14561450
file_project_schema,
14571451
current_batch,
14581452
downcast_ns_timestamp_to_us=True,
1459-
use_large_types=use_large_types,
14601453
)
14611454

14621455
# Inject projected column values if available
@@ -1542,14 +1535,6 @@ def __init__(
15421535
self._case_sensitive = case_sensitive
15431536
self._limit = limit
15441537

1545-
@property
1546-
def _use_large_types(self) -> bool:
1547-
"""Whether to represent data as large arrow types.
1548-
1549-
Defaults to True.
1550-
"""
1551-
return property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)
1552-
15531538
@property
15541539
def _projected_field_ids(self) -> Set[int]:
15551540
"""Set of field IDs that should be projected from the data files."""
@@ -1611,11 +1596,21 @@ def _table_from_scan_task(task: FileScanTask) -> pa.Table:
16111596

16121597
tables = [f.result() for f in completed_futures if f.result()]
16131598

1599+
arrow_schema = schema_to_pyarrow(self._projected_schema, include_field_ids=False)
1600+
16141601
if len(tables) < 1:
1615-
return pa.Table.from_batches([], schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False))
1602+
return pa.Table.from_batches([], schema=arrow_schema)
16161603

16171604
result = pa.concat_tables(tables, promote_options="permissive")
16181605

1606+
if property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, False):
1607+
deprecation_message(
1608+
deprecated_in="0.10.0",
1609+
removed_in="0.11.0",
1610+
help_message=f"Property `{PYARROW_USE_LARGE_TYPES_ON_READ}` will be removed.",
1611+
)
1612+
result = result.cast(arrow_schema)
1613+
16191614
if self._limit is not None:
16201615
return result.slice(0, self._limit)
16211616

@@ -1658,7 +1653,6 @@ def _record_batches_from_scan_tasks_and_deletes(
16581653
deletes_per_file.get(task.file.file_path),
16591654
self._case_sensitive,
16601655
self._table_metadata.name_mapping(),
1661-
self._use_large_types,
16621656
self._table_metadata.spec(),
16631657
)
16641658
for batch in batches:
@@ -1677,13 +1671,12 @@ def _to_requested_schema(
16771671
batch: pa.RecordBatch,
16781672
downcast_ns_timestamp_to_us: bool = False,
16791673
include_field_ids: bool = False,
1680-
use_large_types: bool = True,
16811674
) -> pa.RecordBatch:
16821675
# We could reuse some of these visitors
16831676
struct_array = visit_with_partner(
16841677
requested_schema,
16851678
batch,
1686-
ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids, use_large_types),
1679+
ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids),
16871680
ArrowAccessor(file_schema),
16881681
)
16891682
return pa.RecordBatch.from_struct_array(struct_array)
@@ -1693,20 +1686,27 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
16931686
_file_schema: Schema
16941687
_include_field_ids: bool
16951688
_downcast_ns_timestamp_to_us: bool
1696-
_use_large_types: bool
1689+
_use_large_types: Optional[bool]
16971690

16981691
def __init__(
16991692
self,
17001693
file_schema: Schema,
17011694
downcast_ns_timestamp_to_us: bool = False,
17021695
include_field_ids: bool = False,
1703-
use_large_types: bool = True,
1696+
use_large_types: Optional[bool] = None,
17041697
) -> None:
17051698
self._file_schema = file_schema
17061699
self._include_field_ids = include_field_ids
17071700
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
17081701
self._use_large_types = use_large_types
17091702

1703+
if use_large_types is not None:
1704+
deprecation_message(
1705+
deprecated_in="0.10.0",
1706+
removed_in="0.11.0",
1707+
help_message="Argument `use_large_types` will be removed from ArrowProjectionVisitor",
1708+
)
1709+
17101710
def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
17111711
file_field = self._file_schema.find_field(field.field_id)
17121712

@@ -1715,7 +1715,7 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
17151715
target_schema = schema_to_pyarrow(
17161716
promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids
17171717
)
1718-
if not self._use_large_types:
1718+
if self._use_large_types is False:
17191719
target_schema = _pyarrow_schema_ensure_small_types(target_schema)
17201720
return values.cast(target_schema)
17211721
elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type:

pyiceberg/table/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1785,7 +1785,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
17851785
return pa.RecordBatchReader.from_batches(
17861786
target_schema,
17871787
batches,
1788-
)
1788+
).cast(target_schema)
17891789

17901790
def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
17911791
"""Read a Pandas DataFrame eagerly from this Iceberg table.

tests/integration/test_add_files.py

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from pyiceberg.catalog import Catalog
3434
from pyiceberg.exceptions import NoSuchTableError
3535
from pyiceberg.io import FileIO
36-
from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _pyarrow_schema_ensure_large_types
36+
from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException
3737
from pyiceberg.manifest import DataFile
3838
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
3939
from pyiceberg.schema import Schema
@@ -588,11 +588,6 @@ def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_ca
588588
pa.field("foo", pa.string(), nullable=False),
589589
]
590590
)
591-
arrow_schema_large = pa.schema(
592-
[
593-
pa.field("foo", pa.large_string(), nullable=False),
594-
]
595-
)
596591

597592
tbl = _create_table(session_catalog, identifier, format_version, schema=iceberg_schema)
598593

@@ -614,27 +609,27 @@ def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_ca
614609
tbl.add_files([file_path])
615610

616611
table_schema = tbl.scan().to_arrow().schema
617-
assert table_schema == arrow_schema_large
612+
assert table_schema == arrow_schema
618613

619614
file_path_large = f"s3://warehouse/default/unpartitioned_with_large_types/v{format_version}/test-1.parquet"
620615
_write_parquet(
621616
tbl.io,
622617
file_path_large,
623-
arrow_schema_large,
618+
arrow_schema,
624619
pa.Table.from_pylist(
625620
[
626621
{
627622
"foo": "normal",
628623
}
629624
],
630-
schema=arrow_schema_large,
625+
schema=arrow_schema,
631626
),
632627
)
633628

634629
tbl.add_files([file_path_large])
635630

636631
table_schema = tbl.scan().to_arrow().schema
637-
assert table_schema == arrow_schema_large
632+
assert table_schema == arrow_schema
638633

639634

640635
@pytest.mark.integration
@@ -748,8 +743,8 @@ def test_add_files_with_valid_upcast(
748743
pa.schema(
749744
(
750745
pa.field("long", pa.int64(), nullable=True),
751-
pa.field("list", pa.large_list(pa.int64()), nullable=False),
752-
pa.field("map", pa.map_(pa.large_string(), pa.int64()), nullable=False),
746+
pa.field("list", pa.list_(pa.int64()), nullable=False),
747+
pa.field("map", pa.map_(pa.string(), pa.int64()), nullable=False),
753748
pa.field("double", pa.float64(), nullable=True),
754749
pa.field("uuid", pa.binary(length=16), nullable=True), # can UUID is read as fixed length binary of length 16
755750
)
@@ -799,7 +794,7 @@ def test_add_files_subset_of_schema(spark: SparkSession, session_catalog: Catalo
799794
"qux": date(2024, 3, 7),
800795
}
801796
],
802-
schema=_pyarrow_schema_ensure_large_types(ARROW_SCHEMA),
797+
schema=ARROW_SCHEMA,
803798
)
804799

805800
lhs = spark.table(f"{identifier}").toPandas()

tests/integration/test_reads.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -831,7 +831,16 @@ def test_configure_row_group_batch_size(session_catalog: Catalog) -> None:
831831

832832
@pytest.mark.integration
833833
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
834-
def test_table_scan_default_to_large_types(catalog: Catalog) -> None:
834+
def test_table_scan_keep_types(catalog: Catalog) -> None:
835+
expected_schema = pa.schema(
836+
[
837+
pa.field("string", pa.string()),
838+
pa.field("string-to-binary", pa.large_binary()),
839+
pa.field("binary", pa.binary()),
840+
pa.field("list", pa.list_(pa.large_string())),
841+
]
842+
)
843+
835844
identifier = "default.test_table_scan_default_to_large_types"
836845
arrow_table = pa.Table.from_arrays(
837846
[
@@ -840,7 +849,7 @@ def test_table_scan_default_to_large_types(catalog: Catalog) -> None:
840849
pa.array([b"a", b"b", b"c"]),
841850
pa.array([["a", "b"], ["c", "d"], ["e", "f"]]),
842851
],
843-
names=["string", "string-to-binary", "binary", "list"],
852+
schema=expected_schema,
844853
)
845854

846855
try:
@@ -859,15 +868,6 @@ def test_table_scan_default_to_large_types(catalog: Catalog) -> None:
859868
update_schema.update_column("string-to-binary", BinaryType())
860869

861870
result_table = tbl.scan().to_arrow()
862-
863-
expected_schema = pa.schema(
864-
[
865-
pa.field("string", pa.large_string()),
866-
pa.field("string-to-binary", pa.large_binary()),
867-
pa.field("binary", pa.large_binary()),
868-
pa.field("list", pa.large_list(pa.large_string())),
869-
]
870-
)
871871
assert result_table.schema.equals(expected_schema)
872872

873873

@@ -906,7 +906,7 @@ def test_table_scan_override_with_small_types(catalog: Catalog) -> None:
906906
expected_schema = pa.schema(
907907
[
908908
pa.field("string", pa.string()),
909-
pa.field("string-to-binary", pa.binary()),
909+
pa.field("string-to-binary", pa.large_binary()),
910910
pa.field("binary", pa.binary()),
911911
pa.field("list", pa.list_(pa.string())),
912912
]

tests/integration/test_writes/test_writes.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from urllib.parse import urlparse
2626

2727
import pandas as pd
28+
import pandas.testing
2829
import pyarrow as pa
2930
import pyarrow.compute as pc
3031
import pyarrow.parquet as pq
@@ -401,7 +402,14 @@ def test_python_writes_dictionary_encoded_column_with_spark_reads(
401402
tbl.append(arrow_table)
402403
spark_df = spark.sql(f"SELECT * FROM {identifier}").toPandas()
403404
pyiceberg_df = tbl.scan().to_pandas()
404-
assert spark_df.equals(pyiceberg_df)
405+
406+
# We're just interested in the content, PyIceberg actually makes a nice Categorical out of it:
407+
# E AssertionError: Attributes of DataFrame.iloc[:, 1] (column name="name") are different
408+
# E
409+
# E Attribute "dtype" are different
410+
# E [left]: object
411+
# E [right]: CategoricalDtype(categories=['AB', 'CD', 'EF'], ordered=False, categories_dtype=object)
412+
pandas.testing.assert_frame_equal(spark_df, pyiceberg_df, check_dtype=False, check_categorical=False)
405413

406414

407415
@pytest.mark.integration
@@ -422,7 +430,7 @@ def test_python_writes_with_small_and_large_types_spark_reads(
422430
}
423431
pa_schema = pa.schema(
424432
[
425-
pa.field("foo", pa.large_string()),
433+
pa.field("foo", pa.string()),
426434
pa.field("id", pa.int32()),
427435
pa.field("name", pa.string()),
428436
pa.field(
@@ -432,7 +440,7 @@ def test_python_writes_with_small_and_large_types_spark_reads(
432440
pa.field("street", pa.string()),
433441
pa.field("city", pa.string()),
434442
pa.field("zip", pa.int32()),
435-
pa.field("bar", pa.large_string()),
443+
pa.field("bar", pa.string()),
436444
]
437445
),
438446
),
@@ -448,17 +456,17 @@ def test_python_writes_with_small_and_large_types_spark_reads(
448456
arrow_table_on_read = tbl.scan().to_arrow()
449457
assert arrow_table_on_read.schema == pa.schema(
450458
[
451-
pa.field("foo", pa.large_string()),
459+
pa.field("foo", pa.string()),
452460
pa.field("id", pa.int32()),
453-
pa.field("name", pa.large_string()),
461+
pa.field("name", pa.string()),
454462
pa.field(
455463
"address",
456464
pa.struct(
457465
[
458-
pa.field("street", pa.large_string()),
459-
pa.field("city", pa.large_string()),
466+
pa.field("street", pa.string()),
467+
pa.field("city", pa.string()),
460468
pa.field("zip", pa.int32()),
461-
pa.field("bar", pa.large_string()),
469+
pa.field("bar", pa.string()),
462470
]
463471
),
464472
),
@@ -1164,8 +1172,8 @@ def test_table_write_schema_with_valid_upcast(
11641172
pa.schema(
11651173
(
11661174
pa.field("long", pa.int64(), nullable=True),
1167-
pa.field("list", pa.large_list(pa.int64()), nullable=False),
1168-
pa.field("map", pa.map_(pa.large_string(), pa.int64()), nullable=False),
1175+
pa.field("list", pa.list_(pa.int64()), nullable=False),
1176+
pa.field("map", pa.map_(pa.string(), pa.int64()), nullable=False),
11691177
pa.field("double", pa.float64(), nullable=True), # can support upcasting float to double
11701178
pa.field("uuid", pa.binary(length=16), nullable=True), # can UUID is read as fixed length binary of length 16
11711179
)

tests/io/test_pyarrow.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,10 +1065,10 @@ def test_read_map(schema_map: Schema, file_map: str) -> None:
10651065

10661066
assert (
10671067
repr(result_table.schema)
1068-
== """properties: map<large_string, large_string>
1069-
child 0, entries: struct<key: large_string not null, value: large_string not null> not null
1070-
child 0, key: large_string not null
1071-
child 1, value: large_string not null"""
1068+
== """properties: map<string, string>
1069+
child 0, entries: struct<key: string not null, value: string not null> not null
1070+
child 0, key: string not null
1071+
child 1, value: string not null"""
10721072
)
10731073

10741074

@@ -1181,7 +1181,7 @@ def test_identity_transform_column_projection(tmp_path: str, catalog: InMemoryCa
11811181
with transaction.update_snapshot().overwrite() as update:
11821182
update.append_data_file(unpartitioned_file)
11831183

1184-
schema = pa.schema([("other_field", pa.large_string()), ("partition_id", pa.int64())])
1184+
schema = pa.schema([("other_field", pa.string()), ("partition_id", pa.int64())])
11851185
assert table.scan().to_arrow() == pa.table(
11861186
{
11871187
"other_field": ["foo", "bar", "baz"],
@@ -1245,7 +1245,7 @@ def test_identity_transform_columns_projection(tmp_path: str, catalog: InMemoryC
12451245
assert (
12461246
str(table.scan().to_arrow())
12471247
== """pyarrow.Table
1248-
field_1: large_string
1248+
field_1: string
12491249
field_2: int64
12501250
field_3: int64
12511251
----
@@ -1470,9 +1470,9 @@ def test_projection_maps_of_structs(schema_map_of_structs: Schema, file_map_of_s
14701470
assert actual.as_py() == expected
14711471
assert (
14721472
repr(result_table.schema)
1473-
== """locations: map<large_string, struct<latitude: double not null, longitude: double not null, altitude: double>>
1474-
child 0, entries: struct<key: large_string not null, value: struct<latitude: double not null, longitude: double not null, altitude: double> not null> not null
1475-
child 0, key: large_string not null
1473+
== """locations: map<string, struct<latitude: double not null, longitude: double not null, altitude: double>>
1474+
child 0, entries: struct<key: string not null, value: struct<latitude: double not null, longitude: double not null, altitude: double> not null> not null
1475+
child 0, key: string not null
14761476
child 1, value: struct<latitude: double not null, longitude: double not null, altitude: double> not null
14771477
child 0, latitude: double not null
14781478
child 1, longitude: double not null

0 commit comments

Comments
 (0)