From 784bdd18cb72264738d4b1507f4166de2e8c07d9 Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 5 Mar 2025 22:16:35 +0100 Subject: [PATCH 1/5] Make records purely position based This aligns the implementation with Java. We had the keywords there mostly for the tests, but they should not be used, and it seems like that's already the case :'( I was undecided if the costs of this PR (all the changes), are worth it, but I see more PRs using the Record in a bad way (example https://github.com/apache/iceberg-python/pull/1743) that might lead to very subtle bugs where the position might sometime change based on the ordering of the dict. --- pyiceberg/avro/file.py | 17 +- pyiceberg/avro/reader.py | 30 +- pyiceberg/io/pyarrow.py | 6 +- pyiceberg/manifest.py | 373 ++++++++++++------- pyiceberg/partitioning.py | 14 +- pyiceberg/schema.py | 57 +++ pyiceberg/table/update/snapshot.py | 8 +- pyiceberg/typedef.py | 43 +-- tests/avro/test_file.py | 157 ++++---- tests/avro/test_reader.py | 22 +- tests/avro/test_resolver.py | 6 +- tests/avro/test_writer.py | 31 +- tests/conftest.py | 2 +- tests/expressions/test_evaluator.py | 33 +- tests/expressions/test_expressions.py | 27 +- tests/expressions/test_residual_evaluator.py | 28 +- tests/expressions/test_visitors.py | 30 +- tests/integration/test_partitioning_key.py | 71 ++-- tests/integration/test_rest_manifest.py | 4 +- tests/io/test_pyarrow.py | 34 +- tests/io/test_pyarrow_stats.py | 28 +- tests/table/test_init.py | 24 +- tests/table/test_partitioning.py | 2 +- tests/table/test_snapshots.py | 8 +- tests/test_typedef.py | 69 ++-- tests/utils/test_manifest.py | 6 +- 26 files changed, 611 insertions(+), 519 deletions(-) diff --git a/pyiceberg/avro/file.py b/pyiceberg/avro/file.py index d0da7651b7..9db585308d 100644 --- a/pyiceberg/avro/file.py +++ b/pyiceberg/avro/file.py @@ -74,10 +74,17 @@ class AvroFileHeader(Record): - __slots__ = ("magic", "meta", "sync") - magic: bytes - meta: Dict[str, str] - sync: bytes + @property + def magic(self) -> bytes: + return self._data[0] + + @property + def meta(self) -> Dict[str, str]: + return self._data[1] + + @property + def sync(self) -> bytes: + return self._data[2] def compression_codec(self) -> Optional[Type[Codec]]: """Get the file's compression codec algorithm from the file's metadata. @@ -271,7 +278,7 @@ def __exit__( def _write_header(self) -> None: json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.file_schema, schema_name=self.schema_name)) meta = {**self.metadata, _SCHEMA_KEY: json_schema, _CODEC_KEY: "null"} - header = AvroFileHeader(magic=MAGIC, meta=meta, sync=self.sync_bytes) + header = AvroFileHeader(MAGIC, meta, self.sync_bytes) construct_writer(META_SCHEMA).write(self.encoder, header) def write_block(self, objects: List[D]) -> None: diff --git a/pyiceberg/avro/reader.py b/pyiceberg/avro/reader.py index a5578680d6..ed696b8e81 100644 --- a/pyiceberg/avro/reader.py +++ b/pyiceberg/avro/reader.py @@ -286,7 +286,15 @@ def skip(self, decoder: BinaryDecoder) -> None: class StructReader(Reader): - __slots__ = ("field_readers", "create_struct", "struct", "_create_with_keyword", "_field_reader_functions", "_hash") + __slots__ = ( + "field_readers", + "create_struct", + "struct", + "_create_with_keyword", + "_field_reader_functions", + "_hash", + "_max_pos", + ) field_readers: Tuple[Tuple[Optional[int], Reader], ...] create_struct: Callable[..., StructProtocol] struct: StructType @@ -300,34 +308,28 @@ def __init__( ) -> None: self.field_readers = field_readers self.create_struct = create_struct + # TODO: Implement struct-reuse self.struct = struct - try: - # Try initializing the struct, first with the struct keyword argument - created_struct = self.create_struct(struct=self.struct) - self._create_with_keyword = True - except TypeError as e: - if "'struct' is an invalid keyword argument for" in str(e): - created_struct = self.create_struct() - self._create_with_keyword = False - else: - raise ValueError(f"Unable to initialize struct: {self.create_struct}") from e - - if not isinstance(created_struct, StructProtocol): + if not isinstance(self.create_struct(), StructProtocol): raise ValueError(f"Incompatible with StructProtocol: {self.create_struct}") reading_callbacks: List[Tuple[Optional[int], Callable[[BinaryDecoder], Any]]] = [] + max_pos = -1 for pos, field in field_readers: if pos is not None: reading_callbacks.append((pos, field.read)) + max_pos = max(max_pos, pos) else: reading_callbacks.append((None, field.skip)) self._field_reader_functions = tuple(reading_callbacks) self._hash = hash(self._field_reader_functions) + self._max_pos = 1 + max_pos def read(self, decoder: BinaryDecoder) -> StructProtocol: - struct = self.create_struct(struct=self.struct) if self._create_with_keyword else self.create_struct() + # TODO: Implement struct-reuse + struct = self.create_struct(*[None] * self._max_pos) for pos, field_reader in self._field_reader_functions: if pos is not None: struct[pos] = field_reader(decoder) # later: pass reuse in here diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index eab26b0c57..b03037a99d 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2225,7 +2225,7 @@ def _partition_value(self, partition_field: PartitionField, schema: Schema) -> A return transform(lower_value) def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record: - return Record(**{field.name: self._partition_value(field, schema) for field in partition_spec.fields}) + return Record(*[self._partition_value(field, schema) for field in partition_spec.fields]) def to_serialized_dict(self) -> Dict[str, Any]: lower_bounds = {} @@ -2398,7 +2398,7 @@ def write_parquet(task: WriteTask) -> DataFile: stats_columns=compute_statistics_plan(file_schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(file_schema), ) - data_file = DataFile( + data_file = DataFile.from_args( content=DataFileContent.DATA, file_path=file_path, file_format=FileFormat.PARQUET, @@ -2489,7 +2489,7 @@ def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_pa stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - data_file = DataFile( + data_file = DataFile.from_args( content=DataFileContent.DATA, file_path=file_path, file_format=FileFormat.PARQUET, diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 5a32a6330c..31da423871 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -321,42 +321,85 @@ def data_file_with_partition(partition_type: StructType, format_version: TableVe class DataFile(Record): - __slots__ = ( - "content", - "file_path", - "file_format", - "partition", - "record_count", - "file_size_in_bytes", - "column_sizes", - "value_counts", - "null_value_counts", - "nan_value_counts", - "lower_bounds", - "upper_bounds", - "key_metadata", - "split_offsets", - "equality_ids", - "sort_order_id", - "spec_id", - ) - content: DataFileContent - file_path: str - file_format: FileFormat - partition: Record - record_count: int - file_size_in_bytes: int - column_sizes: Dict[int, int] - value_counts: Dict[int, int] - null_value_counts: Dict[int, int] - nan_value_counts: Dict[int, int] - lower_bounds: Dict[int, bytes] - upper_bounds: Dict[int, bytes] - key_metadata: Optional[bytes] - split_offsets: Optional[List[int]] - equality_ids: Optional[List[int]] - sort_order_id: Optional[int] - spec_id: int + @classmethod + def from_args(cls, _table_format_version: TableVersion = DEFAULT_READ_VERSION, **arguments: Any) -> DataFile: + struct = DATA_FILE_TYPE[_table_format_version] + return super()._bind(struct, **arguments) + + @property + def content(self) -> DataFileContent: + return self._data[0] + + @property + def file_path(self) -> str: + return self._data[1] + + @property + def file_format(self) -> FileFormat: + return self._data[2] + + @property + def partition(self) -> Record: + return self._data[3] + + @property + def record_count(self) -> int: + return self._data[4] + + @property + def file_size_in_bytes(self) -> int: + return self._data[5] + + @property + def column_sizes(self) -> Dict[int, int]: + return self._data[6] + + @property + def value_counts(self) -> Dict[int, int]: + return self._data[7] + + @property + def null_value_counts(self) -> Dict[int, int]: + return self._data[8] + + @property + def nan_value_counts(self) -> Dict[int, int]: + return self._data[9] + + @property + def lower_bounds(self) -> Dict[int, bytes]: + return self._data[10] + + @property + def upper_bounds(self) -> Dict[int, bytes]: + return self._data[11] + + @property + def key_metadata(self) -> Optional[bytes]: + return self._data[12] + + @property + def split_offsets(self) -> Optional[List[int]]: + return self._data[13] + + @property + def equality_ids(self) -> Optional[List[int]]: + return self._data[14] + + @property + def sort_order_id(self) -> Optional[int]: + return self._data[15] + + # Spec ID should not be stored in the file + _spec_id: int + + @property + def spec_id(self) -> int: + return self._spec_id + + @spec_id.setter + def spec_id(self, value: int) -> None: + self._spec_id = value def __setattr__(self, name: str, value: Any) -> None: """Assign a key/value to a DataFile.""" @@ -365,12 +408,6 @@ def __setattr__(self, name: str, value: Any) -> None: value = FileFormat[value] super().__setattr__(name, value) - def __init__(self, format_version: TableVersion = DEFAULT_READ_VERSION, *data: Any, **named_data: Any) -> None: - super().__init__( - *data, - **{"struct": DATA_FILE_TYPE[format_version], **named_data}, - ) - def __hash__(self) -> int: """Return the hash of the file path.""" return hash(self.file_path) @@ -411,53 +448,49 @@ def manifest_entry_schema_with_data_file(format_version: TableVersion, data_file class ManifestEntry(Record): - __slots__ = ("status", "snapshot_id", "sequence_number", "file_sequence_number", "data_file") - status: ManifestEntryStatus - snapshot_id: Optional[int] - sequence_number: Optional[int] - file_sequence_number: Optional[int] - data_file: DataFile + @classmethod + def from_args(cls, _table_format_version: TableVersion = DEFAULT_READ_VERSION, **arguments: Any) -> ManifestEntry: + return super()._bind(**arguments, struct=MANIFEST_ENTRY_SCHEMAS_STRUCT[_table_format_version]) - def __init__(self, *data: Any, **named_data: Any) -> None: - super().__init__(*data, **{"struct": MANIFEST_ENTRY_SCHEMAS_STRUCT[DEFAULT_READ_VERSION], **named_data}) + @property + def status(self) -> ManifestEntryStatus: + return self._data[0] - def _wrap( - self, - new_status: ManifestEntryStatus, - new_snapshot_id: Optional[int], - new_sequence_number: Optional[int], - new_file_sequence_number: Optional[int], - new_file: DataFile, - ) -> ManifestEntry: - self.status = new_status - self.snapshot_id = new_snapshot_id - self.sequence_number = new_sequence_number - self.file_sequence_number = new_file_sequence_number - self.data_file = new_file - return self + @status.setter + def status(self, value: ManifestEntryStatus) -> None: + self._data[0] = value - def _wrap_append( - self, new_snapshot_id: Optional[int], new_sequence_number: Optional[int], new_file: DataFile - ) -> ManifestEntry: - return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_sequence_number, None, new_file) + @property + def snapshot_id(self) -> Optional[int]: + return self._data[1] - def _wrap_delete( - self, - new_snapshot_id: Optional[int], - new_sequence_number: Optional[int], - new_file_sequence_number: Optional[int], - new_file: DataFile, - ) -> ManifestEntry: - return self._wrap(ManifestEntryStatus.DELETED, new_snapshot_id, new_sequence_number, new_file_sequence_number, new_file) - - def _wrap_existing( - self, - new_snapshot_id: Optional[int], - new_sequence_number: Optional[int], - new_file_sequence_number: Optional[int], - new_file: DataFile, - ) -> ManifestEntry: - return self._wrap(ManifestEntryStatus.EXISTING, new_snapshot_id, new_sequence_number, new_file_sequence_number, new_file) + @snapshot_id.setter + def snapshot_id(self, value: int) -> None: + self._data[0] = value + + @property + def sequence_number(self) -> Optional[int]: + return self._data[2] + + @sequence_number.setter + def sequence_number(self, value: int) -> None: + self._data[2] = value + + @property + def file_sequence_number(self) -> Optional[int]: + return self._data[3] + + @file_sequence_number.setter + def file_sequence_number(self, value: int) -> None: + self._data[3] = value + + @property + def data_file(self) -> DataFile: + return self._data[4] + + @data_file.setter + def data_file(self, value: DataFile) -> None: + self._data[4] = value PARTITION_FIELD_SUMMARY_TYPE = StructType( @@ -469,14 +502,25 @@ def _wrap_existing( class PartitionFieldSummary(Record): - __slots__ = ("contains_null", "contains_nan", "lower_bound", "upper_bound") - contains_null: bool - contains_nan: Optional[bool] - lower_bound: Optional[bytes] - upper_bound: Optional[bytes] + @classmethod + def from_args(cls, **arguments: Any) -> PartitionFieldSummary: + return super()._bind(**arguments, struct=PARTITION_FIELD_SUMMARY_TYPE) - def __init__(self, *data: Any, **named_data: Any) -> None: - super().__init__(*data, **{"struct": PARTITION_FIELD_SUMMARY_TYPE, **named_data}) + @property + def contains_null(self) -> bool: + return self._data[0] + + @property + def contains_nan(self) -> Optional[bool]: + return self._data[1] + + @property + def lower_bound(self) -> Optional[bytes]: + return self._data[2] + + @property + def upper_bound(self) -> Optional[bytes]: + return self._data[3] class PartitionFieldStats: @@ -495,10 +539,10 @@ def __init__(self, iceberg_type: PrimitiveType) -> None: def to_summary(self) -> PartitionFieldSummary: return PartitionFieldSummary( - contains_null=self._contains_null, - contains_nan=self._contains_nan, - lower_bound=to_bytes(self._type, self._min) if self._min is not None else None, - upper_bound=to_bytes(self._type, self._max) if self._max is not None else None, + self._contains_null, + self._contains_nan, + to_bytes(self._type, self._min) if self._min is not None else None, + to_bytes(self._type, self._max) if self._max is not None else None, ) def update(self, value: Any) -> None: @@ -570,41 +614,77 @@ def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partition class ManifestFile(Record): - __slots__ = ( - "manifest_path", - "manifest_length", - "partition_spec_id", - "content", - "sequence_number", - "min_sequence_number", - "added_snapshot_id", - "added_files_count", - "existing_files_count", - "deleted_files_count", - "added_rows_count", - "existing_rows_count", - "deleted_rows_count", - "partitions", - "key_metadata", - ) - manifest_path: str - manifest_length: int - partition_spec_id: int - content: ManifestContent - sequence_number: int - min_sequence_number: int - added_snapshot_id: int - added_files_count: Optional[int] - existing_files_count: Optional[int] - deleted_files_count: Optional[int] - added_rows_count: Optional[int] - existing_rows_count: Optional[int] - deleted_rows_count: Optional[int] - partitions: Optional[List[PartitionFieldSummary]] - key_metadata: Optional[bytes] - - def __init__(self, *data: Any, **named_data: Any) -> None: - super().__init__(*data, **{"struct": MANIFEST_LIST_FILE_STRUCTS[DEFAULT_READ_VERSION], **named_data}) + @classmethod + def from_args(cls, _table_format_version: TableVersion = DEFAULT_READ_VERSION, **arguments: Any) -> ManifestFile: + return super()._bind(**arguments, struct=MANIFEST_LIST_FILE_SCHEMAS[_table_format_version]) + + @property + def manifest_path(self) -> str: + return self._data[0] + + @property + def manifest_length(self) -> int: + return self._data[1] + + @property + def partition_spec_id(self) -> int: + return self._data[2] + + @property + def content(self) -> ManifestContent: + return self._data[3] + + @property + def sequence_number(self) -> int: + return self._data[4] + + @sequence_number.setter + def sequence_number(self, value: int) -> None: + self._data[4] = value + + @property + def min_sequence_number(self) -> int: + return self._data[5] + + @min_sequence_number.setter + def min_sequence_number(self, value: int) -> None: + self._data[5] = value + + @property + def added_snapshot_id(self) -> Optional[int]: + return self._data[6] + + @property + def added_files_count(self) -> Optional[int]: + return self._data[7] + + @property + def existing_files_count(self) -> Optional[int]: + return self._data[8] + + @property + def deleted_files_count(self) -> Optional[int]: + return self._data[9] + + @property + def added_rows_count(self) -> Optional[int]: + return self._data[10] + + @property + def existing_rows_count(self) -> Optional[int]: + return self._data[11] + + @property + def deleted_rows_count(self) -> Optional[int]: + return self._data[12] + + @property + def partitions(self) -> Optional[List[PartitionFieldSummary]]: + return self._data[13] + + @property + def key_metadata(self) -> Optional[bytes]: + return self._data[14] def has_added_files(self) -> bool: return self.added_files_count is None or self.added_files_count > 0 @@ -734,7 +814,6 @@ def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, self._deleted_rows = 0 self._min_sequence_number = None self._partitions = [] - self._reused_entry_wrapper = ManifestEntry() def __enter__(self) -> ManifestWriter: """Open the writer.""" @@ -795,7 +874,7 @@ def to_manifest_file(self) -> ManifestFile: # once the manifest file is generated, no more entries can be added self.closed = True min_sequence_number = self._min_sequence_number or UNASSIGNED_SEQ - return ManifestFile( + return ManifestFile.from_args( manifest_path=self._output_file.location, manifest_length=len(self._writer.output_file), partition_spec_id=self._spec.spec_id, @@ -842,23 +921,39 @@ def add_entry(self, entry: ManifestEntry) -> ManifestWriter: def add(self, entry: ManifestEntry) -> ManifestWriter: if entry.sequence_number is not None and entry.sequence_number >= 0: - self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, entry.sequence_number, entry.data_file)) + self.add_entry( + ManifestEntry.from_args( + snapshot_id=self._snapshot_id, sequence_number=entry.sequence_number, data_file=entry.data_file + ) + ) else: - self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, None, entry.data_file)) + self.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, snapshot_id=self._snapshot_id, data_file=entry.data_file + ) + ) return self def delete(self, entry: ManifestEntry) -> ManifestWriter: self.add_entry( - self._reused_entry_wrapper._wrap_delete( - self._snapshot_id, entry.sequence_number, entry.file_sequence_number, entry.data_file + ManifestEntry.from_args( + status=ManifestEntryStatus.DELETED, + snapshot_id=self._snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, ) ) return self def existing(self, entry: ManifestEntry) -> ManifestWriter: self.add_entry( - self._reused_entry_wrapper._wrap_existing( - entry.snapshot_id, entry.sequence_number, entry.file_sequence_number, entry.data_file + ManifestEntry.from_args( + status=ManifestEntryStatus.EXISTING, + snapshot_id=entry.snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, ) ) return self diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index 2bed2ce899..f702182501 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -390,18 +390,20 @@ class PartitionKey: @cached_property def partition(self) -> Record: # partition key transformed with iceberg internal representation as input - iceberg_typed_key_values = {} + iceberg_typed_key_values = [] for raw_partition_field_value in self.field_values: partition_fields = self.partition_spec.source_id_to_fields_map[raw_partition_field_value.field.source_id] if len(partition_fields) != 1: raise ValueError(f"Cannot have redundant partitions: {partition_fields}") partition_field = partition_fields[0] - iceberg_typed_key_values[partition_field.name] = partition_record_value( - partition_field=partition_field, - value=raw_partition_field_value.value, - schema=self.schema, + iceberg_typed_key_values.append( + partition_record_value( + partition_field=partition_field, + value=raw_partition_field_value.value, + schema=self.schema, + ) ) - return Record(**iceberg_typed_key_values) + return Record(*iceberg_typed_key_values) def to_path(self) -> str: return self.partition_spec.partition_to_path(self.partition, self.schema) diff --git a/pyiceberg/schema.py b/pyiceberg/schema.py index 5a373cb15f..4cd499e71c 100644 --- a/pyiceberg/schema.py +++ b/pyiceberg/schema.py @@ -1718,3 +1718,60 @@ def map(self, map_type: MapType, key_result: Callable[[], bool], value_result: C def primitive(self, primitive: PrimitiveType) -> bool: return True + + +# def _bind_to_struct( +# struct: StructType, +# data: Dict[str, Any], +# ) -> Record: +# return visit_with_partner( +# struct, +# data, +# _BindDictToRecord(), +# ArrowAccessor(), +# ) +# +# +# class _BindDictToRecord(SchemaWithPartnerVisitor[Any, Any]): +# def schema(self, schema: Schema, schema_partner: Any, struct_result: Any) -> Any: +# return struct_result +# +# def struct(self, struct: StructType, struct_partner: Any, field_results: List[Any]) -> Any: +# if isinstance(struct_partner, list): +# return [Record(*result) for result in field_results] +# else: +# return Record(*field_results) +# +# def field(self, field: NestedField, partner_struct: Optional[Any], field_result: Any) -> Optional[pa.Array]: +# return field_result +# +# def list(self, list_type: ListType, list_array: Optional[Any], element_result: Any) -> Any: +# return element_result +# +# def map(self, map_type: MapType, map_array: Optional[Any], key_result: Optional[Any], value_result: Optional[Any]) -> Any: +# return value_result +# +# def primitive(self, _: PrimitiveType, primitive_partner: Any) -> Any: +# return primitive_partner +# +# +# class ArrowAccessor(PartnerAccessor[Any]): +# def schema_partner(self, partner: Any) -> Any: +# return partner +# +# def field_partner(self, partner_struct: Any, field_id: int, name: str) -> Any: +# if isinstance(partner_struct, dict): +# return partner_struct.get(name) +# if isinstance(partner_struct, list): +# return [e.get(name) for e in partner_struct] +# else: +# return partner_struct +# +# def list_element_partner(self, partner_list: Optional[Any]) -> Optional[Any]: +# return partner_list +# +# def map_key_partner(self, partner_map: Optional[Any]) -> Optional[Any]: +# return partner_map +# +# def map_value_partner(self, partner_map: Optional[Any]) -> Optional[Any]: +# return partner_map diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index f21c501780..b7474746c8 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -157,7 +157,7 @@ def _write_added_manifest() -> List[ManifestFile]: ) as writer: for data_file in self._added_data_files: writer.add( - ManifestEntry( + ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, snapshot_id=self._snapshot_id, sequence_number=None, @@ -368,7 +368,7 @@ def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry], boo schema = self._transaction.table_metadata.schema() def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry: - return ManifestEntry( + return ManifestEntry.from_args( status=status, snapshot_id=entry.snapshot_id, sequence_number=entry.sequence_number, @@ -555,7 +555,7 @@ def _existing_manifests(self) -> List[ManifestFile]: ) as writer: [ writer.add_entry( - ManifestEntry( + ManifestEntry.from_args( status=ManifestEntryStatus.EXISTING, snapshot_id=entry.snapshot_id, sequence_number=entry.sequence_number, @@ -586,7 +586,7 @@ def _deleted_entries(self) -> List[ManifestEntry]: def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: return [ - ManifestEntry( + ManifestEntry.from_args( status=ManifestEntryStatus.DELETED, snapshot_id=entry.snapshot_id, sequence_number=entry.sequence_number, diff --git a/pyiceberg/typedef.py b/pyiceberg/typedef.py index 07374887a3..dbc4a164fb 100644 --- a/pyiceberg/typedef.py +++ b/pyiceberg/typedef.py @@ -19,13 +19,13 @@ from abc import abstractmethod from datetime import date, datetime from decimal import Decimal -from functools import lru_cache from typing import ( TYPE_CHECKING, Any, Callable, Dict, Generic, + List, Literal, Optional, Protocol, @@ -38,7 +38,7 @@ from uuid import UUID from pydantic import BaseModel, ConfigDict, RootModel -from typing_extensions import TypeAlias +from typing_extensions import Self, TypeAlias if TYPE_CHECKING: from pyiceberg.types import StructType @@ -171,51 +171,36 @@ class IcebergRootModel(RootModel[T], Generic[T]): model_config = ConfigDict(frozen=True) -@lru_cache -def _get_struct_fields(struct_type: StructType) -> Tuple[str, ...]: - return tuple(field.name for field in struct_type.fields) - - class Record(StructProtocol): - __slots__ = ("_position_to_field_name",) - _position_to_field_name: Tuple[str, ...] - - def __init__(self, *data: Any, struct: Optional[StructType] = None, **named_data: Any) -> None: - if struct is not None: - self._position_to_field_name = _get_struct_fields(struct) - elif named_data: - # Order of named_data is preserved (PEP 468) so this can be used to generate the position dict - self._position_to_field_name = tuple(named_data.keys()) - else: - self._position_to_field_name = tuple(f"field{idx + 1}" for idx in range(len(data))) + __slots__ = ("_data",) + _data: List[Any] - for idx, d in enumerate(data): - self[idx] = d + @classmethod + def _bind(cls, struct: StructType, **arguments: Any) -> Self: + return cls(*[arguments[field.name] if field.name in arguments else field.initial_default for field in struct.fields]) - for field_name, d in named_data.items(): - self.__setattr__(field_name, d) + def __init__(self, *data: Any) -> None: + self._data = list(data) def __setitem__(self, pos: int, value: Any) -> None: """Assign a value to a Record.""" - self.__setattr__(self._position_to_field_name[pos], value) + self._data[pos] = value def __getitem__(self, pos: int) -> Any: """Fetch a value from a Record.""" - return self.__getattribute__(self._position_to_field_name[pos]) + return self._data[pos] def __eq__(self, other: Any) -> bool: """Return the equality of two instances of the Record class.""" - if not isinstance(other, Record): - return False - return self.__dict__ == other.__dict__ + return self._data == other._data if isinstance(other, Record) else False def __repr__(self) -> str: """Return the string representation of the Record class.""" - return f"{self.__class__.__name__}[{', '.join(f'{key}={repr(value)}' for key, value in self.__dict__.items() if not key.startswith('_'))}]" + return f"{self.__class__.__name__}[{', '.join(str(v) for v in self._data)}]" def __len__(self) -> int: """Return the number of fields in the Record class.""" - return len(self._position_to_field_name) + return len(self._data) def __hash__(self) -> int: """Return hash value of the Record class.""" diff --git a/tests/avro/test_file.py b/tests/avro/test_file.py index 0756b2670c..0a986d5405 100644 --- a/tests/avro/test_file.py +++ b/tests/avro/test_file.py @@ -16,8 +16,7 @@ # under the License. import inspect from _decimal import Decimal -from copy import copy -from datetime import date, datetime, time +from datetime import datetime from enum import Enum from tempfile import TemporaryDirectory from typing import Any @@ -28,7 +27,7 @@ import pyiceberg.avro.file as avro from pyiceberg.avro.codecs.deflate import DeflateCodec -from pyiceberg.avro.file import META_SCHEMA, AvroFileHeader +from pyiceberg.avro.file import AvroFileHeader from pyiceberg.io.pyarrow import PyArrowFileIO from pyiceberg.manifest import ( DEFAULT_BLOCK_SIZE, @@ -40,7 +39,7 @@ ManifestEntryStatus, ) from pyiceberg.schema import Schema -from pyiceberg.typedef import Record +from pyiceberg.typedef import Record, TableVersion from pyiceberg.types import ( BooleanType, DateType, @@ -61,26 +60,17 @@ def get_deflate_compressor() -> None: - header = AvroFileHeader(struct=META_SCHEMA) - header[0] = bytes(0) - header[1] = {"avro.codec": "deflate"} - header[2] = bytes(16) + header = AvroFileHeader(bytes(0), {"avro.codec": "deflate"}, bytes(16)) assert header.compression_codec() == DeflateCodec def get_null_compressor() -> None: - header = AvroFileHeader(struct=META_SCHEMA) - header[0] = bytes(0) - header[1] = {"avro.codec": "null"} - header[2] = bytes(16) + header = AvroFileHeader(bytes(0), {"avro.codec": "null"}, bytes(16)) assert header.compression_codec() is None def test_unknown_codec() -> None: - header = AvroFileHeader(struct=META_SCHEMA) - header[0] = bytes(0) - header[1] = {"avro.codec": "unknown"} - header[2] = bytes(16) + header = AvroFileHeader(bytes(0), {"avro.codec": "unknown"}, bytes(16)) with pytest.raises(ValueError) as exc_info: header.compression_codec() @@ -89,10 +79,7 @@ def test_unknown_codec() -> None: def test_missing_schema() -> None: - header = AvroFileHeader(struct=META_SCHEMA) - header[0] = bytes(0) - header[1] = {} - header[2] = bytes(16) + header = AvroFileHeader(bytes(0), {}, bytes(16)) with pytest.raises(ValueError) as exc_info: header.get_schema() @@ -119,7 +106,7 @@ def todict(obj: Any) -> Any: def test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None: - data_file = DataFile( + data_file = DataFile.from_args( content=DataFileContent.DATA, file_path="s3://some-path/some-file.parquet", file_format=FileFormat.PARQUET, @@ -137,7 +124,7 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None: equality_ids=[], sort_order_id=4, ) - entry = ManifestEntry( + entry = ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, snapshot_id=8638475580105682862, sequence_number=0, @@ -185,7 +172,7 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None: def test_write_manifest_entry_with_iceberg_read_with_fastavro_v2() -> None: - data_file = DataFile( + data_file = DataFile.from_args( content=DataFileContent.DATA, file_path="s3://some-path/some-file.parquet", file_format=FileFormat.PARQUET, @@ -203,7 +190,7 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v2() -> None: equality_ids=[], sort_order_id=4, ) - entry = ManifestEntry( + entry = ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, snapshot_id=8638475580105682862, sequence_number=0, @@ -239,33 +226,32 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v2() -> None: @pytest.mark.parametrize("format_version", [1, 2]) -def test_write_manifest_entry_with_fastavro_read_with_iceberg(format_version: int) -> None: - data_file = DataFile( - content=DataFileContent.DATA, - file_path="s3://some-path/some-file.parquet", - file_format=FileFormat.PARQUET, - partition=Record(), - record_count=131327, - file_size_in_bytes=220669226, - column_sizes={1: 220661854}, - value_counts={1: 131327}, - null_value_counts={1: 0}, - nan_value_counts={}, - lower_bounds={1: b"aaaaaaaaaaaaaaaa"}, - upper_bounds={1: b"zzzzzzzzzzzzzzzz"}, - key_metadata=b"\xde\xad\xbe\xef", - split_offsets=[4, 133697593], - equality_ids=[], - sort_order_id=4, - spec_id=3, - ) - - entry = ManifestEntry( +def test_write_manifest_entry_with_fastavro_read_with_iceberg(format_version: TableVersion) -> None: + data_file_dict = { + "content": DataFileContent.DATA, + "file_path": "s3://some-path/some-file.parquet", + "file_format": FileFormat.PARQUET, + "partition": Record(), + "record_count": 131327, + "file_size_in_bytes": 220669226, + "column_sizes": {1: 220661854}, + "value_counts": {1: 131327}, + "null_value_counts": {1: 0}, + "nan_value_counts": {}, + "lower_bounds": {1: b"aaaaaaaaaaaaaaaa"}, + "upper_bounds": {1: b"zzzzzzzzzzzzzzzz"}, + "key_metadata": b"\xde\xad\xbe\xef", + "split_offsets": [4, 133697593], + "equality_ids": [], + "sort_order_id": 4, + "spec_id": 3, + } + data_file_v2 = DataFile.from_args(**data_file_dict) # type: ignore + + entry = ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, snapshot_id=8638475580105682862, - sequence_number=0, - file_sequence_number=0, - data_file=data_file, + data_file=data_file_v2, ) with TemporaryDirectory() as tmpdir: @@ -297,17 +283,13 @@ def test_write_manifest_entry_with_fastavro_read_with_iceberg(format_version: in avro_entry = next(it) if format_version == 1: - v1_datafile = copy(data_file) - # Not part of V1 - v1_datafile.equality_ids = None + data_file_v1 = DataFile.from_args(**data_file_dict, _table_format_version=format_version) - assert avro_entry == ManifestEntry( - status=ManifestEntryStatus.ADDED, + assert avro_entry == ManifestEntry.from_args( + status=1, snapshot_id=8638475580105682862, - # Not part of v1 - sequence_number=None, - file_sequence_number=None, - data_file=v1_datafile, + data_file=data_file_v1, + _table_format_version=format_version, ) elif format_version == 2: assert entry == avro_entry @@ -335,22 +317,49 @@ def test_all_primitive_types(is_required: bool) -> None: ) class AllPrimitivesRecord(Record): - field_fixed: bytes - field_decimal: Decimal - field_bool: bool - field_int: int - field_long: int - field_float: float - field_double: float - field_date: date - field_time: time - field_timestamp: datetime - field_timestamptz: datetime - field_string: str - field_uuid: UUID - - def __init__(self, *data: Any, **named_data: Any) -> None: - super().__init__(*data, **{"struct": all_primitives_schema.as_struct(), **named_data}) + @property + def field_fixed(self) -> bytes: + return self._data[0] + + @property + def field_decimal(self) -> Decimal: + return self._data[1] + + @property + def field_bool(self) -> bool: + return self._data[2] + + @property + def field_int(self) -> int: + return self._data[3] + + @property + def field_long(self) -> int: + return self._data[4] + + @property + def field_float(self) -> float: + return self._data[5] + + @property + def field_double(self) -> float: + return self._data[6] + + @property + def field_timestamp(self) -> datetime: + return self._data[7] + + @property + def field_timestamptz(self) -> datetime: + return self._data[8] + + @property + def field_string(self) -> str: + return self._data[9] + + @property + def field_uuid(self) -> UUID: + return self._data[10] record = AllPrimitivesRecord( b"\x124Vx\x124Vx\x124Vx\x124Vx", diff --git a/tests/avro/test_reader.py b/tests/avro/test_reader.py index c97d421d87..81d93df272 100644 --- a/tests/avro/test_reader.py +++ b/tests/avro/test_reader.py @@ -343,7 +343,7 @@ def test_read_struct(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x18") struct = StructType(NestedField(1, "id", IntegerType(), required=True)) result = StructReader(((0, IntegerReader()),), Record, struct).read(decoder) - assert repr(result) == "Record[id=12]" + assert repr(result) == "Record[12]" @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) @@ -354,10 +354,10 @@ def test_read_struct_lambda(decoder_class: Callable[[bytes], BinaryDecoder]) -> # You can also pass in an arbitrary function that returns a struct result = StructReader( ((0, IntegerReader()),), - lambda struct: Record(struct=struct), + Record, struct, # pylint: disable=unnecessary-lambda ).read(decoder) - assert repr(result) == "Record[id=12]" + assert repr(result) == "Record[12]" @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) @@ -369,19 +369,3 @@ def test_read_not_struct_type(decoder_class: Callable[[bytes], BinaryDecoder]) - _ = StructReader(((0, IntegerReader()),), str, struct).read(decoder) # type: ignore assert "Incompatible with StructProtocol: " in str(exc_info.value) - - -@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_struct_exception_handling(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: - decoder = decoder_class(b"\x18") - - def raise_err(struct: StructType) -> None: - raise TypeError("boom") - - struct = StructType(NestedField(1, "id", IntegerType(), required=True)) - # You can also pass in an arbitrary function that returns a struct - - with pytest.raises(ValueError) as exc_info: - _ = StructReader(((0, IntegerReader()),), raise_err, struct).read(decoder) # type: ignore - - assert "Unable to initialize struct:" in str(exc_info.value) diff --git a/tests/avro/test_resolver.py b/tests/avro/test_resolver.py index b5388b5ebb..26b44e8e23 100644 --- a/tests/avro/test_resolver.py +++ b/tests/avro/test_resolver.py @@ -289,15 +289,15 @@ class Ints(Record): c: int = Field() d: Optional[int] = Field() - MANIFEST_ENTRY_SCHEMA = Schema( + ints_schema = Schema( NestedField(3, "c", IntegerType(), required=True), NestedField(4, "d", IntegerType(), required=False), ) - with AvroFile[Ints](PyArrowFileIO().new_input(tmp_avro_file), MANIFEST_ENTRY_SCHEMA, {-1: Ints}) as reader: + with AvroFile[Ints](PyArrowFileIO().new_input(tmp_avro_file), ints_schema, {-1: Ints}) as reader: records = list(reader) - assert repr(records) == "[Ints[c=3, d=None]]" + assert repr(records) == "[Ints[3, None]]" def test_resolver_initial_value() -> None: diff --git a/tests/avro/test_writer.py b/tests/avro/test_writer.py index 39b8ecc393..bcd200030f 100644 --- a/tests/avro/test_writer.py +++ b/tests/avro/test_writer.py @@ -19,7 +19,6 @@ import io import struct from _decimal import Decimal -from typing import Dict, List import pytest @@ -147,16 +146,11 @@ def test_write_simple_struct() -> None: schema = StructType( NestedField(1, "id", IntegerType(), required=True), NestedField(2, "property", StringType(), required=True) ) - - class MyStruct(Record): - id: int - property: str - - my_struct = MyStruct(id=12, property="awesome") + struct = Record(12, "awesome") enc_str = b"awesome" - construct_writer(schema).write(encoder, my_struct) + construct_writer(schema).write(encoder, struct) assert output.getbuffer() == b"".join([b"\x18", zigzag_encode(len(enc_str)), enc_str]) @@ -170,18 +164,13 @@ def test_write_struct_with_dict() -> None: NestedField(2, "properties", MapType(3, IntegerType(), 4, IntegerType()), required=True), ) - class MyStruct(Record): - id: int - properties: Dict[int, int] - - my_struct = MyStruct(id=12, properties={1: 2, 3: 4}) - - construct_writer(schema).write(encoder, my_struct) + struct = Record(12, {1: 2, 3: 4}) + construct_writer(schema).write(encoder, struct) assert output.getbuffer() == b"".join( [ b"\x18", - zigzag_encode(len(my_struct.properties)), + zigzag_encode(len(struct[1])), zigzag_encode(1), zigzag_encode(2), zigzag_encode(3), @@ -200,18 +189,14 @@ def test_write_struct_with_list() -> None: NestedField(2, "properties", ListType(3, IntegerType()), required=True), ) - class MyStruct(Record): - id: int - properties: List[int] - - my_struct = MyStruct(id=12, properties=[1, 2, 3, 4]) + struct = Record(12, [1, 2, 3, 4]) - construct_writer(schema).write(encoder, my_struct) + construct_writer(schema).write(encoder, struct) assert output.getbuffer() == b"".join( [ b"\x18", - zigzag_encode(len(my_struct.properties)), + zigzag_encode(len(struct[1])), zigzag_encode(1), zigzag_encode(2), zigzag_encode(3), diff --git a/tests/conftest.py b/tests/conftest.py index a0e5e74522..5b024cc972 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2295,7 +2295,7 @@ def data_file(table_schema_simple: Schema, tmp_path: str) -> str: @pytest.fixture def example_task(data_file: str) -> FileScanTask: return FileScanTask( - data_file=DataFile(file_path=data_file, file_format=FileFormat.PARQUET, file_size_in_bytes=1925), + data_file=DataFile.from_args(file_path=data_file, file_format=FileFormat.PARQUET, file_size_in_bytes=1925), ) diff --git a/tests/expressions/test_evaluator.py b/tests/expressions/test_evaluator.py index e2b1f27377..7b15099105 100644 --- a/tests/expressions/test_evaluator.py +++ b/tests/expressions/test_evaluator.py @@ -42,6 +42,7 @@ from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator, _StrictMetricsEvaluator from pyiceberg.manifest import DataFile, FileFormat from pyiceberg.schema import Schema +from pyiceberg.typedef import Record from pyiceberg.types import ( DoubleType, FloatType, @@ -91,7 +92,7 @@ def schema_data_file() -> Schema: @pytest.fixture def data_file() -> DataFile: - return DataFile( + return DataFile.from_args( file_path="file_1.parquet", file_format=FileFormat.PARQUET, partition={}, @@ -133,7 +134,7 @@ def data_file() -> DataFile: @pytest.fixture def data_file_2() -> DataFile: - return DataFile( + return DataFile.from_args( file_path="file_2.parquet", file_format=FileFormat.PARQUET, partition={}, @@ -149,7 +150,7 @@ def data_file_2() -> DataFile: @pytest.fixture def data_file_3() -> DataFile: - return DataFile( + return DataFile.from_args( file_path="file_3.parquet", file_format=FileFormat.PARQUET, partition={}, @@ -165,7 +166,7 @@ def data_file_3() -> DataFile: @pytest.fixture def data_file_4() -> DataFile: - return DataFile( + return DataFile.from_args( file_path="file_4.parquet", file_format=FileFormat.PARQUET, partition={}, @@ -288,10 +289,10 @@ def test_missing_stats() -> None: NestedField(2, "no_stats", DoubleType(), required=False), ) - no_stats_file = DataFile( + no_stats_file = DataFile.from_args( file_path="file_1.parquet", file_format=FileFormat.PARQUET, - partition={}, + partition=Record(), record_count=50, value_counts=None, null_value_counts=None, @@ -319,7 +320,9 @@ def test_missing_stats() -> None: def test_zero_record_file_stats(schema_data_file: Schema) -> None: - zero_record_data_file = DataFile(file_path="file_1.parquet", file_format=FileFormat.PARQUET, partition={}, record_count=0) + zero_record_data_file = DataFile.from_args( + file_path="file_1.parquet", file_format=FileFormat.PARQUET, partition=Record(), record_count=0 + ) expressions = [ LessThan("no_stats", 5), @@ -636,7 +639,7 @@ def schema_data_file_nan() -> Schema: @pytest.fixture def data_file_nan() -> DataFile: - return DataFile( + return DataFile.from_args( file_path="file.avro", file_format=FileFormat.PARQUET, partition={}, @@ -949,7 +952,7 @@ def strict_data_file_schema() -> Schema: @pytest.fixture def strict_data_file_1() -> DataFile: - return DataFile( + return DataFile.from_args( file_path="file_1.parquet", file_format=FileFormat.PARQUET, partition={}, @@ -990,7 +993,7 @@ def strict_data_file_1() -> DataFile: @pytest.fixture def strict_data_file_2() -> DataFile: - return DataFile( + return DataFile.from_args( file_path="file_2.parquet", file_format=FileFormat.PARQUET, partition={}, @@ -1015,7 +1018,7 @@ def strict_data_file_2() -> DataFile: @pytest.fixture def strict_data_file_3() -> DataFile: - return DataFile( + return DataFile.from_args( file_path="file_3.parquet", file_format=FileFormat.PARQUET, partition={}, @@ -1147,10 +1150,10 @@ def test_strict_missing_stats(strict_data_file_schema: Schema, strict_data_file_ NestedField(2, "no_stats", DoubleType(), required=False), ) - no_stats_file = DataFile( + no_stats_file = DataFile.from_args( file_path="file_1.parquet", file_format=FileFormat.PARQUET, - partition={}, + partition=Record(), record_count=50, value_counts=None, null_value_counts=None, @@ -1178,7 +1181,9 @@ def test_strict_missing_stats(strict_data_file_schema: Schema, strict_data_file_ def test_strict_zero_record_file_stats(strict_data_file_schema: Schema) -> None: - zero_record_data_file = DataFile(file_path="file_1.parquet", file_format=FileFormat.PARQUET, partition={}, record_count=0) + zero_record_data_file = DataFile.from_args( + file_path="file_1.parquet", file_format=FileFormat.PARQUET, partition=Record(), record_count=0 + ) expressions = [ LessThan("no_stats", 5), diff --git a/tests/expressions/test_expressions.py b/tests/expressions/test_expressions.py index 4926b70121..3797fb9a61 100644 --- a/tests/expressions/test_expressions.py +++ b/tests/expressions/test_expressions.py @@ -64,9 +64,6 @@ from pyiceberg.schema import Accessor, Schema from pyiceberg.typedef import Record from pyiceberg.types import ( - BinaryType, - BooleanType, - DecimalType, DoubleType, FloatType, IntegerType, @@ -75,7 +72,6 @@ NestedField, StringType, StructType, - UUIDType, ) from pyiceberg.utils.singleton import Singleton @@ -613,22 +609,7 @@ def test_invert_always() -> None: def test_accessor_base_class() -> None: """Test retrieving a value at a position of a container using an accessor""" - struct = Record( - struct=StructType( - NestedField(1, "a", StringType()), - NestedField(2, "b", StringType()), - NestedField(3, "c", StringType()), - NestedField(4, "d", IntegerType()), - NestedField(5, "e", IntegerType()), - NestedField(6, "f", IntegerType()), - NestedField(7, "g", FloatType()), - NestedField(8, "h", DecimalType(8, 4)), - NestedField(9, "i", UUIDType()), - NestedField(10, "j", BooleanType()), - NestedField(11, "k", BooleanType()), - NestedField(12, "l", BinaryType()), - ) - ) + struct = Record(*[None] * 12) uuid_value = uuid.uuid4() @@ -968,11 +949,7 @@ def test_less_than_or_equal() -> None: def test_bound_reference_eval(table_schema_simple: Schema) -> None: """Test creating a BoundReference and evaluating it on a StructProtocol""" - struct = Record(struct=table_schema_simple.as_struct()) - - struct[0] = "foovalue" - struct[1] = 123 - struct[2] = True + struct = Record("foovalue", 123, True) position1_accessor = Accessor(position=0) position2_accessor = Accessor(position=1) diff --git a/tests/expressions/test_residual_evaluator.py b/tests/expressions/test_residual_evaluator.py index cf01821787..ba0a0da2e5 100644 --- a/tests/expressions/test_residual_evaluator.py +++ b/tests/expressions/test_residual_evaluator.py @@ -58,7 +58,7 @@ def test_identity_transform_residual() -> None: ) res_eval = residual_evaluator_of(spec=spec, expr=predicate, case_sensitive=True, schema=schema) - residual = res_eval.residual_for(Record(dateint=20170815)) + residual = res_eval.residual_for(Record(20170815)) # assert residual == True assert isinstance(residual, LessThan) @@ -67,7 +67,7 @@ def test_identity_transform_residual() -> None: assert residual.literal.value == 12 assert type(residual) is LessThan - residual = res_eval.residual_for(Record(dateint=20170801)) + residual = res_eval.residual_for(Record(20170801)) # assert isinstance(residual, UnboundPredicate) from pyiceberg.expressions import LiteralPredicate @@ -79,11 +79,11 @@ def test_identity_transform_residual() -> None: assert residual.literal.value == 11 # type :ignore # assert type(residual) == BoundGreaterThan - residual = res_eval.residual_for(Record(dateint=20170812)) + residual = res_eval.residual_for(Record(20170812)) assert residual == AlwaysTrue() - residual = res_eval.residual_for(Record(dateint=20170817)) + residual = res_eval.residual_for(Record(20170817)) assert residual == AlwaysFalse() @@ -103,7 +103,7 @@ def test_case_insensitive_identity_transform_residuals() -> None: res_eval = residual_evaluator_of(spec=spec, expr=predicate, case_sensitive=True, schema=schema) with pytest.raises(ValueError) as e: - res_eval.residual_for(Record(dateint=20170815)) + res_eval.residual_for(Record(20170815)) assert "Could not find field with name DATEINT, case_sensitive=True" in str(e.value) @@ -142,7 +142,7 @@ def test_in() -> None: res_eval = residual_evaluator_of(spec=spec, expr=predicate, case_sensitive=True, schema=schema) - residual = res_eval.residual_for(Record(dateint=20170815)) + residual = res_eval.residual_for(Record(20170815)) assert residual == AlwaysTrue() @@ -178,10 +178,10 @@ def test_not_in() -> None: res_eval = residual_evaluator_of(spec=spec, expr=predicate, case_sensitive=True, schema=schema) - residual = res_eval.residual_for(Record(dateint=20180815)) + residual = res_eval.residual_for(Record(20180815)) assert residual == AlwaysTrue() - residual = res_eval.residual_for(Record(dateint=20170815)) + residual = res_eval.residual_for(Record(20170815)) assert residual == AlwaysFalse() @@ -194,10 +194,10 @@ def test_is_nan() -> None: res_eval = residual_evaluator_of(spec=spec, expr=predicate, case_sensitive=True, schema=schema) - residual = res_eval.residual_for(Record(double=float("nan"))) + residual = res_eval.residual_for(Record(float("nan"))) assert residual == AlwaysTrue() - residual = res_eval.residual_for(Record(double=2)) + residual = res_eval.residual_for(Record(2)) assert residual == AlwaysFalse() @@ -210,10 +210,10 @@ def test_is_not_nan() -> None: res_eval = residual_evaluator_of(spec=spec, expr=predicate, case_sensitive=True, schema=schema) - residual = res_eval.residual_for(Record(double=None)) + residual = res_eval.residual_for(Record(None)) assert residual == AlwaysFalse() - residual = res_eval.residual_for(Record(double=2)) + residual = res_eval.residual_for(Record(2)) assert residual == AlwaysTrue() spec = PartitionSpec(PartitionField(51, 1051, IdentityTransform(), "float_part")) @@ -222,10 +222,10 @@ def test_is_not_nan() -> None: res_eval = residual_evaluator_of(spec=spec, expr=predicate, case_sensitive=True, schema=schema) - residual = res_eval.residual_for(Record(double=None)) + residual = res_eval.residual_for(Record(None)) assert residual == AlwaysFalse() - residual = res_eval.residual_for(Record(double=2)) + residual = res_eval.residual_for(Record(2)) assert residual == AlwaysTrue() diff --git a/tests/expressions/test_visitors.py b/tests/expressions/test_visitors.py index 94bfcf076c..7ee09b8607 100644 --- a/tests/expressions/test_visitors.py +++ b/tests/expressions/test_visitors.py @@ -822,7 +822,7 @@ def _to_byte_buffer(field_type: IcebergType, val: Any) -> bytes: def _to_manifest_file(*partitions: PartitionFieldSummary) -> ManifestFile: """Helper to create a ManifestFile""" - return ManifestFile(manifest_path="", manifest_length=0, partition_spec_id=0, partitions=partitions) + return ManifestFile.from_args(manifest_path="", manifest_length=0, partition_spec_id=0, partitions=partitions) INT_MIN_VALUE = 30 @@ -863,81 +863,81 @@ def manifest_no_stats() -> ManifestFile: def manifest() -> ManifestFile: return _to_manifest_file( # id - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=False, contains_nan=None, lower_bound=INT_MIN, upper_bound=INT_MAX, ), # all_nulls_missing_nan - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=True, contains_nan=None, lower_bound=None, upper_bound=None, ), # some_nulls - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=True, contains_nan=None, lower_bound=STRING_MIN, upper_bound=STRING_MAX, ), # no_nulls - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=False, contains_nan=None, lower_bound=STRING_MIN, upper_bound=STRING_MAX, ), # float - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=True, contains_nan=None, lower_bound=_to_byte_buffer(FloatType(), 0.0), upper_bound=_to_byte_buffer(FloatType(), 20.0), ), # all_nulls_double - PartitionFieldSummary(contains_null=True, contains_nan=None, lower_bound=None, upper_bound=None), + PartitionFieldSummary.from_args(contains_null=True, contains_nan=None, lower_bound=None, upper_bound=None), # all_nulls_no_nans - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=True, contains_nan=False, lower_bound=None, upper_bound=None, ), # all_nans - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=False, contains_nan=True, lower_bound=None, upper_bound=None, ), # both_nan_and_null - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=True, contains_nan=True, lower_bound=None, upper_bound=None, ), # no_nan_or_null - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=False, contains_nan=False, lower_bound=_to_byte_buffer(FloatType(), 0.0), upper_bound=_to_byte_buffer(FloatType(), 20.0), ), # all_nulls_missing_nan_float - PartitionFieldSummary(contains_null=True, contains_nan=None, lower_bound=None, upper_bound=None), + PartitionFieldSummary.from_args(contains_null=True, contains_nan=None, lower_bound=None, upper_bound=None), # all_same_value_or_null - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=True, contains_nan=None, lower_bound=STRING_MIN, upper_bound=STRING_MIN, ), # no_nulls_same_value_a - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=False, contains_nan=None, lower_bound=STRING_MIN, @@ -1607,7 +1607,7 @@ def test_dnf_to_dask(table_schema_simple: Schema) -> None: def test_expression_evaluator_null() -> None: - struct = Record(a=None) + struct = Record(None) schema = Schema(NestedField(1, "a", IntegerType(), required=False), schema_id=1) assert expression_evaluator(schema, In("a", {1, 2, 3}), case_sensitive=True)(struct) is False assert expression_evaluator(schema, NotIn("a", {1, 2, 3}), case_sensitive=True)(struct) is True diff --git a/tests/integration/test_partitioning_key.py b/tests/integration/test_partitioning_key.py index 1066753655..f9bdd4eead 100644 --- a/tests/integration/test_partitioning_key.py +++ b/tests/integration/test_partitioning_key.py @@ -26,7 +26,7 @@ from pyiceberg.catalog import Catalog from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec -from pyiceberg.schema import Schema, make_compatible_name +from pyiceberg.schema import Schema from pyiceberg.transforms import ( BucketTransform, DayTransform, @@ -84,7 +84,7 @@ ( [PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="boolean_field")], [False], - Record(boolean_field=False), + Record(False), "boolean_field=false", f"""CREATE TABLE {identifier} ( boolean_field boolean, @@ -103,7 +103,7 @@ ( [PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="string_field")], ["sample_string"], - Record(string_field="sample_string"), + Record("sample_string"), "string_field=sample_string", f"""CREATE TABLE {identifier} ( string_field string, @@ -122,7 +122,7 @@ ( [PartitionField(source_id=4, field_id=1001, transform=IdentityTransform(), name="int_field")], [42], - Record(int_field=42), + Record(42), "int_field=42", f"""CREATE TABLE {identifier} ( int_field int, @@ -141,7 +141,7 @@ ( [PartitionField(source_id=5, field_id=1001, transform=IdentityTransform(), name="long_field")], [1234567890123456789], - Record(long_field=1234567890123456789), + Record(1234567890123456789), "long_field=1234567890123456789", f"""CREATE TABLE {identifier} ( long_field bigint, @@ -160,7 +160,7 @@ ( [PartitionField(source_id=6, field_id=1001, transform=IdentityTransform(), name="float_field")], [3.14], - Record(float_field=3.14), + Record(3.14), "float_field=3.14", # spark writes differently as pyiceberg, Record[float_field=3.140000104904175], path:float_field=3.14 (Record has difference) # so justification (compare expected value with spark behavior) would fail. @@ -183,7 +183,7 @@ ( [PartitionField(source_id=7, field_id=1001, transform=IdentityTransform(), name="double_field")], [6.282], - Record(double_field=6.282), + Record(6.282), "double_field=6.282", # spark writes differently as pyiceberg, Record[double_field=6.2820000648498535] path:double_field=6.282 (Record has difference) # so justification (compare expected value with spark behavior) would fail. @@ -206,7 +206,7 @@ ( [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], [datetime(2023, 1, 1, 12, 0, 1, 999)], - Record(timestamp_field=1672574401000999), + Record(1672574401000999), "timestamp_field=2023-01-01T12%3A00%3A01.000999", f"""CREATE TABLE {identifier} ( timestamp_field timestamp_ntz, @@ -225,7 +225,7 @@ ( [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], [datetime(2023, 1, 1, 12, 0, 1)], - Record(timestamp_field=1672574401000000), + Record(1672574401000000), "timestamp_field=2023-01-01T12%3A00%3A01", f"""CREATE TABLE {identifier} ( timestamp_field timestamp_ntz, @@ -244,7 +244,7 @@ ( [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], [datetime(2023, 1, 1, 12, 0, 0)], - Record(timestamp_field=1672574400000000), + Record(1672574400000000), "timestamp_field=2023-01-01T12%3A00%3A00", # Spark writes differently as pyiceberg, so justification (compare expected value with spark behavior) would fail # AssertionError: assert 'timestamp_field=2023-01-01T12%3A00%3A00' in 's3://warehouse/default/test_table/data/timestamp_field=2023-01-01T12%3A00/00000-5-f9dca69a-9fb7-4830-9ef6-62d3d7afc09e-00001.parquet' @@ -268,7 +268,7 @@ ( [PartitionField(source_id=9, field_id=1001, transform=IdentityTransform(), name="timestamptz_field")], [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field=1672563601000999), + Record(1672563601000999), "timestamptz_field=2023-01-01T09%3A00%3A01.000999%2B00%3A00", # Spark writes differently as pyiceberg, so justification (compare expected value with spark behavior) would fail # AssertionError: assert 'timestamptz_field=2023-01-01T09%3A00%3A01.000999%2B00%3A00' in 's3://warehouse/default/test_table/data/timestamptz_field=2023-01-01T09%3A00%3A01.000999Z/00000-5-b710fc4d-66b6-47f1-b8ae-6208f8aaa2d4-00001.parquet' @@ -292,7 +292,7 @@ ( [PartitionField(source_id=10, field_id=1001, transform=IdentityTransform(), name="date_field")], [date(2023, 1, 1)], - Record(date_field=19358), + Record(19358), "date_field=2023-01-01", f"""CREATE TABLE {identifier} ( date_field date, @@ -311,7 +311,7 @@ ( [PartitionField(source_id=14, field_id=1001, transform=IdentityTransform(), name="uuid_field")], [uuid.UUID("f47ac10b-58cc-4372-a567-0e02b2c3d479")], - Record(uuid_field="f47ac10b-58cc-4372-a567-0e02b2c3d479"), + Record("f47ac10b-58cc-4372-a567-0e02b2c3d479"), "uuid_field=f47ac10b-58cc-4372-a567-0e02b2c3d479", f"""CREATE TABLE {identifier} ( uuid_field string, @@ -330,7 +330,7 @@ ( [PartitionField(source_id=11, field_id=1001, transform=IdentityTransform(), name="binary_field")], [b"example"], - Record(binary_field=b"example"), + Record(b"example"), "binary_field=ZXhhbXBsZQ%3D%3D", f"""CREATE TABLE {identifier} ( binary_field binary, @@ -349,7 +349,7 @@ ( [PartitionField(source_id=13, field_id=1001, transform=IdentityTransform(), name="decimal_field")], [Decimal("123.45")], - Record(decimal_field=Decimal("123.45")), + Record(Decimal("123.45")), "decimal_field=123.45", f"""CREATE TABLE {identifier} ( decimal_field decimal(5,2), @@ -370,7 +370,7 @@ ( [PartitionField(source_id=8, field_id=1001, transform=MonthTransform(), name="timestamp_field_month")], [datetime(2023, 1, 1, 11, 55, 59, 999999)], - Record(timestamp_field_month=((2023 - 1970) * 12)), + Record((2023 - 1970) * 12), "timestamp_field_month=2023-01", f"""CREATE TABLE {identifier} ( timestamp_field timestamp_ntz, @@ -389,7 +389,7 @@ ( [PartitionField(source_id=9, field_id=1001, transform=MonthTransform(), name="timestamptz_field_month")], [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field_month=((2023 - 1970) * 12 + 1 - 1)), + Record((2023 - 1970) * 12 + 1 - 1), "timestamptz_field_month=2023-01", f"""CREATE TABLE {identifier} ( timestamptz_field timestamp, @@ -408,7 +408,7 @@ ( [PartitionField(source_id=10, field_id=1001, transform=MonthTransform(), name="date_field_month")], [date(2023, 1, 1)], - Record(date_field_month=((2023 - 1970) * 12)), + Record((2023 - 1970) * 12), "date_field_month=2023-01", f"""CREATE TABLE {identifier} ( date_field date, @@ -428,7 +428,7 @@ ( [PartitionField(source_id=8, field_id=1001, transform=YearTransform(), name="timestamp_field_year")], [datetime(2023, 1, 1, 11, 55, 59, 999999)], - Record(timestamp_field_year=(2023 - 1970)), + Record(2023 - 1970), "timestamp_field_year=2023", f"""CREATE TABLE {identifier} ( timestamp_field timestamp, @@ -447,7 +447,7 @@ ( [PartitionField(source_id=9, field_id=1001, transform=YearTransform(), name="timestamptz_field_year")], [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field_year=53), + Record(53), "timestamptz_field_year=2023", f"""CREATE TABLE {identifier} ( timestamptz_field timestamp, @@ -466,7 +466,7 @@ ( [PartitionField(source_id=10, field_id=1001, transform=YearTransform(), name="date_field_year")], [date(2023, 1, 1)], - Record(date_field_year=(2023 - 1970)), + Record(2023 - 1970), "date_field_year=2023", f"""CREATE TABLE {identifier} ( date_field date, @@ -486,7 +486,7 @@ ( [PartitionField(source_id=8, field_id=1001, transform=DayTransform(), name="timestamp_field_day")], [datetime(2023, 1, 1, 11, 55, 59, 999999)], - Record(timestamp_field_day=19358), + Record(19358), "timestamp_field_day=2023-01-01", f"""CREATE TABLE {identifier} ( timestamp_field timestamp, @@ -505,7 +505,7 @@ ( [PartitionField(source_id=9, field_id=1001, transform=DayTransform(), name="timestamptz_field_day")], [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field_day=19358), + Record(19358), "timestamptz_field_day=2023-01-01", f"""CREATE TABLE {identifier} ( timestamptz_field timestamp, @@ -524,7 +524,7 @@ ( [PartitionField(source_id=10, field_id=1001, transform=DayTransform(), name="date_field_day")], [date(2023, 1, 1)], - Record(date_field_day=19358), + Record(19358), "date_field_day=2023-01-01", f"""CREATE TABLE {identifier} ( date_field date, @@ -544,7 +544,7 @@ ( [PartitionField(source_id=8, field_id=1001, transform=HourTransform(), name="timestamp_field_hour")], [datetime(2023, 1, 1, 11, 55, 59, 999999)], - Record(timestamp_field_hour=464603), + Record(464603), "timestamp_field_hour=2023-01-01-11", f"""CREATE TABLE {identifier} ( timestamp_field timestamp, @@ -563,7 +563,7 @@ ( [PartitionField(source_id=9, field_id=1001, transform=HourTransform(), name="timestamptz_field_hour")], [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field_hour=464601), + Record(464601), "timestamptz_field_hour=2023-01-01-09", f"""CREATE TABLE {identifier} ( timestamptz_field timestamp, @@ -583,7 +583,7 @@ ( [PartitionField(source_id=4, field_id=1001, transform=TruncateTransform(10), name="int_field_trunc")], [12345], - Record(int_field_trunc=12340), + Record(12340), "int_field_trunc=12340", f"""CREATE TABLE {identifier} ( int_field int, @@ -602,7 +602,7 @@ ( [PartitionField(source_id=5, field_id=1001, transform=TruncateTransform(2), name="bigint_field_trunc")], [2**32 + 1], - Record(bigint_field_trunc=2**32), # 4294967296 + Record(2**32), # 4294967296 "bigint_field_trunc=4294967296", f"""CREATE TABLE {identifier} ( bigint_field bigint, @@ -621,7 +621,7 @@ ( [PartitionField(source_id=2, field_id=1001, transform=TruncateTransform(3), name="string_field_trunc")], ["abcdefg"], - Record(string_field_trunc="abc"), + Record("abc"), "string_field_trunc=abc", f"""CREATE TABLE {identifier} ( string_field string, @@ -640,7 +640,7 @@ ( [PartitionField(source_id=13, field_id=1001, transform=TruncateTransform(width=5), name="decimal_field_trunc")], [Decimal("678.93")], - Record(decimal_field_trunc=Decimal("678.90")), + Record(Decimal("678.90")), "decimal_field_trunc=678.90", # Assuming truncation width of 1 leads to truncating to 670 f"""CREATE TABLE {identifier} ( decimal_field decimal(5,2), @@ -659,7 +659,7 @@ ( [PartitionField(source_id=11, field_id=1001, transform=TruncateTransform(10), name="binary_field_trunc")], [b"HELLOICEBERG"], - Record(binary_field_trunc=b"HELLOICEBE"), + Record(b"HELLOICEBE"), "binary_field_trunc=SEVMTE9JQ0VCRQ%3D%3D", f"""CREATE TABLE {identifier} ( binary_field binary, @@ -679,7 +679,7 @@ ( [PartitionField(source_id=4, field_id=1001, transform=BucketTransform(2), name="int_field_bucket")], [10], - Record(int_field_bucket=0), + Record(0), "int_field_bucket=0", f"""CREATE TABLE {identifier} ( int_field int, @@ -705,7 +705,7 @@ datetime(2023, 1, 1, 11, 55, 59, 999999), date(2023, 1, 1), ], - Record(timestamp_field_year=53, date_field_day=19358), + Record(53, 19358), "timestamp_field_year=2023/date_field_day=2023-01-01", f"""CREATE TABLE {identifier} ( timestamp_field timestamp, @@ -727,7 +727,7 @@ ( [PartitionField(source_id=15, field_id=1001, transform=IdentityTransform(), name="special#string+field")], ["special string"], - Record(**{"special#string+field": "special string"}), # type: ignore + Record("special string"), "special%23string%2Bfield=special+string", f"""CREATE TABLE {identifier} ( `special#string+field` string @@ -792,6 +792,5 @@ def test_partition_key( snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.file_path ) # Special characters in partition value are sanitized when written to the data file's partition field - sanitized_record = Record(**{make_compatible_name(k): v for k, v in vars(expected_partition_record).items()}) - assert spark_partition_for_justification == sanitized_record + assert spark_partition_for_justification == expected_partition_record assert expected_hive_partition_path_slice in spark_path_for_justification diff --git a/tests/integration/test_rest_manifest.py b/tests/integration/test_rest_manifest.py index 82c41cfd93..8ddddfba5d 100644 --- a/tests/integration/test_rest_manifest.py +++ b/tests/integration/test_rest_manifest.py @@ -80,7 +80,7 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None: entry = test_manifest_entries[0] test_schema = table_test_all_types.schema() test_spec = table_test_all_types.spec() - wrapped_data_file_v2_debug = DataFile( + wrapped_data_file_v2_debug = DataFile.from_args( format_version=2, content=entry.data_file.content, file_path=entry.data_file.file_path, @@ -103,8 +103,6 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None: wrapped_entry_v2 = copy(entry) wrapped_entry_v2.data_file = wrapped_data_file_v2_debug wrapped_entry_v2_dict = todict(wrapped_entry_v2) - # This one should not be written - del wrapped_entry_v2_dict["data_file"]["spec_id"] with TemporaryDirectory() as tmpdir: tmp_avro_file = tmpdir + "/test_write_manifest.avro" diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 3f43d9215a..ffbcaa4044 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -977,7 +977,7 @@ def project( ).to_table( tasks=[ FileScanTask( - DataFile( + DataFile.from_args( content=DataFileContent.DATA, file_path=file, file_format=FileFormat.PARQUET, @@ -1163,12 +1163,12 @@ def test_identity_transform_column_projection(tmp_path: str, catalog: InMemoryCa parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), ) - unpartitioned_file = DataFile( + unpartitioned_file = DataFile.from_args( content=DataFileContent.DATA, file_path=file_loc, file_format=FileFormat.PARQUET, # projected value - partition=Record(partition_id=1), + partition=Record(1), file_size_in_bytes=os.path.getsize(file_loc), sort_order_id=None, spec_id=table.metadata.default_spec_id, @@ -1225,12 +1225,12 @@ def test_identity_transform_columns_projection(tmp_path: str, catalog: InMemoryC parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), ) - unpartitioned_file = DataFile( + unpartitioned_file = DataFile.from_args( content=DataFileContent.DATA, file_path=file_loc, file_format=FileFormat.PARQUET, # projected value - partition=Record(field_2=2, field_3=3), + partition=Record(2, 3), file_size_in_bytes=os.path.getsize(file_loc), sort_order_id=None, spec_id=table.metadata.default_spec_id, @@ -1540,7 +1540,7 @@ def deletes_file(tmp_path: str, example_task: FileScanTask) -> str: def test_read_deletes(deletes_file: str, example_task: FileScanTask) -> None: - deletes = _read_deletes(LocalFileSystem(), DataFile(file_path=deletes_file, file_format=FileFormat.PARQUET)) + deletes = _read_deletes(LocalFileSystem(), DataFile.from_args(file_path=deletes_file, file_format=FileFormat.PARQUET)) assert set(deletes.keys()) == {example_task.file.file_path} assert list(deletes.values())[0] == pa.chunked_array([[1, 3, 5]]) @@ -1549,7 +1549,9 @@ def test_delete(deletes_file: str, example_task: FileScanTask, table_schema_simp metadata_location = "file://a/b/c.json" example_task_with_delete = FileScanTask( data_file=example_task.file, - delete_files={DataFile(content=DataFileContent.POSITION_DELETES, file_path=deletes_file, file_format=FileFormat.PARQUET)}, + delete_files={ + DataFile.from_args(content=DataFileContent.POSITION_DELETES, file_path=deletes_file, file_format=FileFormat.PARQUET) + }, ) with_deletes = ArrowScan( table_metadata=TableMetadataV2( @@ -1583,8 +1585,8 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_ example_task_with_delete = FileScanTask( data_file=example_task.file, delete_files={ - DataFile(content=DataFileContent.POSITION_DELETES, file_path=deletes_file, file_format=FileFormat.PARQUET), - DataFile(content=DataFileContent.POSITION_DELETES, file_path=deletes_file, file_format=FileFormat.PARQUET), + DataFile.from_args(content=DataFileContent.POSITION_DELETES, file_path=deletes_file, file_format=FileFormat.PARQUET), + DataFile.from_args(content=DataFileContent.POSITION_DELETES, file_path=deletes_file, file_format=FileFormat.PARQUET), }, ) @@ -2143,12 +2145,12 @@ def test_partition_for_demo() -> None: ) result = _determine_partitions(partition_spec, test_schema, arrow_table) assert {table_partition.partition_key.partition for table_partition in result} == { - Record(n_legs_identity=2, year_identity=2020), - Record(n_legs_identity=100, year_identity=2021), - Record(n_legs_identity=4, year_identity=2021), - Record(n_legs_identity=4, year_identity=2022), - Record(n_legs_identity=2, year_identity=2022), - Record(n_legs_identity=5, year_identity=2019), + Record(2, 2020), + Record(100, 2021), + Record(4, 2021), + Record(4, 2022), + Record(2, 2022), + Record(5, 2019), } assert ( pa.concat_tables([table_partition.arrow_table_partition for table_partition in result]).num_rows == arrow_table.num_rows @@ -2172,7 +2174,7 @@ def test_identity_partition_on_multi_columns() -> None: (None, 4, "Kirin"), (2021, None, "Fish"), ] * 2 - expected = {Record(n_legs_identity=test_rows[i][1], year_identity=test_rows[i][0]) for i in range(len(test_rows))} + expected = {Record(test_rows[i][1], test_rows[i][0]) for i in range(len(test_rows))} partition_spec = PartitionSpec( PartitionField(source_id=2, field_id=1002, transform=IdentityTransform(), name="n_legs_identity"), PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="year_identity"), diff --git a/tests/io/test_pyarrow_stats.py b/tests/io/test_pyarrow_stats.py index 788891711e..0e9f69ec96 100644 --- a/tests/io/test_pyarrow_stats.py +++ b/tests/io/test_pyarrow_stats.py @@ -194,7 +194,7 @@ def test_record_count() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert datafile.record_count == 4 @@ -207,7 +207,7 @@ def test_value_counts() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 7 assert datafile.value_counts[1] == 4 @@ -228,7 +228,7 @@ def test_column_sizes() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.column_sizes) == 7 # these values are an artifact of how the write_table encodes the columns @@ -248,7 +248,7 @@ def test_null_and_nan_counts() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.null_value_counts) == 7 assert datafile.null_value_counts[1] == 1 @@ -275,7 +275,7 @@ def test_bounds() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.lower_bounds) == 2 assert datafile.lower_bounds[1].decode() == "aaaaaaaaaaaaaaaa" @@ -319,7 +319,7 @@ def test_metrics_mode_none() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 0 assert len(datafile.null_value_counts) == 0 @@ -338,7 +338,7 @@ def test_metrics_mode_counts() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 7 assert len(datafile.null_value_counts) == 7 @@ -357,7 +357,7 @@ def test_metrics_mode_full() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 7 assert len(datafile.null_value_counts) == 7 @@ -382,7 +382,7 @@ def test_metrics_mode_non_default_trunc() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 7 assert len(datafile.null_value_counts) == 7 @@ -408,7 +408,7 @@ def test_column_metrics_mode() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 6 assert len(datafile.null_value_counts) == 6 @@ -508,7 +508,7 @@ def test_metrics_primitive_types() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 12 assert len(datafile.null_value_counts) == 12 @@ -606,7 +606,7 @@ def test_metrics_invalid_upper_bound() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 4 assert len(datafile.null_value_counts) == 4 @@ -632,7 +632,7 @@ def test_offsets() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert datafile.split_offsets is not None assert len(datafile.split_offsets) == 1 @@ -707,7 +707,7 @@ def test_read_missing_statistics() -> None: parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) # expect only "strings" column values to be reflected in the # upper_bound, lower_bound and null_value_counts props of datafile diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 69bbab527e..69ff37507b 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -389,10 +389,10 @@ def test_static_table_io_does_not_exist(metadata_location: str) -> None: def test_match_deletes_to_datafile() -> None: - data_entry = ManifestEntry( + data_entry = ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, sequence_number=1, - data_file=DataFile( + data_file=DataFile.from_args( content=DataFileContent.DATA, file_path="s3://bucket/0000.parquet", file_format=FileFormat.PARQUET, @@ -401,10 +401,10 @@ def test_match_deletes_to_datafile() -> None: file_size_in_bytes=3, ), ) - delete_entry_1 = ManifestEntry( + delete_entry_1 = ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, sequence_number=0, # Older than the data - data_file=DataFile( + data_file=DataFile.from_args( content=DataFileContent.POSITION_DELETES, file_path="s3://bucket/0001-delete.parquet", file_format=FileFormat.PARQUET, @@ -413,10 +413,10 @@ def test_match_deletes_to_datafile() -> None: file_size_in_bytes=3, ), ) - delete_entry_2 = ManifestEntry( + delete_entry_2 = ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, sequence_number=3, - data_file=DataFile( + data_file=DataFile.from_args( content=DataFileContent.POSITION_DELETES, file_path="s3://bucket/0002-delete.parquet", file_format=FileFormat.PARQUET, @@ -440,10 +440,10 @@ def test_match_deletes_to_datafile() -> None: def test_match_deletes_to_datafile_duplicate_number() -> None: - data_entry = ManifestEntry( + data_entry = ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, sequence_number=1, - data_file=DataFile( + data_file=DataFile.from_args( content=DataFileContent.DATA, file_path="s3://bucket/0000.parquet", file_format=FileFormat.PARQUET, @@ -452,10 +452,10 @@ def test_match_deletes_to_datafile_duplicate_number() -> None: file_size_in_bytes=3, ), ) - delete_entry_1 = ManifestEntry( + delete_entry_1 = ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, sequence_number=3, - data_file=DataFile( + data_file=DataFile.from_args( content=DataFileContent.POSITION_DELETES, file_path="s3://bucket/0001-delete.parquet", file_format=FileFormat.PARQUET, @@ -470,10 +470,10 @@ def test_match_deletes_to_datafile_duplicate_number() -> None: upper_bounds={}, ), ) - delete_entry_2 = ManifestEntry( + delete_entry_2 = ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, sequence_number=3, - data_file=DataFile( + data_file=DataFile.from_args( content=DataFileContent.POSITION_DELETES, file_path="s3://bucket/0002-delete.parquet", file_format=FileFormat.PARQUET, diff --git a/tests/table/test_partitioning.py b/tests/table/test_partitioning.py index edda6d3aa8..57ab3e328a 100644 --- a/tests/table/test_partitioning.py +++ b/tests/table/test_partitioning.py @@ -158,7 +158,7 @@ def test_partition_spec_to_path() -> None: spec_id=3, ) - record = Record(**{"my#str%bucket": "my+str", "other str+bucket": "( )", "my!int:bucket": 10}) # type: ignore + record = Record("my+str", "( )", 10) # Both partition field names and values should be URL encoded, with spaces mapping to plus signs, to match the Java # behaviour: https://github.com/apache/iceberg/blob/ca3db931b0f024f0412084751ac85dd4ef2da7e7/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L198-L204 diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index b4dde217d4..db5c0c6e17 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -143,7 +143,7 @@ def test_snapshot_with_properties_repr(snapshot_with_properties: Snapshot) -> No @pytest.fixture def manifest_file() -> ManifestFile: - return ManifestFile( + return ManifestFile.from_args( content=ManifestContent.DATA, manifest_length=100, added_files_count=1, @@ -160,7 +160,7 @@ def test_snapshot_summary_collector(table_schema_simple: Schema) -> None: ssc = SnapshotSummaryCollector() assert ssc.build() == {} - data_file = DataFile(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record()) + data_file = DataFile.from_args(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record()) ssc.add_file(data_file, schema=table_schema_simple) assert ssc.build() == { @@ -183,8 +183,8 @@ def test_snapshot_summary_collector_with_partition() -> None: NestedField(field_id=3, name="int_field", field_type=IntegerType(), required=False), ) spec = PartitionSpec(PartitionField(source_id=3, field_id=1001, transform=IdentityTransform(), name="int_field")) - data_file_1 = DataFile(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record(int_field=1)) - data_file_2 = DataFile(content=DataFileContent.DATA, record_count=200, file_size_in_bytes=4321, partition=Record(int_field=2)) + data_file_1 = DataFile.from_args(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record(1)) + data_file_2 = DataFile.from_args(content=DataFileContent.DATA, record_count=200, file_size_in_bytes=4321, partition=Record(2)) # When ssc.add_file(data_file=data_file_1, schema=schema, partition_spec=spec) ssc.remove_file(data_file=data_file_1, schema=schema, partition_spec=spec) diff --git a/tests/test_typedef.py b/tests/test_typedef.py index 43388addca..bb4e0ac339 100644 --- a/tests/test_typedef.py +++ b/tests/test_typedef.py @@ -16,14 +16,7 @@ # under the License. import pytest -from pyiceberg.schema import Schema from pyiceberg.typedef import FrozenDict, KeyDefaultDict, Record -from pyiceberg.types import ( - IntegerType, - NestedField, - StringType, - StructType, -) def test_setitem_frozendict() -> None: @@ -46,44 +39,36 @@ def one(_: int) -> int: assert defaultdict[22] == 1 -def test_record_repr(table_schema_simple: Schema) -> None: - r = Record("vo", 1, True, struct=table_schema_simple.as_struct()) - assert repr(r) == "Record[foo='vo', bar=1, baz=True]" - - -def test_named_record() -> None: - r = Record(struct=StructType(NestedField(0, "id", IntegerType()), NestedField(1, "name", StringType()))) - - with pytest.raises(AttributeError): - assert r.id is None # type: ignore - - with pytest.raises(AttributeError): - assert r.name is None # type: ignore - - r[0] = 123 - r[1] = "abc" - - assert r[0] == 123 - assert r[1] == "abc" - - assert r.id == 123 # type: ignore - assert r.name == "abc" # type: ignore - - -def test_record_positional_args() -> None: - r = Record(1, "a", True) - assert repr(r) == "Record[field1=1, field2='a', field3=True]" - - def test_record_named_args() -> None: - r = Record(foo=1, bar="a", baz=True) - - assert r.foo == 1 # type: ignore - assert r.bar == "a" # type: ignore - assert r.baz is True # type: ignore + r = Record(1, "a", True) assert r[0] == 1 assert r[1] == "a" assert r[2] is True - assert repr(r) == "Record[foo=1, bar='a', baz=True]" + assert repr(r) == "Record[1, a, True]" + + +# +# def test_bind_record_nested(table_schema_nested: Schema) -> None: +# struct = table_schema_nested.as_struct() +# data = { +# "foo": "str", +# "bar": 123, +# "baz": True, +# "qux": ["a", "b", "c"], +# "quux": {"a": 1, "b": 2}, +# "location": [{"latitude": 52.377956, "longitude": 4.897070}, {"latitude": 4.897070, "longitude": -122.431297}], +# "person": {"name": "Fokko", "age": 35}, # Possible data quality issue +# } +# res = _bind_to_struct(struct, data) +# +# assert res == Record( +# "str", +# 123, +# True, +# ["a", "b", "c"], +# {"a": 1, "b": 2}, +# [Record(52.377956, 4.89707), Record(4.89707, -122.431297)], +# Record("Fokko", 35), +# ) diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 3b1fc6f013..8d793c468c 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -61,7 +61,7 @@ def _verify_metadata_with_fastavro(avro_file: str, expected_metadata: Dict[str, def test_read_manifest_entry(generated_manifest_entry_file: str) -> None: - manifest = ManifestFile( + manifest = ManifestFile.from_args( manifest_path=generated_manifest_entry_file, manifest_length=0, partition_spec_id=0, @@ -85,7 +85,7 @@ def test_read_manifest_entry(generated_manifest_entry_file: str) -> None: == "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet" ) assert data_file.file_format == FileFormat.PARQUET - assert repr(data_file.partition) == "Record[VendorID=1, tpep_pickup_datetime=1925]" + assert repr(data_file.partition) == "Record[1, 1925]" assert data_file.record_count == 19513 assert data_file.file_size_in_bytes == 388872 assert data_file.column_sizes == { @@ -422,7 +422,7 @@ def test_write_manifest( == "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet" ) assert data_file.file_format == FileFormat.PARQUET - assert data_file.partition == Record(VendorID=1, tpep_pickup_datetime=1925) + assert data_file.partition == Record(1, 1925) assert data_file.record_count == 19513 assert data_file.file_size_in_bytes == 388872 assert data_file.column_sizes == { From f13bf9e47130d80fed96b627b214a4f64685afd1 Mon Sep 17 00:00:00 2001 From: Fokko Date: Mon, 24 Mar 2025 21:26:59 +0100 Subject: [PATCH 2/5] =?UTF-8?q?=F0=9F=A4=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/expressions/test_expressions.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/expressions/test_expressions.py b/tests/expressions/test_expressions.py index 02ee0066bc..15cda49f9c 100644 --- a/tests/expressions/test_expressions.py +++ b/tests/expressions/test_expressions.py @@ -64,6 +64,7 @@ from pyiceberg.schema import Accessor, Schema from pyiceberg.typedef import Record from pyiceberg.types import ( + DecimalType, DoubleType, FloatType, IntegerType, From 6cb546a6a4a1a7a68cafd35285bb575f5a7b94d0 Mon Sep 17 00:00:00 2001 From: Fokko Date: Tue, 25 Mar 2025 20:37:52 +0100 Subject: [PATCH 3/5] fix some tests --- tests/integration/test_rest_manifest.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/tests/integration/test_rest_manifest.py b/tests/integration/test_rest_manifest.py index 8ddddfba5d..dda0bbfe3b 100644 --- a/tests/integration/test_rest_manifest.py +++ b/tests/integration/test_rest_manifest.py @@ -20,7 +20,7 @@ from copy import copy from enum import Enum from tempfile import TemporaryDirectory -from typing import Any +from typing import Any, List import pytest from fastavro import reader @@ -29,12 +29,15 @@ from pyiceberg.io.pyarrow import PyArrowFileIO from pyiceberg.manifest import DataFile, write_manifest from pyiceberg.table import Table +from pyiceberg.typedef import Record from pyiceberg.utils.lazydict import LazyDict # helper function to serialize our objects to dicts to enable # direct comparison with the dicts returned by fastavro -def todict(obj: Any) -> Any: +def todict(obj: Any, spec_keys: List[str]) -> Any: + if type(obj) is Record: + return {key: obj[pos] for key, pos in zip(spec_keys, range(len(obj)))} if isinstance(obj, dict) or isinstance(obj, LazyDict): data = [] for k, v in obj.items(): @@ -43,9 +46,13 @@ def todict(obj: Any) -> Any: elif isinstance(obj, Enum): return obj.value elif hasattr(obj, "__iter__") and not isinstance(obj, str) and not isinstance(obj, bytes): - return [todict(v) for v in obj] + return [todict(v, spec_keys) for v in obj] elif hasattr(obj, "__dict__"): - return {key: todict(value) for key, value in inspect.getmembers(obj) if not callable(value) and not key.startswith("_")} + return { + key: todict(value, spec_keys) + for key, value in inspect.getmembers(obj) + if not callable(value) and not key.startswith("_") + } else: return obj @@ -102,7 +109,7 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None: ) wrapped_entry_v2 = copy(entry) wrapped_entry_v2.data_file = wrapped_data_file_v2_debug - wrapped_entry_v2_dict = todict(wrapped_entry_v2) + wrapped_entry_v2_dict = todict(wrapped_entry_v2, [field.name for field in test_spec.fields]) with TemporaryDirectory() as tmpdir: tmp_avro_file = tmpdir + "/test_write_manifest.avro" From a862c0b281c4699d1047cd9494dd1402dd5b4583 Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 26 Mar 2025 20:18:31 +0100 Subject: [PATCH 4/5] Cleanup --- pyiceberg/schema.py | 57 ------------------------------------------- tests/test_typedef.py | 25 ------------------- 2 files changed, 82 deletions(-) diff --git a/pyiceberg/schema.py b/pyiceberg/schema.py index dfc4713660..6aa1f88852 100644 --- a/pyiceberg/schema.py +++ b/pyiceberg/schema.py @@ -1772,60 +1772,3 @@ def map(self, map_type: MapType, key_result: Callable[[], bool], value_result: C def primitive(self, primitive: PrimitiveType) -> bool: return True - - -# def _bind_to_struct( -# struct: StructType, -# data: Dict[str, Any], -# ) -> Record: -# return visit_with_partner( -# struct, -# data, -# _BindDictToRecord(), -# ArrowAccessor(), -# ) -# -# -# class _BindDictToRecord(SchemaWithPartnerVisitor[Any, Any]): -# def schema(self, schema: Schema, schema_partner: Any, struct_result: Any) -> Any: -# return struct_result -# -# def struct(self, struct: StructType, struct_partner: Any, field_results: List[Any]) -> Any: -# if isinstance(struct_partner, list): -# return [Record(*result) for result in field_results] -# else: -# return Record(*field_results) -# -# def field(self, field: NestedField, partner_struct: Optional[Any], field_result: Any) -> Optional[pa.Array]: -# return field_result -# -# def list(self, list_type: ListType, list_array: Optional[Any], element_result: Any) -> Any: -# return element_result -# -# def map(self, map_type: MapType, map_array: Optional[Any], key_result: Optional[Any], value_result: Optional[Any]) -> Any: -# return value_result -# -# def primitive(self, _: PrimitiveType, primitive_partner: Any) -> Any: -# return primitive_partner -# -# -# class ArrowAccessor(PartnerAccessor[Any]): -# def schema_partner(self, partner: Any) -> Any: -# return partner -# -# def field_partner(self, partner_struct: Any, field_id: int, name: str) -> Any: -# if isinstance(partner_struct, dict): -# return partner_struct.get(name) -# if isinstance(partner_struct, list): -# return [e.get(name) for e in partner_struct] -# else: -# return partner_struct -# -# def list_element_partner(self, partner_list: Optional[Any]) -> Optional[Any]: -# return partner_list -# -# def map_key_partner(self, partner_map: Optional[Any]) -> Optional[Any]: -# return partner_map -# -# def map_value_partner(self, partner_map: Optional[Any]) -> Optional[Any]: -# return partner_map diff --git a/tests/test_typedef.py b/tests/test_typedef.py index bb4e0ac339..fbbb619968 100644 --- a/tests/test_typedef.py +++ b/tests/test_typedef.py @@ -47,28 +47,3 @@ def test_record_named_args() -> None: assert r[2] is True assert repr(r) == "Record[1, a, True]" - - -# -# def test_bind_record_nested(table_schema_nested: Schema) -> None: -# struct = table_schema_nested.as_struct() -# data = { -# "foo": "str", -# "bar": 123, -# "baz": True, -# "qux": ["a", "b", "c"], -# "quux": {"a": 1, "b": 2}, -# "location": [{"latitude": 52.377956, "longitude": 4.897070}, {"latitude": 4.897070, "longitude": -122.431297}], -# "person": {"name": "Fokko", "age": 35}, # Possible data quality issue -# } -# res = _bind_to_struct(struct, data) -# -# assert res == Record( -# "str", -# 123, -# True, -# ["a", "b", "c"], -# {"a": 1, "b": 2}, -# [Record(52.377956, 4.89707), Record(4.89707, -122.431297)], -# Record("Fokko", 35), -# ) From bf66db5c00b3d8fbf95e71bc20bdbd4c0816727b Mon Sep 17 00:00:00 2001 From: Fokko Date: Tue, 22 Apr 2025 15:25:11 +0200 Subject: [PATCH 5/5] Thanks Kevin! --- pyiceberg/avro/reader.py | 1 - tests/avro/test_file.py | 16 ++++++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/pyiceberg/avro/reader.py b/pyiceberg/avro/reader.py index 92c23169ba..bccc772022 100644 --- a/pyiceberg/avro/reader.py +++ b/pyiceberg/avro/reader.py @@ -316,7 +316,6 @@ class StructReader(Reader): "field_readers", "create_struct", "struct", - "_create_with_keyword", "_field_reader_functions", "_hash", "_max_pos", diff --git a/tests/avro/test_file.py b/tests/avro/test_file.py index 0a986d5405..137215ebc8 100644 --- a/tests/avro/test_file.py +++ b/tests/avro/test_file.py @@ -346,21 +346,29 @@ def field_double(self) -> float: return self._data[6] @property - def field_timestamp(self) -> datetime: + def field_date(self) -> datetime: return self._data[7] @property - def field_timestamptz(self) -> datetime: + def field_time(self) -> datetime: return self._data[8] @property - def field_string(self) -> str: + def field_timestamp(self) -> datetime: return self._data[9] @property - def field_uuid(self) -> UUID: + def field_timestamptz(self) -> datetime: return self._data[10] + @property + def field_string(self) -> str: + return self._data[11] + + @property + def field_uuid(self) -> UUID: + return self._data[12] + record = AllPrimitivesRecord( b"\x124Vx\x124Vx\x124Vx\x124Vx", Decimal("123.45"),