diff --git a/pyiceberg/avro/file.py b/pyiceberg/avro/file.py index d0da7651b7..9db585308d 100644 --- a/pyiceberg/avro/file.py +++ b/pyiceberg/avro/file.py @@ -74,10 +74,17 @@ class AvroFileHeader(Record): - __slots__ = ("magic", "meta", "sync") - magic: bytes - meta: Dict[str, str] - sync: bytes + @property + def magic(self) -> bytes: + return self._data[0] + + @property + def meta(self) -> Dict[str, str]: + return self._data[1] + + @property + def sync(self) -> bytes: + return self._data[2] def compression_codec(self) -> Optional[Type[Codec]]: """Get the file's compression codec algorithm from the file's metadata. @@ -271,7 +278,7 @@ def __exit__( def _write_header(self) -> None: json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.file_schema, schema_name=self.schema_name)) meta = {**self.metadata, _SCHEMA_KEY: json_schema, _CODEC_KEY: "null"} - header = AvroFileHeader(magic=MAGIC, meta=meta, sync=self.sync_bytes) + header = AvroFileHeader(MAGIC, meta, self.sync_bytes) construct_writer(META_SCHEMA).write(self.encoder, header) def write_block(self, objects: List[D]) -> None: diff --git a/pyiceberg/avro/reader.py b/pyiceberg/avro/reader.py index 4c028ed711..bccc772022 100644 --- a/pyiceberg/avro/reader.py +++ b/pyiceberg/avro/reader.py @@ -312,7 +312,14 @@ def skip(self, decoder: BinaryDecoder) -> None: class StructReader(Reader): - __slots__ = ("field_readers", "create_struct", "struct", "_create_with_keyword", "_field_reader_functions", "_hash") + __slots__ = ( + "field_readers", + "create_struct", + "struct", + "_field_reader_functions", + "_hash", + "_max_pos", + ) field_readers: Tuple[Tuple[Optional[int], Reader], ...] create_struct: Callable[..., StructProtocol] struct: StructType @@ -326,34 +333,28 @@ def __init__( ) -> None: self.field_readers = field_readers self.create_struct = create_struct + # TODO: Implement struct-reuse self.struct = struct - try: - # Try initializing the struct, first with the struct keyword argument - created_struct = self.create_struct(struct=self.struct) - self._create_with_keyword = True - except TypeError as e: - if "'struct' is an invalid keyword argument for" in str(e): - created_struct = self.create_struct() - self._create_with_keyword = False - else: - raise ValueError(f"Unable to initialize struct: {self.create_struct}") from e - - if not isinstance(created_struct, StructProtocol): + if not isinstance(self.create_struct(), StructProtocol): raise ValueError(f"Incompatible with StructProtocol: {self.create_struct}") reading_callbacks: List[Tuple[Optional[int], Callable[[BinaryDecoder], Any]]] = [] + max_pos = -1 for pos, field in field_readers: if pos is not None: reading_callbacks.append((pos, field.read)) + max_pos = max(max_pos, pos) else: reading_callbacks.append((None, field.skip)) self._field_reader_functions = tuple(reading_callbacks) self._hash = hash(self._field_reader_functions) + self._max_pos = 1 + max_pos def read(self, decoder: BinaryDecoder) -> StructProtocol: - struct = self.create_struct(struct=self.struct) if self._create_with_keyword else self.create_struct() + # TODO: Implement struct-reuse + struct = self.create_struct(*[None] * self._max_pos) for pos, field_reader in self._field_reader_functions: if pos is not None: struct[pos] = field_reader(decoder) # later: pass reuse in here diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 88be6abac7..5873eed4a7 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2249,7 +2249,7 @@ def _partition_value(self, partition_field: PartitionField, schema: Schema) -> A return transform(lower_value) def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record: - return Record(**{field.name: self._partition_value(field, schema) for field in partition_spec.fields}) + return Record(*[self._partition_value(field, schema) for field in partition_spec.fields]) def to_serialized_dict(self) -> Dict[str, Any]: lower_bounds = {} @@ -2422,7 +2422,7 @@ def write_parquet(task: WriteTask) -> DataFile: stats_columns=compute_statistics_plan(file_schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(file_schema), ) - data_file = DataFile( + data_file = DataFile.from_args( content=DataFileContent.DATA, file_path=file_path, file_format=FileFormat.PARQUET, @@ -2513,7 +2513,7 @@ def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_pa stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - data_file = DataFile( + data_file = DataFile.from_args( content=DataFileContent.DATA, file_path=file_path, file_format=FileFormat.PARQUET, diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 5a32a6330c..31da423871 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -321,42 +321,85 @@ def data_file_with_partition(partition_type: StructType, format_version: TableVe class DataFile(Record): - __slots__ = ( - "content", - "file_path", - "file_format", - "partition", - "record_count", - "file_size_in_bytes", - "column_sizes", - "value_counts", - "null_value_counts", - "nan_value_counts", - "lower_bounds", - "upper_bounds", - "key_metadata", - "split_offsets", - "equality_ids", - "sort_order_id", - "spec_id", - ) - content: DataFileContent - file_path: str - file_format: FileFormat - partition: Record - record_count: int - file_size_in_bytes: int - column_sizes: Dict[int, int] - value_counts: Dict[int, int] - null_value_counts: Dict[int, int] - nan_value_counts: Dict[int, int] - lower_bounds: Dict[int, bytes] - upper_bounds: Dict[int, bytes] - key_metadata: Optional[bytes] - split_offsets: Optional[List[int]] - equality_ids: Optional[List[int]] - sort_order_id: Optional[int] - spec_id: int + @classmethod + def from_args(cls, _table_format_version: TableVersion = DEFAULT_READ_VERSION, **arguments: Any) -> DataFile: + struct = DATA_FILE_TYPE[_table_format_version] + return super()._bind(struct, **arguments) + + @property + def content(self) -> DataFileContent: + return self._data[0] + + @property + def file_path(self) -> str: + return self._data[1] + + @property + def file_format(self) -> FileFormat: + return self._data[2] + + @property + def partition(self) -> Record: + return self._data[3] + + @property + def record_count(self) -> int: + return self._data[4] + + @property + def file_size_in_bytes(self) -> int: + return self._data[5] + + @property + def column_sizes(self) -> Dict[int, int]: + return self._data[6] + + @property + def value_counts(self) -> Dict[int, int]: + return self._data[7] + + @property + def null_value_counts(self) -> Dict[int, int]: + return self._data[8] + + @property + def nan_value_counts(self) -> Dict[int, int]: + return self._data[9] + + @property + def lower_bounds(self) -> Dict[int, bytes]: + return self._data[10] + + @property + def upper_bounds(self) -> Dict[int, bytes]: + return self._data[11] + + @property + def key_metadata(self) -> Optional[bytes]: + return self._data[12] + + @property + def split_offsets(self) -> Optional[List[int]]: + return self._data[13] + + @property + def equality_ids(self) -> Optional[List[int]]: + return self._data[14] + + @property + def sort_order_id(self) -> Optional[int]: + return self._data[15] + + # Spec ID should not be stored in the file + _spec_id: int + + @property + def spec_id(self) -> int: + return self._spec_id + + @spec_id.setter + def spec_id(self, value: int) -> None: + self._spec_id = value def __setattr__(self, name: str, value: Any) -> None: """Assign a key/value to a DataFile.""" @@ -365,12 +408,6 @@ def __setattr__(self, name: str, value: Any) -> None: value = FileFormat[value] super().__setattr__(name, value) - def __init__(self, format_version: TableVersion = DEFAULT_READ_VERSION, *data: Any, **named_data: Any) -> None: - super().__init__( - *data, - **{"struct": DATA_FILE_TYPE[format_version], **named_data}, - ) - def __hash__(self) -> int: """Return the hash of the file path.""" return hash(self.file_path) @@ -411,53 +448,49 @@ def manifest_entry_schema_with_data_file(format_version: TableVersion, data_file class ManifestEntry(Record): - __slots__ = ("status", "snapshot_id", "sequence_number", "file_sequence_number", "data_file") - status: ManifestEntryStatus - snapshot_id: Optional[int] - sequence_number: Optional[int] - file_sequence_number: Optional[int] - data_file: DataFile + @classmethod + def from_args(cls, _table_format_version: TableVersion = DEFAULT_READ_VERSION, **arguments: Any) -> ManifestEntry: + return super()._bind(**arguments, struct=MANIFEST_ENTRY_SCHEMAS_STRUCT[_table_format_version]) - def __init__(self, *data: Any, **named_data: Any) -> None: - super().__init__(*data, **{"struct": MANIFEST_ENTRY_SCHEMAS_STRUCT[DEFAULT_READ_VERSION], **named_data}) + @property + def status(self) -> ManifestEntryStatus: + return self._data[0] - def _wrap( - self, - new_status: ManifestEntryStatus, - new_snapshot_id: Optional[int], - new_sequence_number: Optional[int], - new_file_sequence_number: Optional[int], - new_file: DataFile, - ) -> ManifestEntry: - self.status = new_status - self.snapshot_id = new_snapshot_id - self.sequence_number = new_sequence_number - self.file_sequence_number = new_file_sequence_number - self.data_file = new_file - return self + @status.setter + def status(self, value: ManifestEntryStatus) -> None: + self._data[0] = value - def _wrap_append( - self, new_snapshot_id: Optional[int], new_sequence_number: Optional[int], new_file: DataFile - ) -> ManifestEntry: - return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_sequence_number, None, new_file) + @property + def snapshot_id(self) -> Optional[int]: + return self._data[1] - def _wrap_delete( - self, - new_snapshot_id: Optional[int], - new_sequence_number: Optional[int], - new_file_sequence_number: Optional[int], - new_file: DataFile, - ) -> ManifestEntry: - return self._wrap(ManifestEntryStatus.DELETED, new_snapshot_id, new_sequence_number, new_file_sequence_number, new_file) - - def _wrap_existing( - self, - new_snapshot_id: Optional[int], - new_sequence_number: Optional[int], - new_file_sequence_number: Optional[int], - new_file: DataFile, - ) -> ManifestEntry: - return self._wrap(ManifestEntryStatus.EXISTING, new_snapshot_id, new_sequence_number, new_file_sequence_number, new_file) + @snapshot_id.setter + def snapshot_id(self, value: int) -> None: + self._data[0] = value + + @property + def sequence_number(self) -> Optional[int]: + return self._data[2] + + @sequence_number.setter + def sequence_number(self, value: int) -> None: + self._data[2] = value + + @property + def file_sequence_number(self) -> Optional[int]: + return self._data[3] + + @file_sequence_number.setter + def file_sequence_number(self, value: int) -> None: + self._data[3] = value + + @property + def data_file(self) -> DataFile: + return self._data[4] + + @data_file.setter + def data_file(self, value: DataFile) -> None: + self._data[4] = value PARTITION_FIELD_SUMMARY_TYPE = StructType( @@ -469,14 +502,25 @@ def _wrap_existing( class PartitionFieldSummary(Record): - __slots__ = ("contains_null", "contains_nan", "lower_bound", "upper_bound") - contains_null: bool - contains_nan: Optional[bool] - lower_bound: Optional[bytes] - upper_bound: Optional[bytes] + @classmethod + def from_args(cls, **arguments: Any) -> PartitionFieldSummary: + return super()._bind(**arguments, struct=PARTITION_FIELD_SUMMARY_TYPE) - def __init__(self, *data: Any, **named_data: Any) -> None: - super().__init__(*data, **{"struct": PARTITION_FIELD_SUMMARY_TYPE, **named_data}) + @property + def contains_null(self) -> bool: + return self._data[0] + + @property + def contains_nan(self) -> Optional[bool]: + return self._data[1] + + @property + def lower_bound(self) -> Optional[bytes]: + return self._data[2] + + @property + def upper_bound(self) -> Optional[bytes]: + return self._data[3] class PartitionFieldStats: @@ -495,10 +539,10 @@ def __init__(self, iceberg_type: PrimitiveType) -> None: def to_summary(self) -> PartitionFieldSummary: return PartitionFieldSummary( - contains_null=self._contains_null, - contains_nan=self._contains_nan, - lower_bound=to_bytes(self._type, self._min) if self._min is not None else None, - upper_bound=to_bytes(self._type, self._max) if self._max is not None else None, + self._contains_null, + self._contains_nan, + to_bytes(self._type, self._min) if self._min is not None else None, + to_bytes(self._type, self._max) if self._max is not None else None, ) def update(self, value: Any) -> None: @@ -570,41 +614,77 @@ def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partition class ManifestFile(Record): - __slots__ = ( - "manifest_path", - "manifest_length", - "partition_spec_id", - "content", - "sequence_number", - "min_sequence_number", - "added_snapshot_id", - "added_files_count", - "existing_files_count", - "deleted_files_count", - "added_rows_count", - "existing_rows_count", - "deleted_rows_count", - "partitions", - "key_metadata", - ) - manifest_path: str - manifest_length: int - partition_spec_id: int - content: ManifestContent - sequence_number: int - min_sequence_number: int - added_snapshot_id: int - added_files_count: Optional[int] - existing_files_count: Optional[int] - deleted_files_count: Optional[int] - added_rows_count: Optional[int] - existing_rows_count: Optional[int] - deleted_rows_count: Optional[int] - partitions: Optional[List[PartitionFieldSummary]] - key_metadata: Optional[bytes] - - def __init__(self, *data: Any, **named_data: Any) -> None: - super().__init__(*data, **{"struct": MANIFEST_LIST_FILE_STRUCTS[DEFAULT_READ_VERSION], **named_data}) + @classmethod + def from_args(cls, _table_format_version: TableVersion = DEFAULT_READ_VERSION, **arguments: Any) -> ManifestFile: + return super()._bind(**arguments, struct=MANIFEST_LIST_FILE_SCHEMAS[_table_format_version]) + + @property + def manifest_path(self) -> str: + return self._data[0] + + @property + def manifest_length(self) -> int: + return self._data[1] + + @property + def partition_spec_id(self) -> int: + return self._data[2] + + @property + def content(self) -> ManifestContent: + return self._data[3] + + @property + def sequence_number(self) -> int: + return self._data[4] + + @sequence_number.setter + def sequence_number(self, value: int) -> None: + self._data[4] = value + + @property + def min_sequence_number(self) -> int: + return self._data[5] + + @min_sequence_number.setter + def min_sequence_number(self, value: int) -> None: + self._data[5] = value + + @property + def added_snapshot_id(self) -> Optional[int]: + return self._data[6] + + @property + def added_files_count(self) -> Optional[int]: + return self._data[7] + + @property + def existing_files_count(self) -> Optional[int]: + return self._data[8] + + @property + def deleted_files_count(self) -> Optional[int]: + return self._data[9] + + @property + def added_rows_count(self) -> Optional[int]: + return self._data[10] + + @property + def existing_rows_count(self) -> Optional[int]: + return self._data[11] + + @property + def deleted_rows_count(self) -> Optional[int]: + return self._data[12] + + @property + def partitions(self) -> Optional[List[PartitionFieldSummary]]: + return self._data[13] + + @property + def key_metadata(self) -> Optional[bytes]: + return self._data[14] def has_added_files(self) -> bool: return self.added_files_count is None or self.added_files_count > 0 @@ -734,7 +814,6 @@ def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, self._deleted_rows = 0 self._min_sequence_number = None self._partitions = [] - self._reused_entry_wrapper = ManifestEntry() def __enter__(self) -> ManifestWriter: """Open the writer.""" @@ -795,7 +874,7 @@ def to_manifest_file(self) -> ManifestFile: # once the manifest file is generated, no more entries can be added self.closed = True min_sequence_number = self._min_sequence_number or UNASSIGNED_SEQ - return ManifestFile( + return ManifestFile.from_args( manifest_path=self._output_file.location, manifest_length=len(self._writer.output_file), partition_spec_id=self._spec.spec_id, @@ -842,23 +921,39 @@ def add_entry(self, entry: ManifestEntry) -> ManifestWriter: def add(self, entry: ManifestEntry) -> ManifestWriter: if entry.sequence_number is not None and entry.sequence_number >= 0: - self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, entry.sequence_number, entry.data_file)) + self.add_entry( + ManifestEntry.from_args( + snapshot_id=self._snapshot_id, sequence_number=entry.sequence_number, data_file=entry.data_file + ) + ) else: - self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, None, entry.data_file)) + self.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, snapshot_id=self._snapshot_id, data_file=entry.data_file + ) + ) return self def delete(self, entry: ManifestEntry) -> ManifestWriter: self.add_entry( - self._reused_entry_wrapper._wrap_delete( - self._snapshot_id, entry.sequence_number, entry.file_sequence_number, entry.data_file + ManifestEntry.from_args( + status=ManifestEntryStatus.DELETED, + snapshot_id=self._snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, ) ) return self def existing(self, entry: ManifestEntry) -> ManifestWriter: self.add_entry( - self._reused_entry_wrapper._wrap_existing( - entry.snapshot_id, entry.sequence_number, entry.file_sequence_number, entry.data_file + ManifestEntry.from_args( + status=ManifestEntryStatus.EXISTING, + snapshot_id=entry.snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, ) ) return self diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index d33e13b438..df07f94342 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -390,18 +390,20 @@ class PartitionKey: @cached_property def partition(self) -> Record: # partition key transformed with iceberg internal representation as input - iceberg_typed_key_values = {} + iceberg_typed_key_values = [] for raw_partition_field_value in self.field_values: partition_fields = self.partition_spec.source_id_to_fields_map[raw_partition_field_value.field.source_id] if len(partition_fields) != 1: raise ValueError(f"Cannot have redundant partitions: {partition_fields}") partition_field = partition_fields[0] - iceberg_typed_key_values[partition_field.name] = partition_record_value( - partition_field=partition_field, - value=raw_partition_field_value.value, - schema=self.schema, + iceberg_typed_key_values.append( + partition_record_value( + partition_field=partition_field, + value=raw_partition_field_value.value, + schema=self.schema, + ) ) - return Record(**iceberg_typed_key_values) + return Record(*iceberg_typed_key_values) def to_path(self) -> str: return self.partition_spec.partition_to_path(self.partition, self.schema) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index c705f3b9fd..c1c820fd69 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -157,7 +157,7 @@ def _write_added_manifest() -> List[ManifestFile]: ) as writer: for data_file in self._added_data_files: writer.add( - ManifestEntry( + ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, snapshot_id=self._snapshot_id, sequence_number=None, @@ -368,7 +368,7 @@ def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry], boo schema = self._transaction.table_metadata.schema() def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry: - return ManifestEntry( + return ManifestEntry.from_args( status=status, snapshot_id=entry.snapshot_id, sequence_number=entry.sequence_number, @@ -555,7 +555,7 @@ def _existing_manifests(self) -> List[ManifestFile]: ) as writer: [ writer.add_entry( - ManifestEntry( + ManifestEntry.from_args( status=ManifestEntryStatus.EXISTING, snapshot_id=entry.snapshot_id, sequence_number=entry.sequence_number, @@ -586,7 +586,7 @@ def _deleted_entries(self) -> List[ManifestEntry]: def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: return [ - ManifestEntry( + ManifestEntry.from_args( status=ManifestEntryStatus.DELETED, snapshot_id=entry.snapshot_id, sequence_number=entry.sequence_number, diff --git a/pyiceberg/typedef.py b/pyiceberg/typedef.py index 07374887a3..dbc4a164fb 100644 --- a/pyiceberg/typedef.py +++ b/pyiceberg/typedef.py @@ -19,13 +19,13 @@ from abc import abstractmethod from datetime import date, datetime from decimal import Decimal -from functools import lru_cache from typing import ( TYPE_CHECKING, Any, Callable, Dict, Generic, + List, Literal, Optional, Protocol, @@ -38,7 +38,7 @@ from uuid import UUID from pydantic import BaseModel, ConfigDict, RootModel -from typing_extensions import TypeAlias +from typing_extensions import Self, TypeAlias if TYPE_CHECKING: from pyiceberg.types import StructType @@ -171,51 +171,36 @@ class IcebergRootModel(RootModel[T], Generic[T]): model_config = ConfigDict(frozen=True) -@lru_cache -def _get_struct_fields(struct_type: StructType) -> Tuple[str, ...]: - return tuple(field.name for field in struct_type.fields) - - class Record(StructProtocol): - __slots__ = ("_position_to_field_name",) - _position_to_field_name: Tuple[str, ...] - - def __init__(self, *data: Any, struct: Optional[StructType] = None, **named_data: Any) -> None: - if struct is not None: - self._position_to_field_name = _get_struct_fields(struct) - elif named_data: - # Order of named_data is preserved (PEP 468) so this can be used to generate the position dict - self._position_to_field_name = tuple(named_data.keys()) - else: - self._position_to_field_name = tuple(f"field{idx + 1}" for idx in range(len(data))) + __slots__ = ("_data",) + _data: List[Any] - for idx, d in enumerate(data): - self[idx] = d + @classmethod + def _bind(cls, struct: StructType, **arguments: Any) -> Self: + return cls(*[arguments[field.name] if field.name in arguments else field.initial_default for field in struct.fields]) - for field_name, d in named_data.items(): - self.__setattr__(field_name, d) + def __init__(self, *data: Any) -> None: + self._data = list(data) def __setitem__(self, pos: int, value: Any) -> None: """Assign a value to a Record.""" - self.__setattr__(self._position_to_field_name[pos], value) + self._data[pos] = value def __getitem__(self, pos: int) -> Any: """Fetch a value from a Record.""" - return self.__getattribute__(self._position_to_field_name[pos]) + return self._data[pos] def __eq__(self, other: Any) -> bool: """Return the equality of two instances of the Record class.""" - if not isinstance(other, Record): - return False - return self.__dict__ == other.__dict__ + return self._data == other._data if isinstance(other, Record) else False def __repr__(self) -> str: """Return the string representation of the Record class.""" - return f"{self.__class__.__name__}[{', '.join(f'{key}={repr(value)}' for key, value in self.__dict__.items() if not key.startswith('_'))}]" + return f"{self.__class__.__name__}[{', '.join(str(v) for v in self._data)}]" def __len__(self) -> int: """Return the number of fields in the Record class.""" - return len(self._position_to_field_name) + return len(self._data) def __hash__(self) -> int: """Return hash value of the Record class.""" diff --git a/tests/avro/test_file.py b/tests/avro/test_file.py index 0756b2670c..137215ebc8 100644 --- a/tests/avro/test_file.py +++ b/tests/avro/test_file.py @@ -16,8 +16,7 @@ # under the License. import inspect from _decimal import Decimal -from copy import copy -from datetime import date, datetime, time +from datetime import datetime from enum import Enum from tempfile import TemporaryDirectory from typing import Any @@ -28,7 +27,7 @@ import pyiceberg.avro.file as avro from pyiceberg.avro.codecs.deflate import DeflateCodec -from pyiceberg.avro.file import META_SCHEMA, AvroFileHeader +from pyiceberg.avro.file import AvroFileHeader from pyiceberg.io.pyarrow import PyArrowFileIO from pyiceberg.manifest import ( DEFAULT_BLOCK_SIZE, @@ -40,7 +39,7 @@ ManifestEntryStatus, ) from pyiceberg.schema import Schema -from pyiceberg.typedef import Record +from pyiceberg.typedef import Record, TableVersion from pyiceberg.types import ( BooleanType, DateType, @@ -61,26 +60,17 @@ def get_deflate_compressor() -> None: - header = AvroFileHeader(struct=META_SCHEMA) - header[0] = bytes(0) - header[1] = {"avro.codec": "deflate"} - header[2] = bytes(16) + header = AvroFileHeader(bytes(0), {"avro.codec": "deflate"}, bytes(16)) assert header.compression_codec() == DeflateCodec def get_null_compressor() -> None: - header = AvroFileHeader(struct=META_SCHEMA) - header[0] = bytes(0) - header[1] = {"avro.codec": "null"} - header[2] = bytes(16) + header = AvroFileHeader(bytes(0), {"avro.codec": "null"}, bytes(16)) assert header.compression_codec() is None def test_unknown_codec() -> None: - header = AvroFileHeader(struct=META_SCHEMA) - header[0] = bytes(0) - header[1] = {"avro.codec": "unknown"} - header[2] = bytes(16) + header = AvroFileHeader(bytes(0), {"avro.codec": "unknown"}, bytes(16)) with pytest.raises(ValueError) as exc_info: header.compression_codec() @@ -89,10 +79,7 @@ def test_unknown_codec() -> None: def test_missing_schema() -> None: - header = AvroFileHeader(struct=META_SCHEMA) - header[0] = bytes(0) - header[1] = {} - header[2] = bytes(16) + header = AvroFileHeader(bytes(0), {}, bytes(16)) with pytest.raises(ValueError) as exc_info: header.get_schema() @@ -119,7 +106,7 @@ def todict(obj: Any) -> Any: def test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None: - data_file = DataFile( + data_file = DataFile.from_args( content=DataFileContent.DATA, file_path="s3://some-path/some-file.parquet", file_format=FileFormat.PARQUET, @@ -137,7 +124,7 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None: equality_ids=[], sort_order_id=4, ) - entry = ManifestEntry( + entry = ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, snapshot_id=8638475580105682862, sequence_number=0, @@ -185,7 +172,7 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v1() -> None: def test_write_manifest_entry_with_iceberg_read_with_fastavro_v2() -> None: - data_file = DataFile( + data_file = DataFile.from_args( content=DataFileContent.DATA, file_path="s3://some-path/some-file.parquet", file_format=FileFormat.PARQUET, @@ -203,7 +190,7 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v2() -> None: equality_ids=[], sort_order_id=4, ) - entry = ManifestEntry( + entry = ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, snapshot_id=8638475580105682862, sequence_number=0, @@ -239,33 +226,32 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro_v2() -> None: @pytest.mark.parametrize("format_version", [1, 2]) -def test_write_manifest_entry_with_fastavro_read_with_iceberg(format_version: int) -> None: - data_file = DataFile( - content=DataFileContent.DATA, - file_path="s3://some-path/some-file.parquet", - file_format=FileFormat.PARQUET, - partition=Record(), - record_count=131327, - file_size_in_bytes=220669226, - column_sizes={1: 220661854}, - value_counts={1: 131327}, - null_value_counts={1: 0}, - nan_value_counts={}, - lower_bounds={1: b"aaaaaaaaaaaaaaaa"}, - upper_bounds={1: b"zzzzzzzzzzzzzzzz"}, - key_metadata=b"\xde\xad\xbe\xef", - split_offsets=[4, 133697593], - equality_ids=[], - sort_order_id=4, - spec_id=3, - ) - - entry = ManifestEntry( +def test_write_manifest_entry_with_fastavro_read_with_iceberg(format_version: TableVersion) -> None: + data_file_dict = { + "content": DataFileContent.DATA, + "file_path": "s3://some-path/some-file.parquet", + "file_format": FileFormat.PARQUET, + "partition": Record(), + "record_count": 131327, + "file_size_in_bytes": 220669226, + "column_sizes": {1: 220661854}, + "value_counts": {1: 131327}, + "null_value_counts": {1: 0}, + "nan_value_counts": {}, + "lower_bounds": {1: b"aaaaaaaaaaaaaaaa"}, + "upper_bounds": {1: b"zzzzzzzzzzzzzzzz"}, + "key_metadata": b"\xde\xad\xbe\xef", + "split_offsets": [4, 133697593], + "equality_ids": [], + "sort_order_id": 4, + "spec_id": 3, + } + data_file_v2 = DataFile.from_args(**data_file_dict) # type: ignore + + entry = ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, snapshot_id=8638475580105682862, - sequence_number=0, - file_sequence_number=0, - data_file=data_file, + data_file=data_file_v2, ) with TemporaryDirectory() as tmpdir: @@ -297,17 +283,13 @@ def test_write_manifest_entry_with_fastavro_read_with_iceberg(format_version: in avro_entry = next(it) if format_version == 1: - v1_datafile = copy(data_file) - # Not part of V1 - v1_datafile.equality_ids = None + data_file_v1 = DataFile.from_args(**data_file_dict, _table_format_version=format_version) - assert avro_entry == ManifestEntry( - status=ManifestEntryStatus.ADDED, + assert avro_entry == ManifestEntry.from_args( + status=1, snapshot_id=8638475580105682862, - # Not part of v1 - sequence_number=None, - file_sequence_number=None, - data_file=v1_datafile, + data_file=data_file_v1, + _table_format_version=format_version, ) elif format_version == 2: assert entry == avro_entry @@ -335,22 +317,57 @@ def test_all_primitive_types(is_required: bool) -> None: ) class AllPrimitivesRecord(Record): - field_fixed: bytes - field_decimal: Decimal - field_bool: bool - field_int: int - field_long: int - field_float: float - field_double: float - field_date: date - field_time: time - field_timestamp: datetime - field_timestamptz: datetime - field_string: str - field_uuid: UUID - - def __init__(self, *data: Any, **named_data: Any) -> None: - super().__init__(*data, **{"struct": all_primitives_schema.as_struct(), **named_data}) + @property + def field_fixed(self) -> bytes: + return self._data[0] + + @property + def field_decimal(self) -> Decimal: + return self._data[1] + + @property + def field_bool(self) -> bool: + return self._data[2] + + @property + def field_int(self) -> int: + return self._data[3] + + @property + def field_long(self) -> int: + return self._data[4] + + @property + def field_float(self) -> float: + return self._data[5] + + @property + def field_double(self) -> float: + return self._data[6] + + @property + def field_date(self) -> datetime: + return self._data[7] + + @property + def field_time(self) -> datetime: + return self._data[8] + + @property + def field_timestamp(self) -> datetime: + return self._data[9] + + @property + def field_timestamptz(self) -> datetime: + return self._data[10] + + @property + def field_string(self) -> str: + return self._data[11] + + @property + def field_uuid(self) -> UUID: + return self._data[12] record = AllPrimitivesRecord( b"\x124Vx\x124Vx\x124Vx\x124Vx", diff --git a/tests/avro/test_reader.py b/tests/avro/test_reader.py index 3fdd3bbda3..82473d11d1 100644 --- a/tests/avro/test_reader.py +++ b/tests/avro/test_reader.py @@ -350,7 +350,7 @@ def test_read_struct(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x18") struct = StructType(NestedField(1, "id", IntegerType(), required=True)) result = StructReader(((0, IntegerReader()),), Record, struct).read(decoder) - assert repr(result) == "Record[id=12]" + assert repr(result) == "Record[12]" @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) @@ -361,10 +361,10 @@ def test_read_struct_lambda(decoder_class: Callable[[bytes], BinaryDecoder]) -> # You can also pass in an arbitrary function that returns a struct result = StructReader( ((0, IntegerReader()),), - lambda struct: Record(struct=struct), + Record, struct, # pylint: disable=unnecessary-lambda ).read(decoder) - assert repr(result) == "Record[id=12]" + assert repr(result) == "Record[12]" @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) @@ -376,19 +376,3 @@ def test_read_not_struct_type(decoder_class: Callable[[bytes], BinaryDecoder]) - _ = StructReader(((0, IntegerReader()),), str, struct).read(decoder) # type: ignore assert "Incompatible with StructProtocol: " in str(exc_info.value) - - -@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_struct_exception_handling(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: - decoder = decoder_class(b"\x18") - - def raise_err(struct: StructType) -> None: - raise TypeError("boom") - - struct = StructType(NestedField(1, "id", IntegerType(), required=True)) - # You can also pass in an arbitrary function that returns a struct - - with pytest.raises(ValueError) as exc_info: - _ = StructReader(((0, IntegerReader()),), raise_err, struct).read(decoder) # type: ignore - - assert "Unable to initialize struct:" in str(exc_info.value) diff --git a/tests/avro/test_resolver.py b/tests/avro/test_resolver.py index b5388b5ebb..26b44e8e23 100644 --- a/tests/avro/test_resolver.py +++ b/tests/avro/test_resolver.py @@ -289,15 +289,15 @@ class Ints(Record): c: int = Field() d: Optional[int] = Field() - MANIFEST_ENTRY_SCHEMA = Schema( + ints_schema = Schema( NestedField(3, "c", IntegerType(), required=True), NestedField(4, "d", IntegerType(), required=False), ) - with AvroFile[Ints](PyArrowFileIO().new_input(tmp_avro_file), MANIFEST_ENTRY_SCHEMA, {-1: Ints}) as reader: + with AvroFile[Ints](PyArrowFileIO().new_input(tmp_avro_file), ints_schema, {-1: Ints}) as reader: records = list(reader) - assert repr(records) == "[Ints[c=3, d=None]]" + assert repr(records) == "[Ints[3, None]]" def test_resolver_initial_value() -> None: diff --git a/tests/avro/test_writer.py b/tests/avro/test_writer.py index 3114b97d2b..c655156c2a 100644 --- a/tests/avro/test_writer.py +++ b/tests/avro/test_writer.py @@ -19,7 +19,6 @@ import io import struct from _decimal import Decimal -from typing import Dict, List from pyiceberg.avro.encoder import BinaryEncoder from pyiceberg.avro.resolver import construct_writer @@ -152,16 +151,11 @@ def test_write_simple_struct() -> None: schema = StructType( NestedField(1, "id", IntegerType(), required=True), NestedField(2, "property", StringType(), required=True) ) - - class MyStruct(Record): - id: int - property: str - - my_struct = MyStruct(id=12, property="awesome") + struct = Record(12, "awesome") enc_str = b"awesome" - construct_writer(schema).write(encoder, my_struct) + construct_writer(schema).write(encoder, struct) assert output.getbuffer() == b"".join([b"\x18", zigzag_encode(len(enc_str)), enc_str]) @@ -175,18 +169,13 @@ def test_write_struct_with_dict() -> None: NestedField(2, "properties", MapType(3, IntegerType(), 4, IntegerType()), required=True), ) - class MyStruct(Record): - id: int - properties: Dict[int, int] - - my_struct = MyStruct(id=12, properties={1: 2, 3: 4}) - - construct_writer(schema).write(encoder, my_struct) + struct = Record(12, {1: 2, 3: 4}) + construct_writer(schema).write(encoder, struct) assert output.getbuffer() == b"".join( [ b"\x18", - zigzag_encode(len(my_struct.properties)), + zigzag_encode(len(struct[1])), zigzag_encode(1), zigzag_encode(2), zigzag_encode(3), @@ -205,18 +194,14 @@ def test_write_struct_with_list() -> None: NestedField(2, "properties", ListType(3, IntegerType()), required=True), ) - class MyStruct(Record): - id: int - properties: List[int] - - my_struct = MyStruct(id=12, properties=[1, 2, 3, 4]) + struct = Record(12, [1, 2, 3, 4]) - construct_writer(schema).write(encoder, my_struct) + construct_writer(schema).write(encoder, struct) assert output.getbuffer() == b"".join( [ b"\x18", - zigzag_encode(len(my_struct.properties)), + zigzag_encode(len(struct[1])), zigzag_encode(1), zigzag_encode(2), zigzag_encode(3), diff --git a/tests/conftest.py b/tests/conftest.py index 6444b7b273..0ca3b4cfdf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2294,7 +2294,7 @@ def data_file(table_schema_simple: Schema, tmp_path: str) -> str: @pytest.fixture def example_task(data_file: str) -> FileScanTask: return FileScanTask( - data_file=DataFile(file_path=data_file, file_format=FileFormat.PARQUET, file_size_in_bytes=1925), + data_file=DataFile.from_args(file_path=data_file, file_format=FileFormat.PARQUET, file_size_in_bytes=1925), ) diff --git a/tests/expressions/test_evaluator.py b/tests/expressions/test_evaluator.py index e2b1f27377..7b15099105 100644 --- a/tests/expressions/test_evaluator.py +++ b/tests/expressions/test_evaluator.py @@ -42,6 +42,7 @@ from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator, _StrictMetricsEvaluator from pyiceberg.manifest import DataFile, FileFormat from pyiceberg.schema import Schema +from pyiceberg.typedef import Record from pyiceberg.types import ( DoubleType, FloatType, @@ -91,7 +92,7 @@ def schema_data_file() -> Schema: @pytest.fixture def data_file() -> DataFile: - return DataFile( + return DataFile.from_args( file_path="file_1.parquet", file_format=FileFormat.PARQUET, partition={}, @@ -133,7 +134,7 @@ def data_file() -> DataFile: @pytest.fixture def data_file_2() -> DataFile: - return DataFile( + return DataFile.from_args( file_path="file_2.parquet", file_format=FileFormat.PARQUET, partition={}, @@ -149,7 +150,7 @@ def data_file_2() -> DataFile: @pytest.fixture def data_file_3() -> DataFile: - return DataFile( + return DataFile.from_args( file_path="file_3.parquet", file_format=FileFormat.PARQUET, partition={}, @@ -165,7 +166,7 @@ def data_file_3() -> DataFile: @pytest.fixture def data_file_4() -> DataFile: - return DataFile( + return DataFile.from_args( file_path="file_4.parquet", file_format=FileFormat.PARQUET, partition={}, @@ -288,10 +289,10 @@ def test_missing_stats() -> None: NestedField(2, "no_stats", DoubleType(), required=False), ) - no_stats_file = DataFile( + no_stats_file = DataFile.from_args( file_path="file_1.parquet", file_format=FileFormat.PARQUET, - partition={}, + partition=Record(), record_count=50, value_counts=None, null_value_counts=None, @@ -319,7 +320,9 @@ def test_missing_stats() -> None: def test_zero_record_file_stats(schema_data_file: Schema) -> None: - zero_record_data_file = DataFile(file_path="file_1.parquet", file_format=FileFormat.PARQUET, partition={}, record_count=0) + zero_record_data_file = DataFile.from_args( + file_path="file_1.parquet", file_format=FileFormat.PARQUET, partition=Record(), record_count=0 + ) expressions = [ LessThan("no_stats", 5), @@ -636,7 +639,7 @@ def schema_data_file_nan() -> Schema: @pytest.fixture def data_file_nan() -> DataFile: - return DataFile( + return DataFile.from_args( file_path="file.avro", file_format=FileFormat.PARQUET, partition={}, @@ -949,7 +952,7 @@ def strict_data_file_schema() -> Schema: @pytest.fixture def strict_data_file_1() -> DataFile: - return DataFile( + return DataFile.from_args( file_path="file_1.parquet", file_format=FileFormat.PARQUET, partition={}, @@ -990,7 +993,7 @@ def strict_data_file_1() -> DataFile: @pytest.fixture def strict_data_file_2() -> DataFile: - return DataFile( + return DataFile.from_args( file_path="file_2.parquet", file_format=FileFormat.PARQUET, partition={}, @@ -1015,7 +1018,7 @@ def strict_data_file_2() -> DataFile: @pytest.fixture def strict_data_file_3() -> DataFile: - return DataFile( + return DataFile.from_args( file_path="file_3.parquet", file_format=FileFormat.PARQUET, partition={}, @@ -1147,10 +1150,10 @@ def test_strict_missing_stats(strict_data_file_schema: Schema, strict_data_file_ NestedField(2, "no_stats", DoubleType(), required=False), ) - no_stats_file = DataFile( + no_stats_file = DataFile.from_args( file_path="file_1.parquet", file_format=FileFormat.PARQUET, - partition={}, + partition=Record(), record_count=50, value_counts=None, null_value_counts=None, @@ -1178,7 +1181,9 @@ def test_strict_missing_stats(strict_data_file_schema: Schema, strict_data_file_ def test_strict_zero_record_file_stats(strict_data_file_schema: Schema) -> None: - zero_record_data_file = DataFile(file_path="file_1.parquet", file_format=FileFormat.PARQUET, partition={}, record_count=0) + zero_record_data_file = DataFile.from_args( + file_path="file_1.parquet", file_format=FileFormat.PARQUET, partition=Record(), record_count=0 + ) expressions = [ LessThan("no_stats", 5), diff --git a/tests/expressions/test_expressions.py b/tests/expressions/test_expressions.py index 12d9ff95a9..15cda49f9c 100644 --- a/tests/expressions/test_expressions.py +++ b/tests/expressions/test_expressions.py @@ -64,8 +64,6 @@ from pyiceberg.schema import Accessor, Schema from pyiceberg.typedef import Record from pyiceberg.types import ( - BinaryType, - BooleanType, DecimalType, DoubleType, FloatType, @@ -75,7 +73,6 @@ NestedField, StringType, StructType, - UUIDType, ) from pyiceberg.utils.singleton import Singleton @@ -630,22 +627,7 @@ def test_invert_always() -> None: def test_accessor_base_class() -> None: """Test retrieving a value at a position of a container using an accessor""" - struct = Record( - struct=StructType( - NestedField(1, "a", StringType()), - NestedField(2, "b", StringType()), - NestedField(3, "c", StringType()), - NestedField(4, "d", IntegerType()), - NestedField(5, "e", IntegerType()), - NestedField(6, "f", IntegerType()), - NestedField(7, "g", FloatType()), - NestedField(8, "h", DecimalType(8, 4)), - NestedField(9, "i", UUIDType()), - NestedField(10, "j", BooleanType()), - NestedField(11, "k", BooleanType()), - NestedField(12, "l", BinaryType()), - ) - ) + struct = Record(*[None] * 12) uuid_value = uuid.uuid4() @@ -985,11 +967,7 @@ def test_less_than_or_equal() -> None: def test_bound_reference_eval(table_schema_simple: Schema) -> None: """Test creating a BoundReference and evaluating it on a StructProtocol""" - struct = Record(struct=table_schema_simple.as_struct()) - - struct[0] = "foovalue" - struct[1] = 123 - struct[2] = True + struct = Record("foovalue", 123, True) position1_accessor = Accessor(position=0) position2_accessor = Accessor(position=1) diff --git a/tests/expressions/test_residual_evaluator.py b/tests/expressions/test_residual_evaluator.py index cf01821787..ba0a0da2e5 100644 --- a/tests/expressions/test_residual_evaluator.py +++ b/tests/expressions/test_residual_evaluator.py @@ -58,7 +58,7 @@ def test_identity_transform_residual() -> None: ) res_eval = residual_evaluator_of(spec=spec, expr=predicate, case_sensitive=True, schema=schema) - residual = res_eval.residual_for(Record(dateint=20170815)) + residual = res_eval.residual_for(Record(20170815)) # assert residual == True assert isinstance(residual, LessThan) @@ -67,7 +67,7 @@ def test_identity_transform_residual() -> None: assert residual.literal.value == 12 assert type(residual) is LessThan - residual = res_eval.residual_for(Record(dateint=20170801)) + residual = res_eval.residual_for(Record(20170801)) # assert isinstance(residual, UnboundPredicate) from pyiceberg.expressions import LiteralPredicate @@ -79,11 +79,11 @@ def test_identity_transform_residual() -> None: assert residual.literal.value == 11 # type :ignore # assert type(residual) == BoundGreaterThan - residual = res_eval.residual_for(Record(dateint=20170812)) + residual = res_eval.residual_for(Record(20170812)) assert residual == AlwaysTrue() - residual = res_eval.residual_for(Record(dateint=20170817)) + residual = res_eval.residual_for(Record(20170817)) assert residual == AlwaysFalse() @@ -103,7 +103,7 @@ def test_case_insensitive_identity_transform_residuals() -> None: res_eval = residual_evaluator_of(spec=spec, expr=predicate, case_sensitive=True, schema=schema) with pytest.raises(ValueError) as e: - res_eval.residual_for(Record(dateint=20170815)) + res_eval.residual_for(Record(20170815)) assert "Could not find field with name DATEINT, case_sensitive=True" in str(e.value) @@ -142,7 +142,7 @@ def test_in() -> None: res_eval = residual_evaluator_of(spec=spec, expr=predicate, case_sensitive=True, schema=schema) - residual = res_eval.residual_for(Record(dateint=20170815)) + residual = res_eval.residual_for(Record(20170815)) assert residual == AlwaysTrue() @@ -178,10 +178,10 @@ def test_not_in() -> None: res_eval = residual_evaluator_of(spec=spec, expr=predicate, case_sensitive=True, schema=schema) - residual = res_eval.residual_for(Record(dateint=20180815)) + residual = res_eval.residual_for(Record(20180815)) assert residual == AlwaysTrue() - residual = res_eval.residual_for(Record(dateint=20170815)) + residual = res_eval.residual_for(Record(20170815)) assert residual == AlwaysFalse() @@ -194,10 +194,10 @@ def test_is_nan() -> None: res_eval = residual_evaluator_of(spec=spec, expr=predicate, case_sensitive=True, schema=schema) - residual = res_eval.residual_for(Record(double=float("nan"))) + residual = res_eval.residual_for(Record(float("nan"))) assert residual == AlwaysTrue() - residual = res_eval.residual_for(Record(double=2)) + residual = res_eval.residual_for(Record(2)) assert residual == AlwaysFalse() @@ -210,10 +210,10 @@ def test_is_not_nan() -> None: res_eval = residual_evaluator_of(spec=spec, expr=predicate, case_sensitive=True, schema=schema) - residual = res_eval.residual_for(Record(double=None)) + residual = res_eval.residual_for(Record(None)) assert residual == AlwaysFalse() - residual = res_eval.residual_for(Record(double=2)) + residual = res_eval.residual_for(Record(2)) assert residual == AlwaysTrue() spec = PartitionSpec(PartitionField(51, 1051, IdentityTransform(), "float_part")) @@ -222,10 +222,10 @@ def test_is_not_nan() -> None: res_eval = residual_evaluator_of(spec=spec, expr=predicate, case_sensitive=True, schema=schema) - residual = res_eval.residual_for(Record(double=None)) + residual = res_eval.residual_for(Record(None)) assert residual == AlwaysFalse() - residual = res_eval.residual_for(Record(double=2)) + residual = res_eval.residual_for(Record(2)) assert residual == AlwaysTrue() diff --git a/tests/expressions/test_visitors.py b/tests/expressions/test_visitors.py index 94bfcf076c..7ee09b8607 100644 --- a/tests/expressions/test_visitors.py +++ b/tests/expressions/test_visitors.py @@ -822,7 +822,7 @@ def _to_byte_buffer(field_type: IcebergType, val: Any) -> bytes: def _to_manifest_file(*partitions: PartitionFieldSummary) -> ManifestFile: """Helper to create a ManifestFile""" - return ManifestFile(manifest_path="", manifest_length=0, partition_spec_id=0, partitions=partitions) + return ManifestFile.from_args(manifest_path="", manifest_length=0, partition_spec_id=0, partitions=partitions) INT_MIN_VALUE = 30 @@ -863,81 +863,81 @@ def manifest_no_stats() -> ManifestFile: def manifest() -> ManifestFile: return _to_manifest_file( # id - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=False, contains_nan=None, lower_bound=INT_MIN, upper_bound=INT_MAX, ), # all_nulls_missing_nan - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=True, contains_nan=None, lower_bound=None, upper_bound=None, ), # some_nulls - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=True, contains_nan=None, lower_bound=STRING_MIN, upper_bound=STRING_MAX, ), # no_nulls - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=False, contains_nan=None, lower_bound=STRING_MIN, upper_bound=STRING_MAX, ), # float - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=True, contains_nan=None, lower_bound=_to_byte_buffer(FloatType(), 0.0), upper_bound=_to_byte_buffer(FloatType(), 20.0), ), # all_nulls_double - PartitionFieldSummary(contains_null=True, contains_nan=None, lower_bound=None, upper_bound=None), + PartitionFieldSummary.from_args(contains_null=True, contains_nan=None, lower_bound=None, upper_bound=None), # all_nulls_no_nans - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=True, contains_nan=False, lower_bound=None, upper_bound=None, ), # all_nans - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=False, contains_nan=True, lower_bound=None, upper_bound=None, ), # both_nan_and_null - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=True, contains_nan=True, lower_bound=None, upper_bound=None, ), # no_nan_or_null - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=False, contains_nan=False, lower_bound=_to_byte_buffer(FloatType(), 0.0), upper_bound=_to_byte_buffer(FloatType(), 20.0), ), # all_nulls_missing_nan_float - PartitionFieldSummary(contains_null=True, contains_nan=None, lower_bound=None, upper_bound=None), + PartitionFieldSummary.from_args(contains_null=True, contains_nan=None, lower_bound=None, upper_bound=None), # all_same_value_or_null - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=True, contains_nan=None, lower_bound=STRING_MIN, upper_bound=STRING_MIN, ), # no_nulls_same_value_a - PartitionFieldSummary( + PartitionFieldSummary.from_args( contains_null=False, contains_nan=None, lower_bound=STRING_MIN, @@ -1607,7 +1607,7 @@ def test_dnf_to_dask(table_schema_simple: Schema) -> None: def test_expression_evaluator_null() -> None: - struct = Record(a=None) + struct = Record(None) schema = Schema(NestedField(1, "a", IntegerType(), required=False), schema_id=1) assert expression_evaluator(schema, In("a", {1, 2, 3}), case_sensitive=True)(struct) is False assert expression_evaluator(schema, NotIn("a", {1, 2, 3}), case_sensitive=True)(struct) is True diff --git a/tests/integration/test_partitioning_key.py b/tests/integration/test_partitioning_key.py index 1066753655..f9bdd4eead 100644 --- a/tests/integration/test_partitioning_key.py +++ b/tests/integration/test_partitioning_key.py @@ -26,7 +26,7 @@ from pyiceberg.catalog import Catalog from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec -from pyiceberg.schema import Schema, make_compatible_name +from pyiceberg.schema import Schema from pyiceberg.transforms import ( BucketTransform, DayTransform, @@ -84,7 +84,7 @@ ( [PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="boolean_field")], [False], - Record(boolean_field=False), + Record(False), "boolean_field=false", f"""CREATE TABLE {identifier} ( boolean_field boolean, @@ -103,7 +103,7 @@ ( [PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="string_field")], ["sample_string"], - Record(string_field="sample_string"), + Record("sample_string"), "string_field=sample_string", f"""CREATE TABLE {identifier} ( string_field string, @@ -122,7 +122,7 @@ ( [PartitionField(source_id=4, field_id=1001, transform=IdentityTransform(), name="int_field")], [42], - Record(int_field=42), + Record(42), "int_field=42", f"""CREATE TABLE {identifier} ( int_field int, @@ -141,7 +141,7 @@ ( [PartitionField(source_id=5, field_id=1001, transform=IdentityTransform(), name="long_field")], [1234567890123456789], - Record(long_field=1234567890123456789), + Record(1234567890123456789), "long_field=1234567890123456789", f"""CREATE TABLE {identifier} ( long_field bigint, @@ -160,7 +160,7 @@ ( [PartitionField(source_id=6, field_id=1001, transform=IdentityTransform(), name="float_field")], [3.14], - Record(float_field=3.14), + Record(3.14), "float_field=3.14", # spark writes differently as pyiceberg, Record[float_field=3.140000104904175], path:float_field=3.14 (Record has difference) # so justification (compare expected value with spark behavior) would fail. @@ -183,7 +183,7 @@ ( [PartitionField(source_id=7, field_id=1001, transform=IdentityTransform(), name="double_field")], [6.282], - Record(double_field=6.282), + Record(6.282), "double_field=6.282", # spark writes differently as pyiceberg, Record[double_field=6.2820000648498535] path:double_field=6.282 (Record has difference) # so justification (compare expected value with spark behavior) would fail. @@ -206,7 +206,7 @@ ( [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], [datetime(2023, 1, 1, 12, 0, 1, 999)], - Record(timestamp_field=1672574401000999), + Record(1672574401000999), "timestamp_field=2023-01-01T12%3A00%3A01.000999", f"""CREATE TABLE {identifier} ( timestamp_field timestamp_ntz, @@ -225,7 +225,7 @@ ( [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], [datetime(2023, 1, 1, 12, 0, 1)], - Record(timestamp_field=1672574401000000), + Record(1672574401000000), "timestamp_field=2023-01-01T12%3A00%3A01", f"""CREATE TABLE {identifier} ( timestamp_field timestamp_ntz, @@ -244,7 +244,7 @@ ( [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], [datetime(2023, 1, 1, 12, 0, 0)], - Record(timestamp_field=1672574400000000), + Record(1672574400000000), "timestamp_field=2023-01-01T12%3A00%3A00", # Spark writes differently as pyiceberg, so justification (compare expected value with spark behavior) would fail # AssertionError: assert 'timestamp_field=2023-01-01T12%3A00%3A00' in 's3://warehouse/default/test_table/data/timestamp_field=2023-01-01T12%3A00/00000-5-f9dca69a-9fb7-4830-9ef6-62d3d7afc09e-00001.parquet' @@ -268,7 +268,7 @@ ( [PartitionField(source_id=9, field_id=1001, transform=IdentityTransform(), name="timestamptz_field")], [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field=1672563601000999), + Record(1672563601000999), "timestamptz_field=2023-01-01T09%3A00%3A01.000999%2B00%3A00", # Spark writes differently as pyiceberg, so justification (compare expected value with spark behavior) would fail # AssertionError: assert 'timestamptz_field=2023-01-01T09%3A00%3A01.000999%2B00%3A00' in 's3://warehouse/default/test_table/data/timestamptz_field=2023-01-01T09%3A00%3A01.000999Z/00000-5-b710fc4d-66b6-47f1-b8ae-6208f8aaa2d4-00001.parquet' @@ -292,7 +292,7 @@ ( [PartitionField(source_id=10, field_id=1001, transform=IdentityTransform(), name="date_field")], [date(2023, 1, 1)], - Record(date_field=19358), + Record(19358), "date_field=2023-01-01", f"""CREATE TABLE {identifier} ( date_field date, @@ -311,7 +311,7 @@ ( [PartitionField(source_id=14, field_id=1001, transform=IdentityTransform(), name="uuid_field")], [uuid.UUID("f47ac10b-58cc-4372-a567-0e02b2c3d479")], - Record(uuid_field="f47ac10b-58cc-4372-a567-0e02b2c3d479"), + Record("f47ac10b-58cc-4372-a567-0e02b2c3d479"), "uuid_field=f47ac10b-58cc-4372-a567-0e02b2c3d479", f"""CREATE TABLE {identifier} ( uuid_field string, @@ -330,7 +330,7 @@ ( [PartitionField(source_id=11, field_id=1001, transform=IdentityTransform(), name="binary_field")], [b"example"], - Record(binary_field=b"example"), + Record(b"example"), "binary_field=ZXhhbXBsZQ%3D%3D", f"""CREATE TABLE {identifier} ( binary_field binary, @@ -349,7 +349,7 @@ ( [PartitionField(source_id=13, field_id=1001, transform=IdentityTransform(), name="decimal_field")], [Decimal("123.45")], - Record(decimal_field=Decimal("123.45")), + Record(Decimal("123.45")), "decimal_field=123.45", f"""CREATE TABLE {identifier} ( decimal_field decimal(5,2), @@ -370,7 +370,7 @@ ( [PartitionField(source_id=8, field_id=1001, transform=MonthTransform(), name="timestamp_field_month")], [datetime(2023, 1, 1, 11, 55, 59, 999999)], - Record(timestamp_field_month=((2023 - 1970) * 12)), + Record((2023 - 1970) * 12), "timestamp_field_month=2023-01", f"""CREATE TABLE {identifier} ( timestamp_field timestamp_ntz, @@ -389,7 +389,7 @@ ( [PartitionField(source_id=9, field_id=1001, transform=MonthTransform(), name="timestamptz_field_month")], [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field_month=((2023 - 1970) * 12 + 1 - 1)), + Record((2023 - 1970) * 12 + 1 - 1), "timestamptz_field_month=2023-01", f"""CREATE TABLE {identifier} ( timestamptz_field timestamp, @@ -408,7 +408,7 @@ ( [PartitionField(source_id=10, field_id=1001, transform=MonthTransform(), name="date_field_month")], [date(2023, 1, 1)], - Record(date_field_month=((2023 - 1970) * 12)), + Record((2023 - 1970) * 12), "date_field_month=2023-01", f"""CREATE TABLE {identifier} ( date_field date, @@ -428,7 +428,7 @@ ( [PartitionField(source_id=8, field_id=1001, transform=YearTransform(), name="timestamp_field_year")], [datetime(2023, 1, 1, 11, 55, 59, 999999)], - Record(timestamp_field_year=(2023 - 1970)), + Record(2023 - 1970), "timestamp_field_year=2023", f"""CREATE TABLE {identifier} ( timestamp_field timestamp, @@ -447,7 +447,7 @@ ( [PartitionField(source_id=9, field_id=1001, transform=YearTransform(), name="timestamptz_field_year")], [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field_year=53), + Record(53), "timestamptz_field_year=2023", f"""CREATE TABLE {identifier} ( timestamptz_field timestamp, @@ -466,7 +466,7 @@ ( [PartitionField(source_id=10, field_id=1001, transform=YearTransform(), name="date_field_year")], [date(2023, 1, 1)], - Record(date_field_year=(2023 - 1970)), + Record(2023 - 1970), "date_field_year=2023", f"""CREATE TABLE {identifier} ( date_field date, @@ -486,7 +486,7 @@ ( [PartitionField(source_id=8, field_id=1001, transform=DayTransform(), name="timestamp_field_day")], [datetime(2023, 1, 1, 11, 55, 59, 999999)], - Record(timestamp_field_day=19358), + Record(19358), "timestamp_field_day=2023-01-01", f"""CREATE TABLE {identifier} ( timestamp_field timestamp, @@ -505,7 +505,7 @@ ( [PartitionField(source_id=9, field_id=1001, transform=DayTransform(), name="timestamptz_field_day")], [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field_day=19358), + Record(19358), "timestamptz_field_day=2023-01-01", f"""CREATE TABLE {identifier} ( timestamptz_field timestamp, @@ -524,7 +524,7 @@ ( [PartitionField(source_id=10, field_id=1001, transform=DayTransform(), name="date_field_day")], [date(2023, 1, 1)], - Record(date_field_day=19358), + Record(19358), "date_field_day=2023-01-01", f"""CREATE TABLE {identifier} ( date_field date, @@ -544,7 +544,7 @@ ( [PartitionField(source_id=8, field_id=1001, transform=HourTransform(), name="timestamp_field_hour")], [datetime(2023, 1, 1, 11, 55, 59, 999999)], - Record(timestamp_field_hour=464603), + Record(464603), "timestamp_field_hour=2023-01-01-11", f"""CREATE TABLE {identifier} ( timestamp_field timestamp, @@ -563,7 +563,7 @@ ( [PartitionField(source_id=9, field_id=1001, transform=HourTransform(), name="timestamptz_field_hour")], [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field_hour=464601), + Record(464601), "timestamptz_field_hour=2023-01-01-09", f"""CREATE TABLE {identifier} ( timestamptz_field timestamp, @@ -583,7 +583,7 @@ ( [PartitionField(source_id=4, field_id=1001, transform=TruncateTransform(10), name="int_field_trunc")], [12345], - Record(int_field_trunc=12340), + Record(12340), "int_field_trunc=12340", f"""CREATE TABLE {identifier} ( int_field int, @@ -602,7 +602,7 @@ ( [PartitionField(source_id=5, field_id=1001, transform=TruncateTransform(2), name="bigint_field_trunc")], [2**32 + 1], - Record(bigint_field_trunc=2**32), # 4294967296 + Record(2**32), # 4294967296 "bigint_field_trunc=4294967296", f"""CREATE TABLE {identifier} ( bigint_field bigint, @@ -621,7 +621,7 @@ ( [PartitionField(source_id=2, field_id=1001, transform=TruncateTransform(3), name="string_field_trunc")], ["abcdefg"], - Record(string_field_trunc="abc"), + Record("abc"), "string_field_trunc=abc", f"""CREATE TABLE {identifier} ( string_field string, @@ -640,7 +640,7 @@ ( [PartitionField(source_id=13, field_id=1001, transform=TruncateTransform(width=5), name="decimal_field_trunc")], [Decimal("678.93")], - Record(decimal_field_trunc=Decimal("678.90")), + Record(Decimal("678.90")), "decimal_field_trunc=678.90", # Assuming truncation width of 1 leads to truncating to 670 f"""CREATE TABLE {identifier} ( decimal_field decimal(5,2), @@ -659,7 +659,7 @@ ( [PartitionField(source_id=11, field_id=1001, transform=TruncateTransform(10), name="binary_field_trunc")], [b"HELLOICEBERG"], - Record(binary_field_trunc=b"HELLOICEBE"), + Record(b"HELLOICEBE"), "binary_field_trunc=SEVMTE9JQ0VCRQ%3D%3D", f"""CREATE TABLE {identifier} ( binary_field binary, @@ -679,7 +679,7 @@ ( [PartitionField(source_id=4, field_id=1001, transform=BucketTransform(2), name="int_field_bucket")], [10], - Record(int_field_bucket=0), + Record(0), "int_field_bucket=0", f"""CREATE TABLE {identifier} ( int_field int, @@ -705,7 +705,7 @@ datetime(2023, 1, 1, 11, 55, 59, 999999), date(2023, 1, 1), ], - Record(timestamp_field_year=53, date_field_day=19358), + Record(53, 19358), "timestamp_field_year=2023/date_field_day=2023-01-01", f"""CREATE TABLE {identifier} ( timestamp_field timestamp, @@ -727,7 +727,7 @@ ( [PartitionField(source_id=15, field_id=1001, transform=IdentityTransform(), name="special#string+field")], ["special string"], - Record(**{"special#string+field": "special string"}), # type: ignore + Record("special string"), "special%23string%2Bfield=special+string", f"""CREATE TABLE {identifier} ( `special#string+field` string @@ -792,6 +792,5 @@ def test_partition_key( snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.file_path ) # Special characters in partition value are sanitized when written to the data file's partition field - sanitized_record = Record(**{make_compatible_name(k): v for k, v in vars(expected_partition_record).items()}) - assert spark_partition_for_justification == sanitized_record + assert spark_partition_for_justification == expected_partition_record assert expected_hive_partition_path_slice in spark_path_for_justification diff --git a/tests/integration/test_rest_manifest.py b/tests/integration/test_rest_manifest.py index 82c41cfd93..dda0bbfe3b 100644 --- a/tests/integration/test_rest_manifest.py +++ b/tests/integration/test_rest_manifest.py @@ -20,7 +20,7 @@ from copy import copy from enum import Enum from tempfile import TemporaryDirectory -from typing import Any +from typing import Any, List import pytest from fastavro import reader @@ -29,12 +29,15 @@ from pyiceberg.io.pyarrow import PyArrowFileIO from pyiceberg.manifest import DataFile, write_manifest from pyiceberg.table import Table +from pyiceberg.typedef import Record from pyiceberg.utils.lazydict import LazyDict # helper function to serialize our objects to dicts to enable # direct comparison with the dicts returned by fastavro -def todict(obj: Any) -> Any: +def todict(obj: Any, spec_keys: List[str]) -> Any: + if type(obj) is Record: + return {key: obj[pos] for key, pos in zip(spec_keys, range(len(obj)))} if isinstance(obj, dict) or isinstance(obj, LazyDict): data = [] for k, v in obj.items(): @@ -43,9 +46,13 @@ def todict(obj: Any) -> Any: elif isinstance(obj, Enum): return obj.value elif hasattr(obj, "__iter__") and not isinstance(obj, str) and not isinstance(obj, bytes): - return [todict(v) for v in obj] + return [todict(v, spec_keys) for v in obj] elif hasattr(obj, "__dict__"): - return {key: todict(value) for key, value in inspect.getmembers(obj) if not callable(value) and not key.startswith("_")} + return { + key: todict(value, spec_keys) + for key, value in inspect.getmembers(obj) + if not callable(value) and not key.startswith("_") + } else: return obj @@ -80,7 +87,7 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None: entry = test_manifest_entries[0] test_schema = table_test_all_types.schema() test_spec = table_test_all_types.spec() - wrapped_data_file_v2_debug = DataFile( + wrapped_data_file_v2_debug = DataFile.from_args( format_version=2, content=entry.data_file.content, file_path=entry.data_file.file_path, @@ -102,9 +109,7 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None: ) wrapped_entry_v2 = copy(entry) wrapped_entry_v2.data_file = wrapped_data_file_v2_debug - wrapped_entry_v2_dict = todict(wrapped_entry_v2) - # This one should not be written - del wrapped_entry_v2_dict["data_file"]["spec_id"] + wrapped_entry_v2_dict = todict(wrapped_entry_v2, [field.name for field in test_spec.fields]) with TemporaryDirectory() as tmpdir: tmp_avro_file = tmpdir + "/test_write_manifest.avro" diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index e883e38cb8..3c097ddd47 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -977,7 +977,7 @@ def project( ).to_table( tasks=[ FileScanTask( - DataFile( + DataFile.from_args( content=DataFileContent.DATA, file_path=file, file_format=FileFormat.PARQUET, @@ -1163,12 +1163,12 @@ def test_identity_transform_column_projection(tmp_path: str, catalog: InMemoryCa parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), ) - unpartitioned_file = DataFile( + unpartitioned_file = DataFile.from_args( content=DataFileContent.DATA, file_path=file_loc, file_format=FileFormat.PARQUET, # projected value - partition=Record(partition_id=1), + partition=Record(1), file_size_in_bytes=os.path.getsize(file_loc), sort_order_id=None, spec_id=table.metadata.default_spec_id, @@ -1224,12 +1224,12 @@ def test_identity_transform_columns_projection(tmp_path: str, catalog: InMemoryC parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), ) - unpartitioned_file = DataFile( + unpartitioned_file = DataFile.from_args( content=DataFileContent.DATA, file_path=file_loc, file_format=FileFormat.PARQUET, # projected value - partition=Record(field_2=2, field_3=3), + partition=Record(2, 3), file_size_in_bytes=os.path.getsize(file_loc), sort_order_id=None, spec_id=table.metadata.default_spec_id, @@ -1539,7 +1539,7 @@ def deletes_file(tmp_path: str, example_task: FileScanTask) -> str: def test_read_deletes(deletes_file: str, example_task: FileScanTask) -> None: - deletes = _read_deletes(LocalFileSystem(), DataFile(file_path=deletes_file, file_format=FileFormat.PARQUET)) + deletes = _read_deletes(LocalFileSystem(), DataFile.from_args(file_path=deletes_file, file_format=FileFormat.PARQUET)) assert set(deletes.keys()) == {example_task.file.file_path} assert list(deletes.values())[0] == pa.chunked_array([[1, 3, 5]]) @@ -1548,7 +1548,9 @@ def test_delete(deletes_file: str, example_task: FileScanTask, table_schema_simp metadata_location = "file://a/b/c.json" example_task_with_delete = FileScanTask( data_file=example_task.file, - delete_files={DataFile(content=DataFileContent.POSITION_DELETES, file_path=deletes_file, file_format=FileFormat.PARQUET)}, + delete_files={ + DataFile.from_args(content=DataFileContent.POSITION_DELETES, file_path=deletes_file, file_format=FileFormat.PARQUET) + }, ) with_deletes = ArrowScan( table_metadata=TableMetadataV2( @@ -1582,8 +1584,8 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_ example_task_with_delete = FileScanTask( data_file=example_task.file, delete_files={ - DataFile(content=DataFileContent.POSITION_DELETES, file_path=deletes_file, file_format=FileFormat.PARQUET), - DataFile(content=DataFileContent.POSITION_DELETES, file_path=deletes_file, file_format=FileFormat.PARQUET), + DataFile.from_args(content=DataFileContent.POSITION_DELETES, file_path=deletes_file, file_format=FileFormat.PARQUET), + DataFile.from_args(content=DataFileContent.POSITION_DELETES, file_path=deletes_file, file_format=FileFormat.PARQUET), }, ) @@ -2142,12 +2144,12 @@ def test_partition_for_demo() -> None: ) result = _determine_partitions(partition_spec, test_schema, arrow_table) assert {table_partition.partition_key.partition for table_partition in result} == { - Record(n_legs_identity=2, year_identity=2020), - Record(n_legs_identity=100, year_identity=2021), - Record(n_legs_identity=4, year_identity=2021), - Record(n_legs_identity=4, year_identity=2022), - Record(n_legs_identity=2, year_identity=2022), - Record(n_legs_identity=5, year_identity=2019), + Record(2, 2020), + Record(100, 2021), + Record(4, 2021), + Record(4, 2022), + Record(2, 2022), + Record(5, 2019), } assert ( pa.concat_tables([table_partition.arrow_table_partition for table_partition in result]).num_rows == arrow_table.num_rows @@ -2171,7 +2173,7 @@ def test_identity_partition_on_multi_columns() -> None: (None, 4, "Kirin"), (2021, None, "Fish"), ] * 2 - expected = {Record(n_legs_identity=test_rows[i][1], year_identity=test_rows[i][0]) for i in range(len(test_rows))} + expected = {Record(test_rows[i][1], test_rows[i][0]) for i in range(len(test_rows))} partition_spec = PartitionSpec( PartitionField(source_id=2, field_id=1002, transform=IdentityTransform(), name="n_legs_identity"), PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="year_identity"), diff --git a/tests/io/test_pyarrow_stats.py b/tests/io/test_pyarrow_stats.py index 788891711e..0e9f69ec96 100644 --- a/tests/io/test_pyarrow_stats.py +++ b/tests/io/test_pyarrow_stats.py @@ -194,7 +194,7 @@ def test_record_count() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert datafile.record_count == 4 @@ -207,7 +207,7 @@ def test_value_counts() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 7 assert datafile.value_counts[1] == 4 @@ -228,7 +228,7 @@ def test_column_sizes() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.column_sizes) == 7 # these values are an artifact of how the write_table encodes the columns @@ -248,7 +248,7 @@ def test_null_and_nan_counts() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.null_value_counts) == 7 assert datafile.null_value_counts[1] == 1 @@ -275,7 +275,7 @@ def test_bounds() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.lower_bounds) == 2 assert datafile.lower_bounds[1].decode() == "aaaaaaaaaaaaaaaa" @@ -319,7 +319,7 @@ def test_metrics_mode_none() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 0 assert len(datafile.null_value_counts) == 0 @@ -338,7 +338,7 @@ def test_metrics_mode_counts() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 7 assert len(datafile.null_value_counts) == 7 @@ -357,7 +357,7 @@ def test_metrics_mode_full() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 7 assert len(datafile.null_value_counts) == 7 @@ -382,7 +382,7 @@ def test_metrics_mode_non_default_trunc() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 7 assert len(datafile.null_value_counts) == 7 @@ -408,7 +408,7 @@ def test_column_metrics_mode() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 6 assert len(datafile.null_value_counts) == 6 @@ -508,7 +508,7 @@ def test_metrics_primitive_types() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 12 assert len(datafile.null_value_counts) == 12 @@ -606,7 +606,7 @@ def test_metrics_invalid_upper_bound() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert len(datafile.value_counts) == 4 assert len(datafile.null_value_counts) == 4 @@ -632,7 +632,7 @@ def test_offsets() -> None: stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) assert datafile.split_offsets is not None assert len(datafile.split_offsets) == 1 @@ -707,7 +707,7 @@ def test_read_missing_statistics() -> None: parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - datafile = DataFile(**statistics.to_serialized_dict()) + datafile = DataFile.from_args(**statistics.to_serialized_dict()) # expect only "strings" column values to be reflected in the # upper_bound, lower_bound and null_value_counts props of datafile diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 69bbab527e..69ff37507b 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -389,10 +389,10 @@ def test_static_table_io_does_not_exist(metadata_location: str) -> None: def test_match_deletes_to_datafile() -> None: - data_entry = ManifestEntry( + data_entry = ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, sequence_number=1, - data_file=DataFile( + data_file=DataFile.from_args( content=DataFileContent.DATA, file_path="s3://bucket/0000.parquet", file_format=FileFormat.PARQUET, @@ -401,10 +401,10 @@ def test_match_deletes_to_datafile() -> None: file_size_in_bytes=3, ), ) - delete_entry_1 = ManifestEntry( + delete_entry_1 = ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, sequence_number=0, # Older than the data - data_file=DataFile( + data_file=DataFile.from_args( content=DataFileContent.POSITION_DELETES, file_path="s3://bucket/0001-delete.parquet", file_format=FileFormat.PARQUET, @@ -413,10 +413,10 @@ def test_match_deletes_to_datafile() -> None: file_size_in_bytes=3, ), ) - delete_entry_2 = ManifestEntry( + delete_entry_2 = ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, sequence_number=3, - data_file=DataFile( + data_file=DataFile.from_args( content=DataFileContent.POSITION_DELETES, file_path="s3://bucket/0002-delete.parquet", file_format=FileFormat.PARQUET, @@ -440,10 +440,10 @@ def test_match_deletes_to_datafile() -> None: def test_match_deletes_to_datafile_duplicate_number() -> None: - data_entry = ManifestEntry( + data_entry = ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, sequence_number=1, - data_file=DataFile( + data_file=DataFile.from_args( content=DataFileContent.DATA, file_path="s3://bucket/0000.parquet", file_format=FileFormat.PARQUET, @@ -452,10 +452,10 @@ def test_match_deletes_to_datafile_duplicate_number() -> None: file_size_in_bytes=3, ), ) - delete_entry_1 = ManifestEntry( + delete_entry_1 = ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, sequence_number=3, - data_file=DataFile( + data_file=DataFile.from_args( content=DataFileContent.POSITION_DELETES, file_path="s3://bucket/0001-delete.parquet", file_format=FileFormat.PARQUET, @@ -470,10 +470,10 @@ def test_match_deletes_to_datafile_duplicate_number() -> None: upper_bounds={}, ), ) - delete_entry_2 = ManifestEntry( + delete_entry_2 = ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, sequence_number=3, - data_file=DataFile( + data_file=DataFile.from_args( content=DataFileContent.POSITION_DELETES, file_path="s3://bucket/0002-delete.parquet", file_format=FileFormat.PARQUET, diff --git a/tests/table/test_partitioning.py b/tests/table/test_partitioning.py index edda6d3aa8..57ab3e328a 100644 --- a/tests/table/test_partitioning.py +++ b/tests/table/test_partitioning.py @@ -158,7 +158,7 @@ def test_partition_spec_to_path() -> None: spec_id=3, ) - record = Record(**{"my#str%bucket": "my+str", "other str+bucket": "( )", "my!int:bucket": 10}) # type: ignore + record = Record("my+str", "( )", 10) # Both partition field names and values should be URL encoded, with spaces mapping to plus signs, to match the Java # behaviour: https://github.com/apache/iceberg/blob/ca3db931b0f024f0412084751ac85dd4ef2da7e7/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L198-L204 diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index b4dde217d4..db5c0c6e17 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -143,7 +143,7 @@ def test_snapshot_with_properties_repr(snapshot_with_properties: Snapshot) -> No @pytest.fixture def manifest_file() -> ManifestFile: - return ManifestFile( + return ManifestFile.from_args( content=ManifestContent.DATA, manifest_length=100, added_files_count=1, @@ -160,7 +160,7 @@ def test_snapshot_summary_collector(table_schema_simple: Schema) -> None: ssc = SnapshotSummaryCollector() assert ssc.build() == {} - data_file = DataFile(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record()) + data_file = DataFile.from_args(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record()) ssc.add_file(data_file, schema=table_schema_simple) assert ssc.build() == { @@ -183,8 +183,8 @@ def test_snapshot_summary_collector_with_partition() -> None: NestedField(field_id=3, name="int_field", field_type=IntegerType(), required=False), ) spec = PartitionSpec(PartitionField(source_id=3, field_id=1001, transform=IdentityTransform(), name="int_field")) - data_file_1 = DataFile(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record(int_field=1)) - data_file_2 = DataFile(content=DataFileContent.DATA, record_count=200, file_size_in_bytes=4321, partition=Record(int_field=2)) + data_file_1 = DataFile.from_args(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record(1)) + data_file_2 = DataFile.from_args(content=DataFileContent.DATA, record_count=200, file_size_in_bytes=4321, partition=Record(2)) # When ssc.add_file(data_file=data_file_1, schema=schema, partition_spec=spec) ssc.remove_file(data_file=data_file_1, schema=schema, partition_spec=spec) diff --git a/tests/test_typedef.py b/tests/test_typedef.py index 43388addca..fbbb619968 100644 --- a/tests/test_typedef.py +++ b/tests/test_typedef.py @@ -16,14 +16,7 @@ # under the License. import pytest -from pyiceberg.schema import Schema from pyiceberg.typedef import FrozenDict, KeyDefaultDict, Record -from pyiceberg.types import ( - IntegerType, - NestedField, - StringType, - StructType, -) def test_setitem_frozendict() -> None: @@ -46,44 +39,11 @@ def one(_: int) -> int: assert defaultdict[22] == 1 -def test_record_repr(table_schema_simple: Schema) -> None: - r = Record("vo", 1, True, struct=table_schema_simple.as_struct()) - assert repr(r) == "Record[foo='vo', bar=1, baz=True]" - - -def test_named_record() -> None: - r = Record(struct=StructType(NestedField(0, "id", IntegerType()), NestedField(1, "name", StringType()))) - - with pytest.raises(AttributeError): - assert r.id is None # type: ignore - - with pytest.raises(AttributeError): - assert r.name is None # type: ignore - - r[0] = 123 - r[1] = "abc" - - assert r[0] == 123 - assert r[1] == "abc" - - assert r.id == 123 # type: ignore - assert r.name == "abc" # type: ignore - - -def test_record_positional_args() -> None: - r = Record(1, "a", True) - assert repr(r) == "Record[field1=1, field2='a', field3=True]" - - def test_record_named_args() -> None: - r = Record(foo=1, bar="a", baz=True) - - assert r.foo == 1 # type: ignore - assert r.bar == "a" # type: ignore - assert r.baz is True # type: ignore + r = Record(1, "a", True) assert r[0] == 1 assert r[1] == "a" assert r[2] is True - assert repr(r) == "Record[foo=1, bar='a', baz=True]" + assert repr(r) == "Record[1, a, True]" diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 3b1fc6f013..8d793c468c 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -61,7 +61,7 @@ def _verify_metadata_with_fastavro(avro_file: str, expected_metadata: Dict[str, def test_read_manifest_entry(generated_manifest_entry_file: str) -> None: - manifest = ManifestFile( + manifest = ManifestFile.from_args( manifest_path=generated_manifest_entry_file, manifest_length=0, partition_spec_id=0, @@ -85,7 +85,7 @@ def test_read_manifest_entry(generated_manifest_entry_file: str) -> None: == "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet" ) assert data_file.file_format == FileFormat.PARQUET - assert repr(data_file.partition) == "Record[VendorID=1, tpep_pickup_datetime=1925]" + assert repr(data_file.partition) == "Record[1, 1925]" assert data_file.record_count == 19513 assert data_file.file_size_in_bytes == 388872 assert data_file.column_sizes == { @@ -422,7 +422,7 @@ def test_write_manifest( == "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet" ) assert data_file.file_format == FileFormat.PARQUET - assert data_file.partition == Record(VendorID=1, tpep_pickup_datetime=1925) + assert data_file.partition == Record(1, 1925) assert data_file.record_count == 19513 assert data_file.file_size_in_bytes == 388872 assert data_file.column_sizes == {