Skip to content

Add engine="numbagg" #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 49 commits into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
e864221
Add engine="numbagg"
dcherian Feb 4, 2022
0d76a44
Fix.
dcherian Feb 4, 2022
45134f0
fix CI
dcherian Feb 4, 2022
d54025c
Add nanlen
dcherian Feb 4, 2022
e0d98cb
fix env
dcherian Feb 4, 2022
551a719
Add numbagg
dcherian Feb 4, 2022
fa5ed42
Merge branch 'main' into numbagg
dcherian Feb 4, 2022
46c008e
Bettter numbagg benchmarks?
dcherian Feb 7, 2022
05dca1c
Merge branch 'main' into numbagg
dcherian Feb 16, 2022
acfd0ec
Merge branch 'main' into numbagg
dcherian Apr 15, 2022
e4ca066
Merge branch 'main' into numbagg
dcherian Jul 13, 2022
5bc6939
Update ci/environment.yml
dcherian Jul 13, 2022
fea47a5
Merge branch 'main' into numbagg
dcherian Oct 17, 2022
4c78f8e
Merge branch 'main' into numbagg
dcherian Sep 28, 2023
4be07f0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 28, 2023
0e35a9c
cleanup
dcherian Sep 28, 2023
1ca4ed8
Merge branch 'main' into numbagg
dcherian Oct 2, 2023
1f5ab2b
Error on dtype specified
dcherian Oct 2, 2023
ff43c04
Don't shadow sum, mean, sum_of_squares
dcherian Oct 2, 2023
875c182
Merge remote-tracking branch 'upstream/main' into numbagg
dcherian Oct 2, 2023
2df375f
more skip
dcherian Oct 2, 2023
293583a
Fix backup npg aggregations
dcherian Oct 2, 2023
ac6211d
xfail nanmean bool
dcherian Oct 2, 2023
631219f
ignore numbagg for mypy
dcherian Oct 2, 2023
f00873e
Add to upstream-dev CI
dcherian Oct 2, 2023
5b5c587
Add to optional dependencies
dcherian Oct 2, 2023
9ace31d
Fix bool reductions
dcherian Oct 2, 2023
c2b378e
fix mypy ignore
dcherian Oct 2, 2023
432d97a
Merge branch 'main' into numbagg
dcherian Oct 2, 2023
1410b77
reintroduce engines
dcherian Oct 2, 2023
6715587
Update docstring
dcherian Oct 3, 2023
e92f483
Update docs.
dcherian Oct 3, 2023
76317f0
Support more aggregations
dcherian Oct 3, 2023
38d54c1
More aggregations
dcherian Oct 3, 2023
ab0dc3f
back to nancount
dcherian Oct 3, 2023
fbfcfce
Add any, all
dcherian Oct 3, 2023
0c8268a
promote in nanstd too
dcherian Oct 3, 2023
aa856ff
Add ddof in anticipation of https://github.com/numbagg/numbagg/issues…
dcherian Oct 3, 2023
d8348ef
Add more benchmarks
dcherian Oct 4, 2023
67e5c92
reorder benchmark table
dcherian Oct 4, 2023
1a02428
Fix numba compilation setup?
dcherian Oct 4, 2023
4eb3a48
More benchmarks
dcherian Oct 4, 2023
a2639e4
Rework benchmarks
dcherian Oct 5, 2023
f2b40ef
small docstring update
dcherian Oct 5, 2023
53f9340
ignore asv typing
dcherian Oct 5, 2023
e1eda24
fix type ignoring
dcherian Oct 5, 2023
412f31f
Merge remote-tracking branch 'upstream/main' into numbagg
dcherian Oct 5, 2023
a899b5a
Guard against numbagg failures
dcherian Oct 7, 2023
492fb04
Use released numbagg
dcherian Oct 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions asv_bench/asv.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
// followed by the pip installed packages).
//
"matrix": {
"numbagg": [""],
"numpy_groupies": [""],
"numpy": [""],
"pandas": [""],
Expand Down
101 changes: 83 additions & 18 deletions asv_bench/benchmarks/reduce.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,86 @@
import numpy as np
import numpy_groupies as npg
import pandas as pd
from asv_runner.benchmarks.mark import parameterize, skip_for_params

import flox
import flox.aggregations

from . import parameterized
N = 3000
funcs = ["sum", "nansum", "mean", "nanmean", "max", "nanmax", "var", "count", "all"]
engines = ["flox", "numpy", "numbagg"]
expected_groups = {
"None": None,
"RangeIndex": pd.RangeIndex(5),
"bins": pd.IntervalIndex.from_breaks([1, 2, 4]),
}
expected_names = tuple(expected_groups)

N = 1000
funcs = ["sum", "nansum", "mean", "nanmean", "max", "var", "nanvar", "count"]
engines = ["flox", "numpy"]
expected_groups = [None, pd.IntervalIndex.from_breaks([1, 2, 4])]
NUMBAGG_FUNCS = ["nansum", "nanmean", "nanmax", "count", "all"]

numbagg_skip = [
(func, expected_names[0], "numbagg") for func in funcs if func not in NUMBAGG_FUNCS
] + [(func, expected_names[1], "numbagg") for func in funcs if func not in NUMBAGG_FUNCS]


def setup_jit():
# pre-compile jitted funcs
labels = np.ones((N), dtype=int)
array1 = np.ones((N), dtype=float)
array2 = np.ones((N, N), dtype=float)

if "numba" in engines:
for func in funcs:
method = getattr(flox.aggregate_npg, func)
method(labels, array1, engine="numba")
if "numbagg" in engines:
for func in set(NUMBAGG_FUNCS) & set(funcs):
flox.groupby_reduce(array1, labels, func=func, engine="numbagg")
flox.groupby_reduce(array2, labels, func=func, engine="numbagg")


class ChunkReduce:
"""Time the core reduction function."""

min_run_count = 5
warmup_time = 1

def setup(self, *args, **kwargs):
# pre-compile jitted funcs
if "numba" in engines:
for func in funcs:
npg.aggregate_numba.aggregate(
np.ones((100,), dtype=int), np.ones((100,), dtype=int), func=func
)
raise NotImplementedError

@parameterized("func, engine, expected_groups", [funcs, engines, expected_groups])
def time_reduce(self, func, engine, expected_groups):
@skip_for_params(numbagg_skip)
@parameterize({"func": funcs, "expected_name": expected_names, "engine": engines})
def time_reduce(self, func, expected_name, engine):
flox.groupby_reduce(
self.array,
self.labels,
func=func,
engine=engine,
axis=self.axis,
expected_groups=expected_groups,
expected_groups=expected_groups[expected_name],
)

@parameterized("func, engine, expected_groups", [funcs, engines, expected_groups])
def peakmem_reduce(self, func, engine, expected_groups):
@parameterize({"func": ["nansum", "nanmean", "nanmax", "count"], "engine": engines})
def time_reduce_bare(self, func, engine):
flox.aggregations.generic_aggregate(
self.labels,
self.array,
axis=-1,
size=5,
func=func,
engine=engine,
fill_value=0,
)

@skip_for_params(numbagg_skip)
@parameterize({"func": funcs, "expected_name": expected_names, "engine": engines})
def peakmem_reduce(self, func, expected_name, engine):
flox.groupby_reduce(
self.array,
self.labels,
func=func,
engine=engine,
axis=self.axis,
expected_groups=expected_groups,
expected_groups=expected_groups[expected_name],
)


Expand All @@ -52,17 +89,45 @@ def setup(self, *args, **kwargs):
self.array = np.ones((N,))
self.labels = np.repeat(np.arange(5), repeats=N // 5)
self.axis = -1
if "numbagg" in args:
setup_jit()


class ChunkReduce1DUnsorted(ChunkReduce):
def setup(self, *args, **kwargs):
self.array = np.ones((N,))
self.labels = np.random.permutation(np.repeat(np.arange(5), repeats=N // 5))
self.axis = -1
setup_jit()


class ChunkReduce2D(ChunkReduce):
def setup(self, *args, **kwargs):
self.array = np.ones((N, N))
self.labels = np.repeat(np.arange(N // 5), repeats=5)
self.axis = -1
setup_jit()


class ChunkReduce2DUnsorted(ChunkReduce):
def setup(self, *args, **kwargs):
self.array = np.ones((N, N))
self.labels = np.random.permutation(np.repeat(np.arange(N // 5), repeats=5))
self.axis = -1
setup_jit()


class ChunkReduce2DAllAxes(ChunkReduce):
def setup(self, *args, **kwargs):
self.array = np.ones((N, N))
self.labels = np.repeat(np.arange(N // 5), repeats=5)
self.axis = None
setup_jit()


class ChunkReduce2DAllAxesUnsorted(ChunkReduce):
def setup(self, *args, **kwargs):
self.array = np.ones((N, N))
self.labels = np.random.permutation(np.repeat(np.arange(N // 5), repeats=5))
self.axis = None
setup_jit()
2 changes: 2 additions & 0 deletions ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ dependencies:
- toolz
- numba
- scipy
- pip:
- numbagg>=0.3
1 change: 1 addition & 0 deletions ci/upstream-dev-env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ dependencies:
- git+https://github.com/pandas-dev/pandas
- git+https://github.com/dask/dask
- git+https://github.com/ml31415/numpy-groupies
- git+https://github.com/numbagg/numbagg
8 changes: 5 additions & 3 deletions docs/source/engines.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@
1. `engine="numba"` wraps `numpy_groupies.aggregate_numba`. This uses `numba` kernels for the core aggregation.
1. `engine="flox"` uses the `ufunc.reduceat` method after first argsorting the array so that all group members occur sequentially. This was copied from
a [gist by Stephan Hoyer](https://gist.github.com/shoyer/f538ac78ae904c936844)
1. `engine="numbagg"` uses the reductions available in [`numbagg.grouped`](https://github.com/numbagg/numbagg/blob/main/numbagg/grouped.py)
from the [numbagg](https://github.com/numbagg/numbagg) project.

See [](arrays) for more details.

## Tradeoffs

For the common case of reducing a nD array by a 1D array of group labels (e.g. `groupby("time.month")`), `engine="flox"` *can* be faster.
For the common case of reducing a nD array by a 1D array of group labels (e.g. `groupby("time.month")`), `engine="numbagg"` is almost always faster, and `engine="flox"` *can* be faster.

The reason is that `numpy_groupies` converts all groupby problems to a 1D problem, this can involve [some overhead](https://github.com/ml31415/numpy-groupies/pull/46).
It is possible to optimize this a bit in `flox` or `numpy_groupies`, but the work has not been done yet.
The advantage of `engine="numpy"` is that it tends to work for more array types, since it appears to be more common to implement `np.bincount`, and not `np.add.reduceat`.

```{tip}
Other potential engines we could add are [`numbagg`](https://github.com/numbagg/numbagg) ([stalled PR here](https://github.com/xarray-contrib/flox/pull/72)) and [`datashader`](https://github.com/xarray-contrib/flox/issues/142).
Both use numba for high-performance aggregations. Contributions or discussion is very welcome!
One other potential engine we could add is [`datashader`](https://github.com/xarray-contrib/flox/issues/142).
Contributions or discussion is very welcome!
```
100 changes: 100 additions & 0 deletions flox/aggregate_numbagg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from functools import partial

import numbagg
import numbagg.grouped
import numpy as np


def _numbagg_wrapper(
group_idx,
array,
*,
axis=-1,
func="sum",
size=None,
fill_value=None,
dtype=None,
numbagg_func=None,
):
return numbagg_func(
array,
group_idx,
axis=axis,
num_labels=size,
# The following are unsupported
# fill_value=fill_value,
# dtype=dtype,
)


def nansum(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None):
if np.issubdtype(array.dtype, np.bool_):
array = array.astype(np.in64)
return numbagg.grouped.group_nansum(
array,
group_idx,
axis=axis,
num_labels=size,
# fill_value=fill_value,
# dtype=dtype,
)


def nanmean(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None):
if np.issubdtype(array.dtype, np.int_):
array = array.astype(np.float64)
return numbagg.grouped.group_nanmean(
array,
group_idx,
axis=axis,
num_labels=size,
# fill_value=fill_value,
# dtype=dtype,
)


def nanvar(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None, ddof=0):
assert ddof != 0
if np.issubdtype(array.dtype, np.int_):
array = array.astype(np.float64)
return numbagg.grouped.group_nanvar(
array,
group_idx,
axis=axis,
num_labels=size,
# ddof=0,
# fill_value=fill_value,
# dtype=dtype,
)


def nanstd(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None, ddof=0):
assert ddof != 0
if np.issubdtype(array.dtype, np.int_):
array = array.astype(np.float64)
return numbagg.grouped.group_nanstd(
array,
group_idx,
axis=axis,
num_labels=size,
# ddof=0,
# fill_value=fill_value,
# dtype=dtype,
)


nansum_of_squares = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nansum_of_squares)
nanlen = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nancount)
nanprod = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanprod)
nanfirst = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanfirst)
nanlast = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanlast)
# nanargmax = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmax)
# nanargmin = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmin)
nanmax = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanmax)
nanmin = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanmin)
any = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanany)
all = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanall)

# sum = nansum
# mean = nanmean
# sum_of_squares = nansum_of_squares
18 changes: 17 additions & 1 deletion flox/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,28 @@ def generic_aggregate(
except AttributeError:
method = get_npg_aggregation(func, engine="numpy")

elif engine == "numbagg":
from . import aggregate_numbagg

try:
if (
# numabgg hardcodes ddof=1
("var" in func or "std" in func)
and kwargs.get("ddof", 0) == 0
):
method = get_npg_aggregation(func, engine="numpy")

else:
method = getattr(aggregate_numbagg, func)
except AttributeError:
method = get_npg_aggregation(func, engine="numpy")

elif engine in ["numpy", "numba"]:
method = get_npg_aggregation(func, engine=engine)

else:
raise ValueError(
f"Expected engine to be one of ['flox', 'numpy', 'numba']. Received {engine} instead."
f"Expected engine to be one of ['flox', 'numpy', 'numba', 'numbagg']. Received {engine} instead."
)

group_idx = np.asarray(group_idx, like=array)
Expand Down
Loading