Skip to content

Commit 78b3345

Browse files
xinrong-mengHyukjinKwon
authored andcommitted
[SPARK-44486][PYTHON][CONNECT] Implement PyArrow self_destruct feature for toPandas
### What changes were proposed in this pull request? Implement Arrow `self_destruct` of `toPandas` for memory savings. Now the Spark configuration `spark.sql.execution.arrow.pyspark.selfDestruct.enabled` can be used to enable PyArrow’s `self_destruct` feature in Spark Connect, which can save memory when creating a Pandas DataFrame via `toPandas` by freeing Arrow-allocated memory while building the Pandas DataFrame. ### Why are the changes needed? Reach parity with vanilla PySpark. The PR is a mirror of #29818 for Spark Connect. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #42079 from xinrong-meng/self_destruct. Authored-by: Xinrong Meng <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 58e5d86 commit 78b3345

File tree

2 files changed

+55
-7
lines changed

2 files changed

+55
-7
lines changed

python/pyspark/sql/connect/client/core.py

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -757,14 +757,33 @@ def to_pandas(self, plan: pb2.Plan) -> "pd.DataFrame":
757757
logger.info(f"Executing plan {self._proto_to_string(plan)}")
758758
req = self._execute_plan_request_with_metadata()
759759
req.plan.CopyFrom(plan)
760-
table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(req)
760+
(self_destruct_conf,) = self.get_config_with_defaults(
761+
("spark.sql.execution.arrow.pyspark.selfDestruct.enabled", "false"),
762+
)
763+
self_destruct = cast(str, self_destruct_conf).lower() == "true"
764+
table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(
765+
req, self_destruct=self_destruct
766+
)
761767
assert table is not None
762768

763769
schema = schema or from_arrow_schema(table.schema, prefer_timestamp_ntz=True)
764770
assert schema is not None and isinstance(schema, StructType)
765771

766772
# Rename columns to avoid duplicated column names.
767-
pdf = table.rename_columns([f"col_{i}" for i in range(table.num_columns)]).to_pandas()
773+
renamed_table = table.rename_columns([f"col_{i}" for i in range(table.num_columns)])
774+
if self_destruct:
775+
# Configure PyArrow to use as little memory as possible:
776+
# self_destruct - free columns as they are converted
777+
# split_blocks - create a separate Pandas block for each column
778+
# use_threads - convert one column at a time
779+
pandas_options = {
780+
"self_destruct": True,
781+
"split_blocks": True,
782+
"use_threads": False,
783+
}
784+
pdf = renamed_table.to_pandas(**pandas_options)
785+
else:
786+
pdf = renamed_table.to_pandas()
768787
pdf.columns = schema.names
769788

770789
if len(pdf.columns) > 0:
@@ -1108,7 +1127,7 @@ def _execute_and_fetch_as_iterator(
11081127
self._handle_error(error)
11091128

11101129
def _execute_and_fetch(
1111-
self, req: pb2.ExecutePlanRequest
1130+
self, req: pb2.ExecutePlanRequest, self_destruct: bool = False
11121131
) -> Tuple[
11131132
Optional["pa.Table"],
11141133
Optional[StructType],
@@ -1144,7 +1163,27 @@ def _execute_and_fetch(
11441163
)
11451164

11461165
if len(batches) > 0:
1147-
table = pa.Table.from_batches(batches=batches)
1166+
if self_destruct:
1167+
results = []
1168+
for batch in batches:
1169+
# self_destruct frees memory column-wise, but Arrow record batches are
1170+
# oriented row-wise, so copies each column into its own allocation
1171+
batch = pa.RecordBatch.from_arrays(
1172+
[
1173+
# This call actually reallocates the array
1174+
pa.concat_arrays([array])
1175+
for array in batch
1176+
],
1177+
schema=batch.schema,
1178+
)
1179+
results.append(batch)
1180+
table = pa.Table.from_batches(batches=results)
1181+
# Ensure only the table has a reference to the batches, so that
1182+
# self_destruct (if enabled) is effective
1183+
del results
1184+
del batches
1185+
else:
1186+
table = pa.Table.from_batches(batches=batches)
11481187
return table, schema, metrics, observed_metrics, properties
11491188
else:
11501189
return None, schema, metrics, observed_metrics, properties

python/pyspark/sql/tests/connect/test_parity_arrow.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
from distutils.version import LooseVersion
2020

2121
import pandas as pd
22+
2223
from pyspark.sql.tests.test_arrow import ArrowTestsMixin
2324
from pyspark.testing.connectutils import ReusedConnectTestCase
25+
from pyspark.testing.pandasutils import PandasOnSparkTestUtils
2426

2527

26-
class ArrowParityTests(ArrowTestsMixin, ReusedConnectTestCase):
28+
class ArrowParityTests(ArrowTestsMixin, ReusedConnectTestCase, PandasOnSparkTestUtils):
2729
@unittest.skip("Spark Connect does not support Spark Context but the test depends on that.")
2830
def test_createDataFrame_empty_partition(self):
2931
super().test_createDataFrame_empty_partition()
@@ -56,9 +58,16 @@ def test_no_partition_frame(self):
5658
def test_no_partition_toPandas(self):
5759
super().test_no_partition_toPandas()
5860

59-
@unittest.skip("The test uses internal APIs.")
6061
def test_pandas_self_destruct(self):
61-
super().test_pandas_self_destruct()
62+
df = self.spark.range(100).select("id", "id", "id")
63+
64+
with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
65+
self_destruct_pdf = df.toPandas()
66+
67+
with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": False}):
68+
no_self_destruct_pdf = df.toPandas()
69+
70+
self.assert_eq(self_destruct_pdf, no_self_destruct_pdf)
6271

6372
def test_propagates_spark_exception(self):
6473
self.check_propagates_spark_exception()

0 commit comments

Comments
 (0)