diff --git a/docs/api/storage.rst b/docs/api/storage.rst index a2aff3b6ef..4321837449 100644 --- a/docs/api/storage.rst +++ b/docs/api/storage.rst @@ -35,6 +35,8 @@ Storage (``zarr.storage``) .. autoclass:: ABSStore +.. autoclass:: FSStore + .. autoclass:: ConsolidatedMetadataStore .. autofunction:: init_array diff --git a/requirements_dev_optional.txt b/requirements_dev_optional.txt index 5777ad56a0..4fb3c21b5b 100644 --- a/requirements_dev_optional.txt +++ b/requirements_dev_optional.txt @@ -18,4 +18,6 @@ pytest-cov==2.7.1 pytest-doctestplus==0.4.0 pytest-remotedata==0.3.2 h5py==2.10.0 -s3fs==0.3.4; python_version > '3.0' +s3fs==0.5.0; python_version > '3.6' +moto>=1.3.14; python_version > '3.6' +flask diff --git a/zarr/creation.py b/zarr/creation.py index 3958e2eb78..591feeed29 100644 --- a/zarr/creation.py +++ b/zarr/creation.py @@ -10,7 +10,7 @@ from zarr.n5 import N5Store from zarr.storage import (DirectoryStore, ZipStore, contains_array, contains_group, default_compressor, init_array, - normalize_storage_path) + normalize_storage_path, FSStore) def create(shape, chunks=True, dtype=None, compressor='default', @@ -127,12 +127,16 @@ def create(shape, chunks=True, dtype=None, compressor='default', return z -def normalize_store_arg(store, clobber=False, default=dict): +def normalize_store_arg(store, clobber=False, default=dict, storage_options=None): if store is None: return default() elif isinstance(store, str): + mode = 'w' if clobber else 'r' + if "://" in store or "::" in store: + return FSStore(store, mode=mode, **(storage_options or {})) + elif storage_options: + raise ValueError("storage_options passed with non-fsspec path") if store.endswith('.zip'): - mode = 'w' if clobber else 'r' return ZipStore(store, mode=mode) elif store.endswith('.n5'): return N5Store(store) @@ -353,7 +357,8 @@ def array(data, **kwargs): def open_array(store=None, mode='a', shape=None, chunks=True, dtype=None, compressor='default', fill_value=0, order='C', synchronizer=None, filters=None, cache_metadata=True, cache_attrs=True, path=None, - object_codec=None, chunk_store=None, **kwargs): + object_codec=None, chunk_store=None, storage_options=None, + **kwargs): """Open an array using file-mode-like semantics. Parameters @@ -399,6 +404,9 @@ def open_array(store=None, mode='a', shape=None, chunks=True, dtype=None, A codec to encode object arrays, only needed if dtype=object. chunk_store : MutableMapping or string, optional Store or path to directory in file system or name of zip file. + storage_options : dict + If using an fsspec URL to create the store, these will be passed to + the backend implementation. Ignored otherwise. Returns ------- @@ -435,9 +443,10 @@ def open_array(store=None, mode='a', shape=None, chunks=True, dtype=None, # handle polymorphic store arg clobber = mode == 'w' - store = normalize_store_arg(store, clobber=clobber) + store = normalize_store_arg(store, clobber=clobber, storage_options=storage_options) if chunk_store is not None: - chunk_store = normalize_store_arg(chunk_store, clobber=clobber) + chunk_store = normalize_store_arg(chunk_store, clobber=clobber, + storage_options=storage_options) path = normalize_storage_path(path) # API compatibility with h5py diff --git a/zarr/hierarchy.py b/zarr/hierarchy.py index 4fab679318..2c5d25be82 100644 --- a/zarr/hierarchy.py +++ b/zarr/hierarchy.py @@ -1032,8 +1032,9 @@ def move(self, source, dest): self._write_op(self._move_nosync, source, dest) -def _normalize_store_arg(store, clobber=False): - return normalize_store_arg(store, clobber=clobber, default=MemoryStore) +def _normalize_store_arg(store, clobber=False, storage_options=None): + return normalize_store_arg(store, clobber=clobber, default=MemoryStore, + storage_options=storage_options) def group(store=None, overwrite=False, chunk_store=None, @@ -1095,7 +1096,7 @@ def group(store=None, overwrite=False, chunk_store=None, def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=None, - chunk_store=None): + chunk_store=None, storage_options=None): """Open a group using file-mode-like semantics. Parameters @@ -1117,6 +1118,9 @@ def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=N Group path within store. chunk_store : MutableMapping or string, optional Store or path to directory in file system or name of zip file. + storage_options : dict + If using an fsspec URL to create the store, these will be passed to + the backend implementation. Ignored otherwise. Returns ------- @@ -1139,9 +1143,11 @@ def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=N """ # handle polymorphic store arg - store = _normalize_store_arg(store) + clobber = mode != 'r' + store = _normalize_store_arg(store, clobber=clobber, storage_options=storage_options) if chunk_store is not None: - chunk_store = _normalize_store_arg(chunk_store) + chunk_store = _normalize_store_arg(chunk_store, clobber=clobber, + storage_options=storage_options) path = normalize_storage_path(path) # ensure store is initialized diff --git a/zarr/storage.py b/zarr/storage.py index 738bf548ee..cd4b37eee2 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -946,6 +946,128 @@ def atexit_rmglob(path, rmtree(p) +class FSStore(MutableMapping): + """Wraps an fsspec.FSMap to give access to arbitrary filesystems + + Requires that ``fsspec`` is installed, as well as any additional + requirements for the protocol chosen. + + Parameters + ---------- + url : str + The destination to map. Should include protocol and path, + like "s3://bucket/root" + normalize_keys : bool + key_separator : str + Character to use when constructing the target path strings + for data keys + mode : str + "w" for writable, "r" for read-only + exceptions : list of Exception subclasses + When accessing data, any of these exceptions will be treated + as a missing key + storage_options : passed to the fsspec implementation + """ + + def __init__(self, url, normalize_keys=True, key_separator='.', + mode='w', + exceptions=(KeyError, PermissionError, IOError), + **storage_options): + import fsspec + self.path = url + self.normalize_keys = normalize_keys + self.key_separator = key_separator + self.map = fsspec.get_mapper(url, **storage_options) + self.fs = self.map.fs # for direct operations + self.mode = mode + self.exceptions = exceptions + if self.fs.exists(url) and not self.fs.isdir(url): + err_fspath_exists_notdir(url) + + def _normalize_key(self, key): + key = normalize_storage_path(key).lstrip('/') + if key: + *bits, end = key.split('/') + key = '/'.join(bits + [end.replace('.', self.key_separator)]) + return key.lower() if self.normalize_keys else key + + def __getitem__(self, key): + key = self._normalize_key(key) + try: + return self.map[key] + except self.exceptions as e: + raise KeyError(key) from e + + def __setitem__(self, key, value): + if self.mode == 'r': + err_read_only() + key = self._normalize_key(key) + path = self.dir_path(key) + value = ensure_contiguous_ndarray(value) + try: + if self.fs.isdir(path): + self.fs.rm(path, recursive=True) + self.map[key] = value + except self.exceptions as e: + raise KeyError(key) from e + + def __delitem__(self, key): + if self.mode == 'r': + err_read_only() + key = self._normalize_key(key) + path = self.dir_path(key) + if self.fs.isdir(path): + self.fs.rm(path, recursive=True) + else: + del self.map[key] + + def __contains__(self, key): + key = self._normalize_key(key) + return key in self.map + + def __eq__(self, other): + return (type(self) == type(other) and self.map == other.map + and self.mode == other.mode) + + def keys(self): + return iter(self.map) + + def __iter__(self): + return self.keys() + + def __len__(self): + return len(list(self.keys())) + + def dir_path(self, path=None): + store_path = normalize_storage_path(path) + return self.map._key_to_str(store_path) + + def listdir(self, path=None): + dir_path = self.dir_path(path) + try: + out = sorted(p.rstrip('/').rsplit('/', 1)[-1] + for p in self.fs.ls(dir_path, detail=False)) + return out + except IOError: + return [] + + def rmdir(self, path=None): + if self.mode == 'r': + err_read_only() + store_path = self.dir_path(path) + if self.fs.isdir(store_path): + self.fs.rm(store_path, recursive=True) + + def getsize(self, path=None): + store_path = self.dir_path(path) + return self.fs.du(store_path, True, True) + + def clear(self): + if self.mode == 'r': + err_read_only() + self.map.clear() + + class TempStore(DirectoryStore): """Directory store using a temporary directory for storage. diff --git a/zarr/tests/test_storage.py b/zarr/tests/test_storage.py index a0c8412e23..d8730b2e7d 100644 --- a/zarr/tests/test_storage.py +++ b/zarr/tests/test_storage.py @@ -29,7 +29,8 @@ array_meta_key, atexit_rmglob, atexit_rmtree, attrs_key, default_compressor, getsize, group_meta_key, init_array, init_group, migrate_1to2) -from zarr.tests.util import CountingDict, skip_test_env_var +from zarr.storage import FSStore +from zarr.tests.util import CountingDict, have_fsspec, skip_test_env_var @contextmanager @@ -827,6 +828,141 @@ def test_normalize_keys(self): assert 'foo' in store +@pytest.mark.skipif(have_fsspec is False, reason="needs fsspec") +class TestFSStore(StoreTests, unittest.TestCase): + + def create_store(self, normalize_keys=False): + path = tempfile.mkdtemp() + atexit.register(atexit_rmtree, path) + store = FSStore(path, normalize_keys=normalize_keys) + return store + + def test_complex(self): + path1 = tempfile.mkdtemp() + path2 = tempfile.mkdtemp() + store = FSStore("simplecache::file://" + path1, + simplecache={"same_names": True, "cache_storage": path2}) + assert not store + assert not os.listdir(path1) + assert not os.listdir(path2) + store['foo'] = b"hello" + assert 'foo' in os.listdir(path1) + assert 'foo' in store + assert not os.listdir(path2) + assert store["foo"] == b"hello" + assert 'foo' in os.listdir(path2) + + def test_not_fsspec(self): + import zarr + path = tempfile.mkdtemp() + with pytest.raises(ValueError, match="storage_options"): + zarr.open_array(path, mode='w', storage_options={"some": "kwargs"}) + with pytest.raises(ValueError, match="storage_options"): + zarr.open_group(path, mode='w', storage_options={"some": "kwargs"}) + zarr.open_array("file://" + path, mode='w', shape=(1,), dtype="f8") + + def test_create(self): + import zarr + path1 = tempfile.mkdtemp() + path2 = tempfile.mkdtemp() + g = zarr.open_group("file://" + path1, mode='w', + storage_options={"auto_mkdir": True}) + a = g.create_dataset("data", shape=(8,)) + a[:4] = [0, 1, 2, 3] + assert "data" in os.listdir(path1) + assert ".zgroup" in os.listdir(path1) + + g = zarr.open_group("simplecache::file://" + path1, mode='r', + storage_options={"cache_storage": path2, + "same_names": True}) + assert g.data[:].tolist() == [0, 1, 2, 3, 0, 0, 0, 0] + with pytest.raises(PermissionError): + g.data[:] = 1 + + def test_read_only(self): + path = tempfile.mkdtemp() + atexit.register(atexit_rmtree, path) + store = FSStore(path) + store['foo'] = b"bar" + + store = FSStore(path, mode='r') + + with pytest.raises(PermissionError): + store['foo'] = b"hex" + + with pytest.raises(PermissionError): + del store['foo'] + + with pytest.raises(PermissionError): + store.clear() + + with pytest.raises(PermissionError): + store.rmdir("anydir") + + assert store['foo'] == b"bar" + + filepath = os.path.join(path, "foo") + with pytest.raises(ValueError): + FSStore(filepath, mode='r') + + def test_eq(self): + store1 = FSStore("anypath") + store2 = FSStore("anypath") + assert store1 == store2 + + @pytest.mark.usefixtures("s3") + def test_s3(self): + import zarr + g = zarr.open_group("s3://test/out.zarr", mode='w', + storage_options=self.s3so) + a = g.create_dataset("data", shape=(8,)) + a[:4] = [0, 1, 2, 3] + + g = zarr.open_group("s3://test/out.zarr", mode='r', + storage_options=self.s3so) + + assert g.data[:].tolist() == [0, 1, 2, 3, 0, 0, 0, 0] + + +@pytest.fixture() +def s3(request): + # writable local S3 system + import shlex + import subprocess + import time + if "BOTO_CONFIG" not in os.environ: # pragma: no cover + os.environ["BOTO_CONFIG"] = "/dev/null" + if "AWS_ACCESS_KEY_ID" not in os.environ: # pragma: no cover + os.environ["AWS_ACCESS_KEY_ID"] = "foo" + if "AWS_SECRET_ACCESS_KEY" not in os.environ: # pragma: no cover + os.environ["AWS_SECRET_ACCESS_KEY"] = "bar" + requests = pytest.importorskip("requests") + s3fs = pytest.importorskip("s3fs") + pytest.importorskip("moto") + + port = 5555 + endpoint_uri = 'http://127.0.0.1:%s/' % port + proc = subprocess.Popen(shlex.split("moto_server s3 -p %s" % port)) + + timeout = 5 + while timeout > 0: + try: + r = requests.get(endpoint_uri) + if r.ok: + break + except Exception: # pragma: no cover + pass + timeout -= 0.1 # pragma: no cover + time.sleep(0.1) # pragma: no cover + s3so = dict(client_kwargs={'endpoint_url': endpoint_uri}) + s3 = s3fs.S3FileSystem(anon=False, **s3so) + s3.mkdir("test") + request.cls.s3so = s3so + yield + proc.terminate() + proc.wait() + + class TestNestedDirectoryStore(TestDirectoryStore, unittest.TestCase): def create_store(self, normalize_keys=False): @@ -961,6 +1097,17 @@ def test_filters(self): init_array(store, shape=1000, chunks=100, filters=filters) +@pytest.mark.skipif(have_fsspec is False, reason="needs fsspec") +class TestNestedFSStore(TestNestedDirectoryStore): + + def create_store(self, normalize_keys=False): + path = tempfile.mkdtemp() + atexit.register(atexit_rmtree, path) + store = FSStore(path, normalize_keys=normalize_keys, + key_separator='/', auto_mkdir=True) + return store + + class TestTempStore(StoreTests, unittest.TestCase): def create_store(self): diff --git a/zarr/tests/util.py b/zarr/tests/util.py index 7e8e719157..a61461d4a9 100644 --- a/zarr/tests/util.py +++ b/zarr/tests/util.py @@ -1,7 +1,9 @@ # -*- coding: utf-8 -*- import collections +from distutils.version import LooseVersion from collections.abc import MutableMapping import os +import sys import pytest @@ -46,3 +48,13 @@ def skip_test_env_var(name): """ value = os.environ.get(name, '0') return pytest.mark.skipif(value == '0', reason='Tests not enabled via environment variable') + + +try: + if LooseVersion(sys.version) >= LooseVersion("3.6"): + import fsspec # noqa: F401 + have_fsspec = True + else: # pragma: no cover + have_fsspec = False +except ImportError: # pragma: no cover + have_fsspec = False