From 01e75181ee904282e656ee01180c2d1d3e679239 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Thu, 24 Oct 2024 17:48:00 -0400 Subject: [PATCH 1/7] new blank whatsnew --- doc/whats-new.rst | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 9a451a836ad..18fae4e0151 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -14,6 +14,34 @@ What's New np.random.seed(123456) +.. _whats-new.2024.10.1: + +v.2024.10.1 (unreleased) +------------------------ + +New Features +~~~~~~~~~~~~ + + +Breaking changes +~~~~~~~~~~~~~~~~ + + +Deprecations +~~~~~~~~~~~~ + + +Bug fixes +~~~~~~~~~ + + +Documentation +~~~~~~~~~~~~~ + + +Internal Changes +~~~~~~~~~~~~~~~~ + .. _whats-new.2024.10.0: v2024.10.0 (Oct 24th, 2024) From 4c412f50de17239d3d5250cdce21d9b91d23b042 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Wed, 8 Jan 2025 13:42:19 -0500 Subject: [PATCH 2/7] add deprecation warning for parallel=True --- xarray/backends/api.py | 19 +++++++++++++++++-- xarray/tests/test_backends.py | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/xarray/backends/api.py b/xarray/backends/api.py index 2adcc57c6b9..b608861d786 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -1365,6 +1365,9 @@ def open_groups( return groups +import warnings + + def open_mfdataset( paths: str | os.PathLike @@ -1596,7 +1599,15 @@ def open_mfdataset( open_kwargs = dict(engine=engine, chunks=chunks or {}, **kwargs) - if parallel: + if parallel is True: + warnings.warn( + "Passing ``parallel=True`` is deprecated, instead please pass ``parallel='dask'`` explicitly", + PendingDeprecationWarning, + stacklevel=2, + ) + parallel = "dask" + + if parallel == "dask": import dask # wrap the open_dataset, getattr, and preprocess with delayed @@ -1604,9 +1615,13 @@ def open_mfdataset( getattr_ = dask.delayed(getattr) if preprocess is not None: preprocess = dask.delayed(preprocess) - else: + elif parallel is False: open_ = open_dataset getattr_ = getattr + else: + raise ValueError( + f"{parallel} is an invalid option for the keyword argument ``parallel``" + ) datasets = [open_(p, **open_kwargs) for p in paths1d] closers = [getattr_(ds, "_close") for ds in datasets] diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index cfca5e69048..d405332ef16 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -4410,6 +4410,41 @@ def test_open_mfdataset_manyfiles( assert_identical(original, actual) +class TestParallel: + def test_validate_parallel_kwarg(self) -> None: + original = Dataset({"foo": ("x", np.random.randn(10))}) + datasets = [original.isel(x=slice(5)), original.isel(x=slice(5, 10))] + with create_tmp_file() as tmp1: + with create_tmp_file() as tmp2: + save_mfdataset(datasets, [tmp1, tmp2]) + + with pytest.raises(ValueError, match="garbage is an invalid option"): + open_mfdataset( + [tmp1, tmp2], + concat_dim="x", + combine="nested", + parallel="garbage", + ) + + def test_deprecation_warning(self) -> None: + original = Dataset({"foo": ("x", np.random.randn(10))}) + datasets = [original.isel(x=slice(5)), original.isel(x=slice(5, 10))] + with create_tmp_file() as tmp1: + with create_tmp_file() as tmp2: + save_mfdataset(datasets, [tmp1, tmp2]) + + with pytest.warns( + PendingDeprecationWarning, + match="please pass ``parallel='dask'`` explicitly", + ): + open_mfdataset( + [tmp1, tmp2], + concat_dim="x", + combine="nested", + parallel=True, + ) + + @requires_netCDF4 @requires_dask def test_open_mfdataset_can_open_path_objects() -> None: From ab49351d6607002d918b00543206e43bd302deaa Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Wed, 8 Jan 2025 13:43:08 -0500 Subject: [PATCH 3/7] add test for lithops --- xarray/tests/test_backends.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index d405332ef16..78b36c44f9d 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -4444,6 +4444,21 @@ def test_deprecation_warning(self) -> None: parallel=True, ) + #@requires_lithops + def test_lithops_parallel(self) -> None: + original = Dataset({"foo": ("x", np.random.randn(10))}) + datasets = [original.isel(x=slice(5)), original.isel(x=slice(5, 10))] + with create_tmp_file() as tmp1: + with create_tmp_file() as tmp2: + save_mfdataset(datasets, [tmp1, tmp2]) + with open_mfdataset( + [tmp1, tmp2], + concat_dim="x", + combine="nested", + parallel="lithops", + ) as actual: + assert_identical(actual, original) + @requires_netCDF4 @requires_dask From 82d4127df800c0a8a4d9afeee90f20c64365c522 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Wed, 8 Jan 2025 13:49:42 -0500 Subject: [PATCH 4/7] implementation for lithops parallelization --- xarray/backends/api.py | 42 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/xarray/backends/api.py b/xarray/backends/api.py index b608861d786..7acad6fb0cd 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -1615,6 +1615,16 @@ def open_mfdataset( getattr_ = dask.delayed(getattr) if preprocess is not None: preprocess = dask.delayed(preprocess) + elif parallel == "lithops": + import lithops + + # TODO use RetryingFunctionExecutor instead? + fn_exec = lithops.FunctionExecutor() + + # lithops doesn't have a delayed primitive + open_ = open_dataset + # TODO I don't know how best to chain this with the getattr + # getattr_ = getattr elif parallel is False: open_ = open_dataset getattr_ = getattr @@ -1623,15 +1633,33 @@ def open_mfdataset( f"{parallel} is an invalid option for the keyword argument ``parallel``" ) - datasets = [open_(p, **open_kwargs) for p in paths1d] - closers = [getattr_(ds, "_close") for ds in datasets] - if preprocess is not None: - datasets = [preprocess(ds) for ds in datasets] + if parallel == "dask": + datasets = [open_(p, **open_kwargs) for p in paths1d] + closers = [getattr_(ds, "_close") for ds in datasets] + if preprocess is not None: + datasets = [preprocess(ds) for ds in datasets] - if parallel: # calling compute here will return the datasets/file_objs lists, # the underlying datasets will still be stored as dask arrays datasets, closers = dask.compute(datasets, closers) + elif parallel == "lithops": + + def generate_lazy_ds(path): + # allows passing the open_dataset function to lithops without evaluating it + ds = open_(path, **kwargs) + return ds + + futures = fn_exec.map(generate_lazy_ds, paths1d) + + # wait for all the serverless workers to finish, and send their resulting lazy datasets back to the client + # TODO do we need download_results? + completed_futures, _ = fn_exec.wait(futures, download_results=True) + datasets = completed_futures.get_result() + elif parallel is False: + virtual_datasets = [open_(p, **kwargs) for p in paths1d] + closers = [getattr_(ds, "_close") for ds in virtual_datasets] + if preprocess is not None: + virtual_datasets = [preprocess(ds) for ds in virtual_datasets] # Combine all datasets, closing them in case of a ValueError try: @@ -1669,7 +1697,9 @@ def open_mfdataset( ds.close() raise - combined.set_close(partial(_multi_file_closer, closers)) + # TODO remove if once closers added above + if parallel != "lithops": + combined.set_close(partial(_multi_file_closer, closers)) # read global attributes from the attrs_file or from the first dataset if attrs_file is not None: From 47bc1f7dc17b2dfffcb2d7c51f7c53c34bfb6e0b Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Wed, 8 Jan 2025 13:55:03 -0500 Subject: [PATCH 5/7] add lithops as requirement and to CI --- ci/requirements/all-but-numba.yml | 1 + ci/requirements/environment-3.13.yml | 1 + ci/requirements/environment-windows-3.13.yml | 1 + ci/requirements/environment-windows.yml | 1 + ci/requirements/environment.yml | 1 + ci/requirements/min-all-deps.yml | 1 + xarray/tests/__init__.py | 1 + xarray/tests/test_backends.py | 5 ++++- 8 files changed, 11 insertions(+), 1 deletion(-) diff --git a/ci/requirements/all-but-numba.yml b/ci/requirements/all-but-numba.yml index 61f64a176af..9bea3a17a85 100644 --- a/ci/requirements/all-but-numba.yml +++ b/ci/requirements/all-but-numba.yml @@ -22,6 +22,7 @@ dependencies: - hypothesis - iris - lxml # Optional dep of pydap + - lithops - matplotlib-base - nc-time-axis - netcdf4 diff --git a/ci/requirements/environment-3.13.yml b/ci/requirements/environment-3.13.yml index 937cb013711..16f3cbf950e 100644 --- a/ci/requirements/environment-3.13.yml +++ b/ci/requirements/environment-3.13.yml @@ -19,6 +19,7 @@ dependencies: - hdf5 - hypothesis - iris + - lithops - lxml # Optional dep of pydap - matplotlib-base - nc-time-axis diff --git a/ci/requirements/environment-windows-3.13.yml b/ci/requirements/environment-windows-3.13.yml index 448e3f70c0c..341ac182e43 100644 --- a/ci/requirements/environment-windows-3.13.yml +++ b/ci/requirements/environment-windows-3.13.yml @@ -18,6 +18,7 @@ dependencies: - hypothesis - iris - lxml # Optional dep of pydap + - lithops - matplotlib-base - nc-time-axis - netcdf4 diff --git a/ci/requirements/environment-windows.yml b/ci/requirements/environment-windows.yml index 3b2e6dc62e6..61e84debfa4 100644 --- a/ci/requirements/environment-windows.yml +++ b/ci/requirements/environment-windows.yml @@ -18,6 +18,7 @@ dependencies: - hypothesis - iris - lxml # Optional dep of pydap + - lithops - matplotlib-base - nc-time-axis - netcdf4 diff --git a/ci/requirements/environment.yml b/ci/requirements/environment.yml index 364ae03666f..156307dbbd6 100644 --- a/ci/requirements/environment.yml +++ b/ci/requirements/environment.yml @@ -19,6 +19,7 @@ dependencies: - hdf5 - hypothesis - iris + - lithops - lxml # Optional dep of pydap - matplotlib-base - nc-time-axis diff --git a/ci/requirements/min-all-deps.yml b/ci/requirements/min-all-deps.yml index f3dab2e5bbf..dd9b040d713 100644 --- a/ci/requirements/min-all-deps.yml +++ b/ci/requirements/min-all-deps.yml @@ -30,6 +30,7 @@ dependencies: - hypothesis - iris=3.7 - lxml=4.9 # Optional dep of pydap + - lithops=3.5.1 - matplotlib-base=3.8 - nc-time-axis=1.4 # netcdf follows a 1.major.minor[.patch] convention diff --git a/xarray/tests/__init__.py b/xarray/tests/__init__.py index 1f2eedcd8f0..5c9187341d5 100644 --- a/xarray/tests/__init__.py +++ b/xarray/tests/__init__.py @@ -118,6 +118,7 @@ def _importorskip( category=DeprecationWarning, ) has_dask_expr, requires_dask_expr = _importorskip("dask_expr") +has_lithops, requires_lithops = _importorskip("lithops") has_bottleneck, requires_bottleneck = _importorskip("bottleneck") has_rasterio, requires_rasterio = _importorskip("rasterio") has_zarr, requires_zarr = _importorskip("zarr") diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index 78b36c44f9d..b966b9f55e1 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -80,6 +80,7 @@ requires_h5netcdf_1_4_0_or_above, requires_h5netcdf_ros3, requires_iris, + requires_lithops, requires_netcdf, requires_netCDF4, requires_netCDF4_1_6_2_or_above, @@ -4444,8 +4445,10 @@ def test_deprecation_warning(self) -> None: parallel=True, ) - #@requires_lithops + @requires_lithops def test_lithops_parallel(self) -> None: + # default configuration of lithops will use local executor + original = Dataset({"foo": ("x", np.random.randn(10))}) datasets = [original.isel(x=slice(5)), original.isel(x=slice(5, 10))] with create_tmp_file() as tmp1: From cf1960c4306c3ba7dbce35ef161f4dd45cfcae2c Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Wed, 8 Jan 2025 13:56:43 -0500 Subject: [PATCH 6/7] docstring --- xarray/backends/api.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/xarray/backends/api.py b/xarray/backends/api.py index 7acad6fb0cd..d45d4391dc5 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -1483,9 +1483,10 @@ def open_mfdataset( those corresponding to other dimensions. * list of str: The listed coordinate variables will be concatenated, in addition the "minimal" coordinates. - parallel : bool, default: False - If True, the open and preprocess steps of this function will be - performed in parallel using ``dask.delayed``. Default is False. + parallel : 'dask', 'lithops', or False + Specify whether the open and preprocess steps of this function will be + performed in parallel using ``dask.delayed``, in parallel using ``lithops.map``, or in serial. + Default is False. Passing True is now a deprecated alias for passing 'dask'. join : {"outer", "inner", "left", "right", "exact", "override"}, default: "outer" String indicating how to combine differing indexes (excluding concat_dim) in objects From 758293d38c0970c932d0915d4ebb96814802915b Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Wed, 8 Jan 2025 14:11:10 -0500 Subject: [PATCH 7/7] add netcdf4 requirement --- xarray/tests/test_backends.py | 1 + 1 file changed, 1 insertion(+) diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index b966b9f55e1..11d5eacf344 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -4411,6 +4411,7 @@ def test_open_mfdataset_manyfiles( assert_identical(original, actual) +@requires_netCDF4 class TestParallel: def test_validate_parallel_kwarg(self) -> None: original = Dataset({"foo": ("x", np.random.randn(10))})