From 656a5656866f65d44dcc0be870d59e46957afa8b Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Mon, 27 Jan 2025 21:29:16 -0500 Subject: [PATCH 01/16] WIP: Support fsspec mutable mapping objects in zarr.open --- src/zarr/storage/_common.py | 23 +++++++++++-- src/zarr/storage/_fsspec.py | 58 +++++++++++++++++++++++++++------ tests/test_api.py | 17 ++++++++++ tests/test_store/test_fsspec.py | 14 ++++++++ 4 files changed, 100 insertions(+), 12 deletions(-) diff --git a/src/zarr/storage/_common.py b/src/zarr/storage/_common.py index 6ab539bb0a..18d76d7924 100644 --- a/src/zarr/storage/_common.py +++ b/src/zarr/storage/_common.py @@ -312,8 +312,27 @@ async def make_store_path( # By only allowing dictionaries, which are in-memory, we know that MemoryStore appropriate. store = await MemoryStore.open(store_dict=store_like, read_only=_read_only) else: - msg = f"Unsupported type for store_like: '{type(store_like).__name__}'" # type: ignore[unreachable] - raise TypeError(msg) + try: # type: ignore[unreachable] + import fsspec + + if isinstance(store_like, fsspec.mapping.FSMap): + if path: + raise TypeError( + "'path' was provided but is not used for FSMap store_like objects" + ) + if storage_options: + raise TypeError( + "'storage_options was provided but is not used for FSMap store_like objects" + ) + store = FsspecStore.from_mapper(store_like, read_only=_read_only) + else: + raise ( + TypeError(f"Unsupported type for store_like: '{type(store_like).__name__}'") + ) + except ImportError: + raise ( + TypeError(f"Unsupported type for store_like: '{type(store_like).__name__}'") + ) from None result = await StorePath.open(store, path=path_normalized, mode=mode) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index c30c9b601b..b756f7bb02 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -16,7 +16,9 @@ if TYPE_CHECKING: from collections.abc import AsyncIterator, Iterable + from fsspec import AbstractFileSystem from fsspec.asyn import AsyncFileSystem + from fsspec.mapping import FSMap from zarr.core.buffer import BufferPrototype from zarr.core.common import BytesLike @@ -29,6 +31,20 @@ ) +def _make_async(fs: AbstractFileSystem) -> AsyncFileSystem: + try: + from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper + + fs = AsyncFileSystemWrapper(fs) + except ImportError as e: + raise ImportError( + f"The filesystem '{fs}' is synchronous, and the required " + "AsyncFileSystemWrapper is not available. Upgrade fsspec to version " + "2024.12.0 or later to enable this functionality." + ) from e + return fs + + class FsspecStore(Store): """ A remote Store based on FSSpec @@ -136,6 +152,37 @@ def from_upath( allowed_exceptions=allowed_exceptions, ) + @classmethod + def from_mapper( + cls, + fs_map: FSMap, + read_only: bool = False, + allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS, + ) -> FsspecStore: + """ + Create a FsspecStore from an upath object. + + Parameters + ---------- + read_only : bool + Whether the store is read-only, defaults to False. + allowed_exceptions : tuple, optional + The exceptions that are allowed to be raised when accessing the + store. Defaults to ALLOWED_EXCEPTIONS. + + Returns + ------- + FsspecStore + """ + if not fs_map.fs.async_impl or not fs_map.fs.asynchronous: + raise TypeError("Filesystem needs to support async operations.") + return cls( + fs=fs_map.fs, + path=fs_map.root, + read_only=read_only, + allowed_exceptions=allowed_exceptions, + ) + @classmethod def from_url( cls, @@ -174,16 +221,7 @@ def from_url( fs, path = url_to_fs(url, **opts) if not fs.async_impl: - try: - from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper - - fs = AsyncFileSystemWrapper(fs) - except ImportError as e: - raise ImportError( - f"The filesystem for URL '{url}' is synchronous, and the required " - "AsyncFileSystemWrapper is not available. Upgrade fsspec to version " - "2024.12.0 or later to enable this functionality." - ) from e + fs = _make_async(fs) # fsspec is not consistent about removing the scheme from the path, so check and strip it here # https://github.com/fsspec/filesystem_spec/issues/1722 diff --git a/tests/test_api.py b/tests/test_api.py index aacd558f2a..fc94ab89a2 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -288,6 +288,23 @@ def test_open_with_mode_w_minus(tmp_path: pathlib.Path) -> None: zarr.open(store=tmp_path, mode="w-") +@pytest.mark.xfail( + reason="Automatic sync -> async filesystems not implemented yet for FSMap objects." +) +def test_open_fsmap_file(tmp_path: pathlib.Path) -> None: + fsspec = pytest.importorskip("fsspec") + fs = fsspec.filesystem("file") + mapper = fs.get_mapper(tmp_path) + arr = zarr.open(store=mapper, mode="w", shape=(3, 3)) + assert isinstance(arr, Array) + + arr[...] = 3 + z2 = zarr.open(store=mapper, mode="w", shape=(3, 3)) + assert isinstance(z2, Array) + assert not (z2[:] == 3).all() + z2[:] = 3 + + @pytest.mark.parametrize("zarr_format", [2, 3]) def test_array_order(zarr_format: ZarrFormat) -> None: arr = zarr.ones(shape=(2, 2), order=None, zarr_format=zarr_format) diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index a560ca02e8..50f52512c4 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -9,6 +9,7 @@ from packaging.version import parse as parse_version import zarr.api.asynchronous +from zarr import Array from zarr.abc.store import OffsetByteRequest from zarr.core.buffer import Buffer, cpu, default_buffer_prototype from zarr.core.sync import _collect_aiterator, sync @@ -104,6 +105,19 @@ async def test_basic() -> None: assert out[0].to_bytes() == data[1:] +def test_open_s3map() -> None: + s3_filesystem = s3fs.S3FileSystem(asynchronous=True, endpoint_url=endpoint_url, anon=False) + mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/map/foo/") + arr = zarr.open(store=mapper, mode="w", shape=(3, 3)) + assert isinstance(arr, Array) + + arr[...] = 3 + z2 = zarr.open(store=mapper, mode="w", shape=(3, 3)) + assert isinstance(z2, Array) + assert not (z2[:] == 3).all() + z2[:] = 3 + + class TestFsspecStoreS3(StoreTests[FsspecStore, cpu.Buffer]): store_cls = FsspecStore buffer_cls = cpu.Buffer From 877eb80d9b16e7f838861110bcbb542734a9663e Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Thu, 6 Feb 2025 16:19:44 -0500 Subject: [PATCH 02/16] Simplify library availability checking --- src/zarr/storage/_common.py | 40 +++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/src/zarr/storage/_common.py b/src/zarr/storage/_common.py index 18d76d7924..845a386ba7 100644 --- a/src/zarr/storage/_common.py +++ b/src/zarr/storage/_common.py @@ -1,5 +1,7 @@ from __future__ import annotations +import importlib +import importlib.util import json from pathlib import Path from typing import TYPE_CHECKING, Any, Literal @@ -12,6 +14,10 @@ from zarr.storage._memory import MemoryStore from zarr.storage._utils import normalize_path +_has_fsspec = importlib.util.find_spec("fsspec") +if _has_fsspec: + from fsspec.mapping import FSMap + if TYPE_CHECKING: from zarr.core.buffer import BufferPrototype @@ -228,7 +234,7 @@ def __eq__(self, other: object) -> bool: async def make_store_path( - store_like: StoreLike | None, + store_like: StoreLike | FSMap | None, *, path: str | None = "", mode: AccessModeLiteral | None = None, @@ -311,28 +317,18 @@ async def make_store_path( # We deliberate only consider dict[str, Buffer] here, and not arbitrary mutable mappings. # By only allowing dictionaries, which are in-memory, we know that MemoryStore appropriate. store = await MemoryStore.open(store_dict=store_like, read_only=_read_only) + elif _has_fsspec: + if not isinstance(store_like, FSMap): + raise (TypeError(f"Unsupported type for store_like: '{type(store_like).__name__}'")) + if path: + raise TypeError("'path' was provided but is not used for FSMap store_like objects") + if storage_options: + raise TypeError( + "'storage_options was provided but is not used for FSMap store_like objects" + ) + store = FsspecStore.from_mapper(store_like, read_only=_read_only) else: - try: # type: ignore[unreachable] - import fsspec - - if isinstance(store_like, fsspec.mapping.FSMap): - if path: - raise TypeError( - "'path' was provided but is not used for FSMap store_like objects" - ) - if storage_options: - raise TypeError( - "'storage_options was provided but is not used for FSMap store_like objects" - ) - store = FsspecStore.from_mapper(store_like, read_only=_read_only) - else: - raise ( - TypeError(f"Unsupported type for store_like: '{type(store_like).__name__}'") - ) - except ImportError: - raise ( - TypeError(f"Unsupported type for store_like: '{type(store_like).__name__}'") - ) from None + raise (TypeError(f"Unsupported type for store_like: '{type(store_like).__name__}'")) result = await StorePath.open(store, path=path_normalized, mode=mode) From f04145c2ca6dd5baf511f8aab51a9573b230aea2 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Wed, 12 Feb 2025 19:12:54 -0500 Subject: [PATCH 03/16] Improve test coverage --- src/zarr/storage/_common.py | 4 ++-- tests/test_store/test_fsspec.py | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/zarr/storage/_common.py b/src/zarr/storage/_common.py index 845a386ba7..12255c234d 100644 --- a/src/zarr/storage/_common.py +++ b/src/zarr/storage/_common.py @@ -321,9 +321,9 @@ async def make_store_path( if not isinstance(store_like, FSMap): raise (TypeError(f"Unsupported type for store_like: '{type(store_like).__name__}'")) if path: - raise TypeError("'path' was provided but is not used for FSMap store_like objects") + raise ValueError("'path' was provided but is not used for FSMap store_like objects") if storage_options: - raise TypeError( + raise ValueError( "'storage_options was provided but is not used for FSMap store_like objects" ) store = FsspecStore.from_mapper(store_like, read_only=_read_only) diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index 56da87dbff..0a165e2dab 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -118,6 +118,22 @@ def test_open_s3map() -> None: z2[:] = 3 +def test_open_s3map_raises() -> None: + with pytest.raises(TypeError, match="Unsupported type for store_like:.*"): + zarr.open(store=0, mode="w", shape=(3, 3)) + s3_filesystem = s3fs.S3FileSystem(asynchronous=True, endpoint_url=endpoint_url, anon=False) + mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/map/foo/") + with pytest.raises( + ValueError, match="'path' was provided but is not used for FSMap store_like objects" + ): + zarr.open(store=mapper, mode="w", shape=(3, 3), path="foo") + with pytest.raises( + ValueError, + match="'storage_options was provided but is not used for FSMap store_like objects", + ): + zarr.open(store=mapper, mode="w", shape=(3, 3), storage_options={"anon": True}) + + class TestFsspecStoreS3(StoreTests[FsspecStore, cpu.Buffer]): store_cls = FsspecStore buffer_cls = cpu.Buffer From c4bfb06c0487da6b2ef0fd14db910098bf36c0c3 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Thu, 13 Feb 2025 10:05:21 -0500 Subject: [PATCH 04/16] Improve error messages --- src/zarr/storage/_fsspec.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index 6334090b25..00c604362f 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -174,8 +174,14 @@ def from_mapper( ------- FsspecStore """ - if not fs_map.fs.async_impl or not fs_map.fs.asynchronous: - raise TypeError("Filesystem needs to support async operations.") + if not fs_map.fs.async_impl: + raise NotImplementedError( + f"The filesystem '{fs_map.fs}' is synchronous and wrapping synchronous filesystems using from_mapper has not been implemented. See https://github.com/zarr-developers/zarr-python/issues/2706 for more details." + ) + if not fs_map.fs.asynchronous: + raise NotImplementedError( + f"The filesystem '{fs_map.fs}' is synchronous and conversion to an async instance has not been implemented. See https://github.com/zarr-developers/zarr-python/issues/2706 for more details." + ) return cls( fs=fs_map.fs, path=fs_map.root, From 06f35f20d94622aba22db09f8e22988077f40164 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Thu, 13 Feb 2025 10:14:48 -0500 Subject: [PATCH 05/16] Consolidate code --- src/zarr/storage/_fsspec.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index 00c604362f..9bb62d7907 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -32,17 +32,30 @@ def _make_async(fs: AbstractFileSystem) -> AsyncFileSystem: + """Convert a sync FSSpec filesystem to an async FFSpec filesystem + + If the filesystem class supports async operations, a new async instance is created + from the existing instance. + + If the filesystem class does not support async operations, the existing instance + is wrapped with AsyncFileSystemWrapper. + """ + if fs.async_impl and fs.asynchronous: + return fs + if fs.async_impl: + raise NotImplementedError( + f"The filesystem '{fs}' is synchronous and wrapping synchronous filesystems using from_mapper has not been implemented. See https://github.com/zarr-developers/zarr-python/issues/2706 for more details." + ) try: from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper - fs = AsyncFileSystemWrapper(fs) + return AsyncFileSystemWrapper(fs) except ImportError as e: raise ImportError( f"The filesystem '{fs}' is synchronous, and the required " "AsyncFileSystemWrapper is not available. Upgrade fsspec to version " "2024.12.0 or later to enable this functionality." ) from e - return fs class FsspecStore(Store): @@ -174,14 +187,8 @@ def from_mapper( ------- FsspecStore """ - if not fs_map.fs.async_impl: - raise NotImplementedError( - f"The filesystem '{fs_map.fs}' is synchronous and wrapping synchronous filesystems using from_mapper has not been implemented. See https://github.com/zarr-developers/zarr-python/issues/2706 for more details." - ) if not fs_map.fs.asynchronous: - raise NotImplementedError( - f"The filesystem '{fs_map.fs}' is synchronous and conversion to an async instance has not been implemented. See https://github.com/zarr-developers/zarr-python/issues/2706 for more details." - ) + fs_map.fs = _make_async(fs_map.fs) return cls( fs=fs_map.fs, path=fs_map.root, From e792e014229e9b592ccbce8251a2385aec86f3d9 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Thu, 13 Feb 2025 10:37:02 -0500 Subject: [PATCH 06/16] Make test more readable --- tests/test_store/test_fsspec.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index 0a165e2dab..f324f8a0fe 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -4,6 +4,7 @@ import os from typing import TYPE_CHECKING +import numpy as np import pytest from packaging.version import parse as parse_version @@ -110,12 +111,12 @@ def test_open_s3map() -> None: mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/map/foo/") arr = zarr.open(store=mapper, mode="w", shape=(3, 3)) assert isinstance(arr, Array) - - arr[...] = 3 - z2 = zarr.open(store=mapper, mode="w", shape=(3, 3)) - assert isinstance(z2, Array) - assert not (z2[:] == 3).all() - z2[:] = 3 + # Set values + arr[:] = 1 + # Read set values + arr = zarr.open(store=mapper, mode="r", shape=(3, 3)) + assert isinstance(arr, Array) + np.testing.assert_array_equal(np.ones((3, 3)), arr[:]) def test_open_s3map_raises() -> None: From ed11018cc185b6877e9014cdcfa451eedca19e5f Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Thu, 13 Feb 2025 10:50:58 -0500 Subject: [PATCH 07/16] Make async instances from sync fsmap objects --- src/zarr/storage/_fsspec.py | 11 ++++++----- tests/test_store/test_fsspec.py | 7 +++++-- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index 9bb62d7907..deca7a7313 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -3,6 +3,8 @@ import warnings from typing import TYPE_CHECKING, Any +from fsspec import AbstractFileSystem + from zarr.abc.store import ( ByteRequest, OffsetByteRequest, @@ -16,7 +18,6 @@ if TYPE_CHECKING: from collections.abc import AsyncIterator, Iterable - from fsspec import AbstractFileSystem from fsspec.asyn import AsyncFileSystem from fsspec.mapping import FSMap @@ -43,9 +44,9 @@ def _make_async(fs: AbstractFileSystem) -> AsyncFileSystem: if fs.async_impl and fs.asynchronous: return fs if fs.async_impl: - raise NotImplementedError( - f"The filesystem '{fs}' is synchronous and wrapping synchronous filesystems using from_mapper has not been implemented. See https://github.com/zarr-developers/zarr-python/issues/2706 for more details." - ) + fs_dict = fs.to_dict() + fs_dict["asynchronous"] = True + return AbstractFileSystem.from_dict(fs_dict) try: from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper @@ -187,7 +188,7 @@ def from_mapper( ------- FsspecStore """ - if not fs_map.fs.asynchronous: + if not fs_map.fs.async_impl or not fs_map.fs.asynchronous: fs_map.fs = _make_async(fs_map.fs) return cls( fs=fs_map.fs, diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index f324f8a0fe..ad847d20b1 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -106,8 +106,11 @@ async def test_basic() -> None: assert out[0].to_bytes() == data[1:] -def test_open_s3map() -> None: - s3_filesystem = s3fs.S3FileSystem(asynchronous=True, endpoint_url=endpoint_url, anon=False) +@pytest.mark.parametrize("asynchronous", [True, False]) +def test_open_s3map(asynchronous: bool) -> None: + s3_filesystem = s3fs.S3FileSystem( + asynchronous=asynchronous, endpoint_url=endpoint_url, anon=False + ) mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/map/foo/") arr = zarr.open(store=mapper, mode="w", shape=(3, 3)) assert isinstance(arr, Array) From e5860014692df12e1044c4550f1b636208becbac Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Thu, 13 Feb 2025 11:10:57 -0500 Subject: [PATCH 08/16] Move test to fsspec store --- tests/test_api.py | 27 +++++---------------------- tests/test_store/test_fsspec.py | 18 +++++++++++++++++- 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index fc94ab89a2..1678347993 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -51,11 +51,11 @@ def test_create(memory_store: Store) -> None: # create array with float shape with pytest.raises(TypeError): - z = create(shape=(400.5, 100), store=store, overwrite=True) # type: ignore [arg-type] + z = create(shape=(400.5, 100), store=store, overwrite=True) # create array with float chunk shape with pytest.raises(TypeError): - z = create(shape=(400, 100), chunks=(16, 16.5), store=store, overwrite=True) # type: ignore [arg-type] + z = create(shape=(400, 100), chunks=(16, 16.5), store=store, overwrite=True) # TODO: parametrize over everything this function takes @@ -200,7 +200,7 @@ def test_save(store: Store, n_args: int, n_kwargs: int) -> None: assert isinstance(array, Array) assert_array_equal(array[:], data) else: - save(store, *args, **kwargs) # type: ignore[arg-type] + save(store, *args, **kwargs) group = open(store) assert isinstance(group, Group) for array in group.array_values(): @@ -288,23 +288,6 @@ def test_open_with_mode_w_minus(tmp_path: pathlib.Path) -> None: zarr.open(store=tmp_path, mode="w-") -@pytest.mark.xfail( - reason="Automatic sync -> async filesystems not implemented yet for FSMap objects." -) -def test_open_fsmap_file(tmp_path: pathlib.Path) -> None: - fsspec = pytest.importorskip("fsspec") - fs = fsspec.filesystem("file") - mapper = fs.get_mapper(tmp_path) - arr = zarr.open(store=mapper, mode="w", shape=(3, 3)) - assert isinstance(arr, Array) - - arr[...] = 3 - z2 = zarr.open(store=mapper, mode="w", shape=(3, 3)) - assert isinstance(z2, Array) - assert not (z2[:] == 3).all() - z2[:] = 3 - - @pytest.mark.parametrize("zarr_format", [2, 3]) def test_array_order(zarr_format: ZarrFormat) -> None: arr = zarr.ones(shape=(2, 2), order=None, zarr_format=zarr_format) @@ -1115,13 +1098,13 @@ async def test_metadata_validation_error() -> None: MetadataValidationError, match="Invalid value for 'zarr_format'. Expected '2, 3, or None'. Got '3.0'.", ): - await zarr.api.asynchronous.open_group(zarr_format="3.0") # type: ignore[arg-type] + await zarr.api.asynchronous.open_group(zarr_format="3.0") with pytest.raises( MetadataValidationError, match="Invalid value for 'zarr_format'. Expected '2, 3, or None'. Got '3.0'.", ): - await zarr.api.asynchronous.open_array(shape=(1,), zarr_format="3.0") # type: ignore[arg-type] + await zarr.api.asynchronous.open_array(shape=(1,), zarr_format="3.0") @pytest.mark.parametrize( diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index ad847d20b1..6c8cb74483 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -17,6 +17,7 @@ from zarr.testing.store import StoreTests if TYPE_CHECKING: + import pathlib from collections.abc import Generator import botocore.client @@ -106,8 +107,23 @@ async def test_basic() -> None: assert out[0].to_bytes() == data[1:] +@pytest.mark.xfail(reason="See https://github.com/zarr-developers/zarr-python/issues/2808") +def test_open_fsmap_file(tmp_path: pathlib.Path) -> None: + fsspec = pytest.importorskip("fsspec") + fs = fsspec.filesystem("file") + mapper = fs.get_mapper(tmp_path) + arr = zarr.open(store=mapper, mode="w", shape=(3, 3)) + assert isinstance(arr, Array) + # Set values + arr[:] = 1 + # Read set values + arr = zarr.open(store=mapper, mode="r", shape=(3, 3)) + assert isinstance(arr, Array) + np.testing.assert_array_equal(np.ones((3, 3)), arr[:]) + + @pytest.mark.parametrize("asynchronous", [True, False]) -def test_open_s3map(asynchronous: bool) -> None: +def test_open_fsmap_s3(asynchronous: bool) -> None: s3_filesystem = s3fs.S3FileSystem( asynchronous=asynchronous, endpoint_url=endpoint_url, anon=False ) From 3f9a34c824e4ff6108226f8024aa31776e3b3d34 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Thu, 13 Feb 2025 11:24:15 -0500 Subject: [PATCH 09/16] Re-add type ignore --- tests/test_api.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index 1678347993..aacd558f2a 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -51,11 +51,11 @@ def test_create(memory_store: Store) -> None: # create array with float shape with pytest.raises(TypeError): - z = create(shape=(400.5, 100), store=store, overwrite=True) + z = create(shape=(400.5, 100), store=store, overwrite=True) # type: ignore [arg-type] # create array with float chunk shape with pytest.raises(TypeError): - z = create(shape=(400, 100), chunks=(16, 16.5), store=store, overwrite=True) + z = create(shape=(400, 100), chunks=(16, 16.5), store=store, overwrite=True) # type: ignore [arg-type] # TODO: parametrize over everything this function takes @@ -200,7 +200,7 @@ def test_save(store: Store, n_args: int, n_kwargs: int) -> None: assert isinstance(array, Array) assert_array_equal(array[:], data) else: - save(store, *args, **kwargs) + save(store, *args, **kwargs) # type: ignore[arg-type] group = open(store) assert isinstance(group, Group) for array in group.array_values(): @@ -1098,13 +1098,13 @@ async def test_metadata_validation_error() -> None: MetadataValidationError, match="Invalid value for 'zarr_format'. Expected '2, 3, or None'. Got '3.0'.", ): - await zarr.api.asynchronous.open_group(zarr_format="3.0") + await zarr.api.asynchronous.open_group(zarr_format="3.0") # type: ignore[arg-type] with pytest.raises( MetadataValidationError, match="Invalid value for 'zarr_format'. Expected '2, 3, or None'. Got '3.0'.", ): - await zarr.api.asynchronous.open_array(shape=(1,), zarr_format="3.0") + await zarr.api.asynchronous.open_array(shape=(1,), zarr_format="3.0") # type: ignore[arg-type] @pytest.mark.parametrize( From 4d1bd266b4be76ca30a40242a0b7acd39337beda Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Thu, 13 Feb 2025 11:26:39 -0500 Subject: [PATCH 10/16] "Update docstring" --- src/zarr/storage/_fsspec.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index deca7a7313..ac0a0a1c23 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -174,10 +174,12 @@ def from_mapper( allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS, ) -> FsspecStore: """ - Create a FsspecStore from an upath object. + Create a FsspecStore from a FSMap object. Parameters ---------- + fs_map : FSMap + Fsspec mutable mapping object. read_only : bool Whether the store is read-only, defaults to False. allowed_exceptions : tuple, optional From abc5fdf7a0482a16392a98fc1468eb162cede292 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Thu, 13 Feb 2025 12:06:03 -0500 Subject: [PATCH 11/16] Add another test --- tests/test_store/test_fsspec.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index 6c8cb74483..5e8db7450d 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -14,6 +14,7 @@ from zarr.core.buffer import Buffer, cpu, default_buffer_prototype from zarr.core.sync import _collect_aiterator, sync from zarr.storage import FsspecStore +from zarr.storage._fsspec import _make_async from zarr.testing.store import StoreTests if TYPE_CHECKING: @@ -154,6 +155,15 @@ def test_open_s3map_raises() -> None: zarr.open(store=mapper, mode="w", shape=(3, 3), storage_options={"anon": True}) +@pytest.mark.parametrize("asynchronous", [True, False]) +def test_make_async(asynchronous: bool) -> None: + s3_filesystem = s3fs.S3FileSystem( + asynchronous=asynchronous, endpoint_url=endpoint_url, anon=False + ) + fs = _make_async(s3_filesystem) + assert fs.asynchronous + + class TestFsspecStoreS3(StoreTests[FsspecStore, cpu.Buffer]): store_cls = FsspecStore buffer_cls = cpu.Buffer From cb2db7d2645249bdf90c5fcc112f300621efa006 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Fri, 14 Feb 2025 15:24:35 -0500 Subject: [PATCH 12/16] Require auto_mkdir for LocalFileSystem --- src/zarr/storage/_fsspec.py | 6 ++ tests/test_store/test_fsspec.py | 121 ++++++++++++++++++++------------ 2 files changed, 82 insertions(+), 45 deletions(-) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index ac0a0a1c23..5f1f2e410d 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -47,6 +47,12 @@ def _make_async(fs: AbstractFileSystem) -> AsyncFileSystem: fs_dict = fs.to_dict() fs_dict["asynchronous"] = True return AbstractFileSystem.from_dict(fs_dict) + from fsspec.implementations.local import LocalFileSystem + + if type(fs) is LocalFileSystem and not fs.auto_mkdir: + raise ValueError( + f"LocalFilesystem {fs} was created with auto_mkdir=False but Zarr requires the filesystem to automatically create directories" + ) try: from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index 5e8db7450d..d59523ace7 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -108,35 +108,94 @@ async def test_basic() -> None: assert out[0].to_bytes() == data[1:] -@pytest.mark.xfail(reason="See https://github.com/zarr-developers/zarr-python/issues/2808") -def test_open_fsmap_file(tmp_path: pathlib.Path) -> None: - fsspec = pytest.importorskip("fsspec") - fs = fsspec.filesystem("file") - mapper = fs.get_mapper(tmp_path) - arr = zarr.open(store=mapper, mode="w", shape=(3, 3)) +def array_roundtrip(store): + """ + Round trip an array using a Zarr store + + Args: + store: Store-Like object (e.g., FSMap) + """ + arr = zarr.open(store=store, mode="w", shape=(3, 3)) assert isinstance(arr, Array) # Set values arr[:] = 1 # Read set values - arr = zarr.open(store=mapper, mode="r", shape=(3, 3)) + arr = zarr.open(store=store, mode="r", shape=(3, 3)) assert isinstance(arr, Array) np.testing.assert_array_equal(np.ones((3, 3)), arr[:]) +@pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +def test_wrap_sync_filesystem(tmp_path): + """The local fs is not async so we should expect it to be wrapped automatically""" + from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper + + store = FsspecStore.from_url(f"local://{tmp_path}", storage_options={"auto_mkdir": True}) + assert isinstance(store.fs, AsyncFileSystemWrapper) + assert store.fs.async_impl + array_roundtrip(store) + + +@pytest.mark.skipif( + parse_version(fsspec.__version__) >= parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +def test_wrap_sync_filesystem_raises(tmp_path): + """The local fs is not async so we should expect it to be wrapped automatically""" + with pytest.raises(ImportError, match="The filesystem .*"): + FsspecStore.from_url(f"local://{tmp_path}", storage_options={"auto_mkdir": True}) + + +@pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +def test_no_wrap_async_filesystem(): + """An async fs should not be wrapped automatically; fsspec's s3 filesystem is such an fs""" + from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper + + store = FsspecStore.from_url( + f"s3://{test_bucket_name}/foo/spam/", + storage_options={"endpoint_url": endpoint_url, "anon": False}, + ) + assert not isinstance(store.fs, AsyncFileSystemWrapper) + assert store.fs.async_impl + array_roundtrip(store) + + +@pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +def test_open_fsmap_file(tmp_path: pathlib.Path) -> None: + fsspec = pytest.importorskip("fsspec") + fs = fsspec.filesystem("file", auto_mkdir=True) + mapper = fs.get_mapper(tmp_path) + array_roundtrip(mapper) + + +@pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +def test_open_fsmap_file_raises(tmp_path: pathlib.Path) -> None: + fsspec = pytest.importorskip("fsspec.implementations.local") + fs = fsspec.LocalFileSystem(auto_mkdir=False) + mapper = fs.get_mapper(tmp_path) + with pytest.raises(ValueError, match="LocalFilesystem .*"): + array_roundtrip(mapper) + + @pytest.mark.parametrize("asynchronous", [True, False]) def test_open_fsmap_s3(asynchronous: bool) -> None: s3_filesystem = s3fs.S3FileSystem( asynchronous=asynchronous, endpoint_url=endpoint_url, anon=False ) mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/map/foo/") - arr = zarr.open(store=mapper, mode="w", shape=(3, 3)) - assert isinstance(arr, Array) - # Set values - arr[:] = 1 - # Read set values - arr = zarr.open(store=mapper, mode="r", shape=(3, 3)) - assert isinstance(arr, Array) - np.testing.assert_array_equal(np.ones((3, 3)), arr[:]) + array_roundtrip(mapper) def test_open_s3map_raises() -> None: @@ -147,12 +206,12 @@ def test_open_s3map_raises() -> None: with pytest.raises( ValueError, match="'path' was provided but is not used for FSMap store_like objects" ): - zarr.open(store=mapper, mode="w", shape=(3, 3), path="foo") + zarr.open(store=mapper, path="bar", mode="w", shape=(3, 3)) with pytest.raises( ValueError, match="'storage_options was provided but is not used for FSMap store_like objects", ): - zarr.open(store=mapper, mode="w", shape=(3, 3), storage_options={"anon": True}) + zarr.open(store=mapper, storage_options={"anon": True}, mode="w", shape=(3, 3)) @pytest.mark.parametrize("asynchronous", [True, False]) @@ -276,31 +335,3 @@ async def test_empty_nonexistent_path(self, store_kwargs) -> None: store_kwargs["path"] += "/abc" store = await self.store_cls.open(**store_kwargs) assert await store.is_empty("") - - -@pytest.mark.skipif( - parse_version(fsspec.__version__) < parse_version("2024.12.0"), - reason="No AsyncFileSystemWrapper", -) -def test_wrap_sync_filesystem(): - """The local fs is not async so we should expect it to be wrapped automatically""" - from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper - - store = FsspecStore.from_url("local://test/path") - - assert isinstance(store.fs, AsyncFileSystemWrapper) - assert store.fs.async_impl - - -@pytest.mark.skipif( - parse_version(fsspec.__version__) < parse_version("2024.12.0"), - reason="No AsyncFileSystemWrapper", -) -def test_no_wrap_async_filesystem(): - """An async fs should not be wrapped automatically; fsspec's https filesystem is such an fs""" - from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper - - store = FsspecStore.from_url("https://test/path") - - assert not isinstance(store.fs, AsyncFileSystemWrapper) - assert store.fs.async_impl From 46e8bff231bc6b681250b452c70230b499fec057 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Fri, 14 Feb 2025 16:00:49 -0500 Subject: [PATCH 13/16] Update test location --- tests/test_store/test_fsspec.py | 230 ++++++++++++++++---------------- 1 file changed, 115 insertions(+), 115 deletions(-) diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index 477bc3cd2c..5f967a0811 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -108,121 +108,6 @@ async def test_basic() -> None: assert out[0].to_bytes() == data[1:] -def array_roundtrip(store): - """ - Round trip an array using a Zarr store - - Args: - store: Store-Like object (e.g., FSMap) - """ - arr = zarr.open(store=store, mode="w", shape=(3, 3)) - assert isinstance(arr, Array) - # Set values - arr[:] = 1 - # Read set values - arr = zarr.open(store=store, mode="r", shape=(3, 3)) - assert isinstance(arr, Array) - np.testing.assert_array_equal(np.ones((3, 3)), arr[:]) - - -@pytest.mark.skipif( - parse_version(fsspec.__version__) < parse_version("2024.12.0"), - reason="No AsyncFileSystemWrapper", -) -def test_wrap_sync_filesystem(tmp_path): - """The local fs is not async so we should expect it to be wrapped automatically""" - from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper - - store = FsspecStore.from_url(f"local://{tmp_path}", storage_options={"auto_mkdir": True}) - assert isinstance(store.fs, AsyncFileSystemWrapper) - assert store.fs.async_impl - array_roundtrip(store) - - -@pytest.mark.skipif( - parse_version(fsspec.__version__) >= parse_version("2024.12.0"), - reason="No AsyncFileSystemWrapper", -) -def test_wrap_sync_filesystem_raises(tmp_path): - """The local fs is not async so we should expect it to be wrapped automatically""" - with pytest.raises(ImportError, match="The filesystem .*"): - FsspecStore.from_url(f"local://{tmp_path}", storage_options={"auto_mkdir": True}) - - -@pytest.mark.skipif( - parse_version(fsspec.__version__) < parse_version("2024.12.0"), - reason="No AsyncFileSystemWrapper", -) -def test_no_wrap_async_filesystem(): - """An async fs should not be wrapped automatically; fsspec's s3 filesystem is such an fs""" - from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper - - store = FsspecStore.from_url( - f"s3://{test_bucket_name}/foo/spam/", - storage_options={"endpoint_url": endpoint_url, "anon": False}, - ) - assert not isinstance(store.fs, AsyncFileSystemWrapper) - assert store.fs.async_impl - array_roundtrip(store) - - -@pytest.mark.skipif( - parse_version(fsspec.__version__) < parse_version("2024.12.0"), - reason="No AsyncFileSystemWrapper", -) -def test_open_fsmap_file(tmp_path: pathlib.Path) -> None: - fsspec = pytest.importorskip("fsspec") - fs = fsspec.filesystem("file", auto_mkdir=True) - mapper = fs.get_mapper(tmp_path) - array_roundtrip(mapper) - - -@pytest.mark.skipif( - parse_version(fsspec.__version__) < parse_version("2024.12.0"), - reason="No AsyncFileSystemWrapper", -) -def test_open_fsmap_file_raises(tmp_path: pathlib.Path) -> None: - fsspec = pytest.importorskip("fsspec.implementations.local") - fs = fsspec.LocalFileSystem(auto_mkdir=False) - mapper = fs.get_mapper(tmp_path) - with pytest.raises(ValueError, match="LocalFilesystem .*"): - array_roundtrip(mapper) - - -@pytest.mark.parametrize("asynchronous", [True, False]) -def test_open_fsmap_s3(asynchronous: bool) -> None: - s3_filesystem = s3fs.S3FileSystem( - asynchronous=asynchronous, endpoint_url=endpoint_url, anon=False - ) - mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/map/foo/") - array_roundtrip(mapper) - - -def test_open_s3map_raises() -> None: - with pytest.raises(TypeError, match="Unsupported type for store_like:.*"): - zarr.open(store=0, mode="w", shape=(3, 3)) - s3_filesystem = s3fs.S3FileSystem(asynchronous=True, endpoint_url=endpoint_url, anon=False) - mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/map/foo/") - with pytest.raises( - ValueError, match="'path' was provided but is not used for FSMap store_like objects" - ): - zarr.open(store=mapper, path="bar", mode="w", shape=(3, 3)) - with pytest.raises( - ValueError, - match="'storage_options was provided but is not used for FSMap store_like objects", - ): - zarr.open(store=mapper, storage_options={"anon": True}, mode="w", shape=(3, 3)) - - -@pytest.mark.parametrize("asynchronous", [True, False]) -def test_make_async(asynchronous: bool) -> None: - s3_filesystem = s3fs.S3FileSystem( - asynchronous=asynchronous, endpoint_url=endpoint_url, anon=False - ) - fs = _make_async(s3_filesystem) - assert fs.asynchronous - - class TestFsspecStoreS3(StoreTests[FsspecStore, cpu.Buffer]): store_cls = FsspecStore buffer_cls = cpu.Buffer @@ -345,6 +230,121 @@ async def test_delete_dir_unsupported_deletes(self, store: FsspecStore) -> None: await store.delete_dir("test_prefix") +def array_roundtrip(store): + """ + Round trip an array using a Zarr store + + Args: + store: Store-Like object (e.g., FSMap) + """ + arr = zarr.open(store=store, mode="w", shape=(3, 3)) + assert isinstance(arr, Array) + # Set values + arr[:] = 1 + # Read set values + arr = zarr.open(store=store, mode="r", shape=(3, 3)) + assert isinstance(arr, Array) + np.testing.assert_array_equal(np.ones((3, 3)), arr[:]) + + +@pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +def test_wrap_sync_filesystem(tmp_path): + """The local fs is not async so we should expect it to be wrapped automatically""" + from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper + + store = FsspecStore.from_url(f"local://{tmp_path}", storage_options={"auto_mkdir": True}) + assert isinstance(store.fs, AsyncFileSystemWrapper) + assert store.fs.async_impl + array_roundtrip(store) + + +@pytest.mark.skipif( + parse_version(fsspec.__version__) >= parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +def test_wrap_sync_filesystem_raises(tmp_path): + """The local fs is not async so we should expect it to be wrapped automatically""" + with pytest.raises(ImportError, match="The filesystem .*"): + FsspecStore.from_url(f"local://{tmp_path}", storage_options={"auto_mkdir": True}) + + +@pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +def test_no_wrap_async_filesystem(): + """An async fs should not be wrapped automatically; fsspec's s3 filesystem is such an fs""" + from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper + + store = FsspecStore.from_url( + f"s3://{test_bucket_name}/foo/spam/", + storage_options={"endpoint_url": endpoint_url, "anon": False}, + ) + assert not isinstance(store.fs, AsyncFileSystemWrapper) + assert store.fs.async_impl + array_roundtrip(store) + + +@pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +def test_open_fsmap_file(tmp_path: pathlib.Path) -> None: + fsspec = pytest.importorskip("fsspec") + fs = fsspec.filesystem("file", auto_mkdir=True) + mapper = fs.get_mapper(tmp_path) + array_roundtrip(mapper) + + +@pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +def test_open_fsmap_file_raises(tmp_path: pathlib.Path) -> None: + fsspec = pytest.importorskip("fsspec.implementations.local") + fs = fsspec.LocalFileSystem(auto_mkdir=False) + mapper = fs.get_mapper(tmp_path) + with pytest.raises(ValueError, match="LocalFilesystem .*"): + array_roundtrip(mapper) + + +@pytest.mark.parametrize("asynchronous", [True, False]) +def test_open_fsmap_s3(asynchronous: bool) -> None: + s3_filesystem = s3fs.S3FileSystem( + asynchronous=asynchronous, endpoint_url=endpoint_url, anon=False + ) + mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/map/foo/") + array_roundtrip(mapper) + + +def test_open_s3map_raises() -> None: + with pytest.raises(TypeError, match="Unsupported type for store_like:.*"): + zarr.open(store=0, mode="w", shape=(3, 3)) + s3_filesystem = s3fs.S3FileSystem(asynchronous=True, endpoint_url=endpoint_url, anon=False) + mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/map/foo/") + with pytest.raises( + ValueError, match="'path' was provided but is not used for FSMap store_like objects" + ): + zarr.open(store=mapper, path="bar", mode="w", shape=(3, 3)) + with pytest.raises( + ValueError, + match="'storage_options was provided but is not used for FSMap store_like objects", + ): + zarr.open(store=mapper, storage_options={"anon": True}, mode="w", shape=(3, 3)) + + +@pytest.mark.parametrize("asynchronous", [True, False]) +def test_make_async(asynchronous: bool) -> None: + s3_filesystem = s3fs.S3FileSystem( + asynchronous=asynchronous, endpoint_url=endpoint_url, anon=False + ) + fs = _make_async(s3_filesystem) + assert fs.asynchronous + + @pytest.mark.skipif( parse_version(fsspec.__version__) < parse_version("2024.12.0"), reason="No AsyncFileSystemWrapper", From 7517f72bf87c2ba00400e8a7c74e3f7eec9217fa Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Wed, 26 Feb 2025 20:01:07 -0500 Subject: [PATCH 14/16] Convert older filesystems to async --- src/zarr/storage/_fsspec.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index 668fe33730..a85ee4dfe1 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -45,9 +45,18 @@ def _make_async(fs: AbstractFileSystem) -> AsyncFileSystem: if fs.async_impl and fs.asynchronous: return fs if fs.async_impl: - fs_dict = fs.to_dict() - fs_dict["asynchronous"] = True - return AbstractFileSystem.from_dict(fs_dict) + try: + fs_dict = fs.to_dict() + fs_dict["asynchronous"] = True + return AbstractFileSystem.from_dict(fs_dict) + except AttributeError: + # Older fsspec specification used to_json rather than to_dict + import json + + fs_dict = json.loads(fs.to_json()) + fs_dict["asynchronous"] = True + return AbstractFileSystem.from_json(json.dumps(fs_dict)) + from fsspec.implementations.local import LocalFileSystem if type(fs) is LocalFileSystem and not fs.auto_mkdir: From 3ae719bbc8c89158766b03fb13d81f609f91592b Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Thu, 27 Feb 2025 10:15:18 -0500 Subject: [PATCH 15/16] Use if on fsspec versions rather than try; else --- src/zarr/storage/_fsspec.py | 43 ++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index a85ee4dfe1..8aece4f377 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -4,8 +4,6 @@ from contextlib import suppress from typing import TYPE_CHECKING, Any -from fsspec import AbstractFileSystem - from zarr.abc.store import ( ByteRequest, OffsetByteRequest, @@ -19,6 +17,7 @@ if TYPE_CHECKING: from collections.abc import AsyncIterator, Iterable + from fsspec import AbstractFileSystem from fsspec.asyn import AsyncFileSystem from fsspec.mapping import FSMap @@ -42,37 +41,37 @@ def _make_async(fs: AbstractFileSystem) -> AsyncFileSystem: If the filesystem class does not support async operations, the existing instance is wrapped with AsyncFileSystemWrapper. """ + import fsspec + from packaging.version import parse as parse_version + + fsspec_version = parse_version(fsspec.__version__) if fs.async_impl and fs.asynchronous: + # Already an async instance of an async filesystem, nothing to do return fs if fs.async_impl: - try: - fs_dict = fs.to_dict() - fs_dict["asynchronous"] = True - return AbstractFileSystem.from_dict(fs_dict) - except AttributeError: - # Older fsspec specification used to_json rather than to_dict - import json - - fs_dict = json.loads(fs.to_json()) - fs_dict["asynchronous"] = True - return AbstractFileSystem.from_json(json.dumps(fs_dict)) + # Convert sync instance of an async fs to an async instance + import json - from fsspec.implementations.local import LocalFileSystem + fs_dict = json.loads(fs.to_json()) + fs_dict["asynchronous"] = True + return fsspec.AbstractFileSystem.from_json(json.dumps(fs_dict)) - if type(fs) is LocalFileSystem and not fs.auto_mkdir: + # Wrap sync filesystems with the async wrapper + if type(fs) is fsspec.implementations.local.LocalFileSystem and not fs.auto_mkdir: raise ValueError( f"LocalFilesystem {fs} was created with auto_mkdir=False but Zarr requires the filesystem to automatically create directories" ) - try: - from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper - - return AsyncFileSystemWrapper(fs) - except ImportError as e: + if fsspec_version < parse_version("2024.12.0"): raise ImportError( - f"The filesystem '{fs}' is synchronous, and the required " + "The filesystem '{fs}' is synchronous, and the required " "AsyncFileSystemWrapper is not available. Upgrade fsspec to version " "2024.12.0 or later to enable this functionality." - ) from e + ) + + if fsspec_version > parse_version("2025.2.0"): + return fsspec.implementations.asyn_wrapper.AsyncFileSystemWrapper(fs, asynchronous=True) + else: + return fsspec.implementations.asyn_wrapper.AsyncFileSystemWrapper(fs) class FsspecStore(Store): From d4d225600026c22ffb5f49c942427397e49cbb08 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Thu, 27 Feb 2025 10:22:14 -0500 Subject: [PATCH 16/16] Always use asynchronous=True in _make_async --- src/zarr/storage/_fsspec.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index 8aece4f377..ddb8e1c9ed 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -68,10 +68,7 @@ def _make_async(fs: AbstractFileSystem) -> AsyncFileSystem: "2024.12.0 or later to enable this functionality." ) - if fsspec_version > parse_version("2025.2.0"): - return fsspec.implementations.asyn_wrapper.AsyncFileSystemWrapper(fs, asynchronous=True) - else: - return fsspec.implementations.asyn_wrapper.AsyncFileSystemWrapper(fs) + return fsspec.implementations.asyn_wrapper.AsyncFileSystemWrapper(fs, asynchronous=True) class FsspecStore(Store):