Skip to content

Commit 3ae93ac

Browse files
rabernatshoyer
authored andcommitted
Zarr consolidated (#2559)
* wip: getting started * preliminary support for zarr consolidated metadata * update zarr dev repo * add consolidate to close * doc updates * skip tests based on zarr version * fix doc typos * fix PEP8 issues * fix test skipping * fixed integration test * update version check * rename keyword arg * Update whats-new.rst * instructions for consolidating existing stores
1 parent 0d6056e commit 3ae93ac

File tree

8 files changed

+119
-34
lines changed

8 files changed

+119
-34
lines changed

ci/requirements-py36-zarr-dev.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@ dependencies:
1818
- pip:
1919
- coveralls
2020
- pytest-cov
21-
- git+https://github.com/alimanfoo/zarr.git
21+
- git+https://github.com/zarr-developers/zarr.git

doc/io.rst

+30-1
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,35 @@ For example:
635635
Not all native zarr compression and filtering options have been tested with
636636
xarray.
637637

638+
Consolidated Metadata
639+
~~~~~~~~~~~~~~~~~~~~~
640+
641+
Xarray needs to read all of the zarr metadata when it opens a dataset.
642+
In some storage mediums, such as with cloud object storage (e.g. amazon S3),
643+
this can introduce significant overhead, because two separate HTTP calls to the
644+
object store must be made for each variable in the dataset.
645+
With version 2.3, zarr will support a feature called *consolidated metadata*,
646+
which allows all metadata for the entire dataset to be stored with a single
647+
key (by default called ``.zmetadata``). This can drastically speed up
648+
opening the store. (For more information on this feature, consult the
649+
`zarr docs <https://zarr.readthedocs.io/en/latest/tutorial.html#consolidating-metadata>`_.)
650+
651+
If you have zarr version 2.3 or greater, xarray can write and read stores
652+
with consolidated metadata. To write consolidated metadata, pass the
653+
``consolidated=True`` option to the
654+
:py:attr:`Dataset.to_zarr <xarray.Dataset.to_zarr>` method::
655+
656+
ds.to_zarr('foo.zarr', consolidated=True)
657+
658+
To read a consolidated store, pass the ``consolidated=True`` option to
659+
:py:func:`~xarray.open_zarr`::
660+
661+
ds = xr.open_zarr('foo.zarr', consolidated=True)
662+
663+
Xarray can't perform consolidation on pre-existing zarr datasets. This should
664+
be done directly from zarr, as described in the
665+
`zarr docs <https://zarr.readthedocs.io/en/latest/tutorial.html#consolidating-metadata>`_.
666+
638667
.. _io.cfgrib:
639668

640669
GRIB format via cfgrib
@@ -678,7 +707,7 @@ Formats supported by PseudoNetCDF
678707
---------------------------------
679708

680709
xarray can also read CAMx, BPCH, ARL PACKED BIT, and many other file
681-
formats supported by PseudoNetCDF_, if PseudoNetCDF is installed.
710+
formats supported by PseudoNetCDF_, if PseudoNetCDF is installed.
682711
PseudoNetCDF can also provide Climate Forecasting Conventions to
683712
CMAQ files. In addition, PseudoNetCDF can automatically register custom
684713
readers that subclass PseudoNetCDF.PseudoNetCDFFile. PseudoNetCDF can

doc/whats-new.rst

+8-6
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ Breaking changes
3636
Enhancements
3737
~~~~~~~~~~~~
3838

39+
- Ability to read and write consolidated metadata in zarr stores (:issue:`2558`).
40+
By `Ryan Abernathey <https://github.com/rabernat>`_.
3941
- :py:class:`CFTimeIndex` uses slicing for string indexing when possible (like
4042
:py:class:`pandas.DatetimeIndex`), which avoids unnecessary copies.
4143
By `Stephan Hoyer <https://github.com/shoyer>`_
@@ -56,15 +58,15 @@ Breaking changes
5658
- ``Dataset.T`` has been removed as a shortcut for :py:meth:`Dataset.transpose`.
5759
Call :py:meth:`Dataset.transpose` directly instead.
5860
- Iterating over a ``Dataset`` now includes only data variables, not coordinates.
59-
Similarily, calling ``len`` and ``bool`` on a ``Dataset`` now
61+
Similarily, calling ``len`` and ``bool`` on a ``Dataset`` now
6062
includes only data variables.
6163
- ``DataArray.__contains__`` (used by Python's ``in`` operator) now checks
62-
array data, not coordinates.
64+
array data, not coordinates.
6365
- The old resample syntax from before xarray 0.10, e.g.,
6466
``data.resample('1D', dim='time', how='mean')``, is no longer supported will
6567
raise an error in most cases. You need to use the new resample syntax
6668
instead, e.g., ``data.resample(time='1D').mean()`` or
67-
``data.resample({'time': '1D'}).mean()``.
69+
``data.resample({'time': '1D'}).mean()``.
6870

6971

7072
- New deprecations (behavior will be changed in xarray 0.12):
@@ -101,13 +103,13 @@ Breaking changes
101103
than by default trying to coerce them into ``np.datetime64[ns]`` objects.
102104
A :py:class:`~xarray.CFTimeIndex` will be used for indexing along time
103105
coordinates in these cases.
104-
- A new method :py:meth:`~xarray.CFTimeIndex.to_datetimeindex` has been added
106+
- A new method :py:meth:`~xarray.CFTimeIndex.to_datetimeindex` has been added
105107
to aid in converting from a :py:class:`~xarray.CFTimeIndex` to a
106108
:py:class:`pandas.DatetimeIndex` for the remaining use-cases where
107109
using a :py:class:`~xarray.CFTimeIndex` is still a limitation (e.g. for
108110
resample or plotting).
109111
- Setting the ``enable_cftimeindex`` option is now a no-op and emits a
110-
``FutureWarning``.
112+
``FutureWarning``.
111113

112114
Enhancements
113115
~~~~~~~~~~~~
@@ -194,7 +196,7 @@ Bug fixes
194196
the dates must be encoded using cftime rather than NumPy (:issue:`2272`).
195197
By `Spencer Clark <https://github.com/spencerkclark>`_.
196198

197-
- Chunked datasets can now roundtrip to Zarr storage continually
199+
- Chunked datasets can now roundtrip to Zarr storage continually
198200
with `to_zarr` and ``open_zarr`` (:issue:`2300`).
199201
By `Lily Wang <https://github.com/lilyminium>`_.
200202

xarray/backends/api.py

+12-8
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,7 @@ def save_mfdataset(datasets, paths, mode='w', format=None, groups=None,
861861

862862

863863
def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None,
864-
encoding=None, compute=True):
864+
encoding=None, compute=True, consolidated=False):
865865
"""This function creates an appropriate datastore for writing a dataset to
866866
a zarr ztore
867867
@@ -876,16 +876,20 @@ def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None,
876876
_validate_dataset_names(dataset)
877877
_validate_attrs(dataset)
878878

879-
store = backends.ZarrStore.open_group(store=store, mode=mode,
880-
synchronizer=synchronizer,
881-
group=group)
879+
zstore = backends.ZarrStore.open_group(store=store, mode=mode,
880+
synchronizer=synchronizer,
881+
group=group,
882+
consolidate_on_close=consolidated)
882883

883884
writer = ArrayWriter()
884885
# TODO: figure out how to properly handle unlimited_dims
885-
dump_to_store(dataset, store, writer, encoding=encoding)
886+
dump_to_store(dataset, zstore, writer, encoding=encoding)
886887
writes = writer.sync(compute=compute)
887888

888-
if not compute:
889+
if compute:
890+
_finalize_store(writes, zstore)
891+
else:
889892
import dask
890-
return dask.delayed(_finalize_store)(writes, store)
891-
return store
893+
return dask.delayed(_finalize_store)(writes, zstore)
894+
895+
return zstore

xarray/backends/zarr.py

+29-8
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,8 @@ class ZarrStore(AbstractWritableDataStore):
224224
"""
225225

226226
@classmethod
227-
def open_group(cls, store, mode='r', synchronizer=None, group=None):
227+
def open_group(cls, store, mode='r', synchronizer=None, group=None,
228+
consolidated=False, consolidate_on_close=False):
228229
import zarr
229230
min_zarr = '2.2'
230231

@@ -234,15 +235,27 @@ def open_group(cls, store, mode='r', synchronizer=None, group=None):
234235
"installation "
235236
"http://zarr.readthedocs.io/en/stable/"
236237
"#installation" % min_zarr)
237-
zarr_group = zarr.open_group(store=store, mode=mode,
238-
synchronizer=synchronizer, path=group)
239-
return cls(zarr_group)
240238

241-
def __init__(self, zarr_group):
239+
if consolidated or consolidate_on_close:
240+
if LooseVersion(zarr.__version__) <= '2.2.1.dev2': # pragma: no cover
241+
raise NotImplementedError("Zarr version 2.2.1.dev2 or greater "
242+
"is required by for consolidated "
243+
"metadata.")
244+
245+
open_kwargs = dict(mode=mode, synchronizer=synchronizer, path=group)
246+
if consolidated:
247+
# TODO: an option to pass the metadata_key keyword
248+
zarr_group = zarr.open_consolidated(store, **open_kwargs)
249+
else:
250+
zarr_group = zarr.open_group(store, **open_kwargs)
251+
return cls(zarr_group, consolidate_on_close)
252+
253+
def __init__(self, zarr_group, consolidate_on_close=False):
242254
self.ds = zarr_group
243255
self._read_only = self.ds.read_only
244256
self._synchronizer = self.ds.synchronizer
245257
self._group = self.ds.path
258+
self._consolidate_on_close = consolidate_on_close
246259

247260
def open_store_variable(self, name, zarr_array):
248261
data = indexing.LazilyOuterIndexedArray(ZarrArrayWrapper(name, self))
@@ -333,11 +346,16 @@ def store(self, variables, attributes, *args, **kwargs):
333346
def sync(self):
334347
pass
335348

349+
def close(self):
350+
if self._consolidate_on_close:
351+
import zarr
352+
zarr.consolidate_metadata(self.ds.store)
353+
336354

337355
def open_zarr(store, group=None, synchronizer=None, auto_chunk=True,
338356
decode_cf=True, mask_and_scale=True, decode_times=True,
339357
concat_characters=True, decode_coords=True,
340-
drop_variables=None):
358+
drop_variables=None, consolidated=False):
341359
"""Load and decode a dataset from a Zarr store.
342360
343361
.. note:: Experimental
@@ -383,10 +401,13 @@ def open_zarr(store, group=None, synchronizer=None, auto_chunk=True,
383401
decode_coords : bool, optional
384402
If True, decode the 'coordinates' attribute to identify coordinates in
385403
the resulting dataset.
386-
drop_variables: string or iterable, optional
404+
drop_variables : string or iterable, optional
387405
A variable or list of variables to exclude from being parsed from the
388406
dataset. This may be useful to drop variables with problems or
389407
inconsistent values.
408+
consolidated : bool, optional
409+
Whether to open the store using zarr's consolidated metadata
410+
capability. Only works for stores that have already been consolidated.
390411
391412
Returns
392413
-------
@@ -423,7 +444,7 @@ def maybe_decode_store(store, lock=False):
423444
mode = 'r'
424445
zarr_store = ZarrStore.open_group(store, mode=mode,
425446
synchronizer=synchronizer,
426-
group=group)
447+
group=group, consolidated=consolidated)
427448
ds = maybe_decode_store(zarr_store)
428449

429450
# auto chunking needs to be here and not in ZarrStore because variable

xarray/core/dataset.py

+14-6
Original file line numberDiff line numberDiff line change
@@ -1222,7 +1222,7 @@ def to_netcdf(self, path=None, mode='w', format=None, group=None,
12221222
compute=compute)
12231223

12241224
def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None,
1225-
encoding=None, compute=True):
1225+
encoding=None, compute=True, consolidated=False):
12261226
"""Write dataset contents to a zarr group.
12271227
12281228
.. note:: Experimental
@@ -1244,9 +1244,16 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None,
12441244
Nested dictionary with variable names as keys and dictionaries of
12451245
variable specific encodings as values, e.g.,
12461246
``{'my_variable': {'dtype': 'int16', 'scale_factor': 0.1,}, ...}``
1247-
compute: boolean
1248-
If true compute immediately, otherwise return a
1247+
compute: bool, optional
1248+
If True compute immediately, otherwise return a
12491249
``dask.delayed.Delayed`` object that can be computed later.
1250+
consolidated: bool, optional
1251+
If True, apply zarr's `consolidate_metadata` function to the store
1252+
after writing.
1253+
1254+
References
1255+
----------
1256+
https://zarr.readthedocs.io/
12501257
"""
12511258
if encoding is None:
12521259
encoding = {}
@@ -1256,7 +1263,8 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None,
12561263
"and 'w-'.")
12571264
from ..backends.api import to_zarr
12581265
return to_zarr(self, store=store, mode=mode, synchronizer=synchronizer,
1259-
group=group, encoding=encoding, compute=compute)
1266+
group=group, encoding=encoding, compute=compute,
1267+
consolidated=consolidated)
12601268

12611269
def __unicode__(self):
12621270
return formatting.dataset_repr(self)
@@ -1380,7 +1388,7 @@ def _validate_indexers(self, indexers):
13801388
""" Here we make sure
13811389
+ indexer has a valid keys
13821390
+ indexer is in a valid data type
1383-
+ string indexers are cast to the appropriate date type if the
1391+
+ string indexers are cast to the appropriate date type if the
13841392
associated index is a DatetimeIndex or CFTimeIndex
13851393
"""
13861394
from .dataarray import DataArray
@@ -1963,7 +1971,7 @@ def _validate_interp_indexer(x, new_x):
19631971
'Instead got\n{}'.format(new_x))
19641972
else:
19651973
return (x, new_x)
1966-
1974+
19671975
variables = OrderedDict()
19681976
for name, var in iteritems(obj._variables):
19691977
if name not in indexers:

xarray/tests/test_backends.py

+9
Original file line numberDiff line numberDiff line change
@@ -1320,6 +1320,15 @@ def roundtrip_append(self, data, save_kwargs={}, open_kwargs={},
13201320
allow_cleanup_failure=False):
13211321
pytest.skip("zarr backend does not support appending")
13221322

1323+
def test_roundtrip_consolidated(self):
1324+
zarr = pytest.importorskip('zarr', minversion="2.2.1.dev2")
1325+
expected = create_test_data()
1326+
with self.roundtrip(expected,
1327+
save_kwargs={'consolidated': True},
1328+
open_kwargs={'consolidated': True}) as actual:
1329+
self.check_dtypes_roundtripped(expected, actual)
1330+
assert_identical(expected, actual)
1331+
13231332
def test_auto_chunk(self):
13241333
original = create_test_data().chunk()
13251334

xarray/tests/test_distributed.py

+16-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
""" isort:skip_file """
22
from __future__ import absolute_import, division, print_function
3+
from distutils.version import LooseVersion
34
import os
45
import sys
56
import pickle
@@ -118,15 +119,26 @@ def test_dask_distributed_read_netcdf_integration_test(
118119

119120

120121
@requires_zarr
121-
def test_dask_distributed_zarr_integration_test(loop):
122+
@pytest.mark.parametrize('consolidated', [True, False])
123+
@pytest.mark.parametrize('compute', [True, False])
124+
def test_dask_distributed_zarr_integration_test(loop, consolidated, compute):
125+
if consolidated:
126+
zarr = pytest.importorskip('zarr', minversion="2.2.1.dev2")
127+
write_kwargs = dict(consolidated=True)
128+
read_kwargs = dict(consolidated=True)
129+
else:
130+
write_kwargs = read_kwargs = {}
122131
chunks = {'dim1': 4, 'dim2': 3, 'dim3': 5}
123132
with cluster() as (s, [a, b]):
124133
with Client(s['address'], loop=loop) as c:
125134
original = create_test_data().chunk(chunks)
126135
with create_tmp_file(allow_cleanup_failure=ON_WINDOWS,
127-
suffix='.zarr') as filename:
128-
original.to_zarr(filename)
129-
with xr.open_zarr(filename) as restored:
136+
suffix='.zarrc') as filename:
137+
maybe_futures = original.to_zarr(filename, compute=compute,
138+
**write_kwargs)
139+
if not compute:
140+
maybe_futures.compute()
141+
with xr.open_zarr(filename, **read_kwargs) as restored:
130142
assert isinstance(restored.var1.data, da.Array)
131143
computed = restored.compute()
132144
assert_allclose(original, computed)

0 commit comments

Comments
 (0)