-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-32953][PYTHON][SQL] Add Arrow self_destruct support to toPandas #29818
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @WeichenXu123 who already took a close look. |
Following up here - any other comments? Does this look like a desirable feature, and how do we want to configure it? |
@@ -34,7 +34,7 @@ class PandasConversionMixin(object): | |||
""" | |||
|
|||
@since(1.3) | |||
def toPandas(self): | |||
def toPandas(self, self_destruct=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's name it as selfDestruct
to make the naming role consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
pandas_options = {'date_as_object': True} | ||
if self_destruct: | ||
pandas_options.update({ | ||
'self_destruct': True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind leaving a comment on the codes about this set of parameter configurations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
yield batch | ||
split_batch = pa.RecordBatch.from_arrays([ | ||
pa.concat_arrays([array]) for array in batch | ||
], schema=batch.schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for asking a question without taking a close look but would you mind elaborating why we should do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a comment, but please let me know if it's unclear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks a little strange to me too. Is concat_arrays
actually doing anything here, and if so, wouldn't it do it for the case that selfDestruct
is False too?
cc @BryanCutler FYI. |
ok to test |
Test build #129766 has finished for PR 29818 at commit
|
Test build #129768 has finished for PR 29818 at commit
|
Kubernetes integration test starting |
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test status success |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing this @lidavidm , I agree it could be useful in cases of working with large DataFrames. The last I saw was that the self_destruct
option is experimental. Do you know if or when this might change? I'm a little unsure about adding experimental features, especially if it could lead to issues with the resulting Pandas DataFrame.
@@ -34,7 +34,7 @@ class PandasConversionMixin(object): | |||
""" | |||
|
|||
@since(1.3) | |||
def toPandas(self): | |||
def toPandas(self, selfDestruct=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this be better as an sql config? Since this class is a mixin, I'm not sure the user would see this option from the public api?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change it.
@@ -103,10 +103,22 @@ def toPandas(self): | |||
batches = self.toDF(*tmp_column_names)._collect_as_arrow() | |||
if len(batches) > 0: | |||
table = pyarrow.Table.from_batches(batches) | |||
del batches |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this have any bearing on the buffers self destructing? is it taking into account how many reference counts there are before destructing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - we don't want to hold on to any other references to the buffers, we want the Table to be the only owner. I'll clarify this part here.
yield batch | ||
split_batch = pa.RecordBatch.from_arrays([ | ||
pa.concat_arrays([array]) for array in batch | ||
], schema=batch.schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks a little strange to me too. Is concat_arrays
actually doing anything here, and if so, wouldn't it do it for the case that selfDestruct
is False too?
# use_threads - convert one column at a time | ||
pandas_options.update({ | ||
'self_destruct': True, | ||
'split_blocks': True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary to set with self_destruct
? It might lead to Pandas doing more memory allocation later, I believe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite necessary but good to have, else we may end up allocating a large block if there are a lot of columns of the same type, defeating the point. Pandas may reconsolidate later but that's part of the issue of using Pandas.
I can't reply to this inline somehow, but
|
Ah and for
I can follow up on the experimental status, but AIUI, it's going to just be a long tail of "this Pandas operation didn't expect an immutable backing array" that we would need to flush out anyways over time. We can leave it turned off by default. Also, I see the PyArrow optimization in Spark is itself experimental anyways. |
Running the demo again gives these two plots. While the memory usage looks identical, in the no-self-destruct case, Python gets OOMKilled, while it does not get OOMKilled in the other case. The reason why the memory usage looks so similar is that jemalloc doesn't immediately return unused memory to the OS, but rather, when under memory pressure, signals a background thread to start cleaning up memory; because of this delay, memory still gets consumed, but at a slower pace. Without self-destruct
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #129910 has finished for PR 29818 at commit
|
"for columnar data transfers in PySpark. " + | ||
"This reduces memory usage at the cost of some CPU time. " + | ||
"This optimization applies to: pyspark.sql.DataFrame.toPandas") | ||
.version("3.0.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's change it to 3.1.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, thanks.
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #131452 has finished for PR 29818 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't seem to me we really need this refactoring. Will your unit test still pass if you get only call toPandas()
?
with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}): | ||
# We hold on to the table reference here, so if self destruct didn't work, then | ||
# there would be 2 copies of the data (one in Arrow, one in Pandas), both | ||
# tracked by the Arrow allocator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I missed this comment that you need the Table reference. Otherwise it could be freed and then we lose track of memory allocated right? If that's the case, what about just calling _collect_as_arrow(_force_split_batches=True)
here and perform batches -> table -> pandas directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - we need the table reference. If self destruct worked, then it holds effectively 0 memory; if it didn't work, then it holds a copy of the dataframe.
If we call to_pandas inside the test itself, wouldn't we then be testing Arrow directly, instead of testing how Arrow is used inside Spark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really, it's not ideal but it would still test that splitting and reallocating the columns in _collect_as_arrow()
will allow for self destruction, which is most of additions here. The rest is just calling to_pandas()
with the right options. I would recommend testing something like this:
with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
# get initial pa.total_allocated_bytes()
batches = _collect_as_arrow(_force_split_batches=True)
table = # convert batches to table as done in toPandas()
pdf = table.to_pandas(self_destruct=True, split_blocks=True, use_threads=False)
# get after pa.total_allocated_bytes() and check difference is expected
# Call with the full code path and compare resulting DataFrame for good measure
assert_frames_equal(df.toPandas(), pdf)
That should be sufficient testing, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me - I'll try to update this soon. Thanks for being patient with me here.
Also looks like the 3.1 branch was cut already, so I need to update the flags here to reference 3.2.
b15a307
to
0d9c88a
Compare
Hmm, I'm not sure what's with javadoc generation failing on all the CI tests... |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #133023 has finished for PR 29818 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay @lidavidm . I think the code changes look good now, just a few issues with the testing remain. We need to cover:
- Test that memory stays within a certain bound (which you have)
- Test complete code path with the new config enabled
- Verify the 2 results above are equal to result of
df.toPandas()
with config disabled
@@ -121,6 +137,8 @@ def toPandas(self): | |||
elif isinstance(field.dataType, MapType): | |||
pdf[field.name] = \ | |||
_convert_map_items_to_dict(pdf[field.name]) | |||
# Return both the DataFrame and table for unit testing | |||
# Note that table reference is invalid if self_destruct enabled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be removed now
@@ -191,6 +191,36 @@ def test_pandas_round_trip(self): | |||
pdf_arrow = df.toPandas() | |||
assert_frame_equal(pdf_arrow, pdf) | |||
|
|||
def test_pandas_self_destruct(self): | |||
import pyarrow as pa | |||
rows = 2 ** 4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should test memory usage with more rows than this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about 2 ** 10
?
# Test the self_destruct behavior by testing _collect_as_arrow directly | ||
# Note the sql_conf flags here don't have any real effect as | ||
# we aren't going through df.toPandas | ||
with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this conf has any affect with the enclosing code block
# Should be around 1x the data size (table should not hold on to any memory) | ||
self.assertGreaterEqual(difference, 0.9 * expected_bytes) | ||
self.assertLessEqual(difference, 1.1 * expected_bytes) | ||
with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": False}): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block isn't quite what I was suggesting. We should compare the Pandas DataFrame resulting from df.toPandas()
with spark.sql.execution.arrow.pyspark.selfDestruct.enabled
False to pdf_split
. Also, we will need to call df.toPandas()
with spark.sql.execution.arrow.pyspark.selfDestruct.enabled
True and compare that as well.
if not isinstance(batch_or_indices, pa.RecordBatch): | ||
results.append(batch_or_indices) | ||
else: | ||
split_batch = pa.RecordBatch.from_arrays([ | ||
pa.concat_arrays([array]) | ||
for array in batch_or_indices | ||
], schema=batch_or_indices.schema) | ||
results.append(split_batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if not isinstance(batch_or_indices, pa.RecordBatch): | |
results.append(batch_or_indices) | |
else: | |
split_batch = pa.RecordBatch.from_arrays([ | |
pa.concat_arrays([array]) | |
for array in batch_or_indices | |
], schema=batch_or_indices.schema) | |
results.append(split_batch) | |
if isinstance(batch_or_indices, pa.RecordBatch): | |
batch_or_indices = pa.RecordBatch.from_arrays([ | |
pa.concat_arrays([array]) | |
for array in batch_or_indices | |
], schema=batch_or_indices.schema) | |
results.append(batch_or_indices) |
Kubernetes integration test starting |
Test build #134628 has finished for PR 29818 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I'm going to run tests once more for good measure and I'll merge after passing.
restest this please |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #135086 has finished for PR 29818 at commit
|
Thanks for being patient with me through this very long review! |
Merged to master, apologies for all of the delays and thanks @lidavidm for all the work! We should add this to the documentation too, if you could do that it would be great. |
### What changes were proposed in this pull request? As a followup for #29818, document caveats of using the Arrow selfDestruct option in toPandas, which include: - toPandas() may be slower; - the resulting dataframe may not support some Pandas operations due to immutable backing arrays. ### Why are the changes needed? This will hopefully reduce user confusion as with SPARK-34463. ### Does this PR introduce _any_ user-facing change? Yes - documentation is updated and a config setting description is updated to clearly indicate the config is experimental. ### How was this patch tested? This is a documentation-only change. Closes #31738 from lidavidm/spark-34463. Authored-by: David Li <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
…ure for `toPandas` ### What changes were proposed in this pull request? Implement Arrow `self_destruct` of `toPandas` for memory savings. Now the Spark configuration `spark.sql.execution.arrow.pyspark.selfDestruct.enabled` can be used to enable PyArrow’s `self_destruct` feature in Spark Connect, which can save memory when creating a Pandas DataFrame via `toPandas` by freeing Arrow-allocated memory while building the Pandas DataFrame. ### Why are the changes needed? Reach parity with vanilla PySpark. The PR is a mirror of #29818 for Spark Connect. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #42079 from xinrong-meng/self_destruct. Authored-by: Xinrong Meng <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…ure for `toPandas` ### What changes were proposed in this pull request? Implement Arrow `self_destruct` of `toPandas` for memory savings. Now the Spark configuration `spark.sql.execution.arrow.pyspark.selfDestruct.enabled` can be used to enable PyArrow’s `self_destruct` feature in Spark Connect, which can save memory when creating a Pandas DataFrame via `toPandas` by freeing Arrow-allocated memory while building the Pandas DataFrame. ### Why are the changes needed? Reach parity with vanilla PySpark. The PR is a mirror of #29818 for Spark Connect. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #42079 from xinrong-meng/self_destruct. Authored-by: Xinrong Meng <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 78b3345) Signed-off-by: Hyukjin Kwon <[email protected]>
…ure for `toPandas` ### What changes were proposed in this pull request? Implement Arrow `self_destruct` of `toPandas` for memory savings. Now the Spark configuration `spark.sql.execution.arrow.pyspark.selfDestruct.enabled` can be used to enable PyArrow’s `self_destruct` feature in Spark Connect, which can save memory when creating a Pandas DataFrame via `toPandas` by freeing Arrow-allocated memory while building the Pandas DataFrame. ### Why are the changes needed? Reach parity with vanilla PySpark. The PR is a mirror of apache#29818 for Spark Connect. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes apache#42079 from xinrong-meng/self_destruct. Authored-by: Xinrong Meng <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
Creating a Pandas dataframe via Apache Arrow currently can use twice as much memory as the final result, because during the conversion, both Pandas and Arrow retain a copy of the data. Arrow has a "self-destruct" mode now (Arrow >= 0.16) to avoid this, by freeing each column after conversion. This PR integrates support for this in toPandas, handling a couple of edge cases:
self_destruct has no effect unless the memory is allocated appropriately, which is handled in the Arrow serializer here. Essentially, the issue is that self_destruct frees memory column-wise, but Arrow record batches are oriented row-wise:
In this scenario, Arrow will drop references to all of column 0's chunks, but no memory will actually be freed, as the chunks were just slices of an underlying allocation. The PR copies each column into its own allocation so that memory is instead arranged as so:
The optimization is disabled by default, and can be enabled with the Spark SQL conf "spark.sql.execution.arrow.pyspark.selfDestruct.enabled" set to "true". We can't always apply this optimization because it's more likely to generate a dataframe with immutable buffers, which Pandas doesn't always handle well, and because it is slower overall (since it only converts one column at a time instead of in parallel).
Why are the changes needed?
This lets us load larger datasets - in particular, with N bytes of memory, before we could never load a dataset bigger than N/2 bytes; now the overhead is more like N/1.25 or so.
Does this PR introduce any user-facing change?
Yes - it adds a new SQL conf "spark.sql.execution.arrow.pyspark.selfDestruct.enabled"
How was this patch tested?
See the mailing list - it was tested with Python memory_profiler. Unit tests added to check memory within certain bounds and correctness with the option enabled.