From 41927f23a8295982ebe04cd552b1e3ae302a7212 Mon Sep 17 00:00:00 2001 From: dcherian Date: Sun, 18 Apr 2021 10:55:55 -0600 Subject: [PATCH 1/4] Support ffill and bfill along chunked dimensions Does not support limit yet. --- doc/whats-new.rst | 2 ++ xarray/core/dataarray.py | 6 ++-- xarray/core/dataset.py | 6 ++-- xarray/core/duck_array_ops.py | 24 ++++++++++++++ xarray/core/missing.py | 14 +++------ xarray/tests/test_duck_array_ops.py | 25 +++++++++++++++ xarray/tests/test_missing.py | 49 +++++++++++++++-------------- 7 files changed, 90 insertions(+), 36 deletions(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 47c5d226d67..8226756301a 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -234,6 +234,8 @@ New Features Bug fixes ~~~~~~~~~ +- Properly support :py:meth:`DataArray.ffill`, :py:meth:`DataArray.bfill`, :py:meth:`Dataset.ffill`, :py:meth:`Dataset.bfill` along chunked dimensions. + (:issue:`2699`).By `Deepak Cherian `_. - Use specific type checks in :py:func:`~xarray.core.variable.as_compatible_data` instead of blanket access to ``values`` attribute (:issue:`2097`) diff --git a/xarray/core/dataarray.py b/xarray/core/dataarray.py index ba0ad135453..951d81e7730 100644 --- a/xarray/core/dataarray.py +++ b/xarray/core/dataarray.py @@ -2515,7 +2515,8 @@ def ffill(self, dim: Hashable, limit: int = None) -> "DataArray": The maximum number of consecutive NaN values to forward fill. In other words, if there is a gap with more than this number of consecutive NaNs, it will only be partially filled. Must be greater - than 0 or None for no limit. + than 0 or None for no limit. Must be None or greater than or equal + to axis length if filling along chunked axes (dimensions). Returns ------- @@ -2539,7 +2540,8 @@ def bfill(self, dim: Hashable, limit: int = None) -> "DataArray": The maximum number of consecutive NaN values to backward fill. In other words, if there is a gap with more than this number of consecutive NaNs, it will only be partially filled. Must be greater - than 0 or None for no limit. + than 0 or None for no limit. Must be None or greater than or equal + to axis length if filling along chunked axes (dimensions). Returns ------- diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index b7bafc92963..78b01de7521 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -4653,7 +4653,8 @@ def ffill(self, dim: Hashable, limit: int = None) -> "Dataset": The maximum number of consecutive NaN values to forward fill. In other words, if there is a gap with more than this number of consecutive NaNs, it will only be partially filled. Must be greater - than 0 or None for no limit. + than 0 or None for no limit. Must be None or greater than or equal + to axis length if filling along chunked axes (dimensions). Returns ------- @@ -4678,7 +4679,8 @@ def bfill(self, dim: Hashable, limit: int = None) -> "Dataset": The maximum number of consecutive NaN values to backward fill. In other words, if there is a gap with more than this number of consecutive NaNs, it will only be partially filled. Must be greater - than 0 or None for no limit. + than 0 or None for no limit. Must be None or greater than or equal + to axis length if filling along chunked axes (dimensions). Returns ------- diff --git a/xarray/core/duck_array_ops.py b/xarray/core/duck_array_ops.py index 9dcd7906ef7..6a5c9626c5b 100644 --- a/xarray/core/duck_array_ops.py +++ b/xarray/core/duck_array_ops.py @@ -631,3 +631,27 @@ def least_squares(lhs, rhs, rcond=None, skipna=False): return dask_array_ops.least_squares(lhs, rhs, rcond=rcond, skipna=skipna) else: return nputils.least_squares(lhs, rhs, rcond=rcond, skipna=skipna) + + +def push(array, n, axis): + """ + Dask-aware bottleneck.push + """ + from bottleneck import push + + if is_duck_dask_array(array): + if len(array.chunks[axis]) > 1 and n is not None and n < array.shape[axis]: + raise NotImplementedError( + "Cannot fill along a chunked axis when limit is not None." + "Either rechunk to a single chunk along this axis or call .compute() or .load() first." + ) + if all(c == 1 for c in array.chunks[axis]): + array = array.rechunk({axis: 2}) + pushed = array.map_blocks(push, axis=axis, n=n) + if len(array.chunks[axis]) > 1: + pushed = pushed.map_overlap( + push, axis=axis, n=n, depth={axis: (1, 0)}, boundary="none" + ) + return pushed + else: + return push(array, n, axis) diff --git a/xarray/core/missing.py b/xarray/core/missing.py index e6dd8b537a0..1407107a7be 100644 --- a/xarray/core/missing.py +++ b/xarray/core/missing.py @@ -11,7 +11,7 @@ from . import utils from .common import _contains_datetime_like_objects, ones_like from .computation import apply_ufunc -from .duck_array_ops import datetime_to_numeric, timedelta_to_numeric +from .duck_array_ops import datetime_to_numeric, push, timedelta_to_numeric from .options import _get_keep_attrs from .pycompat import is_duck_dask_array from .utils import OrderedSet, is_scalar @@ -390,12 +390,10 @@ def func_interpolate_na(interpolator, y, x, **kwargs): def _bfill(arr, n=None, axis=-1): """inverse of ffill""" - import bottleneck as bn - arr = np.flip(arr, axis=axis) # fill - arr = bn.push(arr, axis=axis, n=n) + arr = push(arr, axis=axis, n=n) # reverse back to original return np.flip(arr, axis=axis) @@ -403,17 +401,15 @@ def _bfill(arr, n=None, axis=-1): def ffill(arr, dim=None, limit=None): """forward fill missing values""" - import bottleneck as bn - axis = arr.get_axis_num(dim) # work around for bottleneck 178 _limit = limit if limit is not None else arr.shape[axis] return apply_ufunc( - bn.push, + push, arr, - dask="parallelized", + dask="allowed", keep_attrs=True, output_dtypes=[arr.dtype], kwargs=dict(n=_limit, axis=axis), @@ -430,7 +426,7 @@ def bfill(arr, dim=None, limit=None): return apply_ufunc( _bfill, arr, - dask="parallelized", + dask="allowed", keep_attrs=True, output_dtypes=[arr.dtype], kwargs=dict(n=_limit, axis=axis), diff --git a/xarray/tests/test_duck_array_ops.py b/xarray/tests/test_duck_array_ops.py index e030b9d2e42..373c5cb9387 100644 --- a/xarray/tests/test_duck_array_ops.py +++ b/xarray/tests/test_duck_array_ops.py @@ -20,6 +20,7 @@ mean, np_timedelta64_to_float, pd_timedelta_to_float, + push, py_timedelta_to_float, stack, timedelta_to_numeric, @@ -35,6 +36,7 @@ has_scipy, raise_if_dask_computes, raises_regex, + requires_bottleneck, requires_cftime, requires_dask, ) @@ -859,3 +861,26 @@ def test_least_squares(use_dask, skipna): np.testing.assert_allclose(coeffs, [1.5, 1.25]) np.testing.assert_allclose(residuals, [2.0]) + + +@requires_dask +@requires_bottleneck +def test_push_dask(): + import bottleneck + import dask.array + + array = np.array([np.nan, np.nan, np.nan, 1, 2, 3, np.nan, np.nan, 4, 5, np.nan, 6]) + expected = bottleneck.push(array, axis=0) + for c in range(1, 11): + with raise_if_dask_computes(): + actual = push(dask.array.from_array(array, chunks=c), axis=0, n=None) + np.testing.assert_equal(actual, expected) + + # some chunks of size-1 with NaN + with raise_if_dask_computes(): + actual = push( + dask.array.from_array(array, chunks=(1, 2, 3, 2, 2, 1, 1)), + axis=0, + n=None, + ) + np.testing.assert_equal(actual, expected) diff --git a/xarray/tests/test_missing.py b/xarray/tests/test_missing.py index 2ab3508b667..555a1ebee95 100644 --- a/xarray/tests/test_missing.py +++ b/xarray/tests/test_missing.py @@ -17,6 +17,7 @@ assert_allclose, assert_array_equal, assert_equal, + raise_if_dask_computes, raises_regex, requires_bottleneck, requires_cftime, @@ -394,37 +395,39 @@ def test_ffill(): @requires_bottleneck @requires_dask -def test_ffill_dask(): +@pytest.mark.parametrize("method", ["ffill", "bfill"]) +def test_ffill_bfill_dask(method): da, _ = make_interpolate_example_data((40, 40), 0.5) da = da.chunk({"x": 5}) - actual = da.ffill("time") - expected = da.load().ffill("time") - assert isinstance(actual.data, dask_array_type) - assert_equal(actual, expected) - # with limit - da = da.chunk({"x": 5}) - actual = da.ffill("time", limit=3) - expected = da.load().ffill("time", limit=3) - assert isinstance(actual.data, dask_array_type) + dask_method = getattr(da, method) + numpy_method = getattr(da.compute(), method) + # unchunked axis + with raise_if_dask_computes(): + actual = dask_method("time") + expected = numpy_method("time") assert_equal(actual, expected) - -@requires_bottleneck -@requires_dask -def test_bfill_dask(): - da, _ = make_interpolate_example_data((40, 40), 0.5) - da = da.chunk({"x": 5}) - actual = da.bfill("time") - expected = da.load().bfill("time") - assert isinstance(actual.data, dask_array_type) + # chunked axis + with raise_if_dask_computes(): + actual = dask_method("x") + expected = numpy_method("x") assert_equal(actual, expected) # with limit - da = da.chunk({"x": 5}) - actual = da.bfill("time", limit=3) - expected = da.load().bfill("time", limit=3) - assert isinstance(actual.data, dask_array_type) + with raise_if_dask_computes(): + actual = dask_method("time", limit=3) + expected = numpy_method("time", limit=3) + assert_equal(actual, expected) + + # limit < axis size + with pytest.raises(NotImplementedError): + actual = dask_method("x", limit=2) + + # limit > axis size + with raise_if_dask_computes(): + actual = dask_method("x", limit=41) + expected = numpy_method("x", limit=41) assert_equal(actual, expected) From 08fe6fdf7a302eb95d2194fdc12b43d08cc63696 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 20 Apr 2021 16:26:49 -0600 Subject: [PATCH 2/4] Apply suggestions from code review --- doc/whats-new.rst | 1 - xarray/tests/test_duck_array_ops.py | 1 - 2 files changed, 2 deletions(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 512abd687df..1f25c5dfa6a 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -240,7 +240,6 @@ New Features Bug fixes ~~~~~~~~~ - - Use specific type checks in ``xarray.core.variable.as_compatible_data`` instead of blanket access to ``values`` attribute (:issue:`2097`) By `Yunus Sevinchan `_. diff --git a/xarray/tests/test_duck_array_ops.py b/xarray/tests/test_duck_array_ops.py index 4869fda5786..d1ee1c14052 100644 --- a/xarray/tests/test_duck_array_ops.py +++ b/xarray/tests/test_duck_array_ops.py @@ -35,7 +35,6 @@ has_dask, has_scipy, raise_if_dask_computes, - raises_regex, requires_bottleneck, requires_cftime, requires_dask, From 95963026fcdfc7a67a18f308c0ba1c181c1b64c1 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 21 Apr 2021 09:20:33 -0600 Subject: [PATCH 3/4] Update xarray/core/duck_array_ops.py Co-authored-by: Mathias Hauser --- xarray/core/duck_array_ops.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xarray/core/duck_array_ops.py b/xarray/core/duck_array_ops.py index 6a5c9626c5b..eaa5956db07 100644 --- a/xarray/core/duck_array_ops.py +++ b/xarray/core/duck_array_ops.py @@ -642,7 +642,7 @@ def push(array, n, axis): if is_duck_dask_array(array): if len(array.chunks[axis]) > 1 and n is not None and n < array.shape[axis]: raise NotImplementedError( - "Cannot fill along a chunked axis when limit is not None." + "Can only fill along a chunked axis when `limit` is None or at least axis length." "Either rechunk to a single chunk along this axis or call .compute() or .load() first." ) if all(c == 1 for c in array.chunks[axis]): From 00069d32fba6a8c981d4966c783d78be73129a5f Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 23 Apr 2021 20:31:40 -0600 Subject: [PATCH 4/4] move to dask_array_ops --- xarray/core/dask_array_ops.py | 21 +++++++++++++++++++++ xarray/core/duck_array_ops.py | 17 +---------------- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/xarray/core/dask_array_ops.py b/xarray/core/dask_array_ops.py index 25f082ec3c5..87f67028862 100644 --- a/xarray/core/dask_array_ops.py +++ b/xarray/core/dask_array_ops.py @@ -51,3 +51,24 @@ def least_squares(lhs, rhs, rcond=None, skipna=False): # See issue dask/dask#6516 coeffs, residuals, _, _ = da.linalg.lstsq(lhs_da, rhs) return coeffs, residuals + + +def push(array, n, axis): + """ + Dask-aware bottleneck.push + """ + from bottleneck import push + + if len(array.chunks[axis]) > 1 and n is not None and n < array.shape[axis]: + raise NotImplementedError( + "Cannot fill along a chunked axis when limit is not None." + "Either rechunk to a single chunk along this axis or call .compute() or .load() first." + ) + if all(c == 1 for c in array.chunks[axis]): + array = array.rechunk({axis: 2}) + pushed = array.map_blocks(push, axis=axis, n=n) + if len(array.chunks[axis]) > 1: + pushed = pushed.map_overlap( + push, axis=axis, n=n, depth={axis: (1, 0)}, boundary="none" + ) + return pushed diff --git a/xarray/core/duck_array_ops.py b/xarray/core/duck_array_ops.py index eaa5956db07..491f0925d73 100644 --- a/xarray/core/duck_array_ops.py +++ b/xarray/core/duck_array_ops.py @@ -634,24 +634,9 @@ def least_squares(lhs, rhs, rcond=None, skipna=False): def push(array, n, axis): - """ - Dask-aware bottleneck.push - """ from bottleneck import push if is_duck_dask_array(array): - if len(array.chunks[axis]) > 1 and n is not None and n < array.shape[axis]: - raise NotImplementedError( - "Can only fill along a chunked axis when `limit` is None or at least axis length." - "Either rechunk to a single chunk along this axis or call .compute() or .load() first." - ) - if all(c == 1 for c in array.chunks[axis]): - array = array.rechunk({axis: 2}) - pushed = array.map_blocks(push, axis=axis, n=n) - if len(array.chunks[axis]) > 1: - pushed = pushed.map_overlap( - push, axis=axis, n=n, depth={axis: (1, 0)}, boundary="none" - ) - return pushed + return dask_array_ops.push(array, n, axis) else: return push(array, n, axis)