diff --git a/cpp/src/arrow/python/arrow_to_pandas.cc b/cpp/src/arrow/python/arrow_to_pandas.cc index 6f2d06d79ea1d..683211206ec83 100644 --- a/cpp/src/arrow/python/arrow_to_pandas.cc +++ b/cpp/src/arrow/python/arrow_to_pandas.cc @@ -51,6 +51,7 @@ #include "arrow/python/helpers.h" #include "arrow/python/numpy_convert.h" #include "arrow/python/numpy_internal.h" +#include "arrow/python/pyarrow.h" #include "arrow/python/python_to_arrow.h" #include "arrow/python/type_traits.h" @@ -307,7 +308,8 @@ class PandasBlock { DATETIME, DATETIME_WITH_TZ, TIMEDELTA, - CATEGORICAL + CATEGORICAL, + EXTENSION }; PandasBlock(const PandasOptions& options, int64_t num_rows, int num_columns) @@ -1459,6 +1461,55 @@ class CategoricalBlock : public PandasBlock { bool needs_copy_; }; +class ExtensionBlock : public PandasBlock { + public: + using PandasBlock::PandasBlock; + + // Don't create a block array here, only the placement array + Status Allocate() override { + PyAcquireGIL lock; + + npy_intp placement_dims[1] = {num_columns_}; + PyObject* placement_arr = PyArray_SimpleNew(1, placement_dims, NPY_INT64); + + RETURN_IF_PYERROR(); + + placement_arr_.reset(placement_arr); + + placement_data_ = reinterpret_cast( + PyArray_DATA(reinterpret_cast(placement_arr))); + + return Status::OK(); + } + + Status Write(std::shared_ptr data, int64_t abs_placement, + int64_t rel_placement) override { + PyAcquireGIL lock; + + PyObject* py_array; + py_array = wrap_chunked_array(data); + py_array_.reset(py_array); + + placement_data_[rel_placement] = abs_placement; + return Status::OK(); + } + + Status GetPyResult(PyObject** output) override { + PyObject* result = PyDict_New(); + RETURN_IF_PYERROR(); + + PyDict_SetItemString(result, "py_array", py_array_.obj()); + PyDict_SetItemString(result, "placement", placement_arr_.obj()); + + *output = result; + + return Status::OK(); + } + + protected: + OwnedRefNoGIL py_array_; +}; + Status MakeBlock(const PandasOptions& options, PandasBlock::type type, int64_t num_rows, int num_columns, std::shared_ptr* block) { #define BLOCK_CASE(NAME, TYPE) \ @@ -1591,8 +1642,9 @@ static Status GetPandasBlockType(const ChunkedArray& data, const PandasOptions& class DataFrameBlockCreator { public: explicit DataFrameBlockCreator(const PandasOptions& options, + const std::unordered_set& extension_columns, const std::shared_ptr& table) - : table_(table), options_(options) {} + : table_(table), options_(options), extension_columns_(extension_columns) {} Status Convert(PyObject** output) { column_types_.resize(table_->num_columns()); @@ -1610,7 +1662,11 @@ class DataFrameBlockCreator { for (int i = 0; i < table_->num_columns(); ++i) { std::shared_ptr col = table_->column(i); PandasBlock::type output_type = PandasBlock::OBJECT; - RETURN_NOT_OK(GetPandasBlockType(*col, options_, &output_type)); + if (extension_columns_.count(table_->field(i)->name())) { + output_type = PandasBlock::EXTENSION; + } else { + RETURN_NOT_OK(GetPandasBlockType(*col, options_, &output_type)); + } int block_placement = 0; std::shared_ptr block; @@ -1623,6 +1679,10 @@ class DataFrameBlockCreator { table_->num_rows()); RETURN_NOT_OK(block->Allocate()); datetimetz_blocks_[i] = block; + } else if (output_type == PandasBlock::EXTENSION) { + block = std::make_shared(options_, table_->num_rows(), 1); + RETURN_NOT_OK(block->Allocate()); + extension_blocks_[i] = block; } else { auto it = type_counts_.find(output_type); if (it != type_counts_.end()) { @@ -1664,6 +1724,12 @@ class DataFrameBlockCreator { return Status::KeyError("No datetimetz block allocated"); } *block = it->second; + } else if (output_type == PandasBlock::EXTENSION) { + auto it = this->extension_blocks_.find(i); + if (it == this->extension_blocks_.end()) { + return Status::KeyError("No extension block allocated"); + } + *block = it->second; } else { auto it = this->blocks_.find(output_type); if (it == this->blocks_.end()) { @@ -1714,6 +1780,7 @@ class DataFrameBlockCreator { RETURN_NOT_OK(AppendBlocks(blocks_, result)); RETURN_NOT_OK(AppendBlocks(categorical_blocks_, result)); RETURN_NOT_OK(AppendBlocks(datetimetz_blocks_, result)); + RETURN_NOT_OK(AppendBlocks(extension_blocks_, result)); *out = result; return Status::OK(); @@ -1732,6 +1799,7 @@ class DataFrameBlockCreator { std::unordered_map type_counts_; PandasOptions options_; + std::unordered_set extension_columns_; // block type -> block BlockMap blocks_; @@ -1741,6 +1809,9 @@ class DataFrameBlockCreator { // column number -> datetimetz block BlockMap datetimetz_blocks_; + + // column number -> extension block + BlockMap extension_blocks_; }; class ArrowDeserializer { @@ -2119,6 +2190,14 @@ Status ConvertTableToPandas(const PandasOptions& options, Status ConvertTableToPandas(const PandasOptions& options, const std::unordered_set& categorical_columns, const std::shared_ptr
& table, PyObject** out) { + return ConvertTableToPandas(options, categorical_columns, + std::unordered_set(), table, out); +} + +Status ConvertTableToPandas(const PandasOptions& options, + const std::unordered_set& categorical_columns, + const std::unordered_set& extension_columns, + const std::shared_ptr
& table, PyObject** out) { std::shared_ptr
current_table = table; if (!categorical_columns.empty()) { FunctionContext ctx; @@ -2140,7 +2219,7 @@ Status ConvertTableToPandas(const PandasOptions& options, } } - DataFrameBlockCreator helper(options, current_table); + DataFrameBlockCreator helper(options, extension_columns, current_table); return helper.Convert(out); } diff --git a/cpp/src/arrow/python/arrow_to_pandas.h b/cpp/src/arrow/python/arrow_to_pandas.h index 3a73e189eaec4..1196a92432d14 100644 --- a/cpp/src/arrow/python/arrow_to_pandas.h +++ b/cpp/src/arrow/python/arrow_to_pandas.h @@ -93,6 +93,12 @@ Status ConvertTableToPandas(const PandasOptions& options, const std::unordered_set& categorical_columns, const std::shared_ptr
& table, PyObject** out); +ARROW_PYTHON_EXPORT +Status ConvertTableToPandas(const PandasOptions& options, + const std::unordered_set& categorical_columns, + const std::unordered_set& extension_columns, + const std::shared_ptr
& table, PyObject** out); + } // namespace py } // namespace arrow diff --git a/cpp/src/arrow/python/pyarrow.cc b/cpp/src/arrow/python/pyarrow.cc index 38a5630c344ca..b335c29f1ddbd 100644 --- a/cpp/src/arrow/python/pyarrow.cc +++ b/cpp/src/arrow/python/pyarrow.cc @@ -113,6 +113,10 @@ PyObject* wrap_array(const std::shared_ptr& array) { return ::pyarrow_wrap_array(array); } +PyObject* wrap_chunked_array(const std::shared_ptr& array) { + return ::pyarrow_wrap_chunked_array(array); +} + bool is_tensor(PyObject* tensor) { return ::pyarrow_is_tensor(tensor) != 0; } Status unwrap_tensor(PyObject* tensor, std::shared_ptr* out) { diff --git a/cpp/src/arrow/python/pyarrow.h b/cpp/src/arrow/python/pyarrow.h index 635dd59e864d6..82635243e5e50 100644 --- a/cpp/src/arrow/python/pyarrow.h +++ b/cpp/src/arrow/python/pyarrow.h @@ -66,6 +66,8 @@ ARROW_PYTHON_EXPORT PyObject* wrap_schema(const std::shared_ptr& schema) ARROW_PYTHON_EXPORT bool is_array(PyObject* array); ARROW_PYTHON_EXPORT Status unwrap_array(PyObject* array, std::shared_ptr* out); ARROW_PYTHON_EXPORT PyObject* wrap_array(const std::shared_ptr& array); +ARROW_PYTHON_EXPORT PyObject* wrap_chunked_array( + const std::shared_ptr& array); ARROW_PYTHON_EXPORT bool is_tensor(PyObject* tensor); ARROW_PYTHON_EXPORT Status unwrap_tensor(PyObject* tensor, std::shared_ptr* out); diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 18881a83a1f24..21f9b5e1c6205 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1440,6 +1440,7 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil: CStatus ConvertTableToPandas( const PandasOptions& options, const unordered_set[c_string]& categorical_columns, + const unordered_set[c_string]& extension_columns, const shared_ptr[CTable]& table, PyObject** out) diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py index 900711ab6bda9..1a60ba2316157 100644 --- a/python/pyarrow/pandas_compat.py +++ b/python/pyarrow/pandas_compat.py @@ -649,7 +649,7 @@ def _reconstruct_block(item): # categorical types and Timestamps-with-timezones types to the proper # pandas Blocks - block_arr = item['block'] + block_arr = item.get('block', None) placement = item['placement'] if 'dictionary' in item: cat = _pandas_api.categorical_type.from_codes( @@ -665,6 +665,27 @@ def _reconstruct_block(item): elif 'object' in item: block = _int.make_block(builtin_pickle.loads(block_arr), placement=placement, klass=_int.ObjectBlock) + elif 'py_array' in item: + arr = item['py_array'] + # TODO have mechanism to know a method to create a + # pandas ExtensionArray given the pyarrow type + # Now hardcode here to create a pandas IntegerArray for the example + arr = arr.chunk(0) + buflist = arr.buffers() + data = np.frombuffer(buflist[-1], dtype=arr.type.to_pandas_dtype())[ + arr.offset:arr.offset + len(arr)] + bitmask = buflist[0] + if bitmask is not None: + mask = pa.BooleanArray.from_buffers( + pa.bool_(), len(arr), [None, bitmask]) + mask = np.asarray(mask) + else: + mask = np.ones(len(arr), dtype=bool) + block_arr = _pandas_api.pd.arrays.IntegerArray( + data.copy(), ~mask, copy=False) + # create ExtensionBlock + block = _int.make_block(block_arr, placement=placement, + klass=_int.ExtensionBlock) else: block = _int.make_block(block_arr, placement=placement) @@ -681,7 +702,7 @@ def make_datetimetz(tz): def table_to_blockmanager(options, table, categories=None, - ignore_metadata=False): + extension_columns=None, ignore_metadata=False): from pandas.core.internals import BlockManager all_columns = [] @@ -699,7 +720,7 @@ def table_to_blockmanager(options, table, categories=None, index = _pandas_api.pd.RangeIndex(table.num_rows) _check_data_column_metadata_consistency(all_columns) - blocks = _table_to_blocks(options, table, categories) + blocks = _table_to_blocks(options, table, categories, extension_columns) columns = _deserialize_column_index(table, all_columns, column_indexes) axes = [columns, index] @@ -967,11 +988,12 @@ def _reconstruct_columns_from_metadata(columns, column_indexes): return pd.MultiIndex(new_levels, labels, names=columns.names) -def _table_to_blocks(options, block_table, categories): +def _table_to_blocks(options, block_table, categories, extension_columns): # Part of table_to_blockmanager # Convert an arrow table to Block from the internal pandas API - result = pa.lib.table_to_blocks(options, block_table, categories) + result = pa.lib.table_to_blocks(options, block_table, categories, + extension_columns) # Defined above return [_reconstruct_block(item) for item in result] diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index e69a133990192..c3e5a61fbddb7 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -782,21 +782,25 @@ def _reconstruct_record_batch(columns, schema): return RecordBatch.from_arrays(columns, schema=schema) -def table_to_blocks(options, Table table, categories): +def table_to_blocks(options, Table table, categories, extension_columns): cdef: PyObject* result_obj shared_ptr[CTable] c_table = table.sp_table CMemoryPool* pool unordered_set[c_string] categorical_columns PandasOptions c_options = _convert_pandas_options(options) + unordered_set[c_string] c_extension_columns if categories is not None: categorical_columns = {tobytes(cat) for cat in categories} + if extension_columns is not None: + c_extension_columns = {tobytes(col) for col in extension_columns} with nogil: check_status( libarrow.ConvertTableToPandas( - c_options, categorical_columns, c_table, &result_obj) + c_options, categorical_columns, c_extension_columns, c_table, + &result_obj) ) return PyObject_to_object(result_obj) diff --git a/python/pyarrow/tests/test_pandas.py b/python/pyarrow/tests/test_pandas.py index 12c1bbf949102..d10e450075f31 100644 --- a/python/pyarrow/tests/test_pandas.py +++ b/python/pyarrow/tests/test_pandas.py @@ -3198,6 +3198,51 @@ def test_array_protocol(): assert result.equals(expected2) +# ---------------------------------------------------------------------- +# Pandas ExtensionArray support + + +def _to_pandas(table, extension_columns=None): + # temporary test function as long as we have no public API to do this + from pyarrow.pandas_compat import table_to_blockmanager + + options = dict( + pool=None, + strings_to_categorical=False, + zero_copy_only=False, + integer_object_nulls=False, + date_as_object=True, + use_threads=True, + deduplicate_objects=True) + + mgr = table_to_blockmanager( + options, table, extension_columns=extension_columns) + return pd.DataFrame(mgr) + + +def test_convert_to_extension_array(): + if LooseVersion(pd.__version__) < '0.24.0': + pytest.skip(reason='IntegerArray only introduced in 0.24') + + import pandas.core.internals as _int + + table = pa.table({'a': [1, 2, 3], 'b': [2, 3, 4]}) + + df = _to_pandas(table) + assert len(df._data.blocks) == 1 + assert isinstance(df._data.blocks[0], _int.IntBlock) + + df = _to_pandas(table, extension_columns=['b']) + assert isinstance(df._data.blocks[0], _int.IntBlock) + assert isinstance(df._data.blocks[1], _int.ExtensionBlock) + + table = pa.table({'a': [1, 2, None]}) + df = _to_pandas(table, extension_columns=['a']) + assert isinstance(df._data.blocks[0], _int.ExtensionBlock) + expected = pd.DataFrame({'a': pd.Series([1, 2, None], dtype='Int64')}) + tm.assert_frame_equal(df, expected) + + # ---------------------------------------------------------------------- # Legacy metadata compatibility tests