Skip to content

[SPARK-32953][PYTHON][SQL] Add Arrow self_destruct support to toPandas #29818

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions python/pyspark/sql/pandas/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,29 @@ def toPandas(self):
import pyarrow
# Rename columns to avoid duplicated column names.
tmp_column_names = ['col_{}'.format(i) for i in range(len(self.columns))]
batches = self.toDF(*tmp_column_names)._collect_as_arrow()
self_destruct = self.sql_ctx._conf.arrowPySparkSelfDestructEnabled()
batches = self.toDF(*tmp_column_names)._collect_as_arrow(
split_batches=self_destruct)
if len(batches) > 0:
table = pyarrow.Table.from_batches(batches)
# Ensure only the table has a reference to the batches, so that
# self_destruct (if enabled) is effective
del batches
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this have any bearing on the buffers self destructing? is it taking into account how many reference counts there are before destructing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - we don't want to hold on to any other references to the buffers, we want the Table to be the only owner. I'll clarify this part here.

# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
# values, but we should use datetime.date to match the behavior with when
# Arrow optimization is disabled.
pdf = table.to_pandas(date_as_object=True)
pandas_options = {'date_as_object': True}
if self_destruct:
# Configure PyArrow to use as little memory as possible:
# self_destruct - free columns as they are converted
# split_blocks - create a separate Pandas block for each column
# use_threads - convert one column at a time
pandas_options.update({
'self_destruct': True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind leaving a comment on the codes about this set of parameter configurations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

'split_blocks': True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary to set with self_destruct? It might lead to Pandas doing more memory allocation later, I believe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite necessary but good to have, else we may end up allocating a large block if there are a lot of columns of the same type, defeating the point. Pandas may reconsolidate later but that's part of the issue of using Pandas.

'use_threads': False,
})
pdf = table.to_pandas(**pandas_options)
# Rename back to the original column names.
pdf.columns = self.columns
for field in self.schema:
Expand Down Expand Up @@ -225,11 +241,16 @@ def _to_corrected_pandas_type(dt):
else:
return None

def _collect_as_arrow(self):
def _collect_as_arrow(self, split_batches=False):
"""
Returns all records as a list of ArrowRecordBatches, pyarrow must be installed
and available on driver and worker Python environments.
This is an experimental feature.

:param split_batches: split batches such that each column is in its own allocation, so
that the selfDestruct optimization is effective; default False.

.. note:: Experimental.
"""
from pyspark.sql.dataframe import DataFrame

Expand All @@ -240,7 +261,26 @@ def _collect_as_arrow(self):

# Collect list of un-ordered batches where last element is a list of correct order indices
try:
results = list(_load_from_socket((port, auth_secret), ArrowCollectSerializer()))
batch_stream = _load_from_socket((port, auth_secret), ArrowCollectSerializer())
if split_batches:
# When spark.sql.execution.arrow.pyspark.selfDestruct.enabled, ensure
# each column in each record batch is contained in its own allocation.
# Otherwise, selfDestruct does nothing; it frees each column as its
# converted, but each column will actually be a list of slices of record
# batches, and so no memory is actually freed until all columns are
# converted.
import pyarrow as pa
results = []
for batch_or_indices in batch_stream:
if isinstance(batch_or_indices, pa.RecordBatch):
batch_or_indices = pa.RecordBatch.from_arrays([
# This call actually reallocates the array
pa.concat_arrays([array])
for array in batch_or_indices
], schema=batch_or_indices.schema)
results.append(batch_or_indices)
else:
results = list(batch_stream)
finally:
# Join serving thread and raise any exceptions from collectAsArrowToPython
jsocket_auth_server.getResult()
Expand Down
33 changes: 32 additions & 1 deletion python/pyspark/sql/tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from pyspark import SparkContext, SparkConf
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.functions import rand, udf
from pyspark.sql.types import StructType, StringType, IntegerType, LongType, \
FloatType, DoubleType, DecimalType, DateType, TimestampType, BinaryType, StructField, \
ArrayType, NullType
Expand Down Expand Up @@ -196,6 +196,37 @@ def test_pandas_round_trip(self):
pdf_arrow = df.toPandas()
assert_frame_equal(pdf_arrow, pdf)

def test_pandas_self_destruct(self):
import pyarrow as pa
rows = 2 ** 10
cols = 4
expected_bytes = rows * cols * 8
df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
# Test the self_destruct behavior by testing _collect_as_arrow directly
allocation_before = pa.total_allocated_bytes()
batches = df._collect_as_arrow(split_batches=True)
table = pa.Table.from_batches(batches)
del batches
pdf_split = table.to_pandas(self_destruct=True, split_blocks=True, use_threads=False)
allocation_after = pa.total_allocated_bytes()
difference = allocation_after - allocation_before
# Should be around 1x the data size (table should not hold on to any memory)
self.assertGreaterEqual(difference, 0.9 * expected_bytes)
self.assertLessEqual(difference, 1.1 * expected_bytes)

with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": False}):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block isn't quite what I was suggesting. We should compare the Pandas DataFrame resulting from df.toPandas() with spark.sql.execution.arrow.pyspark.selfDestruct.enabled False to pdf_split. Also, we will need to call df.toPandas() with spark.sql.execution.arrow.pyspark.selfDestruct.enabled True and compare that as well.

no_self_destruct_pdf = df.toPandas()
# Note while memory usage is 2x data size here (both table and pdf hold on to
# memory), in this case Arrow still only tracks 1x worth of memory (since the
# batches are not allocated by Arrow in this case), so we can't make any
# assertions here

with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
self_destruct_pdf = df.toPandas()

assert_frame_equal(pdf_split, no_self_destruct_pdf)
assert_frame_equal(pdf_split, self_destruct_pdf)

def test_filtered_frame(self):
df = self.spark.range(3).toDF("i")
pdf = df.filter("i < 0").toPandas()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2023,6 +2023,17 @@ object SQLConf {
.version("3.0.0")
.fallbackConf(ARROW_EXECUTION_ENABLED)

val ARROW_PYSPARK_SELF_DESTRUCT_ENABLED =
buildConf("spark.sql.execution.arrow.pyspark.selfDestruct.enabled")
.doc("When true, make use of Apache Arrow's self-destruct and split-blocks options " +
"for columnar data transfers in PySpark, when converting from Arrow to Pandas. " +
"This reduces memory usage at the cost of some CPU time. " +
"This optimization applies to: pyspark.sql.DataFrame.toPandas " +
"when 'spark.sql.execution.arrow.pyspark.enabled' is set.")
.version("3.2.0")
.booleanConf
.createWithDefault(false)

val PYSPARK_JVM_STACKTRACE_ENABLED =
buildConf("spark.sql.pyspark.jvmStacktrace.enabled")
.doc("When true, it shows the JVM stacktrace in the user-facing PySpark exception " +
Expand Down Expand Up @@ -3577,6 +3588,8 @@ class SQLConf extends Serializable with Logging {

def arrowPySparkEnabled: Boolean = getConf(ARROW_PYSPARK_EXECUTION_ENABLED)

def arrowPySparkSelfDestructEnabled: Boolean = getConf(ARROW_PYSPARK_SELF_DESTRUCT_ENABLED)

def pysparkJVMStacktraceEnabled: Boolean = getConf(PYSPARK_JVM_STACKTRACE_ENABLED)

def arrowSparkREnabled: Boolean = getConf(ARROW_SPARKR_EXECUTION_ENABLED)
Expand Down