Skip to content

Default to RemoteStore for fsspec URIs #2198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Sep 19, 2024
61 changes: 49 additions & 12 deletions src/zarr/api/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ async def open(
zarr_version: ZarrFormat | None = None, # deprecated
zarr_format: ZarrFormat | None = None,
path: str | None = None,
storage_options: dict[str, Any] | None = None,
**kwargs: Any, # TODO: type kwargs as valid args to open_array
) -> AsyncArray | AsyncGroup:
"""Convenience function to open a group or array using file-mode-like semantics.
Expand All @@ -211,6 +212,9 @@ async def open(
The zarr format to use when saving.
path : str or None, optional
The path within the store to open.
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.
**kwargs
Additional parameters are passed through to :func:`zarr.creation.open_array` or
:func:`zarr.hierarchy.open_group`.
Expand All @@ -221,7 +225,7 @@ async def open(
Return type depends on what exists in the given store.
"""
zarr_format = _handle_zarr_version_or_format(zarr_version=zarr_version, zarr_format=zarr_format)
store_path = await make_store_path(store, mode=mode)
store_path = await make_store_path(store, mode=mode, storage_options=storage_options)

if path is not None:
store_path = store_path / path
Expand Down Expand Up @@ -276,6 +280,7 @@ async def save_array(
zarr_version: ZarrFormat | None = None, # deprecated
zarr_format: ZarrFormat | None = None,
path: str | None = None,
storage_options: dict[str, Any] | None = None,
**kwargs: Any, # TODO: type kwargs as valid args to create
) -> None:
"""Convenience function to save a NumPy array to the local file system, following a
Expand All @@ -291,6 +296,9 @@ async def save_array(
The zarr format to use when saving.
path : str or None, optional
The path within the store where the array will be saved.
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.
kwargs
Passed through to :func:`create`, e.g., compressor.
"""
Expand All @@ -299,7 +307,7 @@ async def save_array(
or _default_zarr_version()
)

store_path = await make_store_path(store, mode="w")
store_path = await make_store_path(store, mode="w", storage_options=storage_options)
if path is not None:
store_path = store_path / path
new = await AsyncArray.create(
Expand All @@ -319,6 +327,7 @@ async def save_group(
zarr_version: ZarrFormat | None = None, # deprecated
zarr_format: ZarrFormat | None = None,
path: str | None = None,
storage_options: dict[str, Any] | None = None,
**kwargs: NDArrayLike,
) -> None:
"""Convenience function to save several NumPy arrays to the local file system, following a
Expand All @@ -334,22 +343,40 @@ async def save_group(
The zarr format to use when saving.
path : str or None, optional
Path within the store where the group will be saved.
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.
kwargs
NumPy arrays with data to save.
"""
zarr_format = (
_handle_zarr_version_or_format(zarr_version=zarr_version, zarr_format=zarr_format)
_handle_zarr_version_or_format(
zarr_version=zarr_version,
zarr_format=zarr_format,
)
or _default_zarr_version()
)

if len(args) == 0 and len(kwargs) == 0:
raise ValueError("at least one array must be provided")
aws = []
for i, arr in enumerate(args):
aws.append(save_array(store, arr, zarr_format=zarr_format, path=f"{path}/arr_{i}"))
aws.append(
save_array(
store,
arr,
zarr_format=zarr_format,
path=f"{path}/arr_{i}",
storage_options=storage_options,
)
)
for k, arr in kwargs.items():
_path = f"{path}/{k}" if path is not None else k
aws.append(save_array(store, arr, zarr_format=zarr_format, path=_path))
aws.append(
save_array(
store, arr, zarr_format=zarr_format, path=_path, storage_options=storage_options
)
)
await asyncio.gather(*aws)


Expand Down Expand Up @@ -418,6 +445,7 @@ async def group(
zarr_format: ZarrFormat | None = None,
meta_array: Any | None = None, # not used
attributes: dict[str, JSON] | None = None,
storage_options: dict[str, Any] | None = None,
) -> AsyncGroup:
"""Create a group.

Expand All @@ -444,6 +472,9 @@ async def group(
to users. Use `numpy.empty(())` by default.
zarr_format : {2, 3, None}, optional
The zarr format to use when saving.
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.

Returns
-------
Expand All @@ -453,7 +484,7 @@ async def group(

zarr_format = _handle_zarr_version_or_format(zarr_version=zarr_version, zarr_format=zarr_format)

store_path = await make_store_path(store)
store_path = await make_store_path(store, storage_options=storage_options)
if path is not None:
store_path = store_path / path

Expand Down Expand Up @@ -488,7 +519,7 @@ async def open_group(
synchronizer: Any = None, # not used
path: str | None = None,
chunk_store: StoreLike | None = None, # not used
storage_options: dict[str, Any] | None = None, # not used
storage_options: dict[str, Any] | None = None,
zarr_version: ZarrFormat | None = None, # deprecated
zarr_format: ZarrFormat | None = None,
meta_array: Any | None = None, # not used
Expand Down Expand Up @@ -548,10 +579,8 @@ async def open_group(
warnings.warn("meta_array is not yet implemented", RuntimeWarning, stacklevel=2)
if chunk_store is not None:
warnings.warn("chunk_store is not yet implemented", RuntimeWarning, stacklevel=2)
if storage_options is not None:
warnings.warn("storage_options is not yet implemented", RuntimeWarning, stacklevel=2)

store_path = await make_store_path(store, mode=mode)
store_path = await make_store_path(store, mode=mode, storage_options=storage_options)
if path is not None:
store_path = store_path / path

Expand Down Expand Up @@ -603,6 +632,7 @@ async def create(
) = None,
codecs: Iterable[Codec | dict[str, JSON]] | None = None,
dimension_names: Iterable[str] | None = None,
storage_options: dict[str, Any] | None = None,
**kwargs: Any,
) -> AsyncArray:
"""Create an array.
Expand Down Expand Up @@ -674,6 +704,9 @@ async def create(
to users. Use `numpy.empty(())` by default.

.. versionadded:: 2.13
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.

Returns
-------
Expand Down Expand Up @@ -725,7 +758,7 @@ async def create(
warnings.warn("meta_array is not yet implemented", RuntimeWarning, stacklevel=2)

mode = kwargs.pop("mode", cast(AccessModeLiteral, "r" if read_only else "w"))
store_path = await make_store_path(store, mode=mode)
store_path = await make_store_path(store, mode=mode, storage_options=storage_options)
if path is not None:
store_path = store_path / path

Expand Down Expand Up @@ -875,6 +908,7 @@ async def open_array(
zarr_version: ZarrFormat | None = None, # deprecated
zarr_format: ZarrFormat | None = None,
path: PathLike | None = None,
storage_options: dict[str, Any] | None = None,
**kwargs: Any, # TODO: type kwargs as valid args to save
) -> AsyncArray:
"""Open an array using file-mode-like semantics.
Expand All @@ -887,6 +921,9 @@ async def open_array(
The zarr format to use when saving.
path : string, optional
Path in store to array.
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.
**kwargs
Any keyword arguments to pass to the array constructor.

Expand All @@ -896,7 +933,7 @@ async def open_array(
The opened array.
"""

store_path = await make_store_path(store)
store_path = await make_store_path(store, storage_options=storage_options)
if path is not None:
store_path = store_path / path

Expand Down
2 changes: 2 additions & 0 deletions src/zarr/api/synchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def save_group(
zarr_version: ZarrFormat | None = None, # deprecated
zarr_format: ZarrFormat | None = None,
path: str | None = None,
storage_options: dict[str, Any] | None = None,
**kwargs: NDArrayLike,
) -> None:
return sync(
Expand All @@ -143,6 +144,7 @@ def save_group(
zarr_version=zarr_version,
zarr_format=zarr_format,
path=path,
storage_options=storage_options,
**kwargs,
)
)
Expand Down
57 changes: 49 additions & 8 deletions src/zarr/store/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from zarr.store.local import LocalStore
from zarr.store.memory import MemoryStore

# from zarr.store.remote import RemoteStore

if TYPE_CHECKING:
from zarr.core.buffer import BufferPrototype
from zarr.core.common import AccessModeLiteral
Expand Down Expand Up @@ -75,30 +77,69 @@ def __eq__(self, other: Any) -> bool:


async def make_store_path(
store_like: StoreLike | None, *, mode: AccessModeLiteral | None = None
store_like: StoreLike | None,
*,
mode: AccessModeLiteral | None = None,
storage_options: dict[str, Any] | None = None,
) -> StorePath:
from zarr.store.remote import RemoteStore # circular import

used_storage_options = False

if isinstance(store_like, StorePath):
if mode is not None:
assert AccessMode.from_literal(mode) == store_like.store.mode
return store_like
result = store_like
elif isinstance(store_like, Store):
if mode is not None:
assert AccessMode.from_literal(mode) == store_like.mode
await store_like._ensure_open()
return StorePath(store_like)
result = StorePath(store_like)
elif store_like is None:
if mode is None:
mode = "w" # exception to the default mode = 'r'
return StorePath(await MemoryStore.open(mode=mode))
result = StorePath(await MemoryStore.open(mode=mode))
elif isinstance(store_like, Path):
return StorePath(await LocalStore.open(root=store_like, mode=mode or "r"))
result = StorePath(await LocalStore.open(root=store_like, mode=mode or "r"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can pass **storage_options to LocalStore as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be missing something, but I don't think that'll work. LocalStore.open will call LocalStore.__init__, which just takes root and mode, which are passed as regular args here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. I was thinking auto_mkdir would be passed through but if that's not the case, let's not get distracted here.

elif isinstance(store_like, str):
return StorePath(await LocalStore.open(root=Path(store_like), mode=mode or "r"))
storage_options = storage_options or {}

if _is_fsspec_uri(store_like):
used_storage_options = True
result = StorePath(
RemoteStore.from_url(store_like, storage_options=storage_options, mode=mode or "r")
)
else:
result = StorePath(await LocalStore.open(root=Path(store_like), mode=mode or "r"))
elif isinstance(store_like, dict):
# We deliberate only consider dict[str, Buffer] here, and not arbitrary mutable mappings.
# By only allowing dictionaries, which are in-memory, we know that MemoryStore appropriate.
return StorePath(await MemoryStore.open(store_dict=store_like, mode=mode))
raise TypeError
result = StorePath(await MemoryStore.open(store_dict=store_like, mode=mode))
else:
msg = f"Unsupported type for store_like: '{type(store_like).__name__}'" # type: ignore[unreachable]
raise TypeError(msg)

if storage_options and not used_storage_options:
msg = "'storage_options' was provided but unused. 'storage_options' is only used for fsspec filesystem stores."
raise TypeError(msg)

return result


def _is_fsspec_uri(uri: str) -> bool:
"""
Check if a URI looks like a non-local fsspec URI.

Examples
--------
>>> _is_fsspec_uri("s3://bucket")
True
>>> _is_fsspec_uri("my-directory")
False
>>> _is_fsspec_uri("local://my-directory")
False
"""
return "://" in uri or "::" in uri and "local://" not in uri


async def ensure_no_existing_node(store_path: StorePath, zarr_format: ZarrFormat) -> None:
Expand Down
Loading