From 2feb8b70e78ce60593629132b053efce269a72b1 Mon Sep 17 00:00:00 2001 From: dcherian Date: Tue, 25 Oct 2022 16:24:12 -0600 Subject: [PATCH 1/6] Use blockwise to extract final result for method="blockwise" --- flox/core.py | 43 +++++++++++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/flox/core.py b/flox/core.py index 07f6d0e69..cf01a15f6 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1370,31 +1370,42 @@ def dask_groupby_agg( # extract results from the dict output_chunks = reduced.chunks[: -len(axis)] + group_chunks - ochunks = tuple(range(len(chunks_v)) for chunks_v in output_chunks) - layer2: dict[tuple, tuple] = {} agg_name = f"{name}-{token}" - for ochunk in itertools.product(*ochunks): - if method == "blockwise": - if len(axis) == 1: - inchunk = ochunk - else: + if method == "blockwise" and len(axis) == 1: + result = reduced.map_blocks( + _extract_result, + key=agg.name, + dtype=agg.dtype[agg.name], + chunks=output_chunks, + name=agg_name, + ) + + else: + ochunks = tuple(range(len(chunks_v)) for chunks_v in output_chunks) + layer2: dict[tuple, tuple] = {} + for ochunk in itertools.product(*ochunks): + if method == "blockwise": nblocks = tuple(len(array.chunks[ax]) for ax in axis) inchunk = ochunk[:-1] + np.unravel_index(ochunk[-1], nblocks) - else: - inchunk = ochunk[:-1] + (0,) * (len(axis) - 1) + (ochunk[-1],) + else: + inchunk = ochunk[:-1] + (0,) * (len(axis) - 1) + (ochunk[-1],) - layer2[(agg_name, *ochunk)] = (operator.getitem, (reduced.name, *inchunk), agg.name) + layer2[(agg_name, *ochunk)] = (operator.getitem, (reduced.name, *inchunk), agg.name) - result = dask.array.Array( - HighLevelGraph.from_collections(agg_name, layer2, dependencies=[reduced]), - agg_name, - chunks=output_chunks, - dtype=agg.dtype[agg.name], - ) + result = dask.array.Array( + HighLevelGraph.from_collections(agg_name, layer2, dependencies=[reduced]), + agg_name, + chunks=output_chunks, + dtype=agg.dtype[agg.name], + ) return (result, groups) +def _extract_result(result_dict: FinalResultsDict, key) -> np.ndarray: + return result_dict[key] + + def _validate_reindex( reindex: bool | None, func, method: T_Method, expected_groups, by_is_dask: bool ) -> bool: From 91110e63e8eea7b013a56dbdc305ba7e4b49f191 Mon Sep 17 00:00:00 2001 From: dcherian Date: Tue, 25 Oct 2022 16:53:00 -0600 Subject: [PATCH 2/6] FOr all methods --- flox/core.py | 47 +++++++++++++++++------------------------------ 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/flox/core.py b/flox/core.py index cf01a15f6..015598185 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1195,7 +1195,6 @@ def dask_groupby_agg( import dask.array from dask.array.core import slices_from_chunks - from dask.highlevelgraph import HighLevelGraph # I think _tree_reduce expects this assert isinstance(axis, Sequence) @@ -1369,35 +1368,23 @@ def dask_groupby_agg( raise ValueError(f"Unknown method={method}.") # extract results from the dict - output_chunks = reduced.chunks[: -len(axis)] + group_chunks - agg_name = f"{name}-{token}" - if method == "blockwise" and len(axis) == 1: - result = reduced.map_blocks( - _extract_result, - key=agg.name, - dtype=agg.dtype[agg.name], - chunks=output_chunks, - name=agg_name, - ) - - else: - ochunks = tuple(range(len(chunks_v)) for chunks_v in output_chunks) - layer2: dict[tuple, tuple] = {} - for ochunk in itertools.product(*ochunks): - if method == "blockwise": - nblocks = tuple(len(array.chunks[ax]) for ax in axis) - inchunk = ochunk[:-1] + np.unravel_index(ochunk[-1], nblocks) - else: - inchunk = ochunk[:-1] + (0,) * (len(axis) - 1) + (ochunk[-1],) - - layer2[(agg_name, *ochunk)] = (operator.getitem, (reduced.name, *inchunk), agg.name) - - result = dask.array.Array( - HighLevelGraph.from_collections(agg_name, layer2, dependencies=[reduced]), - agg_name, - chunks=output_chunks, - dtype=agg.dtype[agg.name], - ) + adjust_chunks = {inds[ax]: lambda: 0 for ax in axis} + if method == "blockwise" and len(axis) > 1: + nblocks = tuple(len(array.chunks[ax]) for ax in axis) + group_chunks = np.unravel_index(group_chunks, nblocks) + for ax, chunks in zip(axis, group_chunks): + adjust_chunks[ax] = chunks + + result = dask.array.blockwise( + _extract_result, + inds[: -len(axis)] + (inds[-1],), + reduced, + inds, + adjust_chunks=adjust_chunks, + dtype=agg.dtype[agg.name], + key=agg.name, + name=f"{name}-{token}", + ) return (result, groups) From ee6be2620d79c1fc3bbee451e204d5342fce4024 Mon Sep 17 00:00:00 2001 From: dcherian Date: Tue, 25 Oct 2022 17:04:30 -0600 Subject: [PATCH 3/6] bugfix --- flox/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index 015598185..a51b6ed3f 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1368,7 +1368,7 @@ def dask_groupby_agg( raise ValueError(f"Unknown method={method}.") # extract results from the dict - adjust_chunks = {inds[ax]: lambda: 0 for ax in axis} + adjust_chunks = {inds[ax]: lambda c: 0 for ax in axis} if method == "blockwise" and len(axis) > 1: nblocks = tuple(len(array.chunks[ax]) for ax in axis) group_chunks = np.unravel_index(group_chunks, nblocks) From cb25e3886dec396be06a9731b7b880fa2864dbff Mon Sep 17 00:00:00 2001 From: dcherian Date: Tue, 25 Oct 2022 19:35:52 -0600 Subject: [PATCH 4/6] Try return_array from _finalize_results --- flox/core.py | 59 ++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 43 insertions(+), 16 deletions(-) diff --git a/flox/core.py b/flox/core.py index a51b6ed3f..063a8fa3a 100644 --- a/flox/core.py +++ b/flox/core.py @@ -803,10 +803,15 @@ def _aggregate( keepdims, fill_value: Any, reindex: bool, + return_array: bool, ) -> FinalResultsDict: """Final aggregation step of tree reduction""" results = combine(x_chunk, agg, axis, keepdims, is_aggregate=True) - return _finalize_results(results, agg, axis, expected_groups, fill_value, reindex) + finalized = _finalize_results(results, agg, axis, expected_groups, fill_value, reindex) + if return_array: + return finalized[agg.name] + else: + return finalized def _expand_dims(results: IntermediateDict) -> IntermediateDict: @@ -1287,6 +1292,7 @@ def dask_groupby_agg( group_chunks: tuple[tuple[Union[int, float], ...]] = ( (len(expected_groups),) if expected_groups is not None else (np.nan,), ) + groups_are_unknown = is_duck_dask_array(by_input) and expected_groups is None if method in ["map-reduce", "cohorts"]: combine: Callable[..., IntermediateDict] @@ -1316,16 +1322,32 @@ def dask_groupby_agg( reduced = tree_reduce( intermediate, combine=partial(combine, agg=agg), - aggregate=partial(aggregate, expected_groups=expected_groups, reindex=reindex), + aggregate=partial( + aggregate, + expected_groups=expected_groups, + reindex=reindex, + return_array=not groups_are_unknown, + ), ) - if is_duck_dask_array(by_input) and expected_groups is None: + if groups_are_unknown: groups = _extract_unknown_groups(reduced, group_chunks=group_chunks, dtype=by.dtype) + result = dask.array.map_blocks( + _extract_result, + reduced, + chunks=reduced.chunks[: -len(axis)] + group_chunks, + drop_axis=axis[:-1], + dtype=agg.dtype[agg.name], + key=agg.name, + name=f"{name}-{token}", + ) + else: if expected_groups is None: expected_groups_ = _get_expected_groups(by_input, sort=sort) else: expected_groups_ = expected_groups groups = (expected_groups_.to_numpy(),) + result = reduced elif method == "cohorts": chunks_cohorts = find_group_cohorts( @@ -1344,12 +1366,14 @@ def dask_groupby_agg( tree_reduce( reindexed, combine=partial(combine, agg=agg, reindex=True), - aggregate=partial(aggregate, expected_groups=index, reindex=True), + aggregate=partial( + aggregate, expected_groups=index, reindex=True, return_array=True + ), ) ) groups_.append(cohort) - reduced = dask.array.concatenate(reduced_, axis=-1) + result = dask.array.concatenate(reduced_, axis=-1) groups = (np.concatenate(groups_),) group_chunks = (tuple(len(cohort) for cohort in groups_),) @@ -1375,21 +1399,24 @@ def dask_groupby_agg( for ax, chunks in zip(axis, group_chunks): adjust_chunks[ax] = chunks - result = dask.array.blockwise( - _extract_result, - inds[: -len(axis)] + (inds[-1],), - reduced, - inds, - adjust_chunks=adjust_chunks, - dtype=agg.dtype[agg.name], - key=agg.name, - name=f"{name}-{token}", - ) - + # result = dask.array.blockwise( + # _extract_result, + # inds[: -len(axis)] + (inds[-1],), + # reduced, + # inds, + # adjust_chunks=adjust_chunks, + # dtype=agg.dtype[agg.name], + # key=agg.name, + # name=f"{name}-{token}", + # ) return (result, groups) def _extract_result(result_dict: FinalResultsDict, key) -> np.ndarray: + from dask.array.core import deepfirst + + if not isinstance(result_dict, dict): + result_dict = deepfirst(result_dict) return result_dict[key] From 15887abcd1f27b3ea6d29f5c2d82cba9bf1be39e Mon Sep 17 00:00:00 2001 From: dcherian Date: Tue, 25 Oct 2022 19:36:15 -0600 Subject: [PATCH 5/6] Revert "Try return_array from _finalize_results" This reverts commit cb25e3886dec396be06a9731b7b880fa2864dbff. --- flox/core.py | 59 ++++++++++++++-------------------------------------- 1 file changed, 16 insertions(+), 43 deletions(-) diff --git a/flox/core.py b/flox/core.py index 063a8fa3a..a51b6ed3f 100644 --- a/flox/core.py +++ b/flox/core.py @@ -803,15 +803,10 @@ def _aggregate( keepdims, fill_value: Any, reindex: bool, - return_array: bool, ) -> FinalResultsDict: """Final aggregation step of tree reduction""" results = combine(x_chunk, agg, axis, keepdims, is_aggregate=True) - finalized = _finalize_results(results, agg, axis, expected_groups, fill_value, reindex) - if return_array: - return finalized[agg.name] - else: - return finalized + return _finalize_results(results, agg, axis, expected_groups, fill_value, reindex) def _expand_dims(results: IntermediateDict) -> IntermediateDict: @@ -1292,7 +1287,6 @@ def dask_groupby_agg( group_chunks: tuple[tuple[Union[int, float], ...]] = ( (len(expected_groups),) if expected_groups is not None else (np.nan,), ) - groups_are_unknown = is_duck_dask_array(by_input) and expected_groups is None if method in ["map-reduce", "cohorts"]: combine: Callable[..., IntermediateDict] @@ -1322,32 +1316,16 @@ def dask_groupby_agg( reduced = tree_reduce( intermediate, combine=partial(combine, agg=agg), - aggregate=partial( - aggregate, - expected_groups=expected_groups, - reindex=reindex, - return_array=not groups_are_unknown, - ), + aggregate=partial(aggregate, expected_groups=expected_groups, reindex=reindex), ) - if groups_are_unknown: + if is_duck_dask_array(by_input) and expected_groups is None: groups = _extract_unknown_groups(reduced, group_chunks=group_chunks, dtype=by.dtype) - result = dask.array.map_blocks( - _extract_result, - reduced, - chunks=reduced.chunks[: -len(axis)] + group_chunks, - drop_axis=axis[:-1], - dtype=agg.dtype[agg.name], - key=agg.name, - name=f"{name}-{token}", - ) - else: if expected_groups is None: expected_groups_ = _get_expected_groups(by_input, sort=sort) else: expected_groups_ = expected_groups groups = (expected_groups_.to_numpy(),) - result = reduced elif method == "cohorts": chunks_cohorts = find_group_cohorts( @@ -1366,14 +1344,12 @@ def dask_groupby_agg( tree_reduce( reindexed, combine=partial(combine, agg=agg, reindex=True), - aggregate=partial( - aggregate, expected_groups=index, reindex=True, return_array=True - ), + aggregate=partial(aggregate, expected_groups=index, reindex=True), ) ) groups_.append(cohort) - result = dask.array.concatenate(reduced_, axis=-1) + reduced = dask.array.concatenate(reduced_, axis=-1) groups = (np.concatenate(groups_),) group_chunks = (tuple(len(cohort) for cohort in groups_),) @@ -1399,24 +1375,21 @@ def dask_groupby_agg( for ax, chunks in zip(axis, group_chunks): adjust_chunks[ax] = chunks - # result = dask.array.blockwise( - # _extract_result, - # inds[: -len(axis)] + (inds[-1],), - # reduced, - # inds, - # adjust_chunks=adjust_chunks, - # dtype=agg.dtype[agg.name], - # key=agg.name, - # name=f"{name}-{token}", - # ) + result = dask.array.blockwise( + _extract_result, + inds[: -len(axis)] + (inds[-1],), + reduced, + inds, + adjust_chunks=adjust_chunks, + dtype=agg.dtype[agg.name], + key=agg.name, + name=f"{name}-{token}", + ) + return (result, groups) def _extract_result(result_dict: FinalResultsDict, key) -> np.ndarray: - from dask.array.core import deepfirst - - if not isinstance(result_dict, dict): - result_dict = deepfirst(result_dict) return result_dict[key] From 39c5add4de3b809411191538ae844fe5b509b9a0 Mon Sep 17 00:00:00 2001 From: dcherian Date: Tue, 25 Oct 2022 21:10:17 -0600 Subject: [PATCH 6/6] Fixes. --- flox/core.py | 54 +++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 11 deletions(-) diff --git a/flox/core.py b/flox/core.py index a51b6ed3f..7cbb5659e 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1267,6 +1267,9 @@ def dask_groupby_agg( engine=engine, sort=sort, ), + # output indices are the same as input indices + # Unlike xhistogram, we don't always know what the size of the group + # dimension will be unless reindex=True inds, array, inds, @@ -1276,7 +1279,7 @@ def dask_groupby_agg( dtype=array.dtype, # this is purely for show meta=array._meta, align_arrays=False, - token=f"{name}-chunk-{token}", + name=f"{name}-chunk-{token}", ) if expected_groups is None: @@ -1363,34 +1366,63 @@ def dask_groupby_agg( groups = (np.concatenate(groups_in_block),) ngroups_per_block = tuple(len(grp) for grp in groups_in_block) group_chunks = (ngroups_per_block,) - else: raise ValueError(f"Unknown method={method}.") - # extract results from the dict - adjust_chunks = {inds[ax]: lambda c: 0 for ax in axis} + out_inds = inds[: -len(axis)] + (inds[-1],) + output_chunks = reduced.chunks[: -len(axis)] + group_chunks if method == "blockwise" and len(axis) > 1: - nblocks = tuple(len(array.chunks[ax]) for ax in axis) - group_chunks = np.unravel_index(group_chunks, nblocks) - for ax, chunks in zip(axis, group_chunks): - adjust_chunks[ax] = chunks + # The final results are available but the blocks along axes + # need to be reshaped to axis=-1 + # I don't know that this is possible with blockwise + # All other code paths benefit from an unmaterialized Blockwise layer + reduced = _collapse_blocks_along_axes(reduced, axis, group_chunks) + # Can't use map_blocks because it forces concatenate=True along drop_axes, result = dask.array.blockwise( _extract_result, - inds[: -len(axis)] + (inds[-1],), + out_inds, reduced, inds, - adjust_chunks=adjust_chunks, + adjust_chunks=dict(zip(out_inds, output_chunks)), dtype=agg.dtype[agg.name], key=agg.name, name=f"{name}-{token}", + concatenate=False, ) return (result, groups) +def _collapse_blocks_along_axes(reduced, axis, group_chunks): + import dask.array + from dask.highlevelgraph import HighLevelGraph + + nblocks = tuple(reduced.numblocks[ax] for ax in axis) + output_chunks = reduced.chunks[: -len(axis)] + ((1,) * (len(axis) - 1),) + group_chunks + + # extract results from the dict + ochunks = tuple(range(len(chunks_v)) for chunks_v in output_chunks) + layer2: dict[tuple, tuple] = {} + name = f"reshape-{reduced.name}" + + for ochunk in itertools.product(*ochunks): + inchunk = ochunk[: -len(axis)] + np.unravel_index(ochunk[-1], nblocks) + layer2[(name, *ochunk)] = (reduced.name, *inchunk) + + return dask.array.Array( + HighLevelGraph.from_collections(name, layer2, dependencies=[reduced]), + name, + chunks=output_chunks, + dtype=reduced.dtype, + ) + + def _extract_result(result_dict: FinalResultsDict, key) -> np.ndarray: - return result_dict[key] + from dask.array.core import deepfirst + + # deepfirst should be not be needed here but sometimes we receive a list of dict? + return deepfirst(result_dict)[key] def _validate_reindex(