diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 901085acbdd..17af655c02e 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -69,6 +69,8 @@ Bug fixes By `Kai Mühlbauer `_. - Remove dask-expr from CI runs, add "pyarrow" dask dependency to windows CI runs, fix related tests (:issue:`9962`, :pull:`9971`). By `Kai Mühlbauer `_. +- Use zarr-fixture to prevent thread leakage errors (:pull:`9967`). + By `Kai Mühlbauer `_. Documentation ~~~~~~~~~~~~~ diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index 9d68d1899d8..e4fdf08d0b4 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -21,7 +21,9 @@ from distributed.client import futures_of from distributed.utils_test import ( # noqa: F401 cleanup, + client, cluster, + cluster_fixture, gen_cluster, loop, loop_in_thread, @@ -46,6 +48,7 @@ from xarray.tests.test_dataset import create_test_data loop = loop # loop is an imported fixture, which flake8 has issues ack-ing +client = client # client is an imported fixture, which flake8 has issues ack-ing @pytest.fixture @@ -214,11 +217,41 @@ def test_dask_distributed_read_netcdf_integration_test( assert_allclose(original, computed) +# fixture vendored from dask +# heads-up, this is using quite private zarr API +# https://github.com/dask/dask/blob/e04734b4d8959ba259801f2e2a490cb4ee8d891f/dask/tests/test_distributed.py#L338-L358 +@pytest.fixture(scope="function") +def zarr(client): + zarr_lib = pytest.importorskip("zarr") + # Zarr-Python 3 lazily allocates a dedicated thread/IO loop + # for to execute async tasks. To avoid having this thread + # be picked up as a "leaked thread", we manually trigger it's + # creation before using zarr + try: + _ = zarr_lib.core.sync._get_loop() + _ = zarr_lib.core.sync._get_executor() + yield zarr_lib + except AttributeError: + yield zarr_lib + finally: + # Zarr-Python 3 lazily allocates a IO thread, a thread pool executor, and + # an IO loop. Here we clean up these resources to avoid leaking threads + # In normal operations, this is done as by an atexit handler when Zarr + # is shutting down. + try: + zarr_lib.core.sync.cleanup_resources() + except AttributeError: + pass + + @requires_zarr @pytest.mark.parametrize("consolidated", [True, False]) @pytest.mark.parametrize("compute", [True, False]) def test_dask_distributed_zarr_integration_test( - loop, consolidated: bool, compute: bool + client, + zarr, + consolidated: bool, + compute: bool, ) -> None: if consolidated: write_kwargs: dict[str, Any] = {"consolidated": True} @@ -226,23 +259,19 @@ def test_dask_distributed_zarr_integration_test( else: write_kwargs = read_kwargs = {} chunks = {"dim1": 4, "dim2": 3, "dim3": 5} - with cluster() as (s, [a, b]): - with Client(s["address"], loop=loop): - original = create_test_data().chunk(chunks) - with create_tmp_file( - allow_cleanup_failure=ON_WINDOWS, suffix=".zarrc" - ) as filename: - maybe_futures = original.to_zarr( # type: ignore[call-overload] #mypy bug? - filename, compute=compute, **write_kwargs - ) - if not compute: - maybe_futures.compute() - with xr.open_dataset( - filename, chunks="auto", engine="zarr", **read_kwargs - ) as restored: - assert isinstance(restored.var1.data, da.Array) - computed = restored.compute() - assert_allclose(original, computed) + original = create_test_data().chunk(chunks) + with create_tmp_file(allow_cleanup_failure=ON_WINDOWS, suffix=".zarrc") as filename: + maybe_futures = original.to_zarr( # type: ignore[call-overload] #mypy bug? + filename, compute=compute, **write_kwargs + ) + if not compute: + maybe_futures.compute() + with xr.open_dataset( + filename, chunks="auto", engine="zarr", **read_kwargs + ) as restored: + assert isinstance(restored.var1.data, da.Array) + computed = restored.compute() + assert_allclose(original, computed) @gen_cluster(client=True)