From 5c4c7a361b5912e5ddd4e58460511793f2bcd1f4 Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 13 Oct 2022 16:35:19 -0600 Subject: [PATCH 1/8] Faster unique --- flox/core.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/flox/core.py b/flox/core.py index b67541ddb..7118d067c 100644 --- a/flox/core.py +++ b/flox/core.py @@ -106,7 +106,7 @@ def _collapse_axis(arr: np.ndarray, naxis: int) -> np.ndarray: def _get_optimal_chunks_for_groups(chunks, labels): chunkidx = np.cumsum(chunks) - 1 # what are the groups at chunk boundaries - labels_at_chunk_bounds = np.unique(labels[chunkidx]) + labels_at_chunk_bounds = _unique(labels[chunkidx]) # what's the last index of all groups last_indexes = npg.aggregate_numpy.aggregate(labels, np.arange(len(labels)), func="last") # what's the last index of groups at the chunk boundaries. @@ -136,6 +136,12 @@ def _get_optimal_chunks_for_groups(chunks, labels): return tuple(newchunks) +def _unique(a): + """Much faster to use pandas unique and sort the results. + np.unique sorts before uniquifying and is slow.""" + return np.sort(pd.unique(a)) + + @memoize def find_group_cohorts(labels, chunks, merge: bool = True): """ @@ -892,7 +898,7 @@ def _grouped_combine( # when there's only a single axis of reduction, we can just concatenate later, # reindexing is unnecessary # I bet we can minimize the amount of reindexing for mD reductions too, but it's complicated - unique_groups = np.unique(tuple(flatten(deepmap(listify_groups, x_chunk)))) + unique_groups = _unique(tuple(flatten(deepmap(listify_groups, x_chunk)))) unique_groups = unique_groups[~isnull(unique_groups)] if len(unique_groups) == 0: unique_groups = [np.nan] @@ -1065,7 +1071,7 @@ def subset_to_blocks( unraveled = np.unravel_index(flatblocks, blkshape) normalized: list[Union[int, np.ndarray, slice]] = [] for ax, idx in enumerate(unraveled): - i = np.unique(idx).squeeze() + i = _unique(idx).squeeze() if i.ndim == 0: normalized.append(i.item()) else: @@ -1310,7 +1316,7 @@ def dask_groupby_agg( # along the reduced axis slices = slices_from_chunks(tuple(array.chunks[ax] for ax in axis)) if expected_groups is None: - groups_in_block = tuple(np.unique(by_input[slc]) for slc in slices) + groups_in_block = tuple(_unique(by_input[slc]) for slc in slices) else: # For cohorts, we could be indexing a block with groups that # are not in the cohort (usually for nD `by`) From a46078d05c9d9b704ea2591626cc513e7e461b1d Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 13 Oct 2022 16:37:05 -0600 Subject: [PATCH 2/8] SeriesGroupBy.unique to get blocks for each label. --- flox/core.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/flox/core.py b/flox/core.py index 7118d067c..0e2b73ac9 100644 --- a/flox/core.py +++ b/flox/core.py @@ -186,14 +186,11 @@ def find_group_cohorts(labels, chunks, merge: bool = True): blocks[idx] = np.full(tuple(block.shape[ax] for ax in axis), idx) which_chunk = np.block(blocks.reshape(shape).tolist()).reshape(-1) - # We always drop NaN; np.unique also considers every NaN to be different so - # it's really important we get rid of them. raveled = labels.reshape(-1) - unique_labels = np.unique(raveled[~isnull(raveled)]) # these are chunks where a label is present - label_chunks = {lab: tuple(np.unique(which_chunk[raveled == lab])) for lab in unique_labels} + label_chunks = pd.Series(which_chunk).groupby(raveled).unique() # These invert the label_chunks mapping so we know which labels occur together. - chunks_cohorts = tlz.groupby(label_chunks.get, label_chunks.keys()) + chunks_cohorts = tlz.groupby(lambda x: tuple(label_chunks.get(x)), label_chunks.keys()) if merge: # First sort by number of chunks occupied by cohort From d73eb49e5be9cb0434a860c16439bbdf75b56b43 Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 13 Oct 2022 19:59:11 -0600 Subject: [PATCH 3/8] Add cohorts benchmark --- asv_bench/benchmarks/cohorts.py | 47 +++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 asv_bench/benchmarks/cohorts.py diff --git a/asv_bench/benchmarks/cohorts.py b/asv_bench/benchmarks/cohorts.py new file mode 100644 index 000000000..b68d4c360 --- /dev/null +++ b/asv_bench/benchmarks/cohorts.py @@ -0,0 +1,47 @@ +import dask +import numpy as np +import pandas as pd + +import flox + + +class Cohorts: + """Time the core reduction function.""" + + def setup(self, *args, **kwargs): + raise NotImplementedError + + def time_find_group_cohorts(self): + flox.core.find_group_cohorts(self.by, self.array.chunks) + flox.cache.cache.clear() + + def time_graph_construct(self): + flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis, method="cohorts") + + +class NWMMidwest(Cohorts): + """2D labels, ireregular w.r.t chunk size. + Mimics National Weather Model, Midwest county groupby.""" + + def setup(self, *args, **kwargs): + x = np.repeat(np.arange(30), 150) + y = np.repeat(np.arange(30), 60) + self.by = x[np.newaxis, :] * y[:, np.newaxis] + + self.array = dask.array.ones(self.by.shape, chunks=(350, 350)) + self.axis = (-2, -1) + + +class ERA5(Cohorts): + """ERA5""" + + def setup(self, *args, **kwargs): + time = pd.Series(pd.date_range("2016-01-01", "2018-12-31 23:59", freq="H")) + + self.by = time.dt.dayofyear.values + self.axis = (-1,) + + array = dask.array.random.random((721, 1440, len(time)), chunks=(-1, -1, 48)) + self.array = flox.core.rechunk_for_cohorts( + array, -1, self.by, force_new_chunk_at=[1], chunksize=48, ignore_old_chunks=True + ) From 3491e760f6c627e08f0b1db350bfeddf9a7f8973 Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 13 Oct 2022 20:25:58 -0600 Subject: [PATCH 4/8] Add empty cache if cachey is absent --- flox/cache.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flox/cache.py b/flox/cache.py index 4f8de8b59..dd6416aea 100644 --- a/flox/cache.py +++ b/flox/cache.py @@ -8,4 +8,5 @@ cache = cachey.Cache(1e6) memoize = partial(cache.memoize, key=dask.base.tokenize) except ImportError: + cache = {} memoize = lambda x: x # type: ignore From 820d4776c617532660dc9d4471598ad5696372a5 Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 13 Oct 2022 20:28:12 -0600 Subject: [PATCH 5/8] Always initialize cache --- flox/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flox/__init__.py b/flox/__init__.py index 879761aa7..5647a4ee0 100644 --- a/flox/__init__.py +++ b/flox/__init__.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # flake8: noqa """Top-level module for flox .""" +from . import cache from .aggregations import Aggregation # noqa from .core import groupby_reduce, rechunk_for_blockwise, rechunk_for_cohorts # noqa From f49e214b6064cfd75f3c5d5b76c9d1866d4c39d4 Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 13 Oct 2022 20:39:53 -0600 Subject: [PATCH 6/8] Guard cache clear --- asv_bench/benchmarks/cohorts.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/asv_bench/benchmarks/cohorts.py b/asv_bench/benchmarks/cohorts.py index b68d4c360..a63954055 100644 --- a/asv_bench/benchmarks/cohorts.py +++ b/asv_bench/benchmarks/cohorts.py @@ -13,7 +13,10 @@ def setup(self, *args, **kwargs): def time_find_group_cohorts(self): flox.core.find_group_cohorts(self.by, self.array.chunks) - flox.cache.cache.clear() + try: + flox.cache.cache.clear() + except AttributeError: + pass def time_graph_construct(self): flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis, method="cohorts") From a39c00dc9e4e99eacbbae9d39c9e787b5bbf9a68 Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 13 Oct 2022 20:40:40 -0600 Subject: [PATCH 7/8] Add numtasks benchmark --- asv_bench/benchmarks/cohorts.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/asv_bench/benchmarks/cohorts.py b/asv_bench/benchmarks/cohorts.py index a63954055..cd00ff1e5 100644 --- a/asv_bench/benchmarks/cohorts.py +++ b/asv_bench/benchmarks/cohorts.py @@ -21,6 +21,14 @@ def time_find_group_cohorts(self): def time_graph_construct(self): flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis, method="cohorts") + def track_num_tasks(self): + result = flox.groupby_reduce( + self.array, self.by, func="sum", axis=self.axis, method="cohorts" + )[0] + return len(result.dask.to_dict()) + + track_num_tasks.unit = "tasks" + class NWMMidwest(Cohorts): """2D labels, ireregular w.r.t chunk size. From 9cb65f4337ba58ce89291f07e74e532f2637754b Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Thu, 13 Oct 2022 20:49:50 -0600 Subject: [PATCH 8/8] Update asv_bench/benchmarks/cohorts.py --- asv_bench/benchmarks/cohorts.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/asv_bench/benchmarks/cohorts.py b/asv_bench/benchmarks/cohorts.py index cd00ff1e5..9062611f1 100644 --- a/asv_bench/benchmarks/cohorts.py +++ b/asv_bench/benchmarks/cohorts.py @@ -13,6 +13,8 @@ def setup(self, *args, **kwargs): def time_find_group_cohorts(self): flox.core.find_group_cohorts(self.by, self.array.chunks) + # The cache clear fails dependably in CI + # Not sure why try: flox.cache.cache.clear() except AttributeError: