Skip to content

[Bug] Error in overwrite(): pyarrow.lib.ArrowInvalid: offset overflow with large dataset (~3M rows) #1491

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
3 tasks
sundaresanr opened this issue Jan 6, 2025 · 9 comments · Fixed by #1555
Closed
3 tasks

Comments

@sundaresanr
Copy link

Apache Iceberg version

0.8.1 (latest release)

Please describe the bug 🐞

Encountered the following error while calling overwrite() on a dataset with over 3 million rows (~1GB parquet file size; ~6 GB pyarrow table size):

pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays

Backtrace:

txn.overwrite(pat, overwrite_filter=overwrite_filter)
  File ".../site-packages/pyiceberg/table/__init__.py", line 470, in overwrite
    for data_file in data_files:
  File ".../site-packages/pyiceberg/io/pyarrow.py", line 2636, in _dataframe_to_data_files
    partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../site-packages/pyiceberg/io/pyarrow.py", line 2726, in _determine_partitions
    arrow_table = arrow_table.take(sort_indices)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/table.pxi", line 2133, in pyarrow.lib._Tabular.take
  File ".../site-packages/pyarrow/compute.py", line 487, in take
    return call_function('take', [data, indices], options, memory_pool)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/_compute.pyx", line 590, in pyarrow._compute.call_function
  File "pyarrow/_compute.pyx", line 385, in pyarrow._compute.Function.call
  File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays

Seems to be related to:

apache/arrow#25822
apache/arrow#33049

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@sundaresanr sundaresanr changed the title [Bug] Error in overwrite(): pyarrow.lib.ArrowInvalid: offset overflow with large dataset (~3M rows, ~1GB) [Bug] Error in overwrite(): pyarrow.lib.ArrowInvalid: offset overflow with large dataset (~3M rows) Jan 7, 2025
@kevinjqliu
Copy link
Contributor

thanks for reporting this issue @sundaresanr do you have an example so i can reproduce this issue?

@kevinjqliu
Copy link
Contributor

I've recreated this example from apache/arrow#33049 and was able to append successfully.

import numpy as np
import pyarrow as pa

from pyiceberg.catalog.sql import SqlCatalog

# Create a large string
x = str(np.random.randint(low=0, high=1000, size=(30000,)).tolist())
t = pa.chunked_array([x] * 20_000)
# Create pa.table
arrow_table = pa.table({"a": t})

warehouse_path = "/tmp/warehouse"
catalog = SqlCatalog(
    "default",
    uri=f"sqlite:///{warehouse_path}/pyiceberg_catalog.db", warehouse=f"file://{warehouse_path}",
)
catalog.create_namespace_if_not_exists("default")
try:
    catalog.drop_table("default.locations")
except:
    pass
table = catalog.create_table("default.locations", arrow_table.schema)
table.append(arrow_table)

@sundaresanr
Copy link
Author

@kevinjqliu The issue can be reproduced with a partitioned table.

As you can see in the backtrace, the issue is with the call to _determine_partitions(), which calls arrow_table.take()

You can reproduce the issue with this code [added partition spec]

import numpy as np
import pyarrow as pa

from pyiceberg import transforms
from pyiceberg.catalog.sql import SqlCatalog

x = np.random.randint(low=0, high=1000, size=(30_000,)).tolist()
ta = pa.chunked_array([x] * 10_000)
y = ["fixed_string"] * 30_000
tb = pa.chunked_array([y] * 10_000)
# Create pa.table
arrow_table = pa.table({"a": ta, "b": tb})

warehouse_path = "/tmp/warehouse"
catalog = SqlCatalog(
    "default",
    uri=f"sqlite:///{warehouse_path}/pyiceberg_catalog.db", warehouse=f"file://{warehouse_path}",
)
catalog.create_namespace_if_not_exists("default")
try:
    catalog.drop_table("default.locations")
except:
    pass
table = catalog.create_table("default.locations", arrow_table.schema)
with table.update_spec() as update_spec:
    update_spec.add_field("b", transforms.IdentityTransform(), "pb")
table.append(arrow_table)

@bigluck
Copy link
Contributor

bigluck commented Jan 15, 2025

We're facing the same problem by using pyiceberg 0.7.1 :(

@Fokko
Copy link
Contributor

Fokko commented Jan 17, 2025

I can take a stab at this if nobody objects.

@bigluck Why are you still at 0.7.1? 😆

@bigluck
Copy link
Contributor

bigluck commented Jan 17, 2025

@Fokko I'm getting older :D
BTW I think there's an easy fix. This comment is pretty interesting:

apache/arrow#33049 (comment)

Yes, it essentially is. We use 32-bit offsets for the default string and binary types. This means you always have to be careful when generating the arrays to chunk them if they get too large. Or otherwise use the LargeString variant, which uses 64-bit offsets. Polars, for example, takes the latter approach; they always use LargeString.

We tested it and it works. In our case we casted all the string types into large_string before passing the data to pyiceberg.

Knowing pyiceberg is already casting (during a scan operation) all the string into large_string, I think it's ok if pyiceberg does the same thing before writing the data into the lake (before passing the data to arrow).

It should be a noop (this code does not cover all the cases):

    def upcast_to_large_types(old_schema: pa.Schema) -> pa.Schema:
        fields = []
        for field in old_schema:
            if pt.is_string(field.type):
                fields.append(pa.field(field.name, pa.large_string()))
            elif pt.is_binary(field.type):
                fields.append(pa.field(field.name, pa.large_binary()))
            else:
                fields.append(field)

        return pa.schema(fields)

@Fokko
Copy link
Contributor

Fokko commented Jan 17, 2025

@bigluck I was suspecting something like that, thanks for testing that out. One of the issues is that we combine the Array into a single one, which can easily go over the maximum offset. Let me see if there is a way without having bump everything to large_.

Fokko added a commit to Fokko/iceberg-python that referenced this issue Jan 20, 2025
This was already being discussed back here:

apache#208 (comment)

This PR changes from doing a sort, and then a single pass over the
table to the the approach where we determine the unique partition tuples
then filter on them one by one.

Fixes apache#1491

Because the sort caused buffers to be joined where it would overflow
in Arrow. I think this is an issue on the Arrow side, and it should
automatically break up into smaller buffers. The `combine_chunks`
method does this correctly.

Now:

```
0.42877754200890195
Run 1 took: 0.2507691659993725
Run 2 took: 0.24833179199777078
Run 3 took: 0.24401691700040828
Run 4 took: 0.2419595829996979
Average runtime of 0.28 seconds
```

Before:

```
Run 0 took: 1.0768639159941813
Run 1 took: 0.8784021250030492
Run 2 took: 0.8486490420036716
Run 3 took: 0.8614017910003895
Run 4 took: 0.8497851670108503
Average runtime of 0.9 seconds
```

So it comes with a nice speedup as well :)
Fokko added a commit to Fokko/iceberg-python that referenced this issue Jan 20, 2025
This was already being discussed back here:

apache#208 (comment)

This PR changes from doing a sort, and then a single pass over the
table to the the approach where we determine the unique partition tuples
then filter on them one by one.

Fixes apache#1491

Because the sort caused buffers to be joined where it would overflow
in Arrow. I think this is an issue on the Arrow side, and it should
automatically break up into smaller buffers. The `combine_chunks`
method does this correctly.

Now:

```
0.42877754200890195
Run 1 took: 0.2507691659993725
Run 2 took: 0.24833179199777078
Run 3 took: 0.24401691700040828
Run 4 took: 0.2419595829996979
Average runtime of 0.28 seconds
```

Before:

```
Run 0 took: 1.0768639159941813
Run 1 took: 0.8784021250030492
Run 2 took: 0.8486490420036716
Run 3 took: 0.8614017910003895
Run 4 took: 0.8497851670108503
Average runtime of 0.9 seconds
```

So it comes with a nice speedup as well :)
Fokko added a commit to Fokko/iceberg-python that referenced this issue Jan 20, 2025
This was already being discussed back here:

apache#208 (comment)

This PR changes from doing a sort, and then a single pass over the
table to the the approach where we determine the unique partition tuples
then filter on them one by one.

Fixes apache#1491

Because the sort caused buffers to be joined where it would overflow
in Arrow. I think this is an issue on the Arrow side, and it should
automatically break up into smaller buffers. The `combine_chunks`
method does this correctly.

Now:

```
0.42877754200890195
Run 1 took: 0.2507691659993725
Run 2 took: 0.24833179199777078
Run 3 took: 0.24401691700040828
Run 4 took: 0.2419595829996979
Average runtime of 0.28 seconds
```

Before:

```
Run 0 took: 1.0768639159941813
Run 1 took: 0.8784021250030492
Run 2 took: 0.8486490420036716
Run 3 took: 0.8614017910003895
Run 4 took: 0.8497851670108503
Average runtime of 0.9 seconds
```

So it comes with a nice speedup as well :)
Fokko added a commit to Fokko/iceberg-python that referenced this issue Jan 20, 2025
This was already being discussed back here:

apache#208 (comment)

This PR changes from doing a sort, and then a single pass over the
table to the the approach where we determine the unique partition tuples
then filter on them one by one.

Fixes apache#1491

Because the sort caused buffers to be joined where it would overflow
in Arrow. I think this is an issue on the Arrow side, and it should
automatically break up into smaller buffers. The `combine_chunks`
method does this correctly.

Now:

```
0.42877754200890195
Run 1 took: 0.2507691659993725
Run 2 took: 0.24833179199777078
Run 3 took: 0.24401691700040828
Run 4 took: 0.2419595829996979
Average runtime of 0.28 seconds
```

Before:

```
Run 0 took: 1.0768639159941813
Run 1 took: 0.8784021250030492
Run 2 took: 0.8486490420036716
Run 3 took: 0.8614017910003895
Run 4 took: 0.8497851670108503
Average runtime of 0.9 seconds
```

So it comes with a nice speedup as well :)
@Fokko
Copy link
Contributor

Fokko commented Jan 20, 2025

@bigluck I got a solution that fixes the issue: #1539 LMKWYT

@bigluck
Copy link
Contributor

bigluck commented Jan 21, 2025

@Fokko I can't test the fix in prod with our data, but #1539 works locally over some synthetic data ;)

Thanks so much!

Fokko added a commit to Fokko/iceberg-python that referenced this issue Jan 21, 2025
This was already being discussed back here:

apache#208 (comment)

This PR changes from doing a sort, and then a single pass over the
table to the the approach where we determine the unique partition tuples
then filter on them one by one.

Fixes apache#1491

Because the sort caused buffers to be joined where it would overflow
in Arrow. I think this is an issue on the Arrow side, and it should
automatically break up into smaller buffers. The `combine_chunks`
method does this correctly.

Now:

```
0.42877754200890195
Run 1 took: 0.2507691659993725
Run 2 took: 0.24833179199777078
Run 3 took: 0.24401691700040828
Run 4 took: 0.2419595829996979
Average runtime of 0.28 seconds
```

Before:

```
Run 0 took: 1.0768639159941813
Run 1 took: 0.8784021250030492
Run 2 took: 0.8486490420036716
Run 3 took: 0.8614017910003895
Run 4 took: 0.8497851670108503
Average runtime of 0.9 seconds
```

So it comes with a nice speedup as well :)
Fokko added a commit that referenced this issue Jan 23, 2025
Second attempt of #1539

This was already being discussed back here:
#208 (comment)

This PR changes from doing a sort, and then a single pass over the table
to the approach where we determine the unique partition tuples filter on
them individually.

Fixes #1491

Because the sort caused buffers to be joined where it would overflow in
Arrow. I think this is an issue on the Arrow side, and it should
automatically break up into smaller buffers. The `combine_chunks` method
does this correctly.

Now:

```
0.42877754200890195
Run 1 took: 0.2507691659993725
Run 2 took: 0.24833179199777078
Run 3 took: 0.24401691700040828
Run 4 took: 0.2419595829996979
Average runtime of 0.28 seconds
```

Before:

```
Run 0 took: 1.0768639159941813
Run 1 took: 0.8784021250030492
Run 2 took: 0.8486490420036716
Run 3 took: 0.8614017910003895
Run 4 took: 0.8497851670108503
Average runtime of 0.9 seconds
```

So it comes with a nice speedup as well :)

---------

Co-authored-by: Kevin Liu <[email protected]>
Fokko added a commit that referenced this issue Apr 16, 2025
<!--
Thanks for opening a pull request!
-->

<!-- In the case this PR will resolve an issue, please replace
${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
<!-- Closes #${GITHUB_ISSUE_ID} -->

# Rationale for this change

Found out I broke this myself after doing a `git bisect`:

```
36d383d is the first bad commit
commit 36d383d
Author: Fokko Driesprong <[email protected]>
Date:   Thu Jan 23 07:50:54 2025 +0100

    PyArrow: Avoid buffer-overflow by avoid doing a sort (#1555)
    
    Second attempt of #1539
    
    This was already being discussed back here:
    #208 (comment)
    
    This PR changes from doing a sort, and then a single pass over the table
    to the approach where we determine the unique partition tuples filter on
    them individually.
    
    Fixes #1491
    
    Because the sort caused buffers to be joined where it would overflow in
    Arrow. I think this is an issue on the Arrow side, and it should
    automatically break up into smaller buffers. The `combine_chunks` method
    does this correctly.
    
    Now:
    
    ```
    0.42877754200890195
    Run 1 took: 0.2507691659993725
    Run 2 took: 0.24833179199777078
    Run 3 took: 0.24401691700040828
    Run 4 took: 0.2419595829996979
    Average runtime of 0.28 seconds
    ```
    
    Before:
    
    ```
    Run 0 took: 1.0768639159941813
    Run 1 took: 0.8784021250030492
    Run 2 took: 0.8486490420036716
    Run 3 took: 0.8614017910003895
    Run 4 took: 0.8497851670108503
    Average runtime of 0.9 seconds
    ```
    
    So it comes with a nice speedup as well :)
    
    ---------
    
    Co-authored-by: Kevin Liu <[email protected]>

 pyiceberg/io/pyarrow.py                    |  129 ++-
 pyiceberg/partitioning.py                  |   39 +-
 pyiceberg/table/__init__.py                |    6 +-
 pyproject.toml                             |    1 +
 tests/benchmark/test_benchmark.py          |   72 ++
 tests/integration/test_partitioning_key.py | 1299 ++++++++++++++--------------
 tests/table/test_locations.py              |    2 +-
 7 files changed, 805 insertions(+), 743 deletions(-)
 create mode 100644 tests/benchmark/test_benchmark.py
```

Closes #1917


# Are these changes tested?

# Are there any user-facing changes?

<!-- In the case of user-facing changes, please add the changelog label.
-->
Fokko added a commit that referenced this issue Apr 17, 2025
<!--
Thanks for opening a pull request!
-->

<!-- In the case this PR will resolve an issue, please replace
${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
<!-- Closes #${GITHUB_ISSUE_ID} -->

Found out I broke this myself after doing a `git bisect`:

```
36d383d is the first bad commit
commit 36d383d
Author: Fokko Driesprong <[email protected]>
Date:   Thu Jan 23 07:50:54 2025 +0100

    PyArrow: Avoid buffer-overflow by avoid doing a sort (#1555)

    Second attempt of #1539

    This was already being discussed back here:
    #208 (comment)

    This PR changes from doing a sort, and then a single pass over the table
    to the approach where we determine the unique partition tuples filter on
    them individually.

    Fixes #1491

    Because the sort caused buffers to be joined where it would overflow in
    Arrow. I think this is an issue on the Arrow side, and it should
    automatically break up into smaller buffers. The `combine_chunks` method
    does this correctly.

    Now:

    ```
    0.42877754200890195
    Run 1 took: 0.2507691659993725
    Run 2 took: 0.24833179199777078
    Run 3 took: 0.24401691700040828
    Run 4 took: 0.2419595829996979
    Average runtime of 0.28 seconds
    ```

    Before:

    ```
    Run 0 took: 1.0768639159941813
    Run 1 took: 0.8784021250030492
    Run 2 took: 0.8486490420036716
    Run 3 took: 0.8614017910003895
    Run 4 took: 0.8497851670108503
    Average runtime of 0.9 seconds
    ```

    So it comes with a nice speedup as well :)

    ---------

    Co-authored-by: Kevin Liu <[email protected]>

 pyiceberg/io/pyarrow.py                    |  129 ++-
 pyiceberg/partitioning.py                  |   39 +-
 pyiceberg/table/__init__.py                |    6 +-
 pyproject.toml                             |    1 +
 tests/benchmark/test_benchmark.py          |   72 ++
 tests/integration/test_partitioning_key.py | 1299 ++++++++++++++--------------
 tests/table/test_locations.py              |    2 +-
 7 files changed, 805 insertions(+), 743 deletions(-)
 create mode 100644 tests/benchmark/test_benchmark.py
```

Closes #1917

<!-- In the case of user-facing changes, please add the changelog label.
-->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants