Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 83 additions & 4 deletions cpp/src/arrow/python/arrow_to_pandas.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include "arrow/python/helpers.h"
#include "arrow/python/numpy_convert.h"
#include "arrow/python/numpy_internal.h"
#include "arrow/python/pyarrow.h"
#include "arrow/python/python_to_arrow.h"
#include "arrow/python/type_traits.h"

Expand Down Expand Up @@ -307,7 +308,8 @@ class PandasBlock {
DATETIME,
DATETIME_WITH_TZ,
TIMEDELTA,
CATEGORICAL
CATEGORICAL,
EXTENSION
};

PandasBlock(const PandasOptions& options, int64_t num_rows, int num_columns)
Expand Down Expand Up @@ -1459,6 +1461,55 @@ class CategoricalBlock : public PandasBlock {
bool needs_copy_;
};

class ExtensionBlock : public PandasBlock {
public:
using PandasBlock::PandasBlock;

// Don't create a block array here, only the placement array
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you're not handling the extension storage anywhere? Why is this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean with "extension storage"?
The goal of this ExtensionBlock is to not convert the arrow array to a numpy array, but to pass it through as a pyarrow array to the caller of the ConvertTableToPandas function.

What is maybe confusing is that this is called "ExtensionBlock", as it is not necessarily for arrow extension types, but meant for pandas extension arrays (and those two don't necessarily map)

Status Allocate() override {
PyAcquireGIL lock;

npy_intp placement_dims[1] = {num_columns_};
PyObject* placement_arr = PyArray_SimpleNew(1, placement_dims, NPY_INT64);

RETURN_IF_PYERROR();

placement_arr_.reset(placement_arr);

placement_data_ = reinterpret_cast<int64_t*>(
PyArray_DATA(reinterpret_cast<PyArrayObject*>(placement_arr)));

return Status::OK();
}

Status Write(std::shared_ptr<ChunkedArray> data, int64_t abs_placement,
int64_t rel_placement) override {
PyAcquireGIL lock;

PyObject* py_array;
py_array = wrap_chunked_array(data);
py_array_.reset(py_array);

placement_data_[rel_placement] = abs_placement;
return Status::OK();
}

Status GetPyResult(PyObject** output) override {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT this just duplicates the base class implementation. Why did you redefine it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PyDict_SetItemString(result, "py_array", py_array_.obj()); is different. This is putting a pyarrow array in the result dict.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's somewhat of a hack but it's a way to pass through the Arrow data so that it gets converted elsewhere

PyObject* result = PyDict_New();
RETURN_IF_PYERROR();

PyDict_SetItemString(result, "py_array", py_array_.obj());
PyDict_SetItemString(result, "placement", placement_arr_.obj());

*output = result;

return Status::OK();
}

protected:
OwnedRefNoGIL py_array_;
};

Status MakeBlock(const PandasOptions& options, PandasBlock::type type, int64_t num_rows,
int num_columns, std::shared_ptr<PandasBlock>* block) {
#define BLOCK_CASE(NAME, TYPE) \
Expand Down Expand Up @@ -1591,8 +1642,9 @@ static Status GetPandasBlockType(const ChunkedArray& data, const PandasOptions&
class DataFrameBlockCreator {
public:
explicit DataFrameBlockCreator(const PandasOptions& options,
const std::unordered_set<std::string>& extension_columns,
const std::shared_ptr<Table>& table)
: table_(table), options_(options) {}
: table_(table), options_(options), extension_columns_(extension_columns) {}

Status Convert(PyObject** output) {
column_types_.resize(table_->num_columns());
Expand All @@ -1610,7 +1662,11 @@ class DataFrameBlockCreator {
for (int i = 0; i < table_->num_columns(); ++i) {
std::shared_ptr<ChunkedArray> col = table_->column(i);
PandasBlock::type output_type = PandasBlock::OBJECT;
RETURN_NOT_OK(GetPandasBlockType(*col, options_, &output_type));
if (extension_columns_.count(table_->field(i)->name())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I don't understand why we're using an explicit extension_columns. Shouldn't we simply detect an arrow ExtensionType?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also confused by this. I looked at the unit test below and there are a couple of different things going on:

  • Creating pandas ExtensionArray values from built-in Arrow types
  • Converting Arrow ExtensionType data

This seems to do the former but not the latter. What is the use case for the former, mainly getting IntegerArray out?

output_type = PandasBlock::EXTENSION;
} else {
RETURN_NOT_OK(GetPandasBlockType(*col, options_, &output_type));
}

int block_placement = 0;
std::shared_ptr<PandasBlock> block;
Expand All @@ -1623,6 +1679,10 @@ class DataFrameBlockCreator {
table_->num_rows());
RETURN_NOT_OK(block->Allocate());
datetimetz_blocks_[i] = block;
} else if (output_type == PandasBlock::EXTENSION) {
block = std::make_shared<ExtensionBlock>(options_, table_->num_rows(), 1);
RETURN_NOT_OK(block->Allocate());
extension_blocks_[i] = block;
} else {
auto it = type_counts_.find(output_type);
if (it != type_counts_.end()) {
Expand Down Expand Up @@ -1664,6 +1724,12 @@ class DataFrameBlockCreator {
return Status::KeyError("No datetimetz block allocated");
}
*block = it->second;
} else if (output_type == PandasBlock::EXTENSION) {
auto it = this->extension_blocks_.find(i);
if (it == this->extension_blocks_.end()) {
return Status::KeyError("No extension block allocated");
}
*block = it->second;
} else {
auto it = this->blocks_.find(output_type);
if (it == this->blocks_.end()) {
Expand Down Expand Up @@ -1714,6 +1780,7 @@ class DataFrameBlockCreator {
RETURN_NOT_OK(AppendBlocks(blocks_, result));
RETURN_NOT_OK(AppendBlocks(categorical_blocks_, result));
RETURN_NOT_OK(AppendBlocks(datetimetz_blocks_, result));
RETURN_NOT_OK(AppendBlocks(extension_blocks_, result));

*out = result;
return Status::OK();
Expand All @@ -1732,6 +1799,7 @@ class DataFrameBlockCreator {
std::unordered_map<int, int> type_counts_;

PandasOptions options_;
std::unordered_set<std::string> extension_columns_;

// block type -> block
BlockMap blocks_;
Expand All @@ -1741,6 +1809,9 @@ class DataFrameBlockCreator {

// column number -> datetimetz block
BlockMap datetimetz_blocks_;

// column number -> extension block
BlockMap extension_blocks_;
};

class ArrowDeserializer {
Expand Down Expand Up @@ -2119,6 +2190,14 @@ Status ConvertTableToPandas(const PandasOptions& options,
Status ConvertTableToPandas(const PandasOptions& options,
const std::unordered_set<std::string>& categorical_columns,
const std::shared_ptr<Table>& table, PyObject** out) {
return ConvertTableToPandas(options, categorical_columns,
std::unordered_set<std::string>(), table, out);
}

Status ConvertTableToPandas(const PandasOptions& options,
const std::unordered_set<std::string>& categorical_columns,
const std::unordered_set<std::string>& extension_columns,
const std::shared_ptr<Table>& table, PyObject** out) {
std::shared_ptr<Table> current_table = table;
if (!categorical_columns.empty()) {
FunctionContext ctx;
Expand All @@ -2140,7 +2219,7 @@ Status ConvertTableToPandas(const PandasOptions& options,
}
}

DataFrameBlockCreator helper(options, current_table);
DataFrameBlockCreator helper(options, extension_columns, current_table);
return helper.Convert(out);
}

Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/python/arrow_to_pandas.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ Status ConvertTableToPandas(const PandasOptions& options,
const std::unordered_set<std::string>& categorical_columns,
const std::shared_ptr<Table>& table, PyObject** out);

ARROW_PYTHON_EXPORT
Status ConvertTableToPandas(const PandasOptions& options,
const std::unordered_set<std::string>& categorical_columns,
const std::unordered_set<std::string>& extension_columns,
const std::shared_ptr<Table>& table, PyObject** out);

} // namespace py
} // namespace arrow

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/python/pyarrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ PyObject* wrap_array(const std::shared_ptr<Array>& array) {
return ::pyarrow_wrap_array(array);
}

PyObject* wrap_chunked_array(const std::shared_ptr<ChunkedArray>& array) {
return ::pyarrow_wrap_chunked_array(array);
}

bool is_tensor(PyObject* tensor) { return ::pyarrow_is_tensor(tensor) != 0; }

Status unwrap_tensor(PyObject* tensor, std::shared_ptr<Tensor>* out) {
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/python/pyarrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ ARROW_PYTHON_EXPORT PyObject* wrap_schema(const std::shared_ptr<Schema>& schema)
ARROW_PYTHON_EXPORT bool is_array(PyObject* array);
ARROW_PYTHON_EXPORT Status unwrap_array(PyObject* array, std::shared_ptr<Array>* out);
ARROW_PYTHON_EXPORT PyObject* wrap_array(const std::shared_ptr<Array>& array);
ARROW_PYTHON_EXPORT PyObject* wrap_chunked_array(
const std::shared_ptr<ChunkedArray>& array);

ARROW_PYTHON_EXPORT bool is_tensor(PyObject* tensor);
ARROW_PYTHON_EXPORT Status unwrap_tensor(PyObject* tensor, std::shared_ptr<Tensor>* out);
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,7 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil:
CStatus ConvertTableToPandas(
const PandasOptions& options,
const unordered_set[c_string]& categorical_columns,
const unordered_set[c_string]& extension_columns,
const shared_ptr[CTable]& table,
PyObject** out)

Expand Down
32 changes: 27 additions & 5 deletions python/pyarrow/pandas_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ def _reconstruct_block(item):
# categorical types and Timestamps-with-timezones types to the proper
# pandas Blocks

block_arr = item['block']
block_arr = item.get('block', None)
placement = item['placement']
if 'dictionary' in item:
cat = _pandas_api.categorical_type.from_codes(
Expand All @@ -665,6 +665,27 @@ def _reconstruct_block(item):
elif 'object' in item:
block = _int.make_block(builtin_pickle.loads(block_arr),
placement=placement, klass=_int.ObjectBlock)
elif 'py_array' in item:
arr = item['py_array']
# TODO have mechanism to know a method to create a
# pandas ExtensionArray given the pyarrow type
# Now hardcode here to create a pandas IntegerArray for the example
arr = arr.chunk(0)
buflist = arr.buffers()
data = np.frombuffer(buflist[-1], dtype=arr.type.to_pandas_dtype())[
arr.offset:arr.offset + len(arr)]
bitmask = buflist[0]
if bitmask is not None:
mask = pa.BooleanArray.from_buffers(
pa.bool_(), len(arr), [None, bitmask])
mask = np.asarray(mask)
else:
mask = np.ones(len(arr), dtype=bool)
block_arr = _pandas_api.pd.arrays.IntegerArray(
data.copy(), ~mask, copy=False)
# create ExtensionBlock
block = _int.make_block(block_arr, placement=placement,
klass=_int.ExtensionBlock)
else:
block = _int.make_block(block_arr, placement=placement)

Expand All @@ -681,7 +702,7 @@ def make_datetimetz(tz):


def table_to_blockmanager(options, table, categories=None,
ignore_metadata=False):
extension_columns=None, ignore_metadata=False):
from pandas.core.internals import BlockManager

all_columns = []
Expand All @@ -699,7 +720,7 @@ def table_to_blockmanager(options, table, categories=None,
index = _pandas_api.pd.RangeIndex(table.num_rows)

_check_data_column_metadata_consistency(all_columns)
blocks = _table_to_blocks(options, table, categories)
blocks = _table_to_blocks(options, table, categories, extension_columns)
columns = _deserialize_column_index(table, all_columns, column_indexes)

axes = [columns, index]
Expand Down Expand Up @@ -967,11 +988,12 @@ def _reconstruct_columns_from_metadata(columns, column_indexes):
return pd.MultiIndex(new_levels, labels, names=columns.names)


def _table_to_blocks(options, block_table, categories):
def _table_to_blocks(options, block_table, categories, extension_columns):
# Part of table_to_blockmanager

# Convert an arrow table to Block from the internal pandas API
result = pa.lib.table_to_blocks(options, block_table, categories)
result = pa.lib.table_to_blocks(options, block_table, categories,
extension_columns)

# Defined above
return [_reconstruct_block(item) for item in result]
Expand Down
8 changes: 6 additions & 2 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -782,21 +782,25 @@ def _reconstruct_record_batch(columns, schema):
return RecordBatch.from_arrays(columns, schema=schema)


def table_to_blocks(options, Table table, categories):
def table_to_blocks(options, Table table, categories, extension_columns):
cdef:
PyObject* result_obj
shared_ptr[CTable] c_table = table.sp_table
CMemoryPool* pool
unordered_set[c_string] categorical_columns
PandasOptions c_options = _convert_pandas_options(options)
unordered_set[c_string] c_extension_columns

if categories is not None:
categorical_columns = {tobytes(cat) for cat in categories}
if extension_columns is not None:
c_extension_columns = {tobytes(col) for col in extension_columns}

with nogil:
check_status(
libarrow.ConvertTableToPandas(
c_options, categorical_columns, c_table, &result_obj)
c_options, categorical_columns, c_extension_columns, c_table,
&result_obj)
)

return PyObject_to_object(result_obj)
Expand Down
45 changes: 45 additions & 0 deletions python/pyarrow/tests/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3198,6 +3198,51 @@ def test_array_protocol():
assert result.equals(expected2)


# ----------------------------------------------------------------------
# Pandas ExtensionArray support


def _to_pandas(table, extension_columns=None):
# temporary test function as long as we have no public API to do this
from pyarrow.pandas_compat import table_to_blockmanager

options = dict(
pool=None,
strings_to_categorical=False,
zero_copy_only=False,
integer_object_nulls=False,
date_as_object=True,
use_threads=True,
deduplicate_objects=True)

mgr = table_to_blockmanager(
options, table, extension_columns=extension_columns)
return pd.DataFrame(mgr)


def test_convert_to_extension_array():
if LooseVersion(pd.__version__) < '0.24.0':
pytest.skip(reason='IntegerArray only introduced in 0.24')

import pandas.core.internals as _int

table = pa.table({'a': [1, 2, 3], 'b': [2, 3, 4]})

df = _to_pandas(table)
assert len(df._data.blocks) == 1
assert isinstance(df._data.blocks[0], _int.IntBlock)

df = _to_pandas(table, extension_columns=['b'])
assert isinstance(df._data.blocks[0], _int.IntBlock)
assert isinstance(df._data.blocks[1], _int.ExtensionBlock)

table = pa.table({'a': [1, 2, None]})
df = _to_pandas(table, extension_columns=['a'])
assert isinstance(df._data.blocks[0], _int.ExtensionBlock)
expected = pd.DataFrame({'a': pd.Series([1, 2, None], dtype='Int64')})
tm.assert_frame_equal(df, expected)


# ----------------------------------------------------------------------
# Legacy metadata compatibility tests

Expand Down