Skip to content

Race condition on Table.scan with limit #542

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
bigluck opened this issue Mar 23, 2024 · 2 comments · Fixed by #545
Closed

Race condition on Table.scan with limit #542

bigluck opened this issue Mar 23, 2024 · 2 comments · Fixed by #545

Comments

@bigluck
Copy link
Contributor

bigluck commented Mar 23, 2024

Apache Iceberg version

0.6.0 (latest release)

Please describe the bug 🐞

I'm facing a race condition when doing table.scan on my code. For some strange reason, the code exits before getting the final table.

This is my code:

from pyiceberg.table import StaticTable
from pyiceberg.expressions import AlwaysTrue


table = StaticTable.from_metadata(
    metadata_location='s3a://my_s3_bucket/iceberg/taxi_fhvhv_7eba066f-7498-4a8a-b932-bf2dbcc938fd/meta...',
)
res = table.scan(
    selected_fields=("*",),
    row_filter=AlwaysTrue(),
    limit=10_000,
)

print('A >>', res)

a_res = res.to_arrow()
print('B >>', a_res)
print('C >>', a_res.num_rows)

Which returns:

root@45daaeef1ce1:/# python test.py
A >> <pyiceberg.table.DataScan object at 0x7fece38ec6d0>
B >> pyarrow.Table
hvfhs_license_num: string
dispatching_base_num: string
originating_base_num: string
request_datetime: timestamp[us, tz=UTC]
on_scene_datetime: timestamp[us, tz=UTC]
pickup_datetime: timestamp[us, tz=UTC]
dropoff_datetime: timestamp[us, tz=UTC]
PULocationID: int64
DOLocationID: int64
trip_miles: double
trip_time: int64
base_passenger_fare: double
tolls: double
bcf: double
sales_tax: double
congestion_surcharge: double
airport_fee: int32
tips: double
driver_pay: double
shared_request_flag: string
shared_match_flag: string
access_a_ride_flag: string
wav_request_flag: string
wav_match_flag: string
----
hvfhs_license_num: []
dispatching_base_num: []
originating_base_num: []
request_datetime: []
on_scene_datetime: []
pickup_datetime: []
dropoff_datetime: []
PULocationID: []
DOLocationID: []
trip_miles: []
...
C >> 0

I think the problem happens here:

row_counts.append(len(arrow_table))
return to_requested_schema(projected_schema, file_project_schema, arrow_table)

The code modifies the row_counts array before returning the table, but if multiple tasks are running concurrently, the next task that starts executing the _task_to_table function will return None due to

def _task_to_table(
fs: FileSystem,
task: FileScanTask,
bound_row_filter: BooleanExpression,
projected_schema: Schema,
projected_field_ids: Set[int],
positional_deletes: Optional[List[ChunkedArray]],
case_sensitive: bool,
row_counts: List[int],
limit: Optional[int] = None,
name_mapping: Optional[NameMapping] = None,
) -> Optional[pa.Table]:
if limit and sum(row_counts) >= limit:
return None

I think it happens because the original task with the data is still processing the content of the table here:

return to_requested_schema(projected_schema, file_project_schema, arrow_table)

So now, I suppose, what happens is that the task that returned None is processed before the real task with the table content, indeed the completed_futures list now contains only a task with None, witch course the code to return an empty table:

https://github.com/apache/iceberg-python/blob/6989b92c2d449beb9fe4817c64f619ea5bfc81dc/pyiceberg/io/pyarrow.py#L1111C1-L1116C18

This is the content of the completed_futures & tables variables on the project_table fn:

>>>>> completed_futures SortedKeyList([<Future at 0x7fa7e4b97fd0 state=finished returned NoneType>], key=<function project_table.<locals>.<lambda> at 0x7fa90316c4a0>)
>>>>> tables []

And by modifying the loop with:

    # for consistent ordering, we need to maintain future order
    futures_index = {f: i for i, f in enumerate(futures)}
    completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f])
    for future in concurrent.futures.as_completed(futures):
        completed_futures.add(future)

        # stop early if limit is satisfied
        if limit is not None and sum(row_counts) >= limit:
            print('>>>>> ', limit, sum(row_counts), future.result())
            break

I got:

>>>>>  10000 10000 None
@kevinjqliu
Copy link
Contributor

So to summarize the above.
There's a bug with table.scan related to Futures execution when a limit is set. The bug is related to the order of the Futures returned and the shared state row_counts.

When the executor is used to run multiple Futures, each Future checks the shared state row_counts first before proceeding.

The bug is when one Future updates the shared state row_counts (L1021). Before this specific Future returns and completes (L1023), another Future check the shared state row_counts (L953) and returns first (L954).

This leads to the correct row_counts but incorrect completed_futures since the Future returned is not the only that modified the row_counts (L1112).

@kevinjqliu
Copy link
Contributor

One potential solution is to have _task_to_table return the row_counts and not modify a shared list.
The row_counts can be aggregated as Futures complete.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants