From e1b3cf72dc35666a93e7f8cc09e9c3fb6a0e6452 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Sun, 25 Sep 2022 16:57:05 +0200 Subject: [PATCH 1/6] copy groupyby_reduce --- flox/core.py | 80 +++++++++++++++++++++++++++------------------------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/flox/core.py b/flox/core.py index 58b89bf17..d85b0cd5e 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1356,13 +1356,13 @@ def groupby_reduce( func: str | Aggregation, expected_groups: Sequence | np.ndarray | None = None, sort: bool = True, - isbin: bool = False, - axis=None, + isbin: T_IsBins = False, + axis: T_AxissOpt = None, fill_value=None, min_count: int | None = None, split_out: int = 1, - method: str = "map-reduce", - engine: str = "numpy", + method: T_Method = "map-reduce", + engine: T_Engine = "numpy", reindex: bool | None = None, finalize_kwargs: Mapping | None = None, ) -> tuple[DaskArray, np.ndarray | DaskArray]: @@ -1467,9 +1467,9 @@ def groupby_reduce( ) reindex = _validate_reindex(reindex, func, method, expected_groups) - by: tuple = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by) - nby = len(by) - by_is_dask = any(is_duck_dask_array(b) for b in by) + bys = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by) + nby = len(bys) + by_is_dask = any(is_duck_dask_array(b) for b in bys) if method in ["split-reduce", "cohorts"] and by_is_dask: raise ValueError(f"method={method!r} can only be used when grouping by numpy arrays.") @@ -1478,54 +1478,56 @@ def groupby_reduce( array = np.asarray(array) array = array.astype(int) if np.issubdtype(array.dtype, bool) else array - if isinstance(isbin, bool): - isbin = (isbin,) * len(by) + if isinstance(isbin, Sequence): + isbins = isbin + else: + isbins = (isbin,) * nby if expected_groups is None: - expected_groups = (None,) * len(by) + expected_groups = (None,) * nby - _assert_by_is_aligned(array.shape, by) + _assert_by_is_aligned(array.shape, bys) - if len(by) == 1 and not isinstance(expected_groups, tuple): + if nby == 1 and not isinstance(expected_groups, tuple): expected_groups = (np.asarray(expected_groups),) - elif len(expected_groups) != len(by): + elif len(expected_groups) != nby: raise ValueError( f"Must have same number of `expected_groups` (received {len(expected_groups)}) " - f" and variables to group by (received {len(by)})." + f" and variables to group by (received {nby})." ) # We convert to pd.Index since that lets us know if we are binning or not # (pd.IntervalIndex or not) - expected_groups = _convert_expected_groups_to_index(expected_groups, isbin, sort) + expected_groups = _convert_expected_groups_to_index(expected_groups, isbins, sort) # TODO: could restrict this to dask-only factorize_early = (nby > 1) or ( - any(isbin) and method in ["split-reduce", "cohorts"] and is_duck_dask_array(array) + any(isbins) and method in ["split-reduce", "cohorts"] and is_duck_dask_array(array) ) if factorize_early: - by, final_groups, grp_shape = _factorize_multiple( - by, expected_groups, by_is_dask=by_is_dask, reindex=reindex + bys, final_groups, grp_shape = _factorize_multiple( + bys, expected_groups, by_is_dask=by_is_dask, reindex=reindex ) expected_groups = (pd.RangeIndex(np.prod(grp_shape)),) - assert len(by) == 1 - by = by[0] + assert len(bys) == 1 + by_ = by[0] expected_groups = expected_groups[0] if axis is None: - axis = tuple(array.ndim + np.arange(-by.ndim, 0)) + axis = tuple(array.ndim + np.arange(-by_.ndim, 0)) else: axis = np.core.numeric.normalize_axis_tuple(axis, array.ndim) # type: ignore - if method in ["blockwise", "cohorts", "split-reduce"] and len(axis) != by.ndim: + if method in ["blockwise", "cohorts", "split-reduce"] and len(axis) != by_.ndim: raise NotImplementedError( "Must reduce along all dimensions of `by` when method != 'map-reduce'." f"Received method={method!r}" ) # TODO: make sure expected_groups is unique - if len(axis) == 1 and by.ndim > 1 and expected_groups is None: + if len(axis) == 1 and by_.ndim > 1 and expected_groups is None: if not by_is_dask: - expected_groups = _get_expected_groups(by, sort) + expected_groups = _get_expected_groups(by_, sort) else: # When we reduce along all axes, we are guaranteed to see all # groups in the final combine stage, so everything works. @@ -1538,13 +1540,13 @@ def groupby_reduce( "Please provide ``expected_groups`` when not reducing along all axes." ) - assert len(axis) <= by.ndim - if len(axis) < by.ndim: - by = _move_reduce_dims_to_end(by, -array.ndim + np.array(axis) + by.ndim) + assert len(axis) <= by_.ndim + if len(axis) < by_.ndim: + by_ = _move_reduce_dims_to_end(by_, -array.ndim + np.array(axis) + by_.ndim) array = _move_reduce_dims_to_end(array, axis) axis = tuple(array.ndim + np.arange(-len(axis), 0)) - has_dask = is_duck_dask_array(array) or is_duck_dask_array(by) + has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_) # When axis is a subset of possible values; then npg will # apply it to groups that don't exist along a particular axis (for e.g.) @@ -1553,7 +1555,7 @@ def groupby_reduce( # The only way to do this consistently is mask out using min_count # Consider np.sum([np.nan]) = np.nan, np.nansum([np.nan]) = 0 if min_count is None: - if len(axis) < by.ndim or fill_value is not None: + if len(axis) < by_.ndim or fill_value is not None: min_count = 1 # TODO: set in xarray? @@ -1567,7 +1569,7 @@ def groupby_reduce( if not has_dask: results = _reduce_blockwise( - array, by, agg, expected_groups=expected_groups, reindex=reindex, sort=sort, **kwargs + array, by_, agg, expected_groups=expected_groups, reindex=reindex, sort=sort, **kwargs ) groups = (results["groups"],) result = results[agg.name] @@ -1587,7 +1589,7 @@ def groupby_reduce( if method in ["split-reduce", "cohorts"]: cohorts = find_group_cohorts( - by, [array.chunks[ax] for ax in axis], merge=True, method=method + by_, [array.chunks[ax] for ax in axis], merge=True, method=method ) results = [] @@ -1595,17 +1597,17 @@ def groupby_reduce( for cohort in cohorts: cohort = sorted(cohort) # equivalent of xarray.DataArray.where(mask, drop=True) - mask = np.isin(by, cohort) + mask = np.isin(by_, cohort) indexer = [np.unique(v) for v in np.nonzero(mask)] array_subset = array - for ax, idxr in zip(range(-by.ndim, 0), indexer): + for ax, idxr in zip(range(-by_.ndim, 0), indexer): array_subset = np.take(array_subset, idxr, axis=ax) numblocks = np.prod([len(array_subset.chunks[ax]) for ax in axis]) # get final result for these groups r, *g = partial_agg( array_subset, - by[np.ix_(*indexer)], + by_[np.ix_(*indexer)], expected_groups=pd.Index(cohort), # First deep copy becasue we might be doping blockwise, # which sets agg.finalize=None, then map-reduce (GH102) @@ -1618,7 +1620,9 @@ def groupby_reduce( sort=False, # if only a single block along axis, we can just work blockwise # inspired by https://github.com/dask/dask/issues/8361 - method="blockwise" if numblocks == 1 and len(axis) == by.ndim else "map-reduce", + method="blockwise" + if numblocks == 1 and len(axis) == by_.ndim + else "map-reduce", ) results.append(r) groups_.append(cohort) @@ -1628,12 +1632,12 @@ def groupby_reduce( groups = (np.hstack(groups_),) result = np.concatenate(results, axis=-1) else: - if method == "blockwise" and by.ndim == 1: - array = rechunk_for_blockwise(array, axis=-1, labels=by) + if method == "blockwise" and by_.ndim == 1: + array = rechunk_for_blockwise(array, axis=-1, labels=by_) result, *groups = partial_agg( array, - by, + by_, expected_groups=None if method == "blockwise" else expected_groups, agg=agg, reindex=reindex, From 213a172ff508ea3b60093da2b46d10c03e167409 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Sun, 25 Sep 2022 16:58:02 +0200 Subject: [PATCH 2/6] add types --- flox/core.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/flox/core.py b/flox/core.py index d85b0cd5e..0372521c7 100644 --- a/flox/core.py +++ b/flox/core.py @@ -11,6 +11,7 @@ Callable, Dict, Iterable, + Literal, Mapping, Sequence, Union, @@ -35,6 +36,17 @@ if TYPE_CHECKING: import dask.array.Array as DaskArray + T_Func = Union[str, Callable] + T_Funcs = Union[T_Func, Sequence[T_Func]] + T_Axis = int + T_Axiss = tuple[T_Axis, ...] # TODO: var name grammar? + T_AxissOpt = Union[T_Axis, T_Axiss, None] + T_Dtypes = Union[np.typing.DTypeLike, Sequence[np.typing.DTypeLike], None] + T_FillValues = Union[np.typing.ArrayLike, Sequence[np.typing.ArrayLike], None] + T_Engine = Literal["flox", "numpy", "numba"] + T_Method = Literal["map-reduce", "blockwise", "cohorts", "split-reduce"] + T_IsBins = Union[bool | Sequence[bool]] + IntermediateDict = Dict[Union[str, Callable], Any] FinalResultsDict = Dict[str, Union["DaskArray", np.ndarray]] From 3b1ce1e6ec8c5a7e0bdf392cef5813a5f3eaf2ce Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Sun, 25 Sep 2022 17:07:31 +0200 Subject: [PATCH 3/6] use nby on more places --- flox/core.py | 308 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 308 insertions(+) diff --git a/flox/core.py b/flox/core.py index 0372521c7..38b17511d 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1479,6 +1479,314 @@ def groupby_reduce( ) reindex = _validate_reindex(reindex, func, method, expected_groups) + by: tuple = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by) + nby = len(by) + by_is_dask = any(is_duck_dask_array(b) for b in by) + + if method in ["split-reduce", "cohorts"] and by_is_dask: + raise ValueError(f"method={method!r} can only be used when grouping by numpy arrays.") + + if not is_duck_array(array): + array = np.asarray(array) + array = array.astype(int) if np.issubdtype(array.dtype, bool) else array + + if isinstance(isbin, bool): + isbin = (isbin,) * nby + if expected_groups is None: + expected_groups = (None,) * nby + + _assert_by_is_aligned(array.shape, by) + + if nby == 1 and not isinstance(expected_groups, tuple): + expected_groups = (np.asarray(expected_groups),) + elif len(expected_groups) != nby: + raise ValueError( + f"Must have same number of `expected_groups` (received {len(expected_groups)}) " + f" and variables to group by (received {nby})." + ) + + # We convert to pd.Index since that lets us know if we are binning or not + # (pd.IntervalIndex or not) + expected_groups = _convert_expected_groups_to_index(expected_groups, isbin, sort) + + # TODO: could restrict this to dask-only + factorize_early = (nby > 1) or ( + any(isbin) and method in ["split-reduce", "cohorts"] and is_duck_dask_array(array) + ) + if factorize_early: + by, final_groups, grp_shape = _factorize_multiple( + by, expected_groups, by_is_dask=by_is_dask, reindex=reindex + ) + expected_groups = (pd.RangeIndex(np.prod(grp_shape)),) + + assert len(by) == 1 + by = by[0] + expected_groups = expected_groups[0] + + if axis is None: + axis = tuple(array.ndim + np.arange(-by.ndim, 0)) + else: + axis = np.core.numeric.normalize_axis_tuple(axis, array.ndim) # type: ignore + + if method in ["blockwise", "cohorts", "split-reduce"] and len(axis) != by.ndim: + raise NotImplementedError( + "Must reduce along all dimensions of `by` when method != 'map-reduce'." + f"Received method={method!r}" + ) + + # TODO: make sure expected_groups is unique + if len(axis) == 1 and by.ndim > 1 and expected_groups is None: + if not by_is_dask: + expected_groups = _get_expected_groups(by, sort) + else: + # When we reduce along all axes, we are guaranteed to see all + # groups in the final combine stage, so everything works. + # This is not necessarily true when reducing along a subset of axes + # (of by) + # TODO: Does this depend on chunking of by? + # For e.g., we could relax this if there is only one chunk along all + # by dim != axis? + raise NotImplementedError( + "Please provide ``expected_groups`` when not reducing along all axes." + ) + + assert len(axis) <= by.ndim + if len(axis) < by.ndim: + by = _move_reduce_dims_to_end(by, -array.ndim + np.array(axis) + by.ndim) + array = _move_reduce_dims_to_end(array, axis) + axis = tuple(array.ndim + np.arange(-len(axis), 0)) + + has_dask = is_duck_dask_array(array) or is_duck_dask_array(by) + + # When axis is a subset of possible values; then npg will + # apply it to groups that don't exist along a particular axis (for e.g.) + # since these count as a group that is absent. thoo! + # fill_value applies to all-NaN groups as well as labels in expected_groups that are not found. + # The only way to do this consistently is mask out using min_count + # Consider np.sum([np.nan]) = np.nan, np.nansum([np.nan]) = 0 + if min_count is None: + if len(axis) < by.ndim or fill_value is not None: + min_count = 1 + + # TODO: set in xarray? + if min_count is not None and func in ["nansum", "nanprod"] and fill_value is None: + # nansum, nanprod have fill_value=0, 1 + # overwrite than when min_count is set + fill_value = np.nan + + kwargs = dict(axis=axis, fill_value=fill_value, engine=engine) + agg = _initialize_aggregation(func, array.dtype, fill_value, min_count, finalize_kwargs) + + if not has_dask: + results = _reduce_blockwise( + array, by, agg, expected_groups=expected_groups, reindex=reindex, sort=sort, **kwargs + ) + groups = (results["groups"],) + result = results[agg.name] + + else: + if agg.chunk[0] is None and method != "blockwise": + raise NotImplementedError( + f"Aggregation {func.name!r} is only implemented for dask arrays when method='blockwise'." + f"\n\n Received: {func}" + ) + + # we always need some fill_value (see above) so choose the default if needed + if kwargs["fill_value"] is None: + kwargs["fill_value"] = agg.fill_value[agg.name] + + partial_agg = partial(dask_groupby_agg, split_out=split_out, **kwargs) + + if method in ["split-reduce", "cohorts"]: + cohorts = find_group_cohorts( + by, [array.chunks[ax] for ax in axis], merge=True, method=method + ) + + results = [] + groups_ = [] + for cohort in cohorts: + cohort = sorted(cohort) + # equivalent of xarray.DataArray.where(mask, drop=True) + mask = np.isin(by, cohort) + indexer = [np.unique(v) for v in np.nonzero(mask)] + array_subset = array + for ax, idxr in zip(range(-by.ndim, 0), indexer): + array_subset = np.take(array_subset, idxr, axis=ax) + numblocks = np.prod([len(array_subset.chunks[ax]) for ax in axis]) + + # get final result for these groups + r, *g = partial_agg( + array_subset, + by[np.ix_(*indexer)], + expected_groups=pd.Index(cohort), + # First deep copy becasue we might be doping blockwise, + # which sets agg.finalize=None, then map-reduce (GH102) + agg=copy.deepcopy(agg), + # reindex to expected_groups at the blockwise step. + # this approach avoids replacing non-cohort members with + # np.nan or some other sentinel value, and preserves dtypes + reindex=True, + # sort controls the final output order so apply that at the end + sort=False, + # if only a single block along axis, we can just work blockwise + # inspired by https://github.com/dask/dask/issues/8361 + method="blockwise" if numblocks == 1 and len(axis) == by.ndim else "map-reduce", + ) + results.append(r) + groups_.append(cohort) + + # concatenate results together, + # sort to make sure we match expected output + groups = (np.hstack(groups_),) + result = np.concatenate(results, axis=-1) + else: + if method == "blockwise" and by.ndim == 1: + array = rechunk_for_blockwise(array, axis=-1, labels=by) + + result, *groups = partial_agg( + array, + by, + expected_groups=None if method == "blockwise" else expected_groups, + agg=agg, + reindex=reindex, + method=method, + sort=sort, + ) + + if sort and method != "map-reduce": + assert len(groups) == 1 + sorted_idx = np.argsort(groups[0]) + result = result[..., sorted_idx] + groups = (groups[0][sorted_idx],) + + if factorize_early: + # nan group labels are factorized to -1, and preserved + # now we get rid of them by reindexing + # This also handles bins with no data + result = reindex_( + result, from_=groups[0], to=expected_groups, fill_value=fill_value + ).reshape(result.shape[:-1] + grp_shape) + groups = final_groups + return (result, *groups) + + +def groupby_reduce2( + array: np.ndarray | DaskArray, + *by: np.ndarray | DaskArray, + func: str | Aggregation, + expected_groups: Sequence | np.ndarray | None = None, + sort: bool = True, + isbin: T_IsBins = False, + axis: T_AxissOpt = None, + fill_value=None, + min_count: int | None = None, + split_out: int = 1, + method: T_Method = "map-reduce", + engine: T_Engine = "numpy", + reindex: bool | None = None, + finalize_kwargs: Mapping | None = None, +) -> tuple[DaskArray, np.ndarray | DaskArray]: + """ + GroupBy reductions using tree reductions for dask.array + + Parameters + ---------- + array : ndarray or DaskArray + Array to be reduced, possibly nD + by : ndarray or DaskArray + Array of labels to group over. Must be aligned with ``array`` so that + ``array.shape[-by.ndim :] == by.shape`` + func : str or Aggregation + Single function name or an Aggregation instance + expected_groups : (optional) Sequence + Expected unique labels. + isbin : bool, optional + Are ``expected_groups`` bin edges? + sort : bool, optional + Whether groups should be returned in sorted order. Only applies for dask + reductions when ``method`` is not ``"map-reduce"``. For ``"map-reduce"``, the groups + are always sorted. + axis : None or int or Sequence[int], optional + If None, reduce across all dimensions of by + Else, reduce across corresponding axes of array + Negative integers are normalized using array.ndim + fill_value : Any + Value to assign when a label in ``expected_groups`` is not present. + min_count : int, default: None + The required number of valid values to perform the operation. If + fewer than min_count non-NA values are present the result will be + NA. Only used if skipna is set to True or defaults to True for the + array's dtype. + split_out : int, optional + Number of chunks along group axis in output (last axis) + method : {"map-reduce", "blockwise", "cohorts", "split-reduce"}, optional + Strategy for reduction of dask arrays only: + * ``"map-reduce"``: + First apply the reduction blockwise on ``array``, then + combine a few newighbouring blocks, apply the reduction. + Continue until finalizing. Usually, ``func`` will need + to be an Aggregation instance for this method to work. + Common aggregations are implemented. + * ``"blockwise"``: + Only reduce using blockwise and avoid aggregating blocks + together. Useful for resampling-style reductions where group + members are always together. If `by` is 1D, `array` is automatically + rechunked so that chunk boundaries line up with group boundaries + i.e. each block contains all members of any group present + in that block. For nD `by`, you must make sure that all members of a group + are present in a single block. + * ``"cohorts"``: + Finds group labels that tend to occur together ("cohorts"), + indexes out cohorts and reduces that subset using "map-reduce", + repeat for all cohorts. This works well for many time groupings + where the group labels repeat at regular intervals like 'hour', + 'month', dayofyear' etc. Optimize chunking ``array`` for this + method by first rechunking using ``rechunk_for_cohorts`` + (for 1D ``by`` only). + * ``"split-reduce"``: + Break out each group into its own array and then ``"map-reduce"``. + This is implemented by having each group be its own cohort, + and is identical to xarray's default strategy. + engine : {"flox", "numpy", "numba"}, optional + Algorithm to compute the groupby reduction on non-dask arrays and on each dask chunk: + * ``"numpy"``: + Use the vectorized implementations in ``numpy_groupies.aggregate_numpy``. + This is the default choice because it works for most array types. + * ``"flox"``: + Use an internal implementation where the data is sorted so that + all members of a group occur sequentially, and then numpy.ufunc.reduceat + is to used for the reduction. This will fall back to ``numpy_groupies.aggregate_numpy`` + for a reduction that is not yet implemented. + * ``"numba"``: + Use the implementations in ``numpy_groupies.aggregate_numba``. + reindex : bool, optional + Whether to "reindex" the blockwise results to `expected_groups` (possibly automatically detected). + If True, the intermediate result of the blockwise groupby-reduction has a value for all expected groups, + and the final result is a simple reduction of those intermediates. In nearly all cases, this is a significant + boost in computation speed. For cases like time grouping, this may result in large intermediates relative to the + original block size. Avoid that by using method="cohorts". By default, it is turned off for argreductions. + finalize_kwargs : dict, optional + Kwargs passed to finalize the reduction such as ``ddof`` for var, std. + + Returns + ------- + result + Aggregated result + *groups + Group labels + + See Also + -------- + xarray.xarray_reduce + """ + + if engine == "flox" and _is_arg_reduction(func): + raise NotImplementedError( + "argreductions not supported for engine='flox' yet." + "Try engine='numpy' or engine='numba' instead." + ) + reindex = _validate_reindex(reindex, func, method, expected_groups) + bys = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by) nby = len(bys) by_is_dask = any(is_duck_dask_array(b) for b in bys) From e2e570497d10e2e69f1ef7a7121671baa1197029 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 26 Sep 2022 19:38:49 +0200 Subject: [PATCH 4/6] Update core.py --- flox/core.py | 310 +-------------------------------------------------- 1 file changed, 1 insertion(+), 309 deletions(-) diff --git a/flox/core.py b/flox/core.py index 38b17511d..069faf2ef 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1479,314 +1479,6 @@ def groupby_reduce( ) reindex = _validate_reindex(reindex, func, method, expected_groups) - by: tuple = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by) - nby = len(by) - by_is_dask = any(is_duck_dask_array(b) for b in by) - - if method in ["split-reduce", "cohorts"] and by_is_dask: - raise ValueError(f"method={method!r} can only be used when grouping by numpy arrays.") - - if not is_duck_array(array): - array = np.asarray(array) - array = array.astype(int) if np.issubdtype(array.dtype, bool) else array - - if isinstance(isbin, bool): - isbin = (isbin,) * nby - if expected_groups is None: - expected_groups = (None,) * nby - - _assert_by_is_aligned(array.shape, by) - - if nby == 1 and not isinstance(expected_groups, tuple): - expected_groups = (np.asarray(expected_groups),) - elif len(expected_groups) != nby: - raise ValueError( - f"Must have same number of `expected_groups` (received {len(expected_groups)}) " - f" and variables to group by (received {nby})." - ) - - # We convert to pd.Index since that lets us know if we are binning or not - # (pd.IntervalIndex or not) - expected_groups = _convert_expected_groups_to_index(expected_groups, isbin, sort) - - # TODO: could restrict this to dask-only - factorize_early = (nby > 1) or ( - any(isbin) and method in ["split-reduce", "cohorts"] and is_duck_dask_array(array) - ) - if factorize_early: - by, final_groups, grp_shape = _factorize_multiple( - by, expected_groups, by_is_dask=by_is_dask, reindex=reindex - ) - expected_groups = (pd.RangeIndex(np.prod(grp_shape)),) - - assert len(by) == 1 - by = by[0] - expected_groups = expected_groups[0] - - if axis is None: - axis = tuple(array.ndim + np.arange(-by.ndim, 0)) - else: - axis = np.core.numeric.normalize_axis_tuple(axis, array.ndim) # type: ignore - - if method in ["blockwise", "cohorts", "split-reduce"] and len(axis) != by.ndim: - raise NotImplementedError( - "Must reduce along all dimensions of `by` when method != 'map-reduce'." - f"Received method={method!r}" - ) - - # TODO: make sure expected_groups is unique - if len(axis) == 1 and by.ndim > 1 and expected_groups is None: - if not by_is_dask: - expected_groups = _get_expected_groups(by, sort) - else: - # When we reduce along all axes, we are guaranteed to see all - # groups in the final combine stage, so everything works. - # This is not necessarily true when reducing along a subset of axes - # (of by) - # TODO: Does this depend on chunking of by? - # For e.g., we could relax this if there is only one chunk along all - # by dim != axis? - raise NotImplementedError( - "Please provide ``expected_groups`` when not reducing along all axes." - ) - - assert len(axis) <= by.ndim - if len(axis) < by.ndim: - by = _move_reduce_dims_to_end(by, -array.ndim + np.array(axis) + by.ndim) - array = _move_reduce_dims_to_end(array, axis) - axis = tuple(array.ndim + np.arange(-len(axis), 0)) - - has_dask = is_duck_dask_array(array) or is_duck_dask_array(by) - - # When axis is a subset of possible values; then npg will - # apply it to groups that don't exist along a particular axis (for e.g.) - # since these count as a group that is absent. thoo! - # fill_value applies to all-NaN groups as well as labels in expected_groups that are not found. - # The only way to do this consistently is mask out using min_count - # Consider np.sum([np.nan]) = np.nan, np.nansum([np.nan]) = 0 - if min_count is None: - if len(axis) < by.ndim or fill_value is not None: - min_count = 1 - - # TODO: set in xarray? - if min_count is not None and func in ["nansum", "nanprod"] and fill_value is None: - # nansum, nanprod have fill_value=0, 1 - # overwrite than when min_count is set - fill_value = np.nan - - kwargs = dict(axis=axis, fill_value=fill_value, engine=engine) - agg = _initialize_aggregation(func, array.dtype, fill_value, min_count, finalize_kwargs) - - if not has_dask: - results = _reduce_blockwise( - array, by, agg, expected_groups=expected_groups, reindex=reindex, sort=sort, **kwargs - ) - groups = (results["groups"],) - result = results[agg.name] - - else: - if agg.chunk[0] is None and method != "blockwise": - raise NotImplementedError( - f"Aggregation {func.name!r} is only implemented for dask arrays when method='blockwise'." - f"\n\n Received: {func}" - ) - - # we always need some fill_value (see above) so choose the default if needed - if kwargs["fill_value"] is None: - kwargs["fill_value"] = agg.fill_value[agg.name] - - partial_agg = partial(dask_groupby_agg, split_out=split_out, **kwargs) - - if method in ["split-reduce", "cohorts"]: - cohorts = find_group_cohorts( - by, [array.chunks[ax] for ax in axis], merge=True, method=method - ) - - results = [] - groups_ = [] - for cohort in cohorts: - cohort = sorted(cohort) - # equivalent of xarray.DataArray.where(mask, drop=True) - mask = np.isin(by, cohort) - indexer = [np.unique(v) for v in np.nonzero(mask)] - array_subset = array - for ax, idxr in zip(range(-by.ndim, 0), indexer): - array_subset = np.take(array_subset, idxr, axis=ax) - numblocks = np.prod([len(array_subset.chunks[ax]) for ax in axis]) - - # get final result for these groups - r, *g = partial_agg( - array_subset, - by[np.ix_(*indexer)], - expected_groups=pd.Index(cohort), - # First deep copy becasue we might be doping blockwise, - # which sets agg.finalize=None, then map-reduce (GH102) - agg=copy.deepcopy(agg), - # reindex to expected_groups at the blockwise step. - # this approach avoids replacing non-cohort members with - # np.nan or some other sentinel value, and preserves dtypes - reindex=True, - # sort controls the final output order so apply that at the end - sort=False, - # if only a single block along axis, we can just work blockwise - # inspired by https://github.com/dask/dask/issues/8361 - method="blockwise" if numblocks == 1 and len(axis) == by.ndim else "map-reduce", - ) - results.append(r) - groups_.append(cohort) - - # concatenate results together, - # sort to make sure we match expected output - groups = (np.hstack(groups_),) - result = np.concatenate(results, axis=-1) - else: - if method == "blockwise" and by.ndim == 1: - array = rechunk_for_blockwise(array, axis=-1, labels=by) - - result, *groups = partial_agg( - array, - by, - expected_groups=None if method == "blockwise" else expected_groups, - agg=agg, - reindex=reindex, - method=method, - sort=sort, - ) - - if sort and method != "map-reduce": - assert len(groups) == 1 - sorted_idx = np.argsort(groups[0]) - result = result[..., sorted_idx] - groups = (groups[0][sorted_idx],) - - if factorize_early: - # nan group labels are factorized to -1, and preserved - # now we get rid of them by reindexing - # This also handles bins with no data - result = reindex_( - result, from_=groups[0], to=expected_groups, fill_value=fill_value - ).reshape(result.shape[:-1] + grp_shape) - groups = final_groups - return (result, *groups) - - -def groupby_reduce2( - array: np.ndarray | DaskArray, - *by: np.ndarray | DaskArray, - func: str | Aggregation, - expected_groups: Sequence | np.ndarray | None = None, - sort: bool = True, - isbin: T_IsBins = False, - axis: T_AxissOpt = None, - fill_value=None, - min_count: int | None = None, - split_out: int = 1, - method: T_Method = "map-reduce", - engine: T_Engine = "numpy", - reindex: bool | None = None, - finalize_kwargs: Mapping | None = None, -) -> tuple[DaskArray, np.ndarray | DaskArray]: - """ - GroupBy reductions using tree reductions for dask.array - - Parameters - ---------- - array : ndarray or DaskArray - Array to be reduced, possibly nD - by : ndarray or DaskArray - Array of labels to group over. Must be aligned with ``array`` so that - ``array.shape[-by.ndim :] == by.shape`` - func : str or Aggregation - Single function name or an Aggregation instance - expected_groups : (optional) Sequence - Expected unique labels. - isbin : bool, optional - Are ``expected_groups`` bin edges? - sort : bool, optional - Whether groups should be returned in sorted order. Only applies for dask - reductions when ``method`` is not ``"map-reduce"``. For ``"map-reduce"``, the groups - are always sorted. - axis : None or int or Sequence[int], optional - If None, reduce across all dimensions of by - Else, reduce across corresponding axes of array - Negative integers are normalized using array.ndim - fill_value : Any - Value to assign when a label in ``expected_groups`` is not present. - min_count : int, default: None - The required number of valid values to perform the operation. If - fewer than min_count non-NA values are present the result will be - NA. Only used if skipna is set to True or defaults to True for the - array's dtype. - split_out : int, optional - Number of chunks along group axis in output (last axis) - method : {"map-reduce", "blockwise", "cohorts", "split-reduce"}, optional - Strategy for reduction of dask arrays only: - * ``"map-reduce"``: - First apply the reduction blockwise on ``array``, then - combine a few newighbouring blocks, apply the reduction. - Continue until finalizing. Usually, ``func`` will need - to be an Aggregation instance for this method to work. - Common aggregations are implemented. - * ``"blockwise"``: - Only reduce using blockwise and avoid aggregating blocks - together. Useful for resampling-style reductions where group - members are always together. If `by` is 1D, `array` is automatically - rechunked so that chunk boundaries line up with group boundaries - i.e. each block contains all members of any group present - in that block. For nD `by`, you must make sure that all members of a group - are present in a single block. - * ``"cohorts"``: - Finds group labels that tend to occur together ("cohorts"), - indexes out cohorts and reduces that subset using "map-reduce", - repeat for all cohorts. This works well for many time groupings - where the group labels repeat at regular intervals like 'hour', - 'month', dayofyear' etc. Optimize chunking ``array`` for this - method by first rechunking using ``rechunk_for_cohorts`` - (for 1D ``by`` only). - * ``"split-reduce"``: - Break out each group into its own array and then ``"map-reduce"``. - This is implemented by having each group be its own cohort, - and is identical to xarray's default strategy. - engine : {"flox", "numpy", "numba"}, optional - Algorithm to compute the groupby reduction on non-dask arrays and on each dask chunk: - * ``"numpy"``: - Use the vectorized implementations in ``numpy_groupies.aggregate_numpy``. - This is the default choice because it works for most array types. - * ``"flox"``: - Use an internal implementation where the data is sorted so that - all members of a group occur sequentially, and then numpy.ufunc.reduceat - is to used for the reduction. This will fall back to ``numpy_groupies.aggregate_numpy`` - for a reduction that is not yet implemented. - * ``"numba"``: - Use the implementations in ``numpy_groupies.aggregate_numba``. - reindex : bool, optional - Whether to "reindex" the blockwise results to `expected_groups` (possibly automatically detected). - If True, the intermediate result of the blockwise groupby-reduction has a value for all expected groups, - and the final result is a simple reduction of those intermediates. In nearly all cases, this is a significant - boost in computation speed. For cases like time grouping, this may result in large intermediates relative to the - original block size. Avoid that by using method="cohorts". By default, it is turned off for argreductions. - finalize_kwargs : dict, optional - Kwargs passed to finalize the reduction such as ``ddof`` for var, std. - - Returns - ------- - result - Aggregated result - *groups - Group labels - - See Also - -------- - xarray.xarray_reduce - """ - - if engine == "flox" and _is_arg_reduction(func): - raise NotImplementedError( - "argreductions not supported for engine='flox' yet." - "Try engine='numpy' or engine='numba' instead." - ) - reindex = _validate_reindex(reindex, func, method, expected_groups) - bys = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by) nby = len(bys) by_is_dask = any(is_duck_dask_array(b) for b in bys) @@ -1830,7 +1522,7 @@ def groupby_reduce2( expected_groups = (pd.RangeIndex(np.prod(grp_shape)),) assert len(bys) == 1 - by_ = by[0] + by_ = bys[0] expected_groups = expected_groups[0] if axis is None: From 95b736f614c1fab80975c228f0bac7d5d8b1d44e Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Tue, 27 Sep 2022 19:32:56 +0200 Subject: [PATCH 5/6] Update core.py --- flox/core.py | 46 +++++++++++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/flox/core.py b/flox/core.py index 069faf2ef..34a088b3f 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1526,18 +1526,20 @@ def groupby_reduce( expected_groups = expected_groups[0] if axis is None: - axis = tuple(array.ndim + np.arange(-by_.ndim, 0)) + axis_ = tuple(array.ndim + np.arange(-by_.ndim, 0)) else: - axis = np.core.numeric.normalize_axis_tuple(axis, array.ndim) # type: ignore + # TODO: How come this function doesn't exist according to mypy? + axis_ = np.core.numeric.normalize_axis_tuple(axis, array.ndim) # type: ignore + nax = len(axis_) - if method in ["blockwise", "cohorts", "split-reduce"] and len(axis) != by_.ndim: + if method in ["blockwise", "cohorts", "split-reduce"] and nax != by_.ndim: raise NotImplementedError( "Must reduce along all dimensions of `by` when method != 'map-reduce'." f"Received method={method!r}" ) # TODO: make sure expected_groups is unique - if len(axis) == 1 and by_.ndim > 1 and expected_groups is None: + if nax == 1 and by_.ndim > 1 and expected_groups is None: if not by_is_dask: expected_groups = _get_expected_groups(by_, sort) else: @@ -1552,11 +1554,12 @@ def groupby_reduce( "Please provide ``expected_groups`` when not reducing along all axes." ) - assert len(axis) <= by_.ndim - if len(axis) < by_.ndim: - by_ = _move_reduce_dims_to_end(by_, -array.ndim + np.array(axis) + by_.ndim) - array = _move_reduce_dims_to_end(array, axis) - axis = tuple(array.ndim + np.arange(-len(axis), 0)) + assert nax <= by_.ndim + if nax < by_.ndim: + by_ = _move_reduce_dims_to_end(by_, tuple(-array.ndim + ax + by_.ndim for ax in axis_)) + array = _move_reduce_dims_to_end(array, axis_) + axis_ = tuple(array.ndim + np.arange(-nax, 0)) + nax = len(axis_) has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_) @@ -1567,7 +1570,7 @@ def groupby_reduce( # The only way to do this consistently is mask out using min_count # Consider np.sum([np.nan]) = np.nan, np.nansum([np.nan]) = 0 if min_count is None: - if len(axis) < by_.ndim or fill_value is not None: + if nax < by_.ndim or fill_value is not None: min_count = 1 # TODO: set in xarray? @@ -1576,7 +1579,7 @@ def groupby_reduce( # overwrite than when min_count is set fill_value = np.nan - kwargs = dict(axis=axis, fill_value=fill_value, engine=engine) + kwargs = dict(axis=axis_, fill_value=fill_value, engine=engine) agg = _initialize_aggregation(func, array.dtype, fill_value, min_count, finalize_kwargs) if not has_dask: @@ -1587,9 +1590,12 @@ def groupby_reduce( result = results[agg.name] else: + if TYPE_CHECKING: + assert isinstance(array, DaskArray) # TODO: How else to narrow that .chunk is there? + if agg.chunk[0] is None and method != "blockwise": raise NotImplementedError( - f"Aggregation {func.name!r} is only implemented for dask arrays when method='blockwise'." + f"Aggregation {agg.name!r} is only implemented for dask arrays when method='blockwise'." f"\n\n Received: {func}" ) @@ -1601,10 +1607,10 @@ def groupby_reduce( if method in ["split-reduce", "cohorts"]: cohorts = find_group_cohorts( - by_, [array.chunks[ax] for ax in axis], merge=True, method=method + by_, [array.chunks[ax] for ax in axis_], merge=True, method=method ) - results = [] + results_ = [] groups_ = [] for cohort in cohorts: cohort = sorted(cohort) @@ -1614,7 +1620,7 @@ def groupby_reduce( array_subset = array for ax, idxr in zip(range(-by_.ndim, 0), indexer): array_subset = np.take(array_subset, idxr, axis=ax) - numblocks = np.prod([len(array_subset.chunks[ax]) for ax in axis]) + numblocks = np.prod([len(array_subset.chunks[ax]) for ax in axis_]) # get final result for these groups r, *g = partial_agg( @@ -1632,22 +1638,20 @@ def groupby_reduce( sort=False, # if only a single block along axis, we can just work blockwise # inspired by https://github.com/dask/dask/issues/8361 - method="blockwise" - if numblocks == 1 and len(axis) == by_.ndim - else "map-reduce", + method="blockwise" if numblocks == 1 and nax == by_.ndim else "map-reduce", ) - results.append(r) + results_.append(r) groups_.append(cohort) # concatenate results together, # sort to make sure we match expected output groups = (np.hstack(groups_),) - result = np.concatenate(results, axis=-1) + result = np.concatenate(results_, axis=-1) else: if method == "blockwise" and by_.ndim == 1: array = rechunk_for_blockwise(array, axis=-1, labels=by_) - result, *groups = partial_agg( + result, groups = partial_agg( array, by_, expected_groups=None if method == "blockwise" else expected_groups, From 68442504ec1bada91fd3155f7f5ab7617f006f15 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Tue, 27 Sep 2022 19:44:09 +0200 Subject: [PATCH 6/6] add dask_groupby_agg fixes --- flox/core.py | 42 +++++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/flox/core.py b/flox/core.py index 34a088b3f..f172970e9 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1055,14 +1055,14 @@ def dask_groupby_agg( by: DaskArray | np.ndarray, agg: Aggregation, expected_groups: pd.Index | None, - axis: Sequence = None, + axis: T_Axiss = (), split_out: int = 1, fill_value: Any = None, - method: str = "map-reduce", + method: T_Method = "map-reduce", reindex: bool = False, - engine: str = "numpy", + engine: T_Engine = "numpy", sort: bool = True, -) -> tuple[DaskArray, np.ndarray | DaskArray]: +) -> tuple[DaskArray, tuple[np.ndarray | DaskArray]]: import dask.array from dask.array.core import slices_from_chunks @@ -1164,18 +1164,21 @@ def dask_groupby_agg( else: intermediate = applied if expected_groups is None: - expected_groups = _get_expected_groups(by_input, sort=sort, raise_if_dask=False) + if is_duck_dask_array(by_input): + expected_groups = None + else: + expected_groups = _get_expected_groups(by_input, sort=sort) group_chunks = ((len(expected_groups),) if expected_groups is not None else (np.nan,),) if method == "map-reduce": # these are negative axis indices useful for concatenating the intermediates neg_axis = tuple(range(-len(axis), 0)) - combine = ( - _simple_combine - if do_simple_combine - else partial(_grouped_combine, engine=engine, neg_axis=neg_axis, sort=sort) - ) + combine: Callable[..., IntermediateDict] + if do_simple_combine: + combine = _simple_combine + else: + combine = partial(_grouped_combine, engine=engine, neg_axis=neg_axis, sort=sort) # reduced is really a dict mapping reduction name to array # and "groups" to an array of group labels @@ -1214,13 +1217,12 @@ def dask_groupby_agg( groups_in_block = tuple( np.intersect1d(by_input[slc], expected_groups) for slc in slices ) - ngroups_per_block = tuple(len(groups) for groups in groups_in_block) + ngroups_per_block = tuple(len(grp) for grp in groups_in_block) output_chunks = reduced.chunks[: -(len(axis))] + (ngroups_per_block,) else: raise ValueError(f"Unknown method={method}.") # extract results from the dict - result: dict = {} layer: dict[tuple, tuple] = {} ochunks = tuple(range(len(chunks_v)) for chunks_v in output_chunks) if is_duck_dask_array(by_input) and expected_groups is None: @@ -1232,7 +1234,7 @@ def dask_groupby_agg( (reduced.name, *first_block), "groups", ) - groups = ( + groups: tuple[np.ndarray | DaskArray] = ( dask.array.Array( HighLevelGraph.from_collections(groups_name, layer, dependencies=[reduced]), groups_name, @@ -1243,12 +1245,14 @@ def dask_groupby_agg( else: if method == "map-reduce": if expected_groups is None: - expected_groups = _get_expected_groups(by_input, sort=sort) - groups = (expected_groups.to_numpy(),) + expected_groups_ = _get_expected_groups(by_input, sort=sort) + else: + expected_groups_ = expected_groups + groups = (expected_groups_.to_numpy(),) else: groups = (np.concatenate(groups_in_block),) - layer: dict[tuple, tuple] = {} # type: ignore + layer2: dict[tuple, tuple] = {} agg_name = f"{name}-{token}" for ochunk in itertools.product(*ochunks): if method == "blockwise": @@ -1259,16 +1263,16 @@ def dask_groupby_agg( inchunk = ochunk[:-1] + np.unravel_index(ochunk[-1], nblocks) else: inchunk = ochunk[:-1] + (0,) * len(axis) + (ochunk[-1],) * int(split_out > 1) - layer[(agg_name, *ochunk)] = (operator.getitem, (reduced.name, *inchunk), agg.name) + layer2[(agg_name, *ochunk)] = (operator.getitem, (reduced.name, *inchunk), agg.name) result = dask.array.Array( - HighLevelGraph.from_collections(agg_name, layer, dependencies=[reduced]), + HighLevelGraph.from_collections(agg_name, layer2, dependencies=[reduced]), agg_name, chunks=output_chunks, dtype=agg.dtype[agg.name], ) - return (result, *groups) + return (result, groups) def _validate_reindex(reindex: bool, func, method, expected_groups) -> bool: