Skip to content

Commit 62140f8

Browse files
committed
POC: New UDF methods
1 parent ae049ae commit 62140f8

14 files changed

+585
-154
lines changed

pandas/core/apply.py

Lines changed: 219 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121

2222
import numpy as np
2323

24-
from pandas._config import option_context
24+
from pandas._config import (
25+
get_option,
26+
option_context,
27+
)
2528

2629
from pandas._libs import lib
2730
from pandas._typing import (
@@ -82,6 +85,7 @@ def frame_apply(
8285
result_type: str | None = None,
8386
args=None,
8487
kwargs=None,
88+
renamer=None,
8589
) -> FrameApply:
8690
"""construct and return a row or column based frame apply object"""
8791
axis = obj._get_axis_number(axis)
@@ -98,6 +102,7 @@ def frame_apply(
98102
result_type=result_type,
99103
args=args,
100104
kwargs=kwargs,
105+
renamer=renamer,
101106
)
102107

103108

@@ -112,6 +117,7 @@ def __init__(
112117
result_type: str | None,
113118
args,
114119
kwargs,
120+
renamer=None,
115121
):
116122
self.obj = obj
117123
self.raw = raw
@@ -141,6 +147,7 @@ def f(x):
141147

142148
self.orig_f: AggFuncType = func
143149
self.f: AggFuncType = f
150+
self.renamer = renamer
144151

145152
@abc.abstractmethod
146153
def apply(self) -> DataFrame | Series:
@@ -164,10 +171,16 @@ def agg(self) -> DataFrame | Series | None:
164171
return self.apply_str()
165172

166173
if is_dict_like(arg):
167-
return self.agg_dict_like()
174+
if get_option("new_udf_methods"):
175+
return self.new_dict_like("agg")
176+
else:
177+
return self.agg_dict_like()
168178
elif is_list_like(arg):
169179
# we require a list, but not a 'str'
170-
return self.agg_list_like()
180+
if get_option("new_udf_methods"):
181+
return self.new_list_like("agg")
182+
else:
183+
return self.agg_list_like()
171184

172185
if callable(arg):
173186
f = com.get_cython_func(arg)
@@ -408,6 +421,70 @@ def agg_list_like(self) -> DataFrame | Series:
408421
)
409422
return concatenated.reindex(full_ordered_index, copy=False)
410423

424+
def new_list_like(self, method: str) -> DataFrame | Series:
425+
"""
426+
Compute aggregation in the case of a list-like argument.
427+
428+
Returns
429+
-------
430+
Result of aggregation.
431+
"""
432+
from pandas.core.reshape.concat import concat
433+
434+
obj = self.obj
435+
arg = cast(List[AggFuncTypeBase], self.f)
436+
437+
results = []
438+
keys = []
439+
result_dim = None
440+
441+
for a in arg:
442+
name = None
443+
try:
444+
if isinstance(a, (tuple, list)):
445+
# Handle (name, value) pairs
446+
name, a = a
447+
new_res = getattr(obj, method)(a)
448+
if result_dim is None:
449+
result_dim = getattr(new_res, "ndim", 0)
450+
elif getattr(new_res, "ndim", 0) != result_dim:
451+
raise ValueError(
452+
"cannot combine transform and aggregation operations"
453+
)
454+
except TypeError:
455+
pass
456+
else:
457+
results.append(new_res)
458+
459+
# make sure we find a good name
460+
if name is None:
461+
name = com.get_callable_name(a) or a
462+
keys.append(name)
463+
464+
# if we are empty
465+
if not len(results):
466+
raise ValueError("no results")
467+
468+
try:
469+
concatenated = concat(results, keys=keys, axis=1, sort=False)
470+
except TypeError:
471+
# we are concatting non-NDFrame objects,
472+
# e.g. a list of scalars
473+
from pandas import Series
474+
475+
result = Series(results, index=keys, name=obj.name)
476+
return result
477+
else:
478+
# Concat uses the first index to determine the final indexing order.
479+
# The union of a shorter first index with the other indices causes
480+
# the index sorting to be different from the order of the aggregating
481+
# functions. Reindex if this is the case.
482+
index_size = concatenated.index.size
483+
full_ordered_index = next(
484+
result.index for result in results if result.index.size == index_size
485+
)
486+
return concatenated.reindex(full_ordered_index, copy=False)
487+
411488
def agg_dict_like(self) -> DataFrame | Series:
412489
"""
413490
Compute aggregation in the case of a dict-like argument.
@@ -486,6 +563,86 @@ def agg_dict_like(self) -> DataFrame | Series:
486563

487564
return result
488565

566+
def new_dict_like(self, method: str) -> DataFrame | Series:
567+
"""
568+
Compute aggregation in the case of a dict-like argument.
569+
570+
Returns
571+
-------
572+
Result of aggregation.
573+
"""
574+
from pandas import Index
575+
from pandas.core.reshape.concat import concat
576+
577+
obj = self.obj
578+
arg = cast(AggFuncTypeDict, self.f)
579+
580+
if not isinstance(obj, SelectionMixin):
581+
# i.e. obj is Series or DataFrame
582+
selected_obj = obj
583+
selection = None
584+
else:
585+
selected_obj = obj._selected_obj
586+
selection = obj._selection
587+
588+
arg = self.normalize_dictlike_arg("agg", selected_obj, arg)
589+
590+
if selected_obj.ndim == 1:
591+
# key only used for output
592+
colg = obj._gotitem(selection, ndim=1)
593+
results = {key: getattr(colg, method)(how) for key, how in arg.items()}
594+
595+
else:
596+
# key used for column selection and output
597+
results = {
598+
key: getattr(obj._gotitem(key, ndim=1), method)(how)
599+
for key, how in arg.items()
600+
}
601+
if self.renamer is not None:
602+
for key, columns in self.renamer.items():
603+
results[key].columns = columns
604+
605+
# Avoid making two isinstance calls in all and any below
606+
if isinstance(results, dict):
607+
is_ndframe = [isinstance(r, ABCNDFrame) for r in results.values()]
608+
else:
609+
is_ndframe = [isinstance(r, ABCNDFrame) for r in results]
610+
611+
# combine results
612+
if all(is_ndframe):
613+
keys_to_use = [k for k in arg.keys() if not results[k].empty]
614+
keys_to_use = keys_to_use if keys_to_use != [] else arg.keys()
615+
if selected_obj.ndim == 2:
616+
# keys are columns, so we can preserve names
617+
ktu = Index(keys_to_use)
618+
ktu._set_names(selected_obj.columns.names)
619+
keys_to_use = ktu
620+
keys = None if selected_obj.ndim == 1 else keys_to_use
621+
result = concat({k: results[k] for k in keys_to_use}, keys=keys, axis=1)
622+
if result.ndim == 1:
623+
result = result.to_frame()
624+
elif any(is_ndframe):
625+
# There is a mix of NDFrames and scalars
626+
raise ValueError(
627+
"cannot perform both aggregation "
628+
"and transformation operations "
629+
"simultaneously"
630+
)
631+
else:
632+
from pandas import Series
633+
634+
# we have a dict of scalars
635+
# GH 36212 use name only if obj is a series
636+
if obj.ndim == 1:
637+
obj = cast("Series", obj)
638+
name = obj.name
639+
else:
640+
name = None
641+
642+
result = Series(results, index=arg.keys(), name=name)
643+
644+
return result
645+
489646
def apply_str(self) -> DataFrame | Series:
490647
"""
491648
Compute apply in case of a string.
@@ -522,6 +679,35 @@ def apply_multiple(self) -> DataFrame | Series:
522679
"""
523680
return self.obj.aggregate(self.f, self.axis, *self.args, **self.kwargs)
524681

682+
def new_apply_multiple(self) -> DataFrame | Series:
683+
"""
684+
Compute apply in case of a list-like or dict-like.
685+
686+
Returns
687+
-------
688+
result: Series, DataFrame, or None
689+
Result when self.f is a list-like or dict-like, None otherwise.
690+
"""
691+
obj = self.obj
692+
axis = self.axis
693+
694+
self.obj = obj if axis == 0 else obj.T
695+
self.axis = 0
696+
697+
try:
698+
if is_dict_like(self.f):
699+
result = self.new_dict_like("apply")
700+
else:
701+
result = self.new_list_like("apply")
702+
finally:
703+
self.obj = obj
704+
self.axis = axis
705+
706+
if axis == 1:
707+
result = result.T if result is not None else result
708+
709+
return result
710+
525711
def normalize_dictlike_arg(
526712
self, how: str, obj: DataFrame | Series, func: AggFuncTypeDict
527713
) -> AggFuncTypeDict:
@@ -661,7 +847,10 @@ def apply(self) -> DataFrame | Series:
661847
"""compute the results"""
662848
# dispatch to agg
663849
if is_list_like(self.f):
664-
return self.apply_multiple()
850+
if get_option("new_udf_methods"):
851+
return self.new_apply_multiple()
852+
else:
853+
return self.apply_multiple()
665854

666855
# all empty
667856
if len(self.columns) == 0 and len(self.index) == 0:
@@ -1039,7 +1228,10 @@ def apply(self) -> DataFrame | Series:
10391228

10401229
# dispatch to agg
10411230
if is_list_like(self.f):
1042-
return self.apply_multiple()
1231+
if get_option("new_udf_methods"):
1232+
return self.new_apply_multiple()
1233+
else:
1234+
return self.apply_multiple()
10431235

10441236
if isinstance(self.f, str):
10451237
# if we are a string, try to dispatch
@@ -1172,7 +1364,13 @@ def transform(self):
11721364

11731365
def reconstruct_func(
11741366
func: AggFuncType | None, **kwargs
1175-
) -> tuple[bool, AggFuncType | None, list[str] | None, list[int] | None]:
1367+
) -> tuple[
1368+
bool,
1369+
AggFuncType | None,
1370+
list[str] | None,
1371+
list[int] | None,
1372+
dict[str, list[str]] | None,
1373+
]:
11761374
"""
11771375
This is the internal function to reconstruct func given if there is relabeling
11781376
or not and also normalize the keyword to get new order of columns.
@@ -1204,14 +1402,16 @@ def reconstruct_func(
12041402
Examples
12051403
--------
12061404
>>> reconstruct_func(None, **{"foo": ("col", "min")})
1207-
(True, defaultdict(<class 'list'>, {'col': ['min']}), ('foo',), array([0]))
1405+
(True, defaultdict(<class 'list'>, {'col': ['min']}), ('foo',), array([0]),
1406+
defaultdict(<class 'list'>, {'col': ['foo']}))
12081407
12091408
>>> reconstruct_func("min")
1210-
(False, 'min', None, None)
1409+
(False, 'min', None, None, None)
12111410
"""
12121411
relabeling = func is None and is_multi_agg_with_relabel(**kwargs)
12131412
columns: list[str] | None = None
12141413
order: list[int] | None = None
1414+
renamer: dict[str, list[str]] | None = None
12151415

12161416
if not relabeling:
12171417
if isinstance(func, list) and len(func) > len(set(func)):
@@ -1227,9 +1427,9 @@ def reconstruct_func(
12271427
raise TypeError("Must provide 'func' or tuples of '(column, aggfunc).")
12281428

12291429
if relabeling:
1230-
func, columns, order = normalize_keyword_aggregation(kwargs)
1430+
func, columns, order, renamer = normalize_keyword_aggregation(kwargs)
12311431

1232-
return relabeling, func, columns, order
1432+
return relabeling, func, columns, order, renamer
12331433

12341434

12351435
def is_multi_agg_with_relabel(**kwargs) -> bool:
@@ -1258,7 +1458,9 @@ def is_multi_agg_with_relabel(**kwargs) -> bool:
12581458
)
12591459

12601460

1261-
def normalize_keyword_aggregation(kwargs: dict) -> tuple[dict, list[str], list[int]]:
1461+
def normalize_keyword_aggregation(
1462+
kwargs: dict,
1463+
) -> tuple[dict, list[str], list[int], dict[str, list]]:
12621464
"""
12631465
Normalize user-provided "named aggregation" kwargs.
12641466
Transforms from the new ``Mapping[str, NamedAgg]`` style kwargs
@@ -1280,7 +1482,8 @@ def normalize_keyword_aggregation(kwargs: dict) -> tuple[dict, list[str], list[i
12801482
Examples
12811483
--------
12821484
>>> normalize_keyword_aggregation({"output": ("input", "sum")})
1283-
(defaultdict(<class 'list'>, {'input': ['sum']}), ('output',), array([0]))
1485+
(defaultdict(<class 'list'>, {'input': ['sum']}), ('output',), array([0]),
1486+
defaultdict(<class 'list'>, {'input': ['output']}))
12841487
"""
12851488
from pandas.core.indexes.base import Index
12861489

@@ -1290,11 +1493,13 @@ def normalize_keyword_aggregation(kwargs: dict) -> tuple[dict, list[str], list[i
12901493
# May be hitting https://github.com/python/mypy/issues/5958
12911494
# saying it doesn't have an attribute __name__
12921495
aggspec: DefaultDict = defaultdict(list)
1496+
renamer: DefaultDict = defaultdict(list)
12931497
order = []
12941498
columns, pairs = list(zip(*kwargs.items()))
12951499

1296-
for column, aggfunc in pairs:
1500+
for name, (column, aggfunc) in zip(kwargs, pairs):
12971501
aggspec[column].append(aggfunc)
1502+
renamer[column].append(name)
12981503
order.append((column, com.get_callable_name(aggfunc) or aggfunc))
12991504

13001505
# uniquify aggfunc name if duplicated in order list
@@ -1314,7 +1519,7 @@ def normalize_keyword_aggregation(kwargs: dict) -> tuple[dict, list[str], list[i
13141519
col_idx_order = Index(uniquified_aggspec).get_indexer(uniquified_order)
13151520
# error: Incompatible return value type (got "Tuple[defaultdict[Any, Any],
13161521
# Any, ndarray]", expected "Tuple[Dict[Any, Any], List[str], List[int]]")
1317-
return aggspec, columns, col_idx_order # type: ignore[return-value]
1522+
return aggspec, columns, col_idx_order, renamer # type: ignore[return-value]
13181523

13191524

13201525
def _make_unique_kwarg_list(

pandas/core/config_init.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,23 @@ def use_inf_as_na_cb(key):
511511
validator=is_one_of_factory(["block", "array"]),
512512
)
513513

514+
new_udf_methods = """
515+
: boolean
516+
Whether to use the new UDF method implementations. Currently experimental.
517+
Defaults to False.
518+
"""
519+
520+
521+
with cf.config_prefix("mode"):
522+
cf.register_option(
523+
"new_udf_methods",
524+
# Get the default from an environment variable, if set, otherwise defaults
525+
# to "block". This environment variable can be set for testing.
526+
os.environ.get("PANDAS_NEW_UDF_METHODS", "false").lower() == "true",
527+
new_udf_methods,
528+
validator=is_bool,
529+
)
530+
514531

515532
# user warnings
516533
chained_assignment = """

0 commit comments

Comments
 (0)