Skip to content

Commit 0c708a0

Browse files
Fokkosungwy
authored andcommitted
Reuse commit-uuid as the write-uuid (apache#437)
* Reuse commit-uuid as the write-uuid * Fix conflicts * Cleanup * cleanup
1 parent f5fd479 commit 0c708a0

File tree

1 file changed

+19
-10
lines changed

1 file changed

+19
-10
lines changed

pyiceberg/table/__init__.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1022,7 +1022,7 @@ def append(self, df: pa.Table) -> None:
10221022
with self.update_snapshot().fast_append() as update_snapshot:
10231023
# skip writing data files if the dataframe is empty
10241024
if df.shape[0] > 0:
1025-
data_files = _dataframe_to_data_files(self, df=df)
1025+
data_files = _dataframe_to_data_files(self, write_uuid=update_snapshot.commit_uuid, df=df)
10261026
for data_file in data_files:
10271027
update_snapshot.append_data_file(data_file)
10281028

@@ -1052,7 +1052,7 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T
10521052
with self.update_snapshot().overwrite() as update_snapshot:
10531053
# skip writing data files if the dataframe is empty
10541054
if df.shape[0] > 0:
1055-
data_files = _dataframe_to_data_files(self, df=df)
1055+
data_files = _dataframe_to_data_files(self, write_uuid=update_snapshot.commit_uuid, df=df)
10561056
for data_file in data_files:
10571057
update_snapshot.append_data_file(data_file)
10581058

@@ -2349,7 +2349,9 @@ def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int,
23492349
return f'{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro'
23502350

23512351

2352-
def _dataframe_to_data_files(table: Table, df: pa.Table, file_schema: Optional[Schema] = None) -> Iterable[DataFile]:
2352+
def _dataframe_to_data_files(
2353+
table: Table, df: pa.Table, write_uuid: Optional[uuid.UUID] = None, file_schema: Optional[Schema] = None
2354+
) -> Iterable[DataFile]:
23532355
"""Convert a PyArrow table into a DataFile.
23542356
23552357
Returns:
@@ -2360,31 +2362,37 @@ def _dataframe_to_data_files(table: Table, df: pa.Table, file_schema: Optional[S
23602362
if len(table.spec().fields) > 0:
23612363
raise ValueError("Cannot write to partitioned tables")
23622364

2363-
write_uuid = uuid.uuid4()
23642365
counter = itertools.count(0)
2366+
write_uuid = write_uuid or uuid.uuid4()
23652367

23662368
# This is an iter, so we don't have to materialize everything every time
23672369
# This will be more relevant when we start doing partitioned writes
23682370
yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)]), file_schema=file_schema)
23692371

23702372

23712373
class _MergingSnapshotProducer:
2374+
commit_uuid: uuid.UUID
23722375
_operation: Operation
23732376
_table: Table
23742377
_snapshot_id: int
23752378
_parent_snapshot_id: Optional[int]
23762379
_added_data_files: List[DataFile]
2377-
_commit_uuid: uuid.UUID
23782380
_transaction: Optional[Transaction]
23792381

2380-
def __init__(self, operation: Operation, table: Table, transaction: Optional[Transaction] = None) -> None:
2382+
def __init__(
2383+
self,
2384+
operation: Operation,
2385+
table: Table,
2386+
commit_uuid: Optional[uuid.UUID] = None,
2387+
transaction: Optional[Transaction] = None,
2388+
) -> None:
2389+
self.commit_uuid = commit_uuid or uuid.uuid4()
23812390
self._operation = operation
23822391
self._table = table
23832392
self._snapshot_id = table.new_snapshot_id()
23842393
# Since we only support the main branch for now
23852394
self._parent_snapshot_id = snapshot.snapshot_id if (snapshot := self._table.current_snapshot()) else None
23862395
self._added_data_files = []
2387-
self._commit_uuid = uuid.uuid4()
23882396
self._transaction = transaction
23892397

23902398
def __enter__(self) -> _MergingSnapshotProducer:
@@ -2408,7 +2416,7 @@ def _existing_manifests(self) -> List[ManifestFile]: ...
24082416
def _manifests(self) -> List[ManifestFile]:
24092417
def _write_added_manifest() -> List[ManifestFile]:
24102418
if self._added_data_files:
2411-
output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self._commit_uuid)
2419+
output_file_location = _new_manifest_path(location=self._table.location(), num=0, commit_uuid=self.commit_uuid)
24122420
with write_manifest(
24132421
format_version=self._table.format_version,
24142422
spec=self._table.spec(),
@@ -2434,7 +2442,8 @@ def _write_delete_manifest() -> List[ManifestFile]:
24342442
# Check if we need to mark the files as deleted
24352443
deleted_entries = self._deleted_entries()
24362444
if len(deleted_entries) > 0:
2437-
output_file_location = _new_manifest_path(location=self._table.location(), num=1, commit_uuid=self._commit_uuid)
2445+
output_file_location = _new_manifest_path(location=self._table.location(), num=1, commit_uuid=self.commit_uuid)
2446+
24382447
with write_manifest(
24392448
format_version=self._table.format_version,
24402449
spec=self._table.spec(),
@@ -2477,7 +2486,7 @@ def commit(self) -> Snapshot:
24772486
summary = self._summary()
24782487

24792488
manifest_list_file_path = _generate_manifest_list_path(
2480-
location=self._table.location(), snapshot_id=self._snapshot_id, attempt=0, commit_uuid=self._commit_uuid
2489+
location=self._table.location(), snapshot_id=self._snapshot_id, attempt=0, commit_uuid=self.commit_uuid
24812490
)
24822491
with write_manifest_list(
24832492
format_version=self._table.metadata.format_version,

0 commit comments

Comments
 (0)