From e864221f3036ebc044e4f4440e76275e4ad02383 Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 3 Feb 2022 21:34:14 -0700 Subject: [PATCH 01/39] Add engine="numbagg" --- flox/aggregate_numbagg.py | 66 +++++++++++++++++++++++++++++++++++++++ flox/aggregations.py | 9 +++++- tests/__init__.py | 2 +- 3 files changed, 75 insertions(+), 2 deletions(-) create mode 100644 flox/aggregate_numbagg.py diff --git a/flox/aggregate_numbagg.py b/flox/aggregate_numbagg.py new file mode 100644 index 000000000..993107a3e --- /dev/null +++ b/flox/aggregate_numbagg.py @@ -0,0 +1,66 @@ +from numbagg.grouped import group_nanmean, group_nansum + + +def nansum_of_squares( + group_idx, array, *, axis=-1, func="sum", size=None, fill_value=None, dtype=None +): + + return group_nansum( + array**2, + group_idx, + axis=axis, + func=func, + # size=size, + # fill_value=fill_value, + # dtype=dtype, + ) + + +def nansum(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None): + # npg takes out NaNs before calling np.bincount + # This means that all NaN groups are equivalent to absent groups + # This behaviour does not work for xarray + + return group_nansum( + array, + group_idx, + axis=axis, + # size=size, + # fill_value=fill_value, + # dtype=dtype, + ) + + +def nanmean(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None): + # npg takes out NaNs before calling np.bincount + # This means that all NaN groups are equivalent to absent groups + # This behaviour does not work for xarray + + return group_nanmean( + array, + group_idx, + axis=axis, + # size=size, + # fill_value=fill_value, + # dtype=dtype, + ) + + +sum = nansum +mean = nanmean +sum_of_squares = nansum_of_squares + +# def nanprod(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None): +# # npg takes out NaNs before calling np.bincount +# # This means that all NaN groups are equivalent to absent groups +# # This behaviour does not work for xarray + +# return npg.aggregate_numpy.aggregate( +# group_idx, +# np.where(np.isnan(array), 1, array), +# axis=axis, +# func="prod", +# size=size, +# fill_value=fill_value, +# dtype=dtype, +# ) diff --git a/flox/aggregations.py b/flox/aggregations.py index 8f389e376..435cee838 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -17,6 +17,13 @@ def generic_aggregate( method = getattr(aggregate_flox, func) except AttributeError: method = partial(npg.aggregate_numpy.aggregate, func=func) + elif engine == "numbagg": + from . import aggregate_numbagg + + try: + method = getattr(aggregate_numbagg, func) + except AttributeError: + method = partial(npg.aggregate_numpy.aggregate, func=func) elif engine == "numpy": try: # TODO: fix numba here @@ -30,7 +37,7 @@ def generic_aggregate( method = partial(npg.aggregate_nb, func=func) else: raise ValueError( - f"Expected engine to be one of ['flox', 'numpy', 'numba']. Received {engine} instead." + f"Expected engine to be one of ['flox', 'numpy', 'numba', 'numbagg']. Received {engine} instead." ) return method( diff --git a/tests/__init__.py b/tests/__init__.py index 04a040bf8..427317133 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -93,7 +93,7 @@ def assert_equal(a, b): np.testing.assert_allclose(a, b, equal_nan=True) -@pytest.fixture(scope="module", params=["flox", "numpy", "numba"]) +@pytest.fixture(scope="module", params=["numbagg"]) def engine(request): if request.param == "numba": try: From 0d76a44fd8492aaf31a37f2d4689a244dc341fee Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 3 Feb 2022 21:37:37 -0700 Subject: [PATCH 02/39] Fix. --- flox/aggregate_numbagg.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/flox/aggregate_numbagg.py b/flox/aggregate_numbagg.py index 993107a3e..b9880e7bb 100644 --- a/flox/aggregate_numbagg.py +++ b/flox/aggregate_numbagg.py @@ -9,8 +9,7 @@ def nansum_of_squares( array**2, group_idx, axis=axis, - func=func, - # size=size, + num_labels=size, # fill_value=fill_value, # dtype=dtype, ) @@ -20,12 +19,11 @@ def nansum(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None) # npg takes out NaNs before calling np.bincount # This means that all NaN groups are equivalent to absent groups # This behaviour does not work for xarray - return group_nansum( array, group_idx, axis=axis, - # size=size, + num_labels=size, # fill_value=fill_value, # dtype=dtype, ) @@ -40,7 +38,7 @@ def nanmean(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None array, group_idx, axis=axis, - # size=size, + num_labels=size, # fill_value=fill_value, # dtype=dtype, ) From 45134f0ef88fb0d3bc99a54ece31e398e7839ad2 Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 3 Feb 2022 21:57:37 -0700 Subject: [PATCH 03/39] fix CI --- ci/environment.yml | 2 ++ tests/__init__.py | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/ci/environment.yml b/ci/environment.yml index cfb44d257..b9d92f8ef 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -17,3 +17,5 @@ dependencies: - pooch - toolz - numba + - pip: + - numbagg diff --git a/tests/__init__.py b/tests/__init__.py index 427317133..4bd2c6b5b 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -100,4 +100,9 @@ def engine(request): import numba except ImportError: pytest.xfail() + if request.param == "numbagg": + try: + import numbagg + except ImportError: + pytest.xfail() return request.param From d54025c55697404a55ea54e7ce4361712f465dd0 Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 4 Feb 2022 10:04:59 -0700 Subject: [PATCH 04/39] Add nanlen --- flox/aggregate_numbagg.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/flox/aggregate_numbagg.py b/flox/aggregate_numbagg.py index b9880e7bb..a192d404c 100644 --- a/flox/aggregate_numbagg.py +++ b/flox/aggregate_numbagg.py @@ -1,3 +1,4 @@ +import numpy as np from numbagg.grouped import group_nanmean, group_nansum @@ -16,9 +17,6 @@ def nansum_of_squares( def nansum(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None): - # npg takes out NaNs before calling np.bincount - # This means that all NaN groups are equivalent to absent groups - # This behaviour does not work for xarray return group_nansum( array, group_idx, @@ -30,10 +28,6 @@ def nansum(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None) def nanmean(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None): - # npg takes out NaNs before calling np.bincount - # This means that all NaN groups are equivalent to absent groups - # This behaviour does not work for xarray - return group_nanmean( array, group_idx, @@ -44,15 +38,22 @@ def nanmean(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None ) +def nanlen(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None): + return group_nansum( + (~np.isnan(array)).astype(int), + group_idx, + axis=axis, + num_labels=size, + # fill_value=fill_value, + # dtype=dtype, + ) + + sum = nansum mean = nanmean sum_of_squares = nansum_of_squares # def nanprod(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None): -# # npg takes out NaNs before calling np.bincount -# # This means that all NaN groups are equivalent to absent groups -# # This behaviour does not work for xarray - # return npg.aggregate_numpy.aggregate( # group_idx, # np.where(np.isnan(array), 1, array), From e0d98cb45e5dde7855b0e9710f9bd73a7a379593 Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 4 Feb 2022 10:27:14 -0700 Subject: [PATCH 05/39] fix env --- ci/environment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/environment.yml b/ci/environment.yml index b9d92f8ef..e724bbef4 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -18,4 +18,4 @@ dependencies: - toolz - numba - pip: - - numbagg + - git+https://github.com/shoyer/numbagg From 551a719c7ec49a06e29d9a87388cfb43b5a92b2d Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 4 Feb 2022 10:27:39 -0700 Subject: [PATCH 06/39] Add numbagg --- asv.conf.json | 1 + benchmarks/reduce.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/asv.conf.json b/asv.conf.json index 5dc60322b..858aad476 100644 --- a/asv.conf.json +++ b/asv.conf.json @@ -72,6 +72,7 @@ // followed by the pip installed packages). // "matrix": { + "numbagg": [""], "numpy_groupies": [""], "numpy": [""], "pandas": [""], diff --git a/benchmarks/reduce.py b/benchmarks/reduce.py index fd5d65714..27164c5ae 100644 --- a/benchmarks/reduce.py +++ b/benchmarks/reduce.py @@ -7,7 +7,7 @@ N = 1000 funcs = ["sum", "nansum", "mean", "nanmean", "argmax", "max"] -engines = ["flox", "numpy"] +engines = ["flox", "numpy", "numbagg"] class ChunkReduce: From 46c008e3901ea42d0b6fb0c5c90cac5c8c8a8f77 Mon Sep 17 00:00:00 2001 From: dcherian Date: Mon, 7 Feb 2022 13:04:59 -0700 Subject: [PATCH 07/39] Bettter numbagg benchmarks? --- benchmarks/reduce.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/benchmarks/reduce.py b/benchmarks/reduce.py index 27164c5ae..4d7ec0299 100644 --- a/benchmarks/reduce.py +++ b/benchmarks/reduce.py @@ -20,6 +20,15 @@ def setup(self, *args, **kwargs): npg.aggregate_numba.aggregate( np.ones((100,), dtype=int), np.ones((100,), dtype=int), func=func ) + if "numbagg" in engines: + for func in funcs: + try: + method = getattr(flox.aggregate_numbagg, func) + method(np.ones((10,), dtype=int), np.ones((10,), dtype=int)) + method(np.ones((10,), dtype=int), np.ones((10, 10), dtype=int)) + except AttributeError: + pass + raise NotImplementedError @parameterized("func, engine", [funcs, engines]) From 5bc69397b28766f573e74b63fc4c6f251328a59b Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 12 Jul 2022 21:58:39 -0600 Subject: [PATCH 08/39] Update ci/environment.yml --- ci/environment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/environment.yml b/ci/environment.yml index eab44a48e..748bc8b86 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -18,4 +18,4 @@ dependencies: - toolz - numba - pip: - - git+https://github.com/shoyer/numbagg + - git+https://github.com/numbagg/numbagg From 4be07f017a2ac8edf296754cbb91464d19697009 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 28 Sep 2023 21:27:40 +0000 Subject: [PATCH 09/39] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- flox/aggregate_numbagg.py | 1 - 1 file changed, 1 deletion(-) diff --git a/flox/aggregate_numbagg.py b/flox/aggregate_numbagg.py index a192d404c..b7da0fc9c 100644 --- a/flox/aggregate_numbagg.py +++ b/flox/aggregate_numbagg.py @@ -5,7 +5,6 @@ def nansum_of_squares( group_idx, array, *, axis=-1, func="sum", size=None, fill_value=None, dtype=None ): - return group_nansum( array**2, group_idx, From 0e35a9c63c072f2d1b0544d197501f64c0b6420b Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Thu, 28 Sep 2023 16:04:26 -0600 Subject: [PATCH 10/39] cleanup --- flox/aggregate_numbagg.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/flox/aggregate_numbagg.py b/flox/aggregate_numbagg.py index b7da0fc9c..448b07067 100644 --- a/flox/aggregate_numbagg.py +++ b/flox/aggregate_numbagg.py @@ -51,14 +51,3 @@ def nanlen(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None) sum = nansum mean = nanmean sum_of_squares = nansum_of_squares - -# def nanprod(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None): -# return npg.aggregate_numpy.aggregate( -# group_idx, -# np.where(np.isnan(array), 1, array), -# axis=axis, -# func="prod", -# size=size, -# fill_value=fill_value, -# dtype=dtype, -# ) From 1f5ab2b932be1a208aaa3973652841525fe186bc Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 2 Oct 2023 10:28:18 -0600 Subject: [PATCH 11/39] Error on dtype specified --- flox/core.py | 8 ++++++++ tests/test_core.py | 3 +++ 2 files changed, 11 insertions(+) diff --git a/flox/core.py b/flox/core.py index f8f700f99..4c2df3a4e 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1857,6 +1857,14 @@ def groupby_reduce( "Try engine='numpy' or engine='numba' instead." ) + if engine == "numbagg" and dtype is not None: + raise NotImplementedError( + "numbagg does not support the `dtype` kwarg. Either cast your " + "input arguments to `dtype` or use a different `engine`: " + "'flox' or 'numpy' or 'numba'. " + "See https://github.com/numbagg/numbagg/issues/121." + ) + bys: T_Bys = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by) nby = len(bys) by_is_dask = tuple(is_duck_dask_array(b) for b in bys) diff --git a/tests/test_core.py b/tests/test_core.py index 83b823b07..2a500bb86 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -854,6 +854,9 @@ def test_fill_value_behaviour(func, chunks, fill_value, engine): @pytest.mark.parametrize("func", ["mean", "sum"]) @pytest.mark.parametrize("dtype", ["float32", "float64", "int32", "int64"]) def test_dtype_preservation(dtype, func, engine): + if engine == "numbagg": + # https://github.com/numbagg/numbagg/issues/121 + pytest.skip() if func == "sum" or (func == "mean" and "float" in dtype): expected = np.dtype(dtype) elif func == "mean" and "int" in dtype: From ff43c041eccc0c8b52f187eb41adf1cc81864d9d Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 2 Oct 2023 15:27:24 -0600 Subject: [PATCH 12/39] Don't shadow sum, mean, sum_of_squares --- flox/aggregate_numbagg.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flox/aggregate_numbagg.py b/flox/aggregate_numbagg.py index 448b07067..3f807ec5d 100644 --- a/flox/aggregate_numbagg.py +++ b/flox/aggregate_numbagg.py @@ -48,6 +48,6 @@ def nanlen(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None) ) -sum = nansum -mean = nanmean -sum_of_squares = nansum_of_squares +# sum = nansum +# mean = nanmean +# sum_of_squares = nansum_of_squares From 2df375fcdb833cb53bfff304c687b0a5402c2032 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 2 Oct 2023 15:51:04 -0600 Subject: [PATCH 13/39] more skip --- tests/test_core.py | 3 +++ tests/test_xarray.py | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/tests/test_core.py b/tests/test_core.py index a1aff9014..7c2ae68c3 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1243,6 +1243,9 @@ def grouped_median(group_idx, array, *, axis=-1, size=None, fill_value=None, dty @pytest.mark.parametrize("func", ALL_FUNCS) @pytest.mark.parametrize("dtype", [np.float32, np.float64]) def test_dtype(func, dtype, engine): + if engine == "numbagg": + # https://github.com/numbagg/numbagg/issues/121 + pytest.skip() if "arg" in func or func in ["any", "all"]: pytest.skip() arr = np.ones((4, 12), dtype=dtype) diff --git a/tests/test_xarray.py b/tests/test_xarray.py index 8f006e5f3..116937052 100644 --- a/tests/test_xarray.py +++ b/tests/test_xarray.py @@ -466,6 +466,10 @@ def test_alignment_error(): @pytest.mark.parametrize("dtype", [np.float32, np.float64]) @pytest.mark.parametrize("chunk", (pytest.param(True, marks=requires_dask), False)) def test_dtype(add_nan, chunk, dtype, dtype_out, engine): + if engine == "numbagg": + # https://github.com/numbagg/numbagg/issues/121 + pytest.skip() + xp = dask.array if chunk else np data = xp.linspace(0, 1, 48, dtype=dtype).reshape((4, 12)) From 293583abfab346ac1827cc357c29cc9a81d73eea Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 2 Oct 2023 15:51:30 -0600 Subject: [PATCH 14/39] Fix backup npg aggregations --- flox/aggregations.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/flox/aggregations.py b/flox/aggregations.py index 21ac9925b..7b90b00ab 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -6,7 +6,6 @@ from typing import TYPE_CHECKING, Any, Callable, TypedDict import numpy as np -import numpy_groupies as npg from numpy.typing import DTypeLike from . import aggregate_flox, aggregate_npg, xrutils @@ -35,6 +34,16 @@ class AggDtype(TypedDict): intermediate: tuple[np.dtype | type[np.intp], ...] +def get_npg_aggregation(func, *, engine): + try: + method_ = getattr(aggregate_npg, func) + method = partial(method_, engine=engine) + except AttributeError: + aggregate = aggregate_npg._get_aggregate(engine).aggregate + method = partial(aggregate, func=func) + return method + + def generic_aggregate( group_idx, array, @@ -51,7 +60,7 @@ def generic_aggregate( try: method = getattr(aggregate_flox, func) except AttributeError: - method = partial(npg.aggregate_numpy.aggregate, func=func) + method = get_npg_aggregation(func, engine="numpy") elif engine == "numbagg": from . import aggregate_numbagg @@ -59,15 +68,11 @@ def generic_aggregate( try: method = getattr(aggregate_numbagg, func) except AttributeError: - method = partial(npg.aggregate_numpy.aggregate, func=func) + method = get_npg_aggregation(func, engine="numpy") elif engine in ["numpy", "numba"]: - try: - method_ = getattr(aggregate_npg, func) - method = partial(method_, engine=engine) - except AttributeError: - aggregate = aggregate_npg._get_aggregate(engine).aggregate - method = partial(aggregate, func=func) + method = get_npg_aggregation(func, engine=engine) + else: raise ValueError( f"Expected engine to be one of ['flox', 'numpy', 'numba', 'numbagg']. Received {engine} instead." From ac6211d2952862ce9cdc6cb660780a8d68b10252 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 2 Oct 2023 16:00:52 -0600 Subject: [PATCH 15/39] xfail nanmean bool --- tests/test_core.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_core.py b/tests/test_core.py index 7c2ae68c3..ab32424d5 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -991,6 +991,8 @@ def test_datetime_binning(): @pytest.mark.parametrize("func", ALL_FUNCS) def test_bool_reductions(func, engine): + if func == "nanmean" and engine == "numbagg": + pytest.xfail(reason="https://github.com/numbagg/numbagg/issues/131") if "arg" in func and engine == "flox": pytest.skip() groups = np.array([1, 1, 1]) From 631219ffd39b672e323264f65d99ffb815c7f442 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 2 Oct 2023 16:12:46 -0600 Subject: [PATCH 16/39] ignore numbagg for mypy --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index fb27ee761..9ca664c57 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -115,6 +115,7 @@ module=[ "dask.*", "importlib_metadata", "numba", + "numbagg", "numpy_groupies.*", "matplotlib.*", "pandas", From f00873e970f1ca918995f12f9c0301acf2c25d10 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 2 Oct 2023 16:13:26 -0600 Subject: [PATCH 17/39] Add to upstream-dev CI --- ci/upstream-dev-env.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/ci/upstream-dev-env.yml b/ci/upstream-dev-env.yml index 04fd7ce60..6b8e796ea 100644 --- a/ci/upstream-dev-env.yml +++ b/ci/upstream-dev-env.yml @@ -18,3 +18,4 @@ dependencies: - git+https://github.com/pandas-dev/pandas - git+https://github.com/dask/dask - git+https://github.com/ml31415/numpy-groupies + - git+https://github.com/numbagg/numbagg From 5b5c5874456244faa2286e141b666c308e9147bd Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 2 Oct 2023 16:14:09 -0600 Subject: [PATCH 18/39] Add to optional dependencies --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 9ca664c57..b1bc6d449 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ repository = "https://github.com/xarray-contrib/flox.git" changelog = "https://github.com/xarray-contrib/flox/releases" [project.optional-dependencies] -all = ["cachey", "dask", "numba", "xarray"] +all = ["cachey", "dask", "numba", "numbagg", "xarray"] test = ["netCDF4"] [build-system] From 9ace31dc2879c3c8bf4e62ca889550fca0330cb2 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 2 Oct 2023 16:18:14 -0600 Subject: [PATCH 19/39] Fix bool reductions --- flox/aggregate_numbagg.py | 4 ++++ tests/test_core.py | 2 -- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/flox/aggregate_numbagg.py b/flox/aggregate_numbagg.py index 3f807ec5d..507c6111b 100644 --- a/flox/aggregate_numbagg.py +++ b/flox/aggregate_numbagg.py @@ -16,6 +16,8 @@ def nansum_of_squares( def nansum(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None): + if np.issubdtype(array.dtype, np.bool_): + array = array.astype(np.in64) return group_nansum( array, group_idx, @@ -27,6 +29,8 @@ def nansum(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None) def nanmean(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None): + if np.issubdtype(array.dtype, np.int_): + array = array.astype(np.float64) return group_nanmean( array, group_idx, diff --git a/tests/test_core.py b/tests/test_core.py index ab32424d5..7c2ae68c3 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -991,8 +991,6 @@ def test_datetime_binning(): @pytest.mark.parametrize("func", ALL_FUNCS) def test_bool_reductions(func, engine): - if func == "nanmean" and engine == "numbagg": - pytest.xfail(reason="https://github.com/numbagg/numbagg/issues/131") if "arg" in func and engine == "flox": pytest.skip() groups = np.array([1, 1, 1]) From c2b378ec52d960818db47e713874541e49185f4d Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 2 Oct 2023 16:26:05 -0600 Subject: [PATCH 20/39] fix mypy ignore --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index b1bc6d449..6e5dff4ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -115,7 +115,7 @@ module=[ "dask.*", "importlib_metadata", "numba", - "numbagg", + "numbagg.*", "numpy_groupies.*", "matplotlib.*", "pandas", From 1410b77c5d8369358d6665ce607784bd26f7e9c3 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 2 Oct 2023 17:58:31 -0600 Subject: [PATCH 21/39] reintroduce engines --- tests/conftest.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 04009e010..504564b5a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,15 +1,15 @@ import pytest -from . import requires_numbagg +from . import requires_numba, requires_numbagg @pytest.fixture( scope="module", params=[ - # "flox", - # "numpy", - # pytest.param("numba", marks=requires_numba), - pytest.param("numbagg", marks=requires_numbagg) + "flox", + "numpy", + pytest.param("numba", marks=requires_numba), + pytest.param("numbagg", marks=requires_numbagg), ], ) def engine(request): From 67155879c7df1081e8d3f89bed5bb3c3a1763a9f Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 2 Oct 2023 18:00:32 -0600 Subject: [PATCH 22/39] Update docstring --- flox/core.py | 6 ++++-- flox/xarray.py | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/flox/core.py b/flox/core.py index 6e58386f7..e6741b45e 100644 --- a/flox/core.py +++ b/flox/core.py @@ -67,7 +67,7 @@ T_AxesOpt = Union[T_Axis, T_Axes, None] T_Dtypes = Union[np.typing.DTypeLike, Sequence[np.typing.DTypeLike], None] T_FillValues = Union[np.typing.ArrayLike, Sequence[np.typing.ArrayLike], None] - T_Engine = Literal["flox", "numpy", "numba"] + T_Engine = Literal["flox", "numpy", "numba", "numbagg"] T_Method = Literal["map-reduce", "blockwise", "cohorts"] T_IsBins = Union[bool | Sequence[bool]] @@ -1816,7 +1816,7 @@ def groupby_reduce( (for 1D ``by`` only). * ``"split-reduce"``: Same as "cohorts" and will be removed soon. - engine : {"flox", "numpy", "numba"}, optional + engine : {"flox", "numpy", "numba", "numbagg"}, optional Algorithm to compute the groupby reduction on non-dask arrays and on each dask chunk: * ``"numpy"``: Use the vectorized implementations in ``numpy_groupies.aggregate_numpy``. @@ -1828,6 +1828,8 @@ def groupby_reduce( for a reduction that is not yet implemented. * ``"numba"``: Use the implementations in ``numpy_groupies.aggregate_numba``. + * ``"numbagg"``: + Use the reductions supported by ``numbagg.grouped``. reindex : bool, optional Whether to "reindex" the blockwise results to ``expected_groups`` (possibly automatically detected). If True, the intermediate result of the blockwise groupby-reduction has a value for all expected groups, diff --git a/flox/xarray.py b/flox/xarray.py index c85ad7113..11a99c5dd 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -145,6 +145,8 @@ def xarray_reduce( for a reduction that is not yet implemented. * ``"numba"``: Use the implementations in ``numpy_groupies.aggregate_numba``. + * ``"numbagg"``: + Use the reductions supported by ``numbagg.grouped``. keep_attrs : bool, optional Preserve attrs? skipna : bool, optional From e92f48303fd5596d38b3bbe616181d8f4b130806 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 2 Oct 2023 18:12:12 -0600 Subject: [PATCH 23/39] Update docs. --- docs/source/engines.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/docs/source/engines.md b/docs/source/engines.md index 867979d13..68c65cab5 100644 --- a/docs/source/engines.md +++ b/docs/source/engines.md @@ -9,18 +9,20 @@ 1. `engine="numba"` wraps `numpy_groupies.aggregate_numba`. This uses `numba` kernels for the core aggregation. 1. `engine="flox"` uses the `ufunc.reduceat` method after first argsorting the array so that all group members occur sequentially. This was copied from a [gist by Stephan Hoyer](https://gist.github.com/shoyer/f538ac78ae904c936844) +1. `engine="numbagg"` uses the reductions available in [`numbagg.grouped`](https://github.com/numbagg/numbagg/blob/main/numbagg/grouped.py) + from the [numbagg](https://github.com/numbagg/numbagg) project. See [](arrays) for more details. ## Tradeoffs -For the common case of reducing a nD array by a 1D array of group labels (e.g. `groupby("time.month")`), `engine="flox"` *can* be faster. +For the common case of reducing a nD array by a 1D array of group labels (e.g. `groupby("time.month")`), `engine="numbagg"` is almost always faster, and `engine="flox"` *can* be faster. The reason is that `numpy_groupies` converts all groupby problems to a 1D problem, this can involve [some overhead](https://github.com/ml31415/numpy-groupies/pull/46). It is possible to optimize this a bit in `flox` or `numpy_groupies`, but the work has not been done yet. The advantage of `engine="numpy"` is that it tends to work for more array types, since it appears to be more common to implement `np.bincount`, and not `np.add.reduceat`. ```{tip} -Other potential engines we could add are [`numbagg`](https://github.com/numbagg/numbagg) ([stalled PR here](https://github.com/xarray-contrib/flox/pull/72)) and [`datashader`](https://github.com/xarray-contrib/flox/issues/142). -Both use numba for high-performance aggregations. Contributions or discussion is very welcome! +One other potential engine we could add is [`datashader`](https://github.com/xarray-contrib/flox/issues/142). +Contributions or discussion is very welcome! ``` From 76317f0d2958328ab4d86e54c927d3d16f77483e Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 3 Oct 2023 09:43:06 -0600 Subject: [PATCH 24/39] Support more aggregations --- flox/aggregate_numbagg.py | 43 +++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/flox/aggregate_numbagg.py b/flox/aggregate_numbagg.py index 507c6111b..f833f32d6 100644 --- a/flox/aggregate_numbagg.py +++ b/flox/aggregate_numbagg.py @@ -1,15 +1,27 @@ +from functools import partial + +import numbagg +import numbagg.grouped import numpy as np -from numbagg.grouped import group_nanmean, group_nansum -def nansum_of_squares( - group_idx, array, *, axis=-1, func="sum", size=None, fill_value=None, dtype=None +def _numbagg_wrapper( + group_idx, + array, + *, + axis=-1, + func="sum", + size=None, + fill_value=None, + dtype=None, + numbagg_func=None, ): - return group_nansum( - array**2, + return numbagg_func( + array, group_idx, axis=axis, num_labels=size, + # The following are unsupported # fill_value=fill_value, # dtype=dtype, ) @@ -18,7 +30,7 @@ def nansum_of_squares( def nansum(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None): if np.issubdtype(array.dtype, np.bool_): array = array.astype(np.in64) - return group_nansum( + return numbagg.grouped.group_nansum( array, group_idx, axis=axis, @@ -31,7 +43,7 @@ def nansum(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None) def nanmean(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None): if np.issubdtype(array.dtype, np.int_): array = array.astype(np.float64) - return group_nanmean( + return numbagg.grouped.group_nanmean( array, group_idx, axis=axis, @@ -41,16 +53,13 @@ def nanmean(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None ) -def nanlen(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None): - return group_nansum( - (~np.isnan(array)).astype(int), - group_idx, - axis=axis, - num_labels=size, - # fill_value=fill_value, - # dtype=dtype, - ) - +nansum_of_squares = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nansum_of_squares) +nanlen = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_count) +nanprod = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanprod) +nanfirst = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanfirst) +nanlast = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanlast) +nanargmax = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmax) +nanargmin = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmin) # sum = nansum # mean = nanmean From 38d54c10db5e2358d836fe839974c5d17d955a56 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 3 Oct 2023 14:40:50 -0600 Subject: [PATCH 25/39] More aggregations --- flox/aggregate_numbagg.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/flox/aggregate_numbagg.py b/flox/aggregate_numbagg.py index f833f32d6..a12c244a3 100644 --- a/flox/aggregate_numbagg.py +++ b/flox/aggregate_numbagg.py @@ -60,6 +60,10 @@ def nanmean(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None nanlast = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanlast) nanargmax = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmax) nanargmin = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmin) +nanmax = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanmax) +nanmin = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanmin) +# nanvar = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmax) +nanstd = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanstd) # sum = nansum # mean = nanmean From ab0dc3f455a873466693e5e3a787676fdbecb56b Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 3 Oct 2023 14:44:08 -0600 Subject: [PATCH 26/39] back to nancount --- flox/aggregate_numbagg.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flox/aggregate_numbagg.py b/flox/aggregate_numbagg.py index a12c244a3..4af2d81e7 100644 --- a/flox/aggregate_numbagg.py +++ b/flox/aggregate_numbagg.py @@ -54,7 +54,7 @@ def nanmean(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None nansum_of_squares = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nansum_of_squares) -nanlen = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_count) +nanlen = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nancount) nanprod = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanprod) nanfirst = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanfirst) nanlast = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanlast) From fbfcfcefdefdd1bf54e101c6829de6818a1384d8 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 3 Oct 2023 17:05:45 -0600 Subject: [PATCH 27/39] Add any, all --- flox/aggregate_numbagg.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flox/aggregate_numbagg.py b/flox/aggregate_numbagg.py index 4af2d81e7..7d6c027b2 100644 --- a/flox/aggregate_numbagg.py +++ b/flox/aggregate_numbagg.py @@ -62,6 +62,8 @@ def nanmean(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None nanargmin = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmin) nanmax = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanmax) nanmin = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanmin) +any_ = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanany) +all_ = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanall) # nanvar = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmax) nanstd = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanstd) From 0c8268ad2dfe6d2594fd55a6937a1097960ce5b4 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 3 Oct 2023 17:06:30 -0600 Subject: [PATCH 28/39] promote in nanstd too --- flox/aggregate_numbagg.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/flox/aggregate_numbagg.py b/flox/aggregate_numbagg.py index 7d6c027b2..71c72c6fd 100644 --- a/flox/aggregate_numbagg.py +++ b/flox/aggregate_numbagg.py @@ -53,6 +53,19 @@ def nanmean(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None ) +def nanstd(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None): + if np.issubdtype(array.dtype, np.int_): + array = array.astype(np.float64) + return numbagg.grouped.group_nanstd( + array, + group_idx, + axis=axis, + num_labels=size, + # fill_value=fill_value, + # dtype=dtype, + ) + + nansum_of_squares = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nansum_of_squares) nanlen = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nancount) nanprod = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanprod) @@ -65,7 +78,7 @@ def nanmean(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None any_ = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanany) all_ = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanall) # nanvar = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmax) -nanstd = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanstd) +# nanstd = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanstd) # sum = nansum # mean = nanmean From aa856ff793e2a40b8da0654b647f04b80b8e5696 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 3 Oct 2023 17:08:25 -0600 Subject: [PATCH 29/39] Add ddof in anticipation of https://github.com/numbagg/numbagg/issues/138 --- flox/aggregate_numbagg.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/flox/aggregate_numbagg.py b/flox/aggregate_numbagg.py index 71c72c6fd..997209b3a 100644 --- a/flox/aggregate_numbagg.py +++ b/flox/aggregate_numbagg.py @@ -53,6 +53,20 @@ def nanmean(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None ) +def nanvar(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None, ddof=0): + if np.issubdtype(array.dtype, np.int_): + array = array.astype(np.float64) + return numbagg.grouped.group_nanvar( + array, + group_idx, + axis=axis, + num_labels=size, + ddof=0, + # fill_value=fill_value, + # dtype=dtype, + ) + + def nanstd(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None): if np.issubdtype(array.dtype, np.int_): array = array.astype(np.float64) @@ -61,6 +75,7 @@ def nanstd(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None) group_idx, axis=axis, num_labels=size, + ddof=0, # fill_value=fill_value, # dtype=dtype, ) From d8348efa53dbb98c23a4b968b0d5ca34656de326 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 3 Oct 2023 22:04:26 -0600 Subject: [PATCH 30/39] Add more benchmarks --- asv_bench/benchmarks/reduce.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/asv_bench/benchmarks/reduce.py b/asv_bench/benchmarks/reduce.py index 89c58e0bf..3dad1b8b8 100644 --- a/asv_bench/benchmarks/reduce.py +++ b/asv_bench/benchmarks/reduce.py @@ -63,6 +63,13 @@ def setup(self, *args, **kwargs): self.axis = -1 +class ChunkReduce1DUnsorted(ChunkReduce): + def setup(self, *args, **kwargs): + self.array = np.ones((N,)) + self.labels = np.random.permutation(np.repeat(np.arange(5), repeats=N // 5)) + self.axis = -1 + + class ChunkReduce2D(ChunkReduce): def setup(self, *args, **kwargs): self.array = np.ones((N, N)) @@ -70,8 +77,22 @@ def setup(self, *args, **kwargs): self.axis = -1 +class ChunkReduce2DUnsorted(ChunkReduce): + def setup(self, *args, **kwargs): + self.array = np.ones((N, N)) + self.labels = np.random.permutation(np.repeat(np.arange(N // 5), repeats=5)) + self.axis = -1 + + class ChunkReduce2DAllAxes(ChunkReduce): def setup(self, *args, **kwargs): self.array = np.ones((N, N)) self.labels = np.repeat(np.arange(N // 5), repeats=5) self.axis = None + + +class ChunkReduce2DAllAxesUnsorted(ChunkReduce): + def setup(self, *args, **kwargs): + self.array = np.ones((N, N)) + self.labels = np.random.permutation(np.repeat(np.arange(N // 5), repeats=5)) + self.axis = None From 67e5c92a3ec3c83d4e96917a046d1a79e375189c Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 3 Oct 2023 22:05:48 -0600 Subject: [PATCH 31/39] reorder benchmark table --- asv_bench/benchmarks/reduce.py | 48 +++++++++++++++++----------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/asv_bench/benchmarks/reduce.py b/asv_bench/benchmarks/reduce.py index 3dad1b8b8..90a9f45f2 100644 --- a/asv_bench/benchmarks/reduce.py +++ b/asv_bench/benchmarks/reduce.py @@ -33,8 +33,8 @@ def setup(self, *args, **kwargs): raise NotImplementedError - @parameterized("func, engine, expected_groups", [funcs, engines, expected_groups]) - def time_reduce(self, func, engine, expected_groups): + @parameterized("func, expected_groups, engine", [funcs, expected_groups, engines]) + def time_reduce(self, func, expected_groups, engine): flox.groupby_reduce( self.array, self.labels, @@ -44,8 +44,8 @@ def time_reduce(self, func, engine, expected_groups): expected_groups=expected_groups, ) - @parameterized("func, engine, expected_groups", [funcs, engines, expected_groups]) - def peakmem_reduce(self, func, engine, expected_groups): + @parameterized("func, expected_groups, engine", [funcs, expected_groups, engines]) + def peakmem_reduce(self, func, expected_groups, engine): flox.groupby_reduce( self.array, self.labels, @@ -70,29 +70,29 @@ def setup(self, *args, **kwargs): self.axis = -1 -class ChunkReduce2D(ChunkReduce): - def setup(self, *args, **kwargs): - self.array = np.ones((N, N)) - self.labels = np.repeat(np.arange(N // 5), repeats=5) - self.axis = -1 +# class ChunkReduce2D(ChunkReduce): +# def setup(self, *args, **kwargs): +# self.array = np.ones((N, N)) +# self.labels = np.repeat(np.arange(N // 5), repeats=5) +# self.axis = -1 -class ChunkReduce2DUnsorted(ChunkReduce): - def setup(self, *args, **kwargs): - self.array = np.ones((N, N)) - self.labels = np.random.permutation(np.repeat(np.arange(N // 5), repeats=5)) - self.axis = -1 +# class ChunkReduce2DUnsorted(ChunkReduce): +# def setup(self, *args, **kwargs): +# self.array = np.ones((N, N)) +# self.labels = np.random.permutation(np.repeat(np.arange(N // 5), repeats=5)) +# self.axis = -1 -class ChunkReduce2DAllAxes(ChunkReduce): - def setup(self, *args, **kwargs): - self.array = np.ones((N, N)) - self.labels = np.repeat(np.arange(N // 5), repeats=5) - self.axis = None +# class ChunkReduce2DAllAxes(ChunkReduce): +# def setup(self, *args, **kwargs): +# self.array = np.ones((N, N)) +# self.labels = np.repeat(np.arange(N // 5), repeats=5) +# self.axis = None -class ChunkReduce2DAllAxesUnsorted(ChunkReduce): - def setup(self, *args, **kwargs): - self.array = np.ones((N, N)) - self.labels = np.random.permutation(np.repeat(np.arange(N // 5), repeats=5)) - self.axis = None +# class ChunkReduce2DAllAxesUnsorted(ChunkReduce): +# def setup(self, *args, **kwargs): +# self.array = np.ones((N, N)) +# self.labels = np.random.permutation(np.repeat(np.arange(N // 5), repeats=5)) +# self.axis = None From 1a02428b0dc8b4fa415c47e0c5492dc8fbd2a980 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 3 Oct 2023 22:57:49 -0600 Subject: [PATCH 32/39] Fix numba compilation setup? --- asv_bench/benchmarks/reduce.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/asv_bench/benchmarks/reduce.py b/asv_bench/benchmarks/reduce.py index 90a9f45f2..8b94ee19d 100644 --- a/asv_bench/benchmarks/reduce.py +++ b/asv_bench/benchmarks/reduce.py @@ -20,14 +20,14 @@ def setup(self, *args, **kwargs): if "numba" in engines: for func in funcs: npg.aggregate_numba.aggregate( - np.ones((100,), dtype=int), np.ones((100,), dtype=int), func=func + np.ones((100,), dtype=int), np.ones((100,)), func=func ) if "numbagg" in engines: for func in funcs: try: method = getattr(flox.aggregate_numbagg, func) - method(np.ones((10,), dtype=int), np.ones((10,), dtype=int)) - method(np.ones((10,), dtype=int), np.ones((10, 10), dtype=int)) + method(np.ones((10,), dtype=int), np.ones((10,))) + method(np.ones((10,), dtype=int), np.ones((10, 10))) except AttributeError: pass From 4eb3a483570c596f45c1b906fc287cbf95640d17 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 4 Oct 2023 09:09:03 -0600 Subject: [PATCH 33/39] More benchmarks --- asv_bench/benchmarks/reduce.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/asv_bench/benchmarks/reduce.py b/asv_bench/benchmarks/reduce.py index 8b94ee19d..33df45ad5 100644 --- a/asv_bench/benchmarks/reduce.py +++ b/asv_bench/benchmarks/reduce.py @@ -6,8 +6,8 @@ from . import parameterized -N = 1000 -funcs = ["sum", "nansum", "mean", "nanmean", "max", "var", "nanvar", "count"] +N = 3000 +funcs = ["sum", "nansum", "mean", "nanmean", "max", "nanmax", "var", "nanvar", "count", "all"] engines = ["flox", "numpy", "numbagg"] expected_groups = [None, pd.IntervalIndex.from_breaks([1, 2, 4])] From a2639e45a83c8d948a5da0bc7b39f46bc67dabc2 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 4 Oct 2023 21:41:10 -0600 Subject: [PATCH 34/39] Rework benchmarks --- asv_bench/benchmarks/reduce.py | 127 +++++++++++++++++++++------------ flox/aggregate_numbagg.py | 6 +- 2 files changed, 84 insertions(+), 49 deletions(-) diff --git a/asv_bench/benchmarks/reduce.py b/asv_bench/benchmarks/reduce.py index 33df45ad5..add77c182 100644 --- a/asv_bench/benchmarks/reduce.py +++ b/asv_bench/benchmarks/reduce.py @@ -1,58 +1,86 @@ import numpy as np -import numpy_groupies as npg import pandas as pd +from asv_runner.benchmarks.mark import parameterize, skip_for_params import flox - -from . import parameterized +import flox.aggregations N = 3000 -funcs = ["sum", "nansum", "mean", "nanmean", "max", "nanmax", "var", "nanvar", "count", "all"] +funcs = ["sum", "nansum", "mean", "nanmean", "max", "nanmax", "var", "count", "all"] engines = ["flox", "numpy", "numbagg"] -expected_groups = [None, pd.IntervalIndex.from_breaks([1, 2, 4])] +expected_groups = { + "None": None, + "RangeIndex": pd.RangeIndex(5), + "bins": pd.IntervalIndex.from_breaks([1, 2, 4]), +} +expected_names = tuple(expected_groups) + +NUMBAGG_FUNCS = ["nansum", "nanmean", "nanmax", "count", "all"] + +numbagg_skip = [ + (func, expected_names[0], "numbagg") for func in funcs if func not in NUMBAGG_FUNCS +] + [(func, expected_names[1], "numbagg") for func in funcs if func not in NUMBAGG_FUNCS] + + +def setup_jit(): + # pre-compile jitted funcs + labels = np.ones((N), dtype=int) + array1 = np.ones((N), dtype=float) + array2 = np.ones((N, N), dtype=float) + + if "numba" in engines: + for func in funcs: + method = getattr(flox.aggregate_npg, func) + method(labels, array1, engine="numba") + if "numbagg" in engines: + for func in set(NUMBAGG_FUNCS) & set(funcs): + flox.groupby_reduce(array1, labels, func=func, engine="numbagg") + flox.groupby_reduce(array2, labels, func=func, engine="numbagg") class ChunkReduce: """Time the core reduction function.""" - def setup(self, *args, **kwargs): - # pre-compile jitted funcs - if "numba" in engines: - for func in funcs: - npg.aggregate_numba.aggregate( - np.ones((100,), dtype=int), np.ones((100,)), func=func - ) - if "numbagg" in engines: - for func in funcs: - try: - method = getattr(flox.aggregate_numbagg, func) - method(np.ones((10,), dtype=int), np.ones((10,))) - method(np.ones((10,), dtype=int), np.ones((10, 10))) - except AttributeError: - pass + min_run_count = 5 + warmup_time = 1 + def setup(self, *args, **kwargs): raise NotImplementedError - @parameterized("func, expected_groups, engine", [funcs, expected_groups, engines]) - def time_reduce(self, func, expected_groups, engine): + @skip_for_params(numbagg_skip) + @parameterize({"func": funcs, "expected_name": expected_names, "engine": engines}) + def time_reduce(self, func, expected_name, engine): flox.groupby_reduce( self.array, self.labels, func=func, engine=engine, axis=self.axis, - expected_groups=expected_groups, + expected_groups=expected_groups[expected_name], + ) + + @parameterize({"func": ["nansum", "nanmean", "nanmax", "count"], "engine": engines}) + def time_reduce_bare(self, func, engine): + flox.aggregations.generic_aggregate( + self.labels, + self.array, + axis=-1, + size=5, + func=func, + engine=engine, + fill_value=0, ) - @parameterized("func, expected_groups, engine", [funcs, expected_groups, engines]) - def peakmem_reduce(self, func, expected_groups, engine): + @skip_for_params(numbagg_skip) + @parameterize({"func": funcs, "expected_name": expected_names, "engine": engines}) + def peakmem_reduce(self, func, expected_name, engine): flox.groupby_reduce( self.array, self.labels, func=func, engine=engine, axis=self.axis, - expected_groups=expected_groups, + expected_groups=expected_groups[expected_name], ) @@ -61,6 +89,8 @@ def setup(self, *args, **kwargs): self.array = np.ones((N,)) self.labels = np.repeat(np.arange(5), repeats=N // 5) self.axis = -1 + if "numbagg" in args: + setup_jit() class ChunkReduce1DUnsorted(ChunkReduce): @@ -68,31 +98,36 @@ def setup(self, *args, **kwargs): self.array = np.ones((N,)) self.labels = np.random.permutation(np.repeat(np.arange(5), repeats=N // 5)) self.axis = -1 + setup_jit() -# class ChunkReduce2D(ChunkReduce): -# def setup(self, *args, **kwargs): -# self.array = np.ones((N, N)) -# self.labels = np.repeat(np.arange(N // 5), repeats=5) -# self.axis = -1 +class ChunkReduce2D(ChunkReduce): + def setup(self, *args, **kwargs): + self.array = np.ones((N, N)) + self.labels = np.repeat(np.arange(N // 5), repeats=5) + self.axis = -1 + setup_jit() -# class ChunkReduce2DUnsorted(ChunkReduce): -# def setup(self, *args, **kwargs): -# self.array = np.ones((N, N)) -# self.labels = np.random.permutation(np.repeat(np.arange(N // 5), repeats=5)) -# self.axis = -1 +class ChunkReduce2DUnsorted(ChunkReduce): + def setup(self, *args, **kwargs): + self.array = np.ones((N, N)) + self.labels = np.random.permutation(np.repeat(np.arange(N // 5), repeats=5)) + self.axis = -1 + setup_jit() -# class ChunkReduce2DAllAxes(ChunkReduce): -# def setup(self, *args, **kwargs): -# self.array = np.ones((N, N)) -# self.labels = np.repeat(np.arange(N // 5), repeats=5) -# self.axis = None +class ChunkReduce2DAllAxes(ChunkReduce): + def setup(self, *args, **kwargs): + self.array = np.ones((N, N)) + self.labels = np.repeat(np.arange(N // 5), repeats=5) + self.axis = None + setup_jit() -# class ChunkReduce2DAllAxesUnsorted(ChunkReduce): -# def setup(self, *args, **kwargs): -# self.array = np.ones((N, N)) -# self.labels = np.random.permutation(np.repeat(np.arange(N // 5), repeats=5)) -# self.axis = None +class ChunkReduce2DAllAxesUnsorted(ChunkReduce): + def setup(self, *args, **kwargs): + self.array = np.ones((N, N)) + self.labels = np.random.permutation(np.repeat(np.arange(N // 5), repeats=5)) + self.axis = None + setup_jit() diff --git a/flox/aggregate_numbagg.py b/flox/aggregate_numbagg.py index 997209b3a..43fe6a847 100644 --- a/flox/aggregate_numbagg.py +++ b/flox/aggregate_numbagg.py @@ -90,9 +90,9 @@ def nanstd(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None) nanargmin = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmin) nanmax = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanmax) nanmin = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanmin) -any_ = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanany) -all_ = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanall) -# nanvar = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmax) +any = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanany) +all = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanall) +# nanvar = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanvar) # nanstd = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanstd) # sum = nansum From f2b40ef8e7511adee660539d2a424838f910d15b Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 4 Oct 2023 21:44:35 -0600 Subject: [PATCH 35/39] small docstring update --- flox/core.py | 3 ++- flox/xarray.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/flox/core.py b/flox/core.py index e6741b45e..a5e07d37d 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1829,7 +1829,8 @@ def groupby_reduce( * ``"numba"``: Use the implementations in ``numpy_groupies.aggregate_numba``. * ``"numbagg"``: - Use the reductions supported by ``numbagg.grouped``. + Use the reductions supported by ``numbagg.grouped``. This will fall back to ``numpy_groupies.aggregate_numpy`` + for a reduction that is not yet implemented. reindex : bool, optional Whether to "reindex" the blockwise results to ``expected_groups`` (possibly automatically detected). If True, the intermediate result of the blockwise groupby-reduction has a value for all expected groups, diff --git a/flox/xarray.py b/flox/xarray.py index 11a99c5dd..bde5cc3f2 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -146,7 +146,8 @@ def xarray_reduce( * ``"numba"``: Use the implementations in ``numpy_groupies.aggregate_numba``. * ``"numbagg"``: - Use the reductions supported by ``numbagg.grouped``. + Use the reductions supported by ``numbagg.grouped``. This will fall back to ``numpy_groupies.aggregate_numpy`` + for a reduction that is not yet implemented. keep_attrs : bool, optional Preserve attrs? skipna : bool, optional From 53f934016c0e1cba482285308c626cce2834e6c1 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 4 Oct 2023 21:44:42 -0600 Subject: [PATCH 36/39] ignore asv typing --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index ebd8247e1..a5df4b50e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -109,6 +109,7 @@ warn_unused_ignores = true [[tool.mypy.overrides]] module=[ + "asv_core.*", "cachey", "cftime", "dask.*", From e1eda248bff84a37a6422fe520138b3fe26eb999 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 4 Oct 2023 21:56:57 -0600 Subject: [PATCH 37/39] fix type ignoring --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index a5df4b50e..364011007 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -109,7 +109,7 @@ warn_unused_ignores = true [[tool.mypy.overrides]] module=[ - "asv_core.*", + "asv_runner.*", "cachey", "cftime", "dask.*", From a899b5a37900c2b86222b3821cbd812f26bbbdf5 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 6 Oct 2023 20:49:23 -0600 Subject: [PATCH 38/39] Guard against numbagg failures --- flox/aggregate_numbagg.py | 14 +++++++------- flox/aggregations.py | 10 +++++++++- flox/core.py | 19 ++++++++++++++++++- tests/test_core.py | 10 ++++++---- 4 files changed, 40 insertions(+), 13 deletions(-) diff --git a/flox/aggregate_numbagg.py b/flox/aggregate_numbagg.py index 43fe6a847..b0b06d86e 100644 --- a/flox/aggregate_numbagg.py +++ b/flox/aggregate_numbagg.py @@ -54,6 +54,7 @@ def nanmean(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None def nanvar(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None, ddof=0): + assert ddof != 0 if np.issubdtype(array.dtype, np.int_): array = array.astype(np.float64) return numbagg.grouped.group_nanvar( @@ -61,13 +62,14 @@ def nanvar(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None, group_idx, axis=axis, num_labels=size, - ddof=0, + # ddof=0, # fill_value=fill_value, # dtype=dtype, ) -def nanstd(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None): +def nanstd(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None, ddof=0): + assert ddof != 0 if np.issubdtype(array.dtype, np.int_): array = array.astype(np.float64) return numbagg.grouped.group_nanstd( @@ -75,7 +77,7 @@ def nanstd(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None) group_idx, axis=axis, num_labels=size, - ddof=0, + # ddof=0, # fill_value=fill_value, # dtype=dtype, ) @@ -86,14 +88,12 @@ def nanstd(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None) nanprod = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanprod) nanfirst = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanfirst) nanlast = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanlast) -nanargmax = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmax) -nanargmin = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmin) +# nanargmax = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmax) +# nanargmin = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmin) nanmax = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanmax) nanmin = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanmin) any = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanany) all = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanall) -# nanvar = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanvar) -# nanstd = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanstd) # sum = nansum # mean = nanmean diff --git a/flox/aggregations.py b/flox/aggregations.py index d2ccf4c64..ec696da53 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -66,7 +66,15 @@ def generic_aggregate( from . import aggregate_numbagg try: - method = getattr(aggregate_numbagg, func) + if ( + # numabgg hardcodes ddof=1 + ("var" in func or "std" in func) + and kwargs.get("ddof", 0) == 0 + ): + method = get_npg_aggregation(func, engine="numpy") + + else: + method = getattr(aggregate_numbagg, func) except AttributeError: method = get_npg_aggregation(func, engine="numpy") diff --git a/flox/core.py b/flox/core.py index 01f031240..e5518b551 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1889,6 +1889,23 @@ def groupby_reduce( by_is_dask = tuple(is_duck_dask_array(b) for b in bys) any_by_dask = any(by_is_dask) + if ( + engine == "numbagg" + and _is_arg_reduction(func) + and (any_by_dask or is_duck_dask_array(array)) + ): + # There is only one test that fails, but I can't figure + # out why without deep debugging. + # just disable for now. + # test_groupby_reduce_axis_subset_against_numpy + # for array is 3D dask, by is 3D dask, axis=2 + # We are falling back to numpy for the arg reduction, + # so presumably something is going wrong + raise NotImplementedError( + "argreductions not supported for engine='numbagg' yet." + "Try engine='numpy' or engine='numba' instead." + ) + if method in ["split-reduce", "cohorts"] and any_by_dask: raise ValueError(f"method={method!r} can only be used when grouping by numpy arrays.") @@ -2030,7 +2047,7 @@ def groupby_reduce( if agg.chunk[0] is None and method != "blockwise": raise NotImplementedError( f"Aggregation {agg.name!r} is only implemented for dask arrays when method='blockwise'." - f"\n\n Received: {func}" + f"Received method={method!r}" ) if method in ["blockwise", "cohorts"] and nax != by_.ndim: diff --git a/tests/test_core.py b/tests/test_core.py index ca8656088..09ee50902 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -225,7 +225,9 @@ def gen_array_by(size, func): @pytest.mark.parametrize("add_nan_by", [True, False]) @pytest.mark.parametrize("func", ALL_FUNCS) def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine): - if ("arg" in func and engine == "flox") or (func in BLOCKWISE_FUNCS and chunks != -1): + if ("arg" in func and engine in ["flox", "numbagg"]) or ( + func in BLOCKWISE_FUNCS and chunks != -1 + ): pytest.skip() array, by = gen_array_by(size, func) @@ -424,7 +426,7 @@ def test_groupby_agg_dask(func, shape, array_chunks, group_chunks, add_nan, dtyp if func in ["first", "last"] or func in BLOCKWISE_FUNCS: pytest.skip() - if "arg" in func and (engine == "flox" or reindex): + if "arg" in func and (engine in ["flox", "numbagg"] or reindex): pytest.skip() rng = np.random.default_rng(12345) @@ -576,7 +578,7 @@ def test_first_last_disallowed_dask(func): "axis", [None, (0, 1, 2), (0, 1), (0, 2), (1, 2), 0, 1, 2, (0,), (1,), (2,)] ) def test_groupby_reduce_axis_subset_against_numpy(func, axis, engine): - if ("arg" in func and engine == "flox") or func in BLOCKWISE_FUNCS: + if ("arg" in func and engine in ["flox", "numbagg"]) or func in BLOCKWISE_FUNCS: pytest.skip() if not isinstance(axis, int): @@ -934,7 +936,7 @@ def test_cohorts_map_reduce_consistent_dtypes(method, dtype, labels_dtype): @pytest.mark.parametrize("method", ["blockwise", "cohorts", "map-reduce"]) def test_cohorts_nd_by(func, method, axis, engine): if ( - ("arg" in func and (axis is None or engine == "flox")) + ("arg" in func and (axis is None or engine in ["flox", "numbagg"])) or (method != "blockwise" and func in BLOCKWISE_FUNCS) or (axis is None and ("first" in func or "last" in func)) ): From 492fb04f672a2b51d9e579ecbf9759d0b952a1aa Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Sat, 7 Oct 2023 08:14:36 -0600 Subject: [PATCH 39/39] Use released numbagg --- ci/environment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/environment.yml b/ci/environment.yml index c565ac3f5..33e1b4661 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -24,4 +24,4 @@ dependencies: - numba - scipy - pip: - - git+https://github.com/numbagg/numbagg + - numbagg>=0.3