Skip to content

Commit 96483e6

Browse files
committed
[PROPOSAL] Add self_destruct support to toPandas
1 parent c336ddf commit 96483e6

File tree

2 files changed

+14
-3
lines changed

2 files changed

+14
-3
lines changed

python/pyspark/sql/pandas/conversion.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class PandasConversionMixin(object):
3434
"""
3535

3636
@since(1.3)
37-
def toPandas(self):
37+
def toPandas(self, self_destruct=False):
3838
"""
3939
Returns the contents of this :class:`DataFrame` as Pandas ``pandas.DataFrame``.
4040
@@ -103,10 +103,18 @@ def toPandas(self):
103103
batches = self.toDF(*tmp_column_names)._collect_as_arrow()
104104
if len(batches) > 0:
105105
table = pyarrow.Table.from_batches(batches)
106+
del batches
106107
# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
107108
# values, but we should use datetime.date to match the behavior with when
108109
# Arrow optimization is disabled.
109-
pdf = table.to_pandas(date_as_object=True)
110+
pandas_options = {'date_as_object': True}
111+
if self_destruct:
112+
pandas_options.update({
113+
'self_destruct': True,
114+
'split_blocks': True,
115+
'use_threads': False,
116+
})
117+
pdf = table.to_pandas(**pandas_options)
110118
# Rename back to the original column names.
111119
pdf.columns = self.columns
112120
for field in self.schema:

python/pyspark/sql/pandas/serializers.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ def load_stream(self, stream):
9090
import pyarrow as pa
9191
reader = pa.ipc.open_stream(stream)
9292
for batch in reader:
93-
yield batch
93+
split_batch = pa.RecordBatch.from_arrays([
94+
pa.concat_arrays([array]) for array in batch
95+
], schema=batch.schema)
96+
yield split_batch
9497

9598
def __repr__(self):
9699
return "ArrowStreamSerializer"

0 commit comments

Comments
 (0)