Skip to content

Commit 59742e0

Browse files
authored
Make Record purely position based (#1768)
This aligns the implementation with Java. We had the keywords there mostly for the tests, but they should not be used, and it seems like that's already the case :'( I was undecided if the costs of this PR (all the changes), are worth it, but I see more PRs using the Record in a bad way (example #1743) that might lead to very subtle bugs where the position might sometime change based on the ordering of the dict. Blocked by Eventual-Inc/Daft#3917
1 parent 2921843 commit 59742e0

25 files changed

+548
-523
lines changed

pyiceberg/avro/file.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,17 @@
7474

7575

7676
class AvroFileHeader(Record):
77-
__slots__ = ("magic", "meta", "sync")
78-
magic: bytes
79-
meta: Dict[str, str]
80-
sync: bytes
77+
@property
78+
def magic(self) -> bytes:
79+
return self._data[0]
80+
81+
@property
82+
def meta(self) -> Dict[str, str]:
83+
return self._data[1]
84+
85+
@property
86+
def sync(self) -> bytes:
87+
return self._data[2]
8188

8289
def compression_codec(self) -> Optional[Type[Codec]]:
8390
"""Get the file's compression codec algorithm from the file's metadata.
@@ -271,7 +278,7 @@ def __exit__(
271278
def _write_header(self) -> None:
272279
json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.file_schema, schema_name=self.schema_name))
273280
meta = {**self.metadata, _SCHEMA_KEY: json_schema, _CODEC_KEY: "null"}
274-
header = AvroFileHeader(magic=MAGIC, meta=meta, sync=self.sync_bytes)
281+
header = AvroFileHeader(MAGIC, meta, self.sync_bytes)
275282
construct_writer(META_SCHEMA).write(self.encoder, header)
276283

277284
def write_block(self, objects: List[D]) -> None:

pyiceberg/avro/reader.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,14 @@ def skip(self, decoder: BinaryDecoder) -> None:
312312

313313

314314
class StructReader(Reader):
315-
__slots__ = ("field_readers", "create_struct", "struct", "_create_with_keyword", "_field_reader_functions", "_hash")
315+
__slots__ = (
316+
"field_readers",
317+
"create_struct",
318+
"struct",
319+
"_field_reader_functions",
320+
"_hash",
321+
"_max_pos",
322+
)
316323
field_readers: Tuple[Tuple[Optional[int], Reader], ...]
317324
create_struct: Callable[..., StructProtocol]
318325
struct: StructType
@@ -326,34 +333,28 @@ def __init__(
326333
) -> None:
327334
self.field_readers = field_readers
328335
self.create_struct = create_struct
336+
# TODO: Implement struct-reuse
329337
self.struct = struct
330338

331-
try:
332-
# Try initializing the struct, first with the struct keyword argument
333-
created_struct = self.create_struct(struct=self.struct)
334-
self._create_with_keyword = True
335-
except TypeError as e:
336-
if "'struct' is an invalid keyword argument for" in str(e):
337-
created_struct = self.create_struct()
338-
self._create_with_keyword = False
339-
else:
340-
raise ValueError(f"Unable to initialize struct: {self.create_struct}") from e
341-
342-
if not isinstance(created_struct, StructProtocol):
339+
if not isinstance(self.create_struct(), StructProtocol):
343340
raise ValueError(f"Incompatible with StructProtocol: {self.create_struct}")
344341

345342
reading_callbacks: List[Tuple[Optional[int], Callable[[BinaryDecoder], Any]]] = []
343+
max_pos = -1
346344
for pos, field in field_readers:
347345
if pos is not None:
348346
reading_callbacks.append((pos, field.read))
347+
max_pos = max(max_pos, pos)
349348
else:
350349
reading_callbacks.append((None, field.skip))
351350

352351
self._field_reader_functions = tuple(reading_callbacks)
353352
self._hash = hash(self._field_reader_functions)
353+
self._max_pos = 1 + max_pos
354354

355355
def read(self, decoder: BinaryDecoder) -> StructProtocol:
356-
struct = self.create_struct(struct=self.struct) if self._create_with_keyword else self.create_struct()
356+
# TODO: Implement struct-reuse
357+
struct = self.create_struct(*[None] * self._max_pos)
357358
for pos, field_reader in self._field_reader_functions:
358359
if pos is not None:
359360
struct[pos] = field_reader(decoder) # later: pass reuse in here

pyiceberg/io/pyarrow.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2271,7 +2271,7 @@ def _partition_value(self, partition_field: PartitionField, schema: Schema) -> A
22712271
return lower_value
22722272

22732273
def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record:
2274-
return Record(**{field.name: self._partition_value(field, schema) for field in partition_spec.fields})
2274+
return Record(*[self._partition_value(field, schema) for field in partition_spec.fields])
22752275

22762276
def to_serialized_dict(self) -> Dict[str, Any]:
22772277
lower_bounds = {}
@@ -2449,7 +2449,7 @@ def write_parquet(task: WriteTask) -> DataFile:
24492449
stats_columns=compute_statistics_plan(file_schema, table_metadata.properties),
24502450
parquet_column_mapping=parquet_path_to_id_mapping(file_schema),
24512451
)
2452-
data_file = DataFile(
2452+
data_file = DataFile.from_args(
24532453
content=DataFileContent.DATA,
24542454
file_path=file_path,
24552455
file_format=FileFormat.PARQUET,
@@ -2540,7 +2540,7 @@ def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_pa
25402540
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
25412541
parquet_column_mapping=parquet_path_to_id_mapping(schema),
25422542
)
2543-
data_file = DataFile(
2543+
data_file = DataFile.from_args(
25442544
content=DataFileContent.DATA,
25452545
file_path=file_path,
25462546
file_format=FileFormat.PARQUET,

0 commit comments

Comments
 (0)