Skip to content

Fsspec writing #630

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Nov 3, 2020
35 changes: 33 additions & 2 deletions docs/tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -854,13 +854,44 @@ please raise an issue on the GitHub issue tracker with any profiling data you
can provide, as there may be opportunities to optimise further either within
Zarr or within the mapping interface to the storage.

IO with ``fsspec``
~~~~~~~~~~~~~~~~~~

As of version 2.5, zarr supports passing URLs directly to `fsspec`_,
and having it create the "mapping" instance automatically. This means, that
for all of the backend storage implementations `supported by fsspec`_,
you can skip importing and configuring the storage explicitly.
For example::

>>> g = zarr.open_group("s3://zarr-demo/store", storage_options={'anon': True}) # doctest: +SKIP
>>> g['foo/bar/baz'][:].tobytes() # doctest: +SKIP
b'Hello from the cloud!'

The provision of the protocol specifier "s3://" will select the correct backend.
Notice the kwargs ``storage_options``, used to pass parameters to that backend.

As of version 2.6, write mode and complex URLs are also supported, such as::

>>> g = zarr.open_group("simplecache::s3://zarr-demo/store",
... storage_options={"s3": {'anon': True}}) # doctest: +SKIP
>>> g['foo/bar/baz'][:].tobytes() # downloads target file # doctest: +SKIP
b'Hello from the cloud!'
>>> g['foo/bar/baz'][:].tobytes() # uses cached file # doctest: +SKIP
b'Hello from the cloud!'

The second invocation here will be much faster. Note that the ``storage_options``
have become more complex here, to account for the two parts of the supplied
URL.

.. _fsspec: https://filesystem-spec.readthedocs.io/en/latest/

.. _supported by fsspec: https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations

.. _tutorial_copy:

Consolidating metadata
~~~~~~~~~~~~~~~~~~~~~~

(This is an experimental feature.)

Since there is a significant overhead for every connection to a cloud object
store such as S3, the pattern described in the previous section may incur
significant latency while scanning the metadata of the array hierarchy, even
Expand Down
4 changes: 2 additions & 2 deletions zarr/convenience.py
Original file line number Diff line number Diff line change
Expand Up @@ -1165,12 +1165,12 @@ def open_consolidated(store, metadata_key='.zmetadata', mode='r+', **kwargs):
from .storage import ConsolidatedMetadataStore

# normalize parameters
store = normalize_store_arg(store)
store = normalize_store_arg(store, storage_options=kwargs.get("storage_options", None))
if mode not in {'r', 'r+'}:
raise ValueError("invalid mode, expected either 'r' or 'r+'; found {!r}"
.format(mode))

# setup metadata sotre
# setup metadata store
meta_store = ConsolidatedMetadataStore(store, metadata_key=metadata_key)

# pass through
Expand Down
76 changes: 52 additions & 24 deletions zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1535,25 +1535,48 @@ def _set_selection(self, indexer, value, fields=None):
check_array_shape('value', value, sel_shape)

# iterate over chunks in range
for chunk_coords, chunk_selection, out_selection in indexer:
if not hasattr(self.store, "setitems") or self._synchronizer is not None:
# iterative approach
for chunk_coords, chunk_selection, out_selection in indexer:

# extract data to store
if sel_shape == ():
chunk_value = value
elif is_scalar(value, self._dtype):
chunk_value = value
else:
chunk_value = value[out_selection]
# handle missing singleton dimensions
if indexer.drop_axes:
item = [slice(None)] * self.ndim
for a in indexer.drop_axes:
item[a] = np.newaxis
item = tuple(item)
chunk_value = chunk_value[item]

# put data
self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields)
# extract data to store
if sel_shape == ():
chunk_value = value
elif is_scalar(value, self._dtype):
chunk_value = value
else:
chunk_value = value[out_selection]
# handle missing singleton dimensions
if indexer.drop_axes:
item = [slice(None)] * self.ndim
for a in indexer.drop_axes:
item[a] = np.newaxis
item = tuple(item)
chunk_value = chunk_value[item]

# put data
self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields)
else:
lchunk_coords, lchunk_selection, lout_selection = zip(*indexer)
chunk_values = []
for out_selection in lout_selection:
if sel_shape == ():
chunk_values.append(value)
elif is_scalar(value, self._dtype):
chunk_values.append(value)
else:
cv = value[out_selection]
# handle missing singleton dimensions
if indexer.drop_axes: # pragma: no cover
item = [slice(None)] * self.ndim
for a in indexer.drop_axes:
item[a] = np.newaxis
item = tuple(item)
cv = chunk_value[item]
chunk_values.append(cv)

self._chunk_setitems(lchunk_coords, lchunk_selection, chunk_values,
fields=fields)

def _process_chunk(self, out, cdata, chunk_selection, drop_axes,
out_is_ndarray, fields, out_selection):
Expand Down Expand Up @@ -1677,6 +1700,12 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection,
fill_value = self._fill_value
out[out_select] = fill_value

def _chunk_setitems(self, lchunk_coords, lchunk_selection, values, fields=None):
ckeys = [self._chunk_key(co) for co in lchunk_coords]
cdatas = [self._process_for_setitem(key, sel, val, fields=fields)
for key, sel, val in zip(ckeys, lchunk_selection, values)]
self.chunk_store.setitems({k: v for k, v in zip(ckeys, cdatas)})

def _chunk_setitem(self, chunk_coords, chunk_selection, value, fields=None):
"""Replace part or whole of a chunk.

Expand Down Expand Up @@ -1704,10 +1733,12 @@ def _chunk_setitem(self, chunk_coords, chunk_selection, value, fields=None):
fields=fields)

def _chunk_setitem_nosync(self, chunk_coords, chunk_selection, value, fields=None):

# obtain key for chunk storage
ckey = self._chunk_key(chunk_coords)
cdata = self._process_for_setitem(ckey, chunk_selection, value, fields=fields)
# store
self.chunk_store[ckey] = cdata

def _process_for_setitem(self, ckey, chunk_selection, value, fields=None):
if is_total_slice(chunk_selection, self._chunks) and not fields:
# totally replace chunk

Expand Down Expand Up @@ -1762,10 +1793,7 @@ def _chunk_setitem_nosync(self, chunk_coords, chunk_selection, value, fields=Non
chunk[chunk_selection] = value

# encode chunk
cdata = self._encode_chunk(chunk)

# store
self.chunk_store[ckey] = cdata
return self._encode_chunk(chunk)

def _chunk_key(self, chunk_coords):
return self._key_prefix + '.'.join(map(str, chunk_coords))
Expand Down
12 changes: 9 additions & 3 deletions zarr/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1021,14 +1021,14 @@ def __init__(self, url, normalize_keys=True, key_separator='.',
exceptions=(KeyError, PermissionError, IOError),
**storage_options):
import fsspec
self.path = url
self.normalize_keys = normalize_keys
self.key_separator = key_separator
self.map = fsspec.get_mapper(url, **storage_options)
self.fs = self.map.fs # for direct operations
self.path = self.fs._strip_protocol(url)
self.mode = mode
self.exceptions = exceptions
if self.fs.exists(url) and not self.fs.isdir(url):
if self.fs.exists(self.path) and not self.fs.isdir(self.path):
raise FSPathExistNotDir(url)

def _normalize_key(self, key):
Expand All @@ -1049,16 +1049,22 @@ def __getitem__(self, key):
except self.exceptions as e:
raise KeyError(key) from e

def setitems(self, values):
if self.mode == 'r':
raise ReadOnlyError()
values = {self._normalize_key(key): val for key, val in values.items()}
self.map.setitems(values)

def __setitem__(self, key, value):
if self.mode == 'r':
raise ReadOnlyError()
key = self._normalize_key(key)
path = self.dir_path(key)
value = ensure_contiguous_ndarray(value)
try:
if self.fs.isdir(path):
self.fs.rm(path, recursive=True)
self.map[key] = value
self.fs.invalidate_cache(self.fs._parent(path))
except self.exceptions as e:
raise KeyError(key) from e

Expand Down
35 changes: 28 additions & 7 deletions zarr/tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -966,17 +966,37 @@ def test_s3_complex(self):
fill_value=-1, chunks=(1, 1, 1))
expected[0] = 0
expected[3] = 3
expected[6, 6, 6] = 6
a[6, 6, 6] = 6
a[:4] = expected[:4]

a = g.create_dataset("data_f", shape=(8, ), chunks=(1,),
b = g.create_dataset("data_f", shape=(8, ), chunks=(1,),
dtype=[('foo', 'S3'), ('bar', 'i4')],
fill_value=(b"b", 1))
a[:4] = (b"aaa", 2)
g = zarr.open_group("s3://test/out.zarr", mode='r',
storage_options=self.s3so)
b[:4] = (b"aaa", 2)
g2 = zarr.open_group("s3://test/out.zarr", mode='r',
storage_options=self.s3so)

assert (g2.data[:] == expected).all()

a[:] = 5 # write with scalar
assert (a[:] == 5).all()

assert g2.data_f['foo'].tolist() == [b"aaa"] * 4 + [b"b"] * 4
with pytest.raises(PermissionError):
g2.data[:] = 5

with pytest.raises(PermissionError):
g2.store.setitems({})

with pytest.raises(PermissionError):
# even though overwrite=True, store is read-only, so fails
g2.create_dataset("data", shape=(8, 8, 8), mode='w',
fill_value=-1, chunks=(1, 1, 1), overwrite=True)

assert (g.data[:] == expected).all()
assert g.data_f['foo'].tolist() == [b"aaa"] * 4 + [b"b"] * 4
a = g.create_dataset("data", shape=(8, 8, 8), mode='w',
fill_value=-1, chunks=(1, 1, 1), overwrite=True)
assert (a[:] == -np.ones((8, 8, 8))).all()


@pytest.fixture()
Expand All @@ -997,7 +1017,8 @@ def s3(request):

port = 5555
endpoint_uri = 'http://127.0.0.1:%s/' % port
proc = subprocess.Popen(shlex.split("moto_server s3 -p %s" % port))
proc = subprocess.Popen(shlex.split("moto_server s3 -p %s" % port),
stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL)

timeout = 5
while timeout > 0:
Expand Down