diff --git a/asv_bench/asv.conf.json b/asv_bench/asv.conf.json index d01f64dd4..cab53382a 100644 --- a/asv_bench/asv.conf.json +++ b/asv_bench/asv.conf.json @@ -72,6 +72,7 @@ // followed by the pip installed packages). // "matrix": { + "numbagg": [""], "numpy_groupies": [""], "numpy": [""], "pandas": [""], diff --git a/asv_bench/benchmarks/reduce.py b/asv_bench/benchmarks/reduce.py index 985658785..add77c182 100644 --- a/asv_bench/benchmarks/reduce.py +++ b/asv_bench/benchmarks/reduce.py @@ -1,49 +1,86 @@ import numpy as np -import numpy_groupies as npg import pandas as pd +from asv_runner.benchmarks.mark import parameterize, skip_for_params import flox +import flox.aggregations -from . import parameterized +N = 3000 +funcs = ["sum", "nansum", "mean", "nanmean", "max", "nanmax", "var", "count", "all"] +engines = ["flox", "numpy", "numbagg"] +expected_groups = { + "None": None, + "RangeIndex": pd.RangeIndex(5), + "bins": pd.IntervalIndex.from_breaks([1, 2, 4]), +} +expected_names = tuple(expected_groups) -N = 1000 -funcs = ["sum", "nansum", "mean", "nanmean", "max", "var", "nanvar", "count"] -engines = ["flox", "numpy"] -expected_groups = [None, pd.IntervalIndex.from_breaks([1, 2, 4])] +NUMBAGG_FUNCS = ["nansum", "nanmean", "nanmax", "count", "all"] + +numbagg_skip = [ + (func, expected_names[0], "numbagg") for func in funcs if func not in NUMBAGG_FUNCS +] + [(func, expected_names[1], "numbagg") for func in funcs if func not in NUMBAGG_FUNCS] + + +def setup_jit(): + # pre-compile jitted funcs + labels = np.ones((N), dtype=int) + array1 = np.ones((N), dtype=float) + array2 = np.ones((N, N), dtype=float) + + if "numba" in engines: + for func in funcs: + method = getattr(flox.aggregate_npg, func) + method(labels, array1, engine="numba") + if "numbagg" in engines: + for func in set(NUMBAGG_FUNCS) & set(funcs): + flox.groupby_reduce(array1, labels, func=func, engine="numbagg") + flox.groupby_reduce(array2, labels, func=func, engine="numbagg") class ChunkReduce: """Time the core reduction function.""" + min_run_count = 5 + warmup_time = 1 + def setup(self, *args, **kwargs): - # pre-compile jitted funcs - if "numba" in engines: - for func in funcs: - npg.aggregate_numba.aggregate( - np.ones((100,), dtype=int), np.ones((100,), dtype=int), func=func - ) raise NotImplementedError - @parameterized("func, engine, expected_groups", [funcs, engines, expected_groups]) - def time_reduce(self, func, engine, expected_groups): + @skip_for_params(numbagg_skip) + @parameterize({"func": funcs, "expected_name": expected_names, "engine": engines}) + def time_reduce(self, func, expected_name, engine): flox.groupby_reduce( self.array, self.labels, func=func, engine=engine, axis=self.axis, - expected_groups=expected_groups, + expected_groups=expected_groups[expected_name], ) - @parameterized("func, engine, expected_groups", [funcs, engines, expected_groups]) - def peakmem_reduce(self, func, engine, expected_groups): + @parameterize({"func": ["nansum", "nanmean", "nanmax", "count"], "engine": engines}) + def time_reduce_bare(self, func, engine): + flox.aggregations.generic_aggregate( + self.labels, + self.array, + axis=-1, + size=5, + func=func, + engine=engine, + fill_value=0, + ) + + @skip_for_params(numbagg_skip) + @parameterize({"func": funcs, "expected_name": expected_names, "engine": engines}) + def peakmem_reduce(self, func, expected_name, engine): flox.groupby_reduce( self.array, self.labels, func=func, engine=engine, axis=self.axis, - expected_groups=expected_groups, + expected_groups=expected_groups[expected_name], ) @@ -52,6 +89,16 @@ def setup(self, *args, **kwargs): self.array = np.ones((N,)) self.labels = np.repeat(np.arange(5), repeats=N // 5) self.axis = -1 + if "numbagg" in args: + setup_jit() + + +class ChunkReduce1DUnsorted(ChunkReduce): + def setup(self, *args, **kwargs): + self.array = np.ones((N,)) + self.labels = np.random.permutation(np.repeat(np.arange(5), repeats=N // 5)) + self.axis = -1 + setup_jit() class ChunkReduce2D(ChunkReduce): @@ -59,6 +106,15 @@ def setup(self, *args, **kwargs): self.array = np.ones((N, N)) self.labels = np.repeat(np.arange(N // 5), repeats=5) self.axis = -1 + setup_jit() + + +class ChunkReduce2DUnsorted(ChunkReduce): + def setup(self, *args, **kwargs): + self.array = np.ones((N, N)) + self.labels = np.random.permutation(np.repeat(np.arange(N // 5), repeats=5)) + self.axis = -1 + setup_jit() class ChunkReduce2DAllAxes(ChunkReduce): @@ -66,3 +122,12 @@ def setup(self, *args, **kwargs): self.array = np.ones((N, N)) self.labels = np.repeat(np.arange(N // 5), repeats=5) self.axis = None + setup_jit() + + +class ChunkReduce2DAllAxesUnsorted(ChunkReduce): + def setup(self, *args, **kwargs): + self.array = np.ones((N, N)) + self.labels = np.random.permutation(np.repeat(np.arange(N // 5), repeats=5)) + self.axis = None + setup_jit() diff --git a/ci/environment.yml b/ci/environment.yml index 9d5aa6d01..33e1b4661 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -23,3 +23,5 @@ dependencies: - toolz - numba - scipy + - pip: + - numbagg>=0.3 diff --git a/ci/upstream-dev-env.yml b/ci/upstream-dev-env.yml index 04fd7ce60..6b8e796ea 100644 --- a/ci/upstream-dev-env.yml +++ b/ci/upstream-dev-env.yml @@ -18,3 +18,4 @@ dependencies: - git+https://github.com/pandas-dev/pandas - git+https://github.com/dask/dask - git+https://github.com/ml31415/numpy-groupies + - git+https://github.com/numbagg/numbagg diff --git a/docs/source/engines.md b/docs/source/engines.md index 867979d13..68c65cab5 100644 --- a/docs/source/engines.md +++ b/docs/source/engines.md @@ -9,18 +9,20 @@ 1. `engine="numba"` wraps `numpy_groupies.aggregate_numba`. This uses `numba` kernels for the core aggregation. 1. `engine="flox"` uses the `ufunc.reduceat` method after first argsorting the array so that all group members occur sequentially. This was copied from a [gist by Stephan Hoyer](https://gist.github.com/shoyer/f538ac78ae904c936844) +1. `engine="numbagg"` uses the reductions available in [`numbagg.grouped`](https://github.com/numbagg/numbagg/blob/main/numbagg/grouped.py) + from the [numbagg](https://github.com/numbagg/numbagg) project. See [](arrays) for more details. ## Tradeoffs -For the common case of reducing a nD array by a 1D array of group labels (e.g. `groupby("time.month")`), `engine="flox"` *can* be faster. +For the common case of reducing a nD array by a 1D array of group labels (e.g. `groupby("time.month")`), `engine="numbagg"` is almost always faster, and `engine="flox"` *can* be faster. The reason is that `numpy_groupies` converts all groupby problems to a 1D problem, this can involve [some overhead](https://github.com/ml31415/numpy-groupies/pull/46). It is possible to optimize this a bit in `flox` or `numpy_groupies`, but the work has not been done yet. The advantage of `engine="numpy"` is that it tends to work for more array types, since it appears to be more common to implement `np.bincount`, and not `np.add.reduceat`. ```{tip} -Other potential engines we could add are [`numbagg`](https://github.com/numbagg/numbagg) ([stalled PR here](https://github.com/xarray-contrib/flox/pull/72)) and [`datashader`](https://github.com/xarray-contrib/flox/issues/142). -Both use numba for high-performance aggregations. Contributions or discussion is very welcome! +One other potential engine we could add is [`datashader`](https://github.com/xarray-contrib/flox/issues/142). +Contributions or discussion is very welcome! ``` diff --git a/flox/aggregate_numbagg.py b/flox/aggregate_numbagg.py new file mode 100644 index 000000000..b0b06d86e --- /dev/null +++ b/flox/aggregate_numbagg.py @@ -0,0 +1,100 @@ +from functools import partial + +import numbagg +import numbagg.grouped +import numpy as np + + +def _numbagg_wrapper( + group_idx, + array, + *, + axis=-1, + func="sum", + size=None, + fill_value=None, + dtype=None, + numbagg_func=None, +): + return numbagg_func( + array, + group_idx, + axis=axis, + num_labels=size, + # The following are unsupported + # fill_value=fill_value, + # dtype=dtype, + ) + + +def nansum(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None): + if np.issubdtype(array.dtype, np.bool_): + array = array.astype(np.in64) + return numbagg.grouped.group_nansum( + array, + group_idx, + axis=axis, + num_labels=size, + # fill_value=fill_value, + # dtype=dtype, + ) + + +def nanmean(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None): + if np.issubdtype(array.dtype, np.int_): + array = array.astype(np.float64) + return numbagg.grouped.group_nanmean( + array, + group_idx, + axis=axis, + num_labels=size, + # fill_value=fill_value, + # dtype=dtype, + ) + + +def nanvar(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None, ddof=0): + assert ddof != 0 + if np.issubdtype(array.dtype, np.int_): + array = array.astype(np.float64) + return numbagg.grouped.group_nanvar( + array, + group_idx, + axis=axis, + num_labels=size, + # ddof=0, + # fill_value=fill_value, + # dtype=dtype, + ) + + +def nanstd(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None, ddof=0): + assert ddof != 0 + if np.issubdtype(array.dtype, np.int_): + array = array.astype(np.float64) + return numbagg.grouped.group_nanstd( + array, + group_idx, + axis=axis, + num_labels=size, + # ddof=0, + # fill_value=fill_value, + # dtype=dtype, + ) + + +nansum_of_squares = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nansum_of_squares) +nanlen = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nancount) +nanprod = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanprod) +nanfirst = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanfirst) +nanlast = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanlast) +# nanargmax = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmax) +# nanargmin = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmin) +nanmax = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanmax) +nanmin = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanmin) +any = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanany) +all = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanall) + +# sum = nansum +# mean = nanmean +# sum_of_squares = nansum_of_squares diff --git a/flox/aggregations.py b/flox/aggregations.py index c06ef3509..ec696da53 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -62,12 +62,28 @@ def generic_aggregate( except AttributeError: method = get_npg_aggregation(func, engine="numpy") + elif engine == "numbagg": + from . import aggregate_numbagg + + try: + if ( + # numabgg hardcodes ddof=1 + ("var" in func or "std" in func) + and kwargs.get("ddof", 0) == 0 + ): + method = get_npg_aggregation(func, engine="numpy") + + else: + method = getattr(aggregate_numbagg, func) + except AttributeError: + method = get_npg_aggregation(func, engine="numpy") + elif engine in ["numpy", "numba"]: method = get_npg_aggregation(func, engine=engine) else: raise ValueError( - f"Expected engine to be one of ['flox', 'numpy', 'numba']. Received {engine} instead." + f"Expected engine to be one of ['flox', 'numpy', 'numba', 'numbagg']. Received {engine} instead." ) group_idx = np.asarray(group_idx, like=array) diff --git a/flox/core.py b/flox/core.py index 484303295..e5518b551 100644 --- a/flox/core.py +++ b/flox/core.py @@ -67,7 +67,7 @@ T_AxesOpt = Union[T_Axis, T_Axes, None] T_Dtypes = Union[np.typing.DTypeLike, Sequence[np.typing.DTypeLike], None] T_FillValues = Union[np.typing.ArrayLike, Sequence[np.typing.ArrayLike], None] - T_Engine = Literal["flox", "numpy", "numba"] + T_Engine = Literal["flox", "numpy", "numba", "numbagg"] T_Method = Literal["map-reduce", "blockwise", "cohorts"] T_IsBins = Union[bool | Sequence[bool]] @@ -1831,7 +1831,7 @@ def groupby_reduce( (for 1D ``by`` only). * ``"split-reduce"``: Same as "cohorts" and will be removed soon. - engine : {"flox", "numpy", "numba"}, optional + engine : {"flox", "numpy", "numba", "numbagg"}, optional Algorithm to compute the groupby reduction on non-dask arrays and on each dask chunk: * ``"numpy"``: Use the vectorized implementations in ``numpy_groupies.aggregate_numpy``. @@ -1843,6 +1843,9 @@ def groupby_reduce( for a reduction that is not yet implemented. * ``"numba"``: Use the implementations in ``numpy_groupies.aggregate_numba``. + * ``"numbagg"``: + Use the reductions supported by ``numbagg.grouped``. This will fall back to ``numpy_groupies.aggregate_numpy`` + for a reduction that is not yet implemented. reindex : bool, optional Whether to "reindex" the blockwise results to ``expected_groups`` (possibly automatically detected). If True, the intermediate result of the blockwise groupby-reduction has a value for all expected groups, @@ -1870,6 +1873,14 @@ def groupby_reduce( "Try engine='numpy' or engine='numba' instead." ) + if engine == "numbagg" and dtype is not None: + raise NotImplementedError( + "numbagg does not support the `dtype` kwarg. Either cast your " + "input arguments to `dtype` or use a different `engine`: " + "'flox' or 'numpy' or 'numba'. " + "See https://github.com/numbagg/numbagg/issues/121." + ) + if func == "quantile" and (finalize_kwargs is None or "q" not in finalize_kwargs): raise ValueError("Please pass `q` for quantile calculations.") @@ -1878,6 +1889,23 @@ def groupby_reduce( by_is_dask = tuple(is_duck_dask_array(b) for b in bys) any_by_dask = any(by_is_dask) + if ( + engine == "numbagg" + and _is_arg_reduction(func) + and (any_by_dask or is_duck_dask_array(array)) + ): + # There is only one test that fails, but I can't figure + # out why without deep debugging. + # just disable for now. + # test_groupby_reduce_axis_subset_against_numpy + # for array is 3D dask, by is 3D dask, axis=2 + # We are falling back to numpy for the arg reduction, + # so presumably something is going wrong + raise NotImplementedError( + "argreductions not supported for engine='numbagg' yet." + "Try engine='numpy' or engine='numba' instead." + ) + if method in ["split-reduce", "cohorts"] and any_by_dask: raise ValueError(f"method={method!r} can only be used when grouping by numpy arrays.") @@ -2019,7 +2047,7 @@ def groupby_reduce( if agg.chunk[0] is None and method != "blockwise": raise NotImplementedError( f"Aggregation {agg.name!r} is only implemented for dask arrays when method='blockwise'." - f"\n\n Received: {func}" + f"Received method={method!r}" ) if method in ["blockwise", "cohorts"] and nax != by_.ndim: diff --git a/flox/xarray.py b/flox/xarray.py index eb35da387..7e8c4f2b1 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -148,6 +148,9 @@ def xarray_reduce( for a reduction that is not yet implemented. * ``"numba"``: Use the implementations in ``numpy_groupies.aggregate_numba``. + * ``"numbagg"``: + Use the reductions supported by ``numbagg.grouped``. This will fall back to ``numpy_groupies.aggregate_numpy`` + for a reduction that is not yet implemented. keep_attrs : bool, optional Preserve attrs? skipna : bool, optional diff --git a/pyproject.toml b/pyproject.toml index e387657d2..5ca8e0317 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,7 @@ repository = "https://github.com/xarray-contrib/flox.git" changelog = "https://github.com/xarray-contrib/flox/releases" [project.optional-dependencies] -all = ["cachey", "dask", "numba", "xarray"] +all = ["cachey", "dask", "numba", "numbagg", "xarray"] test = ["netCDF4"] [build-system] @@ -109,11 +109,13 @@ warn_unused_ignores = true [[tool.mypy.overrides]] module=[ + "asv_runner.*", "cachey", "cftime", "dask.*", "importlib_metadata", "numba", + "numbagg.*", "numpy_groupies.*", "matplotlib.*", "pandas", diff --git a/tests/__init__.py b/tests/__init__.py index f1c8ec6bf..f46319172 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -47,6 +47,7 @@ def LooseVersion(vstring): has_dask, requires_dask = _importorskip("dask") has_numba, requires_numba = _importorskip("numba") +has_numbagg, requires_numbagg = _importorskip("numbagg") has_scipy, requires_scipy = _importorskip("scipy") has_xarray, requires_xarray = _importorskip("xarray") diff --git a/tests/conftest.py b/tests/conftest.py index eb5971784..504564b5a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,10 +1,16 @@ import pytest -from . import requires_numba +from . import requires_numba, requires_numbagg @pytest.fixture( - scope="module", params=["flox", "numpy", pytest.param("numba", marks=requires_numba)] + scope="module", + params=[ + "flox", + "numpy", + pytest.param("numba", marks=requires_numba), + pytest.param("numbagg", marks=requires_numbagg), + ], ) def engine(request): return request.param diff --git a/tests/test_core.py b/tests/test_core.py index 440431304..09ee50902 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -225,7 +225,9 @@ def gen_array_by(size, func): @pytest.mark.parametrize("add_nan_by", [True, False]) @pytest.mark.parametrize("func", ALL_FUNCS) def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine): - if ("arg" in func and engine == "flox") or (func in BLOCKWISE_FUNCS and chunks != -1): + if ("arg" in func and engine in ["flox", "numbagg"]) or ( + func in BLOCKWISE_FUNCS and chunks != -1 + ): pytest.skip() array, by = gen_array_by(size, func) @@ -424,7 +426,7 @@ def test_groupby_agg_dask(func, shape, array_chunks, group_chunks, add_nan, dtyp if func in ["first", "last"] or func in BLOCKWISE_FUNCS: pytest.skip() - if "arg" in func and (engine == "flox" or reindex): + if "arg" in func and (engine in ["flox", "numbagg"] or reindex): pytest.skip() rng = np.random.default_rng(12345) @@ -576,7 +578,7 @@ def test_first_last_disallowed_dask(func): "axis", [None, (0, 1, 2), (0, 1), (0, 2), (1, 2), 0, 1, 2, (0,), (1,), (2,)] ) def test_groupby_reduce_axis_subset_against_numpy(func, axis, engine): - if ("arg" in func and engine == "flox") or func in BLOCKWISE_FUNCS: + if ("arg" in func and engine in ["flox", "numbagg"]) or func in BLOCKWISE_FUNCS: pytest.skip() if not isinstance(axis, int): @@ -893,6 +895,9 @@ def test_fill_value_behaviour(func, chunks, fill_value, engine): @pytest.mark.parametrize("func", ["mean", "sum"]) @pytest.mark.parametrize("dtype", ["float32", "float64", "int32", "int64"]) def test_dtype_preservation(dtype, func, engine): + if engine == "numbagg": + # https://github.com/numbagg/numbagg/issues/121 + pytest.skip() if func == "sum" or (func == "mean" and "float" in dtype): expected = np.dtype(dtype) elif func == "mean" and "int" in dtype: @@ -931,7 +936,7 @@ def test_cohorts_map_reduce_consistent_dtypes(method, dtype, labels_dtype): @pytest.mark.parametrize("method", ["blockwise", "cohorts", "map-reduce"]) def test_cohorts_nd_by(func, method, axis, engine): if ( - ("arg" in func and (axis is None or engine == "flox")) + ("arg" in func and (axis is None or engine in ["flox", "numbagg"])) or (method != "blockwise" and func in BLOCKWISE_FUNCS) or (axis is None and ("first" in func or "last" in func)) ): @@ -1270,6 +1275,9 @@ def grouped_median(group_idx, array, *, axis=-1, size=None, fill_value=None, dty @pytest.mark.parametrize("func", ALL_FUNCS) @pytest.mark.parametrize("dtype", [np.float32, np.float64]) def test_dtype(func, dtype, engine): + if engine == "numbagg": + # https://github.com/numbagg/numbagg/issues/121 + pytest.skip() if "arg" in func or func in ["any", "all"]: pytest.skip() diff --git a/tests/test_xarray.py b/tests/test_xarray.py index 8f006e5f3..116937052 100644 --- a/tests/test_xarray.py +++ b/tests/test_xarray.py @@ -466,6 +466,10 @@ def test_alignment_error(): @pytest.mark.parametrize("dtype", [np.float32, np.float64]) @pytest.mark.parametrize("chunk", (pytest.param(True, marks=requires_dask), False)) def test_dtype(add_nan, chunk, dtype, dtype_out, engine): + if engine == "numbagg": + # https://github.com/numbagg/numbagg/issues/121 + pytest.skip() + xp = dask.array if chunk else np data = xp.linspace(0, 1, 48, dtype=dtype).reshape((4, 12))