From 63abe7f396f3e75d03afa53e4b4820848595b539 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Sat, 16 Dec 2017 23:40:05 -0800 Subject: [PATCH 01/27] distributed tests that write dask arrays --- xarray/tests/__init__.py | 2 +- xarray/tests/test_distributed.py | 52 +++++++++++++++++--------------- xarray/util/print_versions.py | 2 ++ 3 files changed, 31 insertions(+), 25 deletions(-) diff --git a/xarray/tests/__init__.py b/xarray/tests/__init__.py index 235c6e9e410..baff215cb02 100644 --- a/xarray/tests/__init__.py +++ b/xarray/tests/__init__.py @@ -71,7 +71,7 @@ def _importorskip(modname, minversion=None): has_bottleneck, requires_bottleneck = _importorskip('bottleneck') has_rasterio, requires_rasterio = _importorskip('rasterio') has_pathlib, requires_pathlib = _importorskip('pathlib') -has_zarr, requires_zarr = _importorskip('zarr', minversion='2.2.0') +has_zarr, requires_zarr = _importorskip('zarr', minversion='2.1.5') # some special cases has_scipy_or_netCDF4 = has_scipy or has_netCDF4 diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index 1d0c51322a1..35f621eafdd 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -2,13 +2,12 @@ import pytest import xarray as xr -from xarray.core.pycompat import suppress distributed = pytest.importorskip('distributed') da = pytest.importorskip('dask.array') import dask -from distributed.utils_test import cluster, loop, gen_cluster -from distributed.client import futures_of, wait +from distributed.utils_test import gen_cluster +from distributed.client import futures_of from xarray.tests.test_backends import create_tmp_file, ON_WINDOWS from xarray.tests.test_dataset import create_test_data @@ -28,29 +27,34 @@ @pytest.mark.xfail(sys.platform == 'win32', reason='https://github.com/pydata/xarray/issues/1738') -@pytest.mark.parametrize('engine', ENGINES) -def test_dask_distributed_netcdf_integration_test(loop, engine): - with cluster() as (s, _): - with distributed.Client(s['address'], loop=loop): - original = create_test_data() - with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename: - original.to_netcdf(filename, engine=engine) - with xr.open_dataset(filename, chunks=3, engine=engine) as restored: - assert isinstance(restored.var1.data, da.Array) - computed = restored.compute() - assert_allclose(original, computed) +# @pytest.mark.parametrize('engine', ENGINES) +@gen_cluster(client=True, timeout=None) +def test_dask_distributed_netcdf_integration_test(c, s, a, b): + chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6} + for engine in ENGINES: + original = create_test_data().chunk(chunks) + with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename: + original.to_netcdf(filename, engine=engine) + with xr.open_dataset(filename, chunks=3, + engine=engine) as restored: + assert isinstance(restored.var1.data, da.Array) + computed = restored.compute() + assert_allclose(original, computed) + @requires_zarr -def test_dask_distributed_zarr_integration_test(loop): - with cluster() as (s, _): - with distributed.Client(s['address'], loop=loop): - original = create_test_data() - with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename: - original.to_zarr(filename) - with xr.open_zarr(filename) as restored: - assert isinstance(restored.var1.data, da.Array) - computed = restored.compute() - assert_allclose(original, computed) +@gen_cluster(client=True, timeout=None) +def test_dask_distributed_zarr_integration_test(c, s, a, b): + chunks = {'dim1': 4, 'dim2': 3, 'dim3': 5} + original = create_test_data().chunk(chunks) + with create_tmp_file(allow_cleanup_failure=ON_WINDOWS, + suffix='.zarr') as filename: + original.to_zarr(filename) + with xr.open_zarr(filename) as restored: + assert isinstance(restored.var1.data, da.Array) + computed = restored.compute() + assert_allclose(original, computed) + @pytest.mark.skipif(distributed.__version__ <= '1.19.3', reason='Need recent distributed version to clean up get') diff --git a/xarray/util/print_versions.py b/xarray/util/print_versions.py index 095c7add84d..1b25583197f 100755 --- a/xarray/util/print_versions.py +++ b/xarray/util/print_versions.py @@ -75,9 +75,11 @@ def show_versions(as_json=False): # ("pydap", lambda mod: mod.version.version), ("h5netcdf", lambda mod: mod.__version__), ("Nio", lambda mod: mod.__version__), + ("zarr", lambda mod: mod.__version__), ("bottleneck", lambda mod: mod.__version__), ("cyordereddict", lambda mod: mod.__version__), ("dask", lambda mod: mod.__version__), + ("distributed", lambda mod: mod.__version__), ("matplotlib", lambda mod: mod.__version__), ("cartopy", lambda mod: mod.__version__), ("seaborn", lambda mod: mod.__version__), From 1952173b83a67ac7854a6edec9ade98bbbae143f Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Tue, 19 Dec 2017 17:38:02 -0500 Subject: [PATCH 02/27] Change zarr test to synchronous API --- xarray/tests/test_distributed.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index 35f621eafdd..6fbd2b721d2 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -6,8 +6,8 @@ distributed = pytest.importorskip('distributed') da = pytest.importorskip('dask.array') import dask -from distributed.utils_test import gen_cluster -from distributed.client import futures_of +from distributed.utils_test import gen_cluster, cluster, loop +from distributed.client import futures_of, Client from xarray.tests.test_backends import create_tmp_file, ON_WINDOWS from xarray.tests.test_dataset import create_test_data @@ -43,17 +43,18 @@ def test_dask_distributed_netcdf_integration_test(c, s, a, b): @requires_zarr -@gen_cluster(client=True, timeout=None) -def test_dask_distributed_zarr_integration_test(c, s, a, b): +def test_dask_distributed_zarr_integration_test(loop): chunks = {'dim1': 4, 'dim2': 3, 'dim3': 5} - original = create_test_data().chunk(chunks) - with create_tmp_file(allow_cleanup_failure=ON_WINDOWS, - suffix='.zarr') as filename: - original.to_zarr(filename) - with xr.open_zarr(filename) as restored: - assert isinstance(restored.var1.data, da.Array) - computed = restored.compute() - assert_allclose(original, computed) + with cluster() as (s, [a, b]): + with Client(s['address'], loop=loop) as c: + original = create_test_data().chunk(chunks) + with create_tmp_file(allow_cleanup_failure=ON_WINDOWS, + suffix='.zarr') as filename: + original.to_zarr(filename) + with xr.open_zarr(filename) as restored: + assert isinstance(restored.var1.data, da.Array) + computed = restored.compute() + assert_allclose(original, computed) @pytest.mark.skipif(distributed.__version__ <= '1.19.3', From 9e70a3a305c6237a0d1c3ab3b2d4312bdabc1935 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Thu, 11 Jan 2018 09:49:32 -0800 Subject: [PATCH 03/27] initial go at __setitem__ on array wrappers --- doc/whats-new.rst | 2 ++ xarray/backends/h5netcdf_.py | 5 ++++- xarray/backends/netCDF4_.py | 9 ++++++++- xarray/backends/scipy_.py | 10 +++++++++- 4 files changed, 23 insertions(+), 3 deletions(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index a65721451f0..9571ed5bd36 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -86,6 +86,8 @@ Bug fixes - Compatibility fixes to plotting module for Numpy 1.14 and Pandas 0.22 (:issue:`1813`). By `Joe Hamman `_. +- Fixed to_netcdf when using dask distributed (:issue:`1464`). + By `Joe Hamman `_.. .. _whats-new.0.10.0: diff --git a/xarray/backends/h5netcdf_.py b/xarray/backends/h5netcdf_.py index 82abaade06a..78a3409d587 100644 --- a/xarray/backends/h5netcdf_.py +++ b/xarray/backends/h5netcdf_.py @@ -164,7 +164,10 @@ def prepare_variable(self, name, variable, check_encoding=False, for k, v in iteritems(attrs): nc4_var.setncattr(k, v) - return nc4_var, variable.data + + target = H5NetCDFArrayWrapper(name, self) + + return target, variable.data def sync(self): with self.ensure_open(autoclose=True): diff --git a/xarray/backends/netCDF4_.py b/xarray/backends/netCDF4_.py index d8aa33f35dc..ac8a27d9287 100644 --- a/xarray/backends/netCDF4_.py +++ b/xarray/backends/netCDF4_.py @@ -41,6 +41,11 @@ def __init__(self, variable_name, datastore): dtype = np.dtype('O') self.dtype = dtype + def __setitem__(self, key, value): + with self.datastore.ensure_open(autoclose=True): + data = self.get_array() + data[key] = value + def get_array(self): self.datastore.assert_open() return self.datastore.ds.variables[self.variable_name] @@ -376,7 +381,9 @@ def prepare_variable(self, name, variable, check_encoding=False, # OrderedDict as the input to setncatts nc4_var.setncattr(k, v) - return nc4_var, variable.data + target = NetCDF4ArrayWrapper(name, self) + + return target, variable.data def sync(self): with self.ensure_open(autoclose=True): diff --git a/xarray/backends/scipy_.py b/xarray/backends/scipy_.py index 0994d8510b8..83e5dcdf373 100644 --- a/xarray/backends/scipy_.py +++ b/xarray/backends/scipy_.py @@ -55,6 +55,11 @@ def __getitem__(self, key): copy = self.datastore.ds.use_mmap return np.array(data, dtype=self.dtype, copy=copy) + def __setitem__(self, key, value): + with self.datastore.ensure_open(autoclose=True): + data = self.get_array() + data[key] = value + def _open_scipy_netcdf(filename, mode, mmap, version): import scipy.io @@ -202,7 +207,10 @@ def prepare_variable(self, name, variable, check_encoding=False, for k, v in iteritems(variable.attrs): self._validate_attr_key(k) setattr(scipy_var, k, v) - return scipy_var, data + + target = ScipyArrayWrapper(name, self) + + return target, data def sync(self): with self.ensure_open(autoclose=True): From ec67a54c49bcbc2a83114ed2479f8a4a900c0872 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Fri, 12 Jan 2018 11:52:40 -0800 Subject: [PATCH 04/27] fixes for scipy --- xarray/backends/common.py | 6 +----- xarray/backends/scipy_.py | 9 ++++++++- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/xarray/backends/common.py b/xarray/backends/common.py index 83753ced8f5..0771cb9a27b 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -175,11 +175,7 @@ def add(self, source, target): self.sources.append(source) self.targets.append(target) else: - try: - target[...] = source - except TypeError: - # workaround for GH: scipy/scipy#6880 - target[:] = source + target[...] = source def sync(self): if self.sources: diff --git a/xarray/backends/scipy_.py b/xarray/backends/scipy_.py index 83e5dcdf373..97f173b6ac8 100644 --- a/xarray/backends/scipy_.py +++ b/xarray/backends/scipy_.py @@ -58,7 +58,14 @@ def __getitem__(self, key): def __setitem__(self, key, value): with self.datastore.ensure_open(autoclose=True): data = self.get_array() - data[key] = value + try: + data[key] = value + except TypeError: + if key is Ellipsis: + # workaround for GH: scipy/scipy#6880 + data[:] = value + else: + raise def _open_scipy_netcdf(filename, mode, mmap, version): From 5497ad18fd004a0fdfacc53ea555b00b0a304a1f Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Wed, 24 Jan 2018 16:51:59 -0800 Subject: [PATCH 05/27] cleanup after merging with upstream/master --- xarray/tests/test_distributed.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index 04ce164c7c9..bab5aa575f8 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -6,6 +6,7 @@ distributed = pytest.importorskip('distributed') da = pytest.importorskip('dask.array') import dask +from dask.distributed import Client from distributed.utils_test import cluster, gen_cluster from distributed.utils_test import loop # flake8: noqa from distributed.client import futures_of @@ -26,9 +27,6 @@ ENGINES.append('h5netcdf') -@pytest.mark.xfail(sys.platform == 'win32', - reason='https://github.com/pydata/xarray/issues/1738') -# @pytest.mark.parametrize('engine', ENGINES) @gen_cluster(client=True, timeout=None) def test_dask_distributed_netcdf_integration_test(c, s, a, b): chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6} From c2f5bb8124f2dc09a6ee577ba207aefdd5f73d87 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Wed, 24 Jan 2018 16:57:32 -0800 Subject: [PATCH 06/27] needless duplication of tests to work around pytest bug --- xarray/tests/test_distributed.py | 49 +++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index bab5aa575f8..b74b1119d67 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -18,27 +18,42 @@ requires_zarr) -ENGINES = [] -if has_scipy: - ENGINES.append('scipy') -if has_netCDF4: - ENGINES.append('netcdf4') -if has_h5netcdf: - ENGINES.append('h5netcdf') +@gen_cluster(client=True, timeout=None) +def test_dask_distributed_netcdf_integration_test_scipy(c, s, a, b): + chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6} + original = create_test_data().chunk(chunks) + with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename: + original.to_netcdf(filename, engine='scipy') + with xr.open_dataset(filename, chunks=chunks, + engine='scipy') as restored: + assert isinstance(restored.var1.data, da.Array) + computed = restored.compute() + assert_allclose(original, computed) + +@gen_cluster(client=True, timeout=None) +def test_dask_distributed_netcdf_integration_test_netcdf4(c, s, a, b): + chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6} + original = create_test_data().chunk(chunks) + with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename: + original.to_netcdf(filename, engine='netcdf4') + with xr.open_dataset(filename, chunks=chunks, + engine='netcdf4') as restored: + assert isinstance(restored.var1.data, da.Array) + computed = restored.compute() + assert_allclose(original, computed) @gen_cluster(client=True, timeout=None) -def test_dask_distributed_netcdf_integration_test(c, s, a, b): +def test_dask_distributed_netcdf_integration_test_h5netcdf(c, s, a, b): chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6} - for engine in ENGINES: - original = create_test_data().chunk(chunks) - with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename: - original.to_netcdf(filename, engine=engine) - with xr.open_dataset(filename, chunks=3, - engine=engine) as restored: - assert isinstance(restored.var1.data, da.Array) - computed = restored.compute() - assert_allclose(original, computed) + original = create_test_data().chunk(chunks) + with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename: + original.to_netcdf(filename, engine='h5netcdf') + with xr.open_dataset(filename, chunks=chunks, + engine='h5netcdf') as restored: + assert isinstance(restored.var1.data, da.Array) + computed = restored.compute() + assert_allclose(original, computed) @requires_zarr From 5344fe80e05947a42d21f32377b66cb817336ee6 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Wed, 24 Jan 2018 19:29:24 -0800 Subject: [PATCH 07/27] use netcdf_variable instead of get_array() --- xarray/backends/scipy_.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xarray/backends/scipy_.py b/xarray/backends/scipy_.py index 2819e60ae7e..02b87cd4edd 100644 --- a/xarray/backends/scipy_.py +++ b/xarray/backends/scipy_.py @@ -57,7 +57,7 @@ def __getitem__(self, key): def __setitem__(self, key, value): with self.datastore.ensure_open(autoclose=True): - data = self.get_array() + data = self.datastore.ds.variables[self.variable_name] try: data[key] = value except TypeError: From 49366bf542d45508e0c5bbbd993e06696e982d3a Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Fri, 2 Feb 2018 08:40:03 -0500 Subject: [PATCH 08/27] use synchronous dask.distributed test harness --- xarray/tests/test_distributed.py | 47 +++++++++++++++++--------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index b74b1119d67..719ea0897fe 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -18,42 +18,45 @@ requires_zarr) -@gen_cluster(client=True, timeout=None) -def test_dask_distributed_netcdf_integration_test_scipy(c, s, a, b): +def test_dask_distributed_netcdf_integration_test_scipy(loop): chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6} original = create_test_data().chunk(chunks) with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename: original.to_netcdf(filename, engine='scipy') - with xr.open_dataset(filename, chunks=chunks, - engine='scipy') as restored: - assert isinstance(restored.var1.data, da.Array) - computed = restored.compute() - assert_allclose(original, computed) + with cluster() as (s, [a, b]): + with Client(s['address'], loop=loop) as c: + with xr.open_dataset(filename, chunks=chunks, + engine='scipy') as restored: + assert isinstance(restored.var1.data, da.Array) + computed = restored.compute() + assert_allclose(original, computed) -@gen_cluster(client=True, timeout=None) -def test_dask_distributed_netcdf_integration_test_netcdf4(c, s, a, b): +def test_dask_distributed_netcdf_integration_test_netcdf4(loop): chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6} original = create_test_data().chunk(chunks) with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename: original.to_netcdf(filename, engine='netcdf4') - with xr.open_dataset(filename, chunks=chunks, - engine='netcdf4') as restored: - assert isinstance(restored.var1.data, da.Array) - computed = restored.compute() - assert_allclose(original, computed) + with cluster() as (s, [a, b]): + with Client(s['address'], loop=loop) as c: + with xr.open_dataset(filename, chunks=chunks, + engine='netcdf4') as restored: + assert isinstance(restored.var1.data, da.Array) + computed = restored.compute() + assert_allclose(original, computed) -@gen_cluster(client=True, timeout=None) -def test_dask_distributed_netcdf_integration_test_h5netcdf(c, s, a, b): +def test_dask_distributed_netcdf_integration_test_h5netcdf(loop): chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6} original = create_test_data().chunk(chunks) with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename: original.to_netcdf(filename, engine='h5netcdf') - with xr.open_dataset(filename, chunks=chunks, - engine='h5netcdf') as restored: - assert isinstance(restored.var1.data, da.Array) - computed = restored.compute() - assert_allclose(original, computed) + with cluster() as (s, [a, b]): + with Client(s['address'], loop=loop) as c: + with xr.open_dataset(filename, chunks=chunks, + engine='h5netcdf') as restored: + assert isinstance(restored.var1.data, da.Array) + computed = restored.compute() + assert_allclose(original, computed) @requires_zarr @@ -98,4 +101,4 @@ def test_async(c, s, a, b): assert not dask.is_dask_collection(w) assert_allclose(x + 10, w) - assert s.task_state + assert s.tasks From 199538e0d2f74768e32f222884dccab52a4c6407 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Fri, 2 Feb 2018 09:00:32 -0800 Subject: [PATCH 09/27] cleanup tests --- xarray/tests/test_distributed.py | 45 ++++++++++++-------------------- 1 file changed, 16 insertions(+), 29 deletions(-) diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index 719ea0897fe..88cf2b5e27a 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -17,48 +17,35 @@ from . import (assert_allclose, has_scipy, has_netCDF4, has_h5netcdf, requires_zarr) +ENGINES = [] +if has_scipy: + ENGINES.append('scipy') +if has_netCDF4: + ENGINES.append('netcdf4') +if has_h5netcdf: + ENGINES.append('h5netcdf') -def test_dask_distributed_netcdf_integration_test_scipy(loop): - chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6} - original = create_test_data().chunk(chunks) - with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename: - original.to_netcdf(filename, engine='scipy') - with cluster() as (s, [a, b]): - with Client(s['address'], loop=loop) as c: - with xr.open_dataset(filename, chunks=chunks, - engine='scipy') as restored: - assert isinstance(restored.var1.data, da.Array) - computed = restored.compute() - assert_allclose(original, computed) -def test_dask_distributed_netcdf_integration_test_netcdf4(loop): +@pytest.mark.xfail(sys.platform == 'win32', + reason='https://github.com/pydata/xarray/issues/1738') +@pytest.mark.parametrize('engine', ENGINES) +def test_dask_distributed_netcdf_integration_test(loop, engine): + chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6} - original = create_test_data().chunk(chunks) + with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename: - original.to_netcdf(filename, engine='netcdf4') with cluster() as (s, [a, b]): with Client(s['address'], loop=loop) as c: - with xr.open_dataset(filename, chunks=chunks, - engine='netcdf4') as restored: - assert isinstance(restored.var1.data, da.Array) - computed = restored.compute() - assert_allclose(original, computed) + original = create_test_data().chunk(chunks) + original.to_netcdf(filename, engine=engine) -def test_dask_distributed_netcdf_integration_test_h5netcdf(loop): - chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6} - original = create_test_data().chunk(chunks) - with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename: - original.to_netcdf(filename, engine='h5netcdf') - with cluster() as (s, [a, b]): - with Client(s['address'], loop=loop) as c: with xr.open_dataset(filename, chunks=chunks, - engine='h5netcdf') as restored: + engine=engine) as restored: assert isinstance(restored.var1.data, da.Array) computed = restored.compute() assert_allclose(original, computed) - @requires_zarr def test_dask_distributed_zarr_integration_test(loop): chunks = {'dim1': 4, 'dim2': 3, 'dim3': 5} From d2050e7a50ecc0c63409b25a80167eca37573d3a Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Fri, 2 Feb 2018 20:48:23 -0800 Subject: [PATCH 10/27] per scheduler locks and autoclose behavior for writes --- xarray/backends/api.py | 9 +++++- xarray/backends/common.py | 47 ++++++++++++++++++++++++++------ xarray/backends/netCDF4_.py | 10 +++---- xarray/tests/test_distributed.py | 1 + 4 files changed, 52 insertions(+), 15 deletions(-) diff --git a/xarray/backends/api.py b/xarray/backends/api.py index 4359868feae..865ffd2b888 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -617,8 +617,15 @@ def to_netcdf(dataset, path_or_file=None, mode='w', format=None, group=None, # if a writer is provided, store asynchronously sync = writer is None + # TODO Move this logic outside of this function + from .common import get_scheduler, get_scheduler_lock + scheduler = get_scheduler() + lock = get_scheduler_lock(scheduler)(path_or_file) + autoclose = scheduler == 'distributed' + target = path_or_file if path_or_file is not None else BytesIO() - store = store_open(target, mode, format, group, writer) + store = store_open(target, mode, format, group, writer, + autoclose=autoclose, lock=lock) if unlimited_dims is None: unlimited_dims = dataset.encoding.get('unlimited_dims', None) diff --git a/xarray/backends/common.py b/xarray/backends/common.py index 32f02e25815..49a55edf508 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -14,11 +14,6 @@ from ..core.utils import FrozenOrderedDict, NdimSizeLenMixin from ..core.pycompat import iteritems, dask_array_type -try: - from dask.utils import SerializableLock as Lock -except ImportError: - from threading import Lock - # Create a logger object, but don't add any handlers. Leave that to user code. logger = logging.getLogger(__name__) @@ -27,8 +22,41 @@ NONE_VAR_NAME = '__values__' -# dask.utils.SerializableLock if available, otherwise just a threading.Lock -GLOBAL_LOCK = Lock() +def get_scheduler(get=None, collection=None): + try: + from dask.utils import effective_get + actual_get = effective_get(get, collection) + try: + from dask.distributed import Client + if isinstance(actual_get.__self__, Client): + return 'distributed' + except (ImportError, AttributeError): + import dask.multiprocessing + if actual_get == dask.multiprocessing.get: + return 'multiprocessing' + else: + return 'threaded' + except ImportError: + return None + + +def get_scheduler_lock(scheduler): + if scheduler == 'distributed': + from dask.distributed import Lock + return Lock + elif scheduler == 'multiprocessing': + import multiprocessing as mp + return mp.Manager().Lock + elif scheduler == 'threaded': + from dask.utils import SerializableLock + return SerializableLock + else: + from threading import Lock + return Lock + + +SCHEDULER = get_scheduler() +GLOBAL_LOCK = get_scheduler_lock(SCHEDULER)() def _encode_variable_name(name): @@ -183,15 +211,16 @@ def add(self, source, target): def sync(self): if self.sources: import dask.array as da + print('self.lock == ', self.lock) da.store(self.sources, self.targets, lock=self.lock) self.sources = [] self.targets = [] class AbstractWritableDataStore(AbstractDataStore): - def __init__(self, writer=None): + def __init__(self, writer=None, lock=GLOBAL_LOCK): if writer is None: - writer = ArrayWriter() + writer = ArrayWriter(lock=lock) self.writer = writer def encode(self, variables, attributes): diff --git a/xarray/backends/netCDF4_.py b/xarray/backends/netCDF4_.py index d57e15acf59..9e3442c5ce6 100644 --- a/xarray/backends/netCDF4_.py +++ b/xarray/backends/netCDF4_.py @@ -16,7 +16,7 @@ from ..core.pycompat import iteritems, basestring, OrderedDict, PY3, suppress from .common import (WritableCFDataStore, robust_getitem, BackendArray, - DataStorePickleMixin, find_root) + DataStorePickleMixin, find_root, GLOBAL_LOCK) from .netcdf3 import (encode_nc3_attr_value, encode_nc3_variable) # This lookup table maps from dtype.byteorder to a readable endian @@ -236,7 +236,7 @@ class NetCDF4DataStore(WritableCFDataStore, DataStorePickleMixin): """ def __init__(self, netcdf4_dataset, mode='r', writer=None, opener=None, - autoclose=False): + autoclose=False, lock=GLOBAL_LOCK): if autoclose and opener is None: raise ValueError('autoclose requires an opener') @@ -254,12 +254,12 @@ def __init__(self, netcdf4_dataset, mode='r', writer=None, opener=None, self._opener = functools.partial(opener, mode=self._mode) else: self._opener = opener - super(NetCDF4DataStore, self).__init__(writer) + super(NetCDF4DataStore, self).__init__(writer, lock=lock) @classmethod def open(cls, filename, mode='r', format='NETCDF4', group=None, writer=None, clobber=True, diskless=False, persist=False, - autoclose=False): + autoclose=False, lock=GLOBAL_LOCK): import netCDF4 as nc4 if (len(filename) == 88 and LooseVersion(nc4.__version__) < "1.3.1"): @@ -279,7 +279,7 @@ def open(cls, filename, mode='r', format='NETCDF4', group=None, format=format) ds = opener() return cls(ds, mode=mode, writer=writer, opener=opener, - autoclose=autoclose) + autoclose=autoclose, lock=lock) def open_store_variable(self, name, var): with self.ensure_open(autoclose=False): diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index 88cf2b5e27a..8d9c87cfd23 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -13,6 +13,7 @@ from xarray.tests.test_backends import create_tmp_file, ON_WINDOWS from xarray.tests.test_dataset import create_test_data +from xarray.backends.common import GLOBAL_LOCK from . import (assert_allclose, has_scipy, has_netCDF4, has_h5netcdf, requires_zarr) From 76675de172094c96c677123bcbefc8c93581e204 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Mon, 5 Feb 2018 16:30:01 -0800 Subject: [PATCH 11/27] HDF5_LOCK and CombinedLock --- xarray/backends/api.py | 11 ++++-- xarray/backends/common.py | 66 ++++++++++++++++++++++++-------- xarray/backends/netCDF4_.py | 6 +-- xarray/tests/test_distributed.py | 6 ++- 4 files changed, 66 insertions(+), 23 deletions(-) diff --git a/xarray/backends/api.py b/xarray/backends/api.py index 865ffd2b888..642fe474715 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -10,7 +10,7 @@ import numpy as np from .. import backends, conventions, Dataset -from .common import ArrayWriter, GLOBAL_LOCK +from .common import ArrayWriter, HDF5_LOCK, CombinedLock from ..core import indexing from ..core.combine import auto_combine from ..core.utils import close_on_error, is_remote_uri @@ -66,9 +66,9 @@ def _default_lock(filename, engine): else: # TODO: identify netcdf3 files and don't use the global lock # for them - lock = GLOBAL_LOCK + lock = HDF5_LOCK elif engine in {'h5netcdf', 'pynio'}: - lock = GLOBAL_LOCK + lock = HDF5_LOCK else: lock = False return lock @@ -620,7 +620,10 @@ def to_netcdf(dataset, path_or_file=None, mode='w', format=None, group=None, # TODO Move this logic outside of this function from .common import get_scheduler, get_scheduler_lock scheduler = get_scheduler() - lock = get_scheduler_lock(scheduler)(path_or_file) + # I think we want to include the filename here to support concurrent writes + # using save_mfdataset + file_lock = get_scheduler_lock(scheduler)(path_or_file) + lock = CombinedLock([HDF5_LOCK, file_lock]) autoclose = scheduler == 'distributed' target = path_or_file if path_or_file is not None else BytesIO() diff --git a/xarray/backends/common.py b/xarray/backends/common.py index 49a55edf508..ef8b45b0a8c 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -8,12 +8,20 @@ import contextlib from collections import Mapping, OrderedDict import warnings +import multiprocessing +import threading from ..conventions import cf_encoder from ..core import indexing from ..core.utils import FrozenOrderedDict, NdimSizeLenMixin from ..core.pycompat import iteritems, dask_array_type +# Import default lock +try: + from dask.utils import SerializableLock + HDF5_LOCK = SerializableLock() +except ImportError: + HDF5_LOCK = threading.Lock() # Create a logger object, but don't add any handlers. Leave that to user code. logger = logging.getLogger(__name__) @@ -31,10 +39,13 @@ def get_scheduler(get=None, collection=None): if isinstance(actual_get.__self__, Client): return 'distributed' except (ImportError, AttributeError): - import dask.multiprocessing - if actual_get == dask.multiprocessing.get: - return 'multiprocessing' - else: + try: + import dask.multiprocessing + if actual_get == dask.multiprocessing.get: + return 'multiprocessing' + else: + return 'threaded' + except ImportError: return 'threaded' except ImportError: return None @@ -45,18 +56,12 @@ def get_scheduler_lock(scheduler): from dask.distributed import Lock return Lock elif scheduler == 'multiprocessing': - import multiprocessing as mp - return mp.Manager().Lock + return multiprocessing.Lock elif scheduler == 'threaded': from dask.utils import SerializableLock return SerializableLock else: - from threading import Lock - return Lock - - -SCHEDULER = get_scheduler() -GLOBAL_LOCK = get_scheduler_lock(SCHEDULER)() + return threading.Lock def _encode_variable_name(name): @@ -105,6 +110,38 @@ def robust_getitem(array, key, catch=Exception, max_retries=6, time.sleep(1e-3 * next_delay) +class CombinedLock(object): + """A combination of multiple locks. + + Like a locked door, a CombinedLock is locked if any of its constituent + locks are locked. + """ + def __init__(self, locks): + self.locks = locks + + def acquire(self, *args): + return all(lock.acquire(*args) for lock in self.locks) + + def release(self, *args): + for lock in self.locks: + lock.release(*args) + + def __enter__(self): + for lock in self.locks: + lock.__enter__() + + def __exit__(self, *args): + for lock in self.locks: + lock.__exit__(*args) + + @property + def locked(self): + return any(lock.locked for lock in self.locks) + + def __repr__(self): + return "CombinedLock(%s)" % [repr(lock) for lock in self.locks] + + class BackendArray(NdimSizeLenMixin, indexing.ExplicitlyIndexed): def __array__(self, dtype=None): @@ -196,7 +233,7 @@ def __exit__(self, exception_type, exception_value, traceback): class ArrayWriter(object): - def __init__(self, lock=GLOBAL_LOCK): + def __init__(self, lock=HDF5_LOCK): self.sources = [] self.targets = [] self.lock = lock @@ -211,14 +248,13 @@ def add(self, source, target): def sync(self): if self.sources: import dask.array as da - print('self.lock == ', self.lock) da.store(self.sources, self.targets, lock=self.lock) self.sources = [] self.targets = [] class AbstractWritableDataStore(AbstractDataStore): - def __init__(self, writer=None, lock=GLOBAL_LOCK): + def __init__(self, writer=None, lock=HDF5_LOCK): if writer is None: writer = ArrayWriter(lock=lock) self.writer = writer diff --git a/xarray/backends/netCDF4_.py b/xarray/backends/netCDF4_.py index 9e3442c5ce6..bebcfeaec20 100644 --- a/xarray/backends/netCDF4_.py +++ b/xarray/backends/netCDF4_.py @@ -16,7 +16,7 @@ from ..core.pycompat import iteritems, basestring, OrderedDict, PY3, suppress from .common import (WritableCFDataStore, robust_getitem, BackendArray, - DataStorePickleMixin, find_root, GLOBAL_LOCK) + DataStorePickleMixin, find_root, HDF5_LOCK) from .netcdf3 import (encode_nc3_attr_value, encode_nc3_variable) # This lookup table maps from dtype.byteorder to a readable endian @@ -236,7 +236,7 @@ class NetCDF4DataStore(WritableCFDataStore, DataStorePickleMixin): """ def __init__(self, netcdf4_dataset, mode='r', writer=None, opener=None, - autoclose=False, lock=GLOBAL_LOCK): + autoclose=False, lock=HDF5_LOCK): if autoclose and opener is None: raise ValueError('autoclose requires an opener') @@ -259,7 +259,7 @@ def __init__(self, netcdf4_dataset, mode='r', writer=None, opener=None, @classmethod def open(cls, filename, mode='r', format='NETCDF4', group=None, writer=None, clobber=True, diskless=False, persist=False, - autoclose=False, lock=GLOBAL_LOCK): + autoclose=False, lock=HDF5_LOCK): import netCDF4 as nc4 if (len(filename) == 88 and LooseVersion(nc4.__version__) < "1.3.1"): diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index 8d9c87cfd23..dbe21c2b263 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -13,7 +13,7 @@ from xarray.tests.test_backends import create_tmp_file, ON_WINDOWS from xarray.tests.test_dataset import create_test_data -from xarray.backends.common import GLOBAL_LOCK +from xarray.backends.common import HDF5_LOCK from . import (assert_allclose, has_scipy, has_netCDF4, has_h5netcdf, requires_zarr) @@ -27,6 +27,10 @@ ENGINES.append('h5netcdf') +def test_hdf5_lock(): + assert isinstance(HDF5_LOCK, dask.utils.SerializableLock) + + @pytest.mark.xfail(sys.platform == 'win32', reason='https://github.com/pydata/xarray/issues/1738') @pytest.mark.parametrize('engine', ENGINES) From 9ac0327000fa9fa1749db3112d1d193e5c5ac3cd Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Mon, 5 Feb 2018 16:51:15 -0800 Subject: [PATCH 12/27] integration test for distributed locks --- xarray/tests/test_distributed.py | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index dbe21c2b263..8b8cd84748e 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -1,4 +1,5 @@ import sys +import pickle import pytest import xarray as xr @@ -6,14 +7,14 @@ distributed = pytest.importorskip('distributed') da = pytest.importorskip('dask.array') import dask -from dask.distributed import Client +from dask.distributed import Client, Lock from distributed.utils_test import cluster, gen_cluster from distributed.utils_test import loop # flake8: noqa from distributed.client import futures_of from xarray.tests.test_backends import create_tmp_file, ON_WINDOWS from xarray.tests.test_dataset import create_test_data -from xarray.backends.common import HDF5_LOCK +from xarray.backends.common import HDF5_LOCK, CombinedLock from . import (assert_allclose, has_scipy, has_netCDF4, has_h5netcdf, requires_zarr) @@ -27,10 +28,6 @@ ENGINES.append('h5netcdf') -def test_hdf5_lock(): - assert isinstance(HDF5_LOCK, dask.utils.SerializableLock) - - @pytest.mark.xfail(sys.platform == 'win32', reason='https://github.com/pydata/xarray/issues/1738') @pytest.mark.parametrize('engine', ENGINES) @@ -94,3 +91,25 @@ def test_async(c, s, a, b): assert_allclose(x + 10, w) assert s.tasks + + +def test_hdf5_lock(): + assert isinstance(HDF5_LOCK, dask.utils.SerializableLock) + + +@gen_cluster(client=True) +def test_serializable_locks(c, s, a, b): + def f(x, lock=None): + with lock: + return x + 1 + + # note, the creation of Lock needs to be done inside a cluster + for lock in [HDF5_LOCK, Lock(), Lock('filename.nc'), + CombinedLock([HDF5_LOCK]), + CombinedLock([HDF5_LOCK, Lock('filename.nc')])]: + + futures = c.map(f, range(10), lock=lock) + yield c.gather(futures) + + lock2 = pickle.loads(pickle.dumps(lock)) + assert type(lock) == type(lock2) From 16729687ded440347077b53d25353e9cb4c79c8d Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Sun, 18 Feb 2018 15:14:58 -0800 Subject: [PATCH 13/27] more tests and set isopen to false when pickling --- xarray/backends/api.py | 2 +- xarray/backends/common.py | 9 +++++++-- xarray/tests/test_distributed.py | 23 +++++++++++++++++++---- 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/xarray/backends/api.py b/xarray/backends/api.py index 7132c90afe1..ac4385a3104 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -443,7 +443,7 @@ def open_mfdataset(paths, chunks=None, concat_dim=_CONCAT_DIM_DEFAULT, lock=None, data_vars='all', coords='different', **kwargs): """Open multiple files as a single dataset. - Requires dask to be installed. See documentation for details on dask [1]. + Requires dask to be installed. See documentation for details on dask [1]. Attributes from the first dataset file are used for the combined dataset. Parameters diff --git a/xarray/backends/common.py b/xarray/backends/common.py index ef8b45b0a8c..81cd90e2a4d 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -435,6 +435,7 @@ class DataStorePickleMixin(object): def __getstate__(self): state = self.__dict__.copy() del state['ds'] + state['_isopen'] = False if self._mode == 'w': # file has already been created, don't override when restoring state['_mode'] = 'a' @@ -442,7 +443,11 @@ def __getstate__(self): def __setstate__(self, state): self.__dict__.update(state) - self.ds = self._opener(mode=self._mode) + if self._autoclose: + with self.ensure_open(True): + self.ds = self._opener(mode=self._mode) + else: + self.ds = self._opener(mode=self._mode) @contextlib.contextmanager def ensure_open(self, autoclose): @@ -452,7 +457,7 @@ def ensure_open(self, autoclose): Use requires `autoclose=True` argument to `open_mfdataset`. """ - if self._autoclose and not self._isopen: + if autoclose and not self._isopen: try: self.ds = self._opener() self._isopen = True diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index 8b8cd84748e..8edc6ee0892 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -27,11 +27,24 @@ if has_h5netcdf: ENGINES.append('h5netcdf') +NC_FORMATS = {'netcdf4': ['NETCDF3_CLASSIC', 'NETCDF3_64BIT_OFFSET', + 'NETCDF3_64BIT_DATA', 'NETCDF4_CLASSIC', 'NETCDF4'], + 'scipy': ['NETCDF3_CLASSIC', 'NETCDF3_64BIT'], + 'h5netcdf': ['NETCDF4']} +TEST_FORMATS = ['NETCDF3_CLASSIC', 'NETCDF4_CLASSIC', 'NETCDF4'] + + @pytest.mark.xfail(sys.platform == 'win32', reason='https://github.com/pydata/xarray/issues/1738') @pytest.mark.parametrize('engine', ENGINES) -def test_dask_distributed_netcdf_integration_test(loop, engine): +@pytest.mark.parametrize('autoclose', [True, False]) +@pytest.mark.parametrize('nc_format', TEST_FORMATS) +def test_dask_distributed_netcdf_integration_test(loop, engine, autoclose, + nc_format): + + if nc_format not in NC_FORMATS[engine]: + pytest.skip("invalid format for engine") chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6} @@ -40,10 +53,12 @@ def test_dask_distributed_netcdf_integration_test(loop, engine): with Client(s['address'], loop=loop) as c: original = create_test_data().chunk(chunks) - original.to_netcdf(filename, engine=engine) + original.to_netcdf(filename, engine=engine, format=nc_format) - with xr.open_dataset(filename, chunks=chunks, - engine=engine) as restored: + with xr.open_dataset(filename, + chunks=chunks, + engine=engine, + autoclose=autoclose) as restored: assert isinstance(restored.var1.data, da.Array) computed = restored.compute() assert_allclose(original, computed) From a667615aeef41cddb182fa92452073467ca7c76e Mon Sep 17 00:00:00 2001 From: stickler-ci Date: Sun, 18 Feb 2018 23:15:20 +0000 Subject: [PATCH 14/27] Fixing style errors. --- xarray/backends/common.py | 1 + xarray/tests/test_distributed.py | 5 ++--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/xarray/backends/common.py b/xarray/backends/common.py index 81cd90e2a4d..9c7ee7107a5 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -116,6 +116,7 @@ class CombinedLock(object): Like a locked door, a CombinedLock is locked if any of its constituent locks are locked. """ + def __init__(self, locks): self.locks = locks diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index 8edc6ee0892..f38f9f197b4 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -29,12 +29,11 @@ NC_FORMATS = {'netcdf4': ['NETCDF3_CLASSIC', 'NETCDF3_64BIT_OFFSET', 'NETCDF3_64BIT_DATA', 'NETCDF4_CLASSIC', 'NETCDF4'], - 'scipy': ['NETCDF3_CLASSIC', 'NETCDF3_64BIT'], - 'h5netcdf': ['NETCDF4']} + 'scipy': ['NETCDF3_CLASSIC', 'NETCDF3_64BIT'], + 'h5netcdf': ['NETCDF4']} TEST_FORMATS = ['NETCDF3_CLASSIC', 'NETCDF4_CLASSIC', 'NETCDF4'] - @pytest.mark.xfail(sys.platform == 'win32', reason='https://github.com/pydata/xarray/issues/1738') @pytest.mark.parametrize('engine', ENGINES) From 2c0a7e8dd7bdf0dfd4485fe47841ed9e9f4e0207 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Sun, 18 Feb 2018 20:16:53 -0800 Subject: [PATCH 15/27] ds property on DataStorePickleMixin --- xarray/backends/common.py | 34 +++++++++++++++++++++++----------- xarray/backends/netCDF4_.py | 2 +- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/xarray/backends/common.py b/xarray/backends/common.py index 81cd90e2a4d..384eea90543 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -150,7 +150,7 @@ def __array__(self, dtype=None): class AbstractDataStore(Mapping): - _autoclose = False + _autoclose = None def __iter__(self): return iter(self.variables) @@ -300,6 +300,9 @@ def set_variable(self, k, v): # pragma: no cover raise NotImplementedError def sync(self): + if self._isopen and self._autoclose: + # datastore will be reopened during write + self.close() self.writer.sync() def store_dataset(self, dataset): @@ -434,8 +437,8 @@ class DataStorePickleMixin(object): def __getstate__(self): state = self.__dict__.copy() - del state['ds'] - state['_isopen'] = False + del state['_ds'] + del state['_isopen'] if self._mode == 'w': # file has already been created, don't override when restoring state['_mode'] = 'a' @@ -443,23 +446,32 @@ def __getstate__(self): def __setstate__(self, state): self.__dict__.update(state) - if self._autoclose: - with self.ensure_open(True): - self.ds = self._opener(mode=self._mode) - else: - self.ds = self._opener(mode=self._mode) + self._ds = None + self._isopen = False + + @property + def ds(self): + if self._ds is not None and self._isopen: + return self._ds + ds = self._opener(mode=self._mode) + self._isopen = True + return ds @contextlib.contextmanager - def ensure_open(self, autoclose): + def ensure_open(self, autoclose=None): """ Helper function to make sure datasets are closed and opened at appropriate times to avoid too many open file errors. Use requires `autoclose=True` argument to `open_mfdataset`. """ - if autoclose and not self._isopen: + + if autoclose is None: + autoclose = self._autoclose + + if not self._isopen: try: - self.ds = self._opener() + self._ds = self._opener() self._isopen = True yield finally: diff --git a/xarray/backends/netCDF4_.py b/xarray/backends/netCDF4_.py index d2ed0075f72..5d97f16aee3 100644 --- a/xarray/backends/netCDF4_.py +++ b/xarray/backends/netCDF4_.py @@ -243,7 +243,7 @@ def __init__(self, netcdf4_dataset, mode='r', writer=None, opener=None, _disable_auto_decode_group(netcdf4_dataset) - self.ds = netcdf4_dataset + self._ds = netcdf4_dataset self._autoclose = autoclose self._isopen = True self.format = self.ds.data_model From aba6bdc4c7fa377bd1597ef8aad511c292b99185 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Sun, 18 Feb 2018 20:27:12 -0800 Subject: [PATCH 16/27] stickler-ci --- xarray/tests/test_distributed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index f38f9f197b4..4dddeada4f3 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -122,7 +122,7 @@ def f(x, lock=None): CombinedLock([HDF5_LOCK]), CombinedLock([HDF5_LOCK, Lock('filename.nc')])]: - futures = c.map(f, range(10), lock=lock) + futures = c.map(f, list(range(10)), lock=lock) yield c.gather(futures) lock2 = pickle.loads(pickle.dumps(lock)) From 5702c67eabf6791da50c255067ac86161a6434b4 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Sun, 18 Feb 2018 23:00:28 -0800 Subject: [PATCH 17/27] compat fixes for other backends --- xarray/backends/common.py | 2 ++ xarray/backends/h5netcdf_.py | 9 +++++---- xarray/backends/scipy_.py | 16 ++++++++++------ xarray/backends/zarr.py | 5 +++++ xarray/tests/test_distributed.py | 3 +++ 5 files changed, 25 insertions(+), 10 deletions(-) diff --git a/xarray/backends/common.py b/xarray/backends/common.py index 7724c351e33..58c2c997200 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -152,6 +152,8 @@ def __array__(self, dtype=None): class AbstractDataStore(Mapping): _autoclose = None + _ds = None + _isopen = False def __iter__(self): return iter(self.variables) diff --git a/xarray/backends/h5netcdf_.py b/xarray/backends/h5netcdf_.py index cc451bb62da..a5b9abe9dc1 100644 --- a/xarray/backends/h5netcdf_.py +++ b/xarray/backends/h5netcdf_.py @@ -10,7 +10,8 @@ from ..core.utils import FrozenOrderedDict, close_on_error from ..core.pycompat import iteritems, bytes_type, unicode_type, OrderedDict -from .common import WritableCFDataStore, DataStorePickleMixin, find_root +from .common import (WritableCFDataStore, DataStorePickleMixin, find_root, + HDF5_LOCK) from .netCDF4_ import (_nc4_group, _encode_nc4_variable, _get_datatype, _extract_nc4_variable_encoding, BaseNetCDF4Array) @@ -64,12 +65,12 @@ class H5NetCDFStore(WritableCFDataStore, DataStorePickleMixin): """ def __init__(self, filename, mode='r', format=None, group=None, - writer=None, autoclose=False): + writer=None, autoclose=False, lock=HDF5_LOCK): if format not in [None, 'NETCDF4']: raise ValueError('invalid format for h5netcdf backend') opener = functools.partial(_open_h5netcdf_group, filename, mode=mode, group=group) - self.ds = opener() + self._ds = opener() if autoclose: raise NotImplementedError('autoclose=True is not implemented ' 'for the h5netcdf backend pending ' @@ -81,7 +82,7 @@ def __init__(self, filename, mode='r', format=None, group=None, self._opener = opener self._filename = filename self._mode = mode - super(H5NetCDFStore, self).__init__(writer) + super(H5NetCDFStore, self).__init__(writer, lock=lock) def open_store_variable(self, name, var): with self.ensure_open(autoclose=False): diff --git a/xarray/backends/scipy_.py b/xarray/backends/scipy_.py index a63f6bedf8b..167223c2fb7 100644 --- a/xarray/backends/scipy_.py +++ b/xarray/backends/scipy_.py @@ -3,9 +3,11 @@ from __future__ import print_function import functools from io import BytesIO +from distutils.version import LooseVersion +import warnings import numpy as np -import warnings + from .. import Variable from ..core.pycompat import iteritems, OrderedDict, basestring @@ -117,11 +119,12 @@ class ScipyDataStore(WritableCFDataStore, DataStorePickleMixin): """ def __init__(self, filename_or_obj, mode='r', format=None, group=None, - writer=None, mmap=None, autoclose=False): + writer=None, mmap=None, autoclose=False, lock=None): import scipy import scipy.io - if mode != 'r' and scipy.__version__ < '0.13': # pragma: no cover + if (mode != 'r' and + scipy.__version__ < LooseVersion('0.13')): # pragma: no cover warnings.warn('scipy %s detected; ' 'the minimal recommended version is 0.13. ' 'Older version of this library do not reliably ' @@ -143,13 +146,13 @@ def __init__(self, filename_or_obj, mode='r', format=None, group=None, opener = functools.partial(_open_scipy_netcdf, filename=filename_or_obj, mode=mode, mmap=mmap, version=version) - self.ds = opener() + self._ds = opener() self._autoclose = autoclose self._isopen = True self._opener = opener self._mode = mode - super(ScipyDataStore, self).__init__(writer) + super(ScipyDataStore, self).__init__(writer, lock=lock) def open_store_variable(self, name, var): with self.ensure_open(autoclose=False): @@ -238,4 +241,5 @@ def __setstate__(self, state): # seek to the start of the file so scipy can read it filename.seek(0) super(ScipyDataStore, self).__setstate__(state) - self._isopen = True + self._ds = None + self._isopen = False diff --git a/xarray/backends/zarr.py b/xarray/backends/zarr.py index 2737d9fb213..f4be07e007f 100644 --- a/xarray/backends/zarr.py +++ b/xarray/backends/zarr.py @@ -359,6 +359,8 @@ def prepare_variable(self, name, variable, check_encoding=False, fill_value = _ensure_valid_fill_value(attrs.pop('_FillValue', None), dtype) + if variable.encoding == {'_FillValue': None} and fill_value is None: + variable.encoding = {} encoding = _extract_zarr_variable_encoding( variable, raise_on_invalid=check_encoding) @@ -379,6 +381,9 @@ def store(self, variables, attributes, *args, **kwargs): AbstractWritableDataStore.store(self, variables, attributes, *args, **kwargs) + def sync(self): + self.writer.sync() + def open_zarr(store, group=None, synchronizer=None, auto_chunk=True, decode_cf=True, mask_and_scale=True, decode_times=True, diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index 4dddeada4f3..f32be6a16c0 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -45,6 +45,9 @@ def test_dask_distributed_netcdf_integration_test(loop, engine, autoclose, if nc_format not in NC_FORMATS[engine]: pytest.skip("invalid format for engine") + if engine == 'h5netcdf': + pytest.xfail("h5netcdf does not support autoclose") + chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6} with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename: From a06b683608ce2c2cd44388b1f8c640156f537382 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Tue, 20 Feb 2018 22:54:50 -0800 Subject: [PATCH 18/27] HDF5_USE_FILE_LOCKING = False in test_distributed --- xarray/tests/test_distributed.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index f32be6a16c0..61016031a4c 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -1,3 +1,4 @@ +import os import sys import pickle @@ -34,6 +35,8 @@ TEST_FORMATS = ['NETCDF3_CLASSIC', 'NETCDF4_CLASSIC', 'NETCDF4'] +os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE" + @pytest.mark.xfail(sys.platform == 'win32', reason='https://github.com/pydata/xarray/issues/1738') @pytest.mark.parametrize('engine', ENGINES) From 6ef31aae565675320ab4d23b766575f19fb577e6 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Tue, 20 Feb 2018 23:03:22 -0800 Subject: [PATCH 19/27] style fix --- xarray/tests/test_distributed.py | 1 + 1 file changed, 1 insertion(+) diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index 61016031a4c..895064489c4 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -1,3 +1,4 @@ +from __future__ import absolute_import, division, print_function import os import sys import pickle From 00156c3652c6b46a8977f8c6e73ff954d7c0da84 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Wed, 21 Feb 2018 20:58:51 -0800 Subject: [PATCH 20/27] update tests to only expect netcdf4 to work, docstrings, and some cleanup in to_netcdf --- xarray/backends/api.py | 45 ++++++++++++++++++++----- xarray/backends/common.py | 16 +++++++++ xarray/tests/test_distributed.py | 58 +++++++++++++++++++++++++++----- 3 files changed, 103 insertions(+), 16 deletions(-) diff --git a/xarray/backends/api.py b/xarray/backends/api.py index ac4385a3104..6d44e0abe56 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -10,7 +10,8 @@ import numpy as np from .. import backends, conventions, Dataset -from .common import ArrayWriter, HDF5_LOCK, CombinedLock +from .common import (ArrayWriter, get_scheduler, get_scheduler_lock, + HDF5_LOCK, CombinedLock) from ..core import indexing from ..core.combine import auto_combine from ..core.utils import close_on_error, is_remote_uri @@ -131,6 +132,32 @@ def _protect_dataset_variables_inplace(dataset, cache): variable.data = data +def _get_lock(engine, scheduler, format, path_or_file): + """ Get the lock(s) that apply to a particular scheduler/engine/format""" + + locks = [] + SchedulerLock = get_scheduler_lock(scheduler) + if format in ['NETCDF4', None] and engine in ['h5netcdf', 'netcdf4']: + locks.append(HDF5_LOCK) + + try: + # per file lock + # Dask locks take a name argument (e.g. filename) + locks.append(SchedulerLock(path_or_file)) + except TypeError: + # threading/multiprocessing lock + locks.append(SchedulerLock()) + + # When we have more than one lock, use the CombinedLock wrapper class + lock = CombinedLock(locks) if len(locks) > 1 else locks[0] + + # Question: Should we be dropping one of these two locks when they are they + # are basically the same. For instance, when using netcdf4 and dask is not + # installed, locks will be [threading.Lock(), threading.Lock()] + + return lock + + def open_dataset(filename_or_obj, group=None, decode_cf=True, mask_and_scale=True, decode_times=True, autoclose=False, concat_characters=True, decode_coords=True, engine=None, @@ -622,14 +649,16 @@ def to_netcdf(dataset, path_or_file=None, mode='w', format=None, group=None, # if a writer is provided, store asynchronously sync = writer is None - # TODO Move this logic outside of this function - from .common import get_scheduler, get_scheduler_lock + # handle scheduler specific logic scheduler = get_scheduler() - # I think we want to include the filename here to support concurrent writes - # using save_mfdataset - file_lock = get_scheduler_lock(scheduler)(path_or_file) - lock = CombinedLock([HDF5_LOCK, file_lock]) - autoclose = scheduler == 'distributed' + if (dataset.chunks and scheduler in ['distributed', 'multiprocessing'] and + engine != 'netcdf4'): + raise NotImplementedError("Writing netCDF files with the %s backend " + "is not currently supported with dask's %s " + "scheduler" % (engine, scheduler)) + lock = _get_lock(engine, scheduler, format, path_or_file) + autoclose = (dataset.chunks and + scheduler in ['distributed', 'multiprocessing']) target = path_or_file if path_or_file is not None else BytesIO() store = store_open(target, mode, format, group, writer, diff --git a/xarray/backends/common.py b/xarray/backends/common.py index 58c2c997200..1f21c645fb5 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -31,6 +31,14 @@ def get_scheduler(get=None, collection=None): + """ Determine the dask scheduler that is being used. + + None is returned if not dask scheduler is active. + + See also + -------- + dask.utils.effective_get + """ try: from dask.utils import effective_get actual_get = effective_get(get, collection) @@ -52,6 +60,14 @@ def get_scheduler(get=None, collection=None): def get_scheduler_lock(scheduler): + """ Get the appropriate lock for a certain situation based onthe dask + scheduler used. + + See Also + -------- + dask.utils.get_scheduler_lock + """ + if scheduler == 'distributed': from dask.distributed import Lock return Lock diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index 895064489c4..bc043fc18d2 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -2,6 +2,7 @@ import os import sys import pickle +import tempfile import pytest import xarray as xr @@ -19,7 +20,7 @@ from xarray.backends.common import HDF5_LOCK, CombinedLock from . import (assert_allclose, has_scipy, has_netCDF4, has_h5netcdf, - requires_zarr) + requires_zarr, raises_regex) ENGINES = [] if has_scipy: @@ -36,21 +37,47 @@ TEST_FORMATS = ['NETCDF3_CLASSIC', 'NETCDF4_CLASSIC', 'NETCDF4'] +# Does this belong elsewhere? os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE" +@pytest.mark.xfail(sys.platform == 'win32', + reason='https://github.com/pydata/xarray/issues/1738') +@pytest.mark.parametrize('engine', ['netcdf4']) +@pytest.mark.parametrize('autoclose', [True, False]) +@pytest.mark.parametrize('nc_format', TEST_FORMATS) +def test_dask_distributed_netcdf_roundtrip(loop, engine, autoclose, nc_format): + + chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6} + + with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename: + with cluster() as (s, [a, b]): + with Client(s['address'], loop=loop) as c: + + original = create_test_data().chunk(chunks) + original.to_netcdf(filename, engine=engine, format=nc_format) + + with xr.open_dataset(filename, + chunks=chunks, + engine=engine, + autoclose=autoclose) as restored: + assert isinstance(restored.var1.data, da.Array) + computed = restored.compute() + assert_allclose(original, computed) + + @pytest.mark.xfail(sys.platform == 'win32', reason='https://github.com/pydata/xarray/issues/1738') @pytest.mark.parametrize('engine', ENGINES) @pytest.mark.parametrize('autoclose', [True, False]) @pytest.mark.parametrize('nc_format', TEST_FORMATS) -def test_dask_distributed_netcdf_integration_test(loop, engine, autoclose, - nc_format): +def test_dask_distributed_read_netcdf_integration_test(loop, engine, autoclose, + nc_format): - if nc_format not in NC_FORMATS[engine]: - pytest.skip("invalid format for engine") + if engine == 'h5netcdf' and autoclose: + pytest.skip('h5netcdf does not support autoclose') - if engine == 'h5netcdf': - pytest.xfail("h5netcdf does not support autoclose") + if nc_format not in NC_FORMATS[engine]: + pytest.skip('invalid format for engine') chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6} @@ -58,7 +85,7 @@ def test_dask_distributed_netcdf_integration_test(loop, engine, autoclose, with cluster() as (s, [a, b]): with Client(s['address'], loop=loop) as c: - original = create_test_data().chunk(chunks) + original = create_test_data() original.to_netcdf(filename, engine=engine, format=nc_format) with xr.open_dataset(filename, @@ -69,6 +96,21 @@ def test_dask_distributed_netcdf_integration_test(loop, engine, autoclose, computed = restored.compute() assert_allclose(original, computed) + +@pytest.mark.parametrize('engine', ['h5netcdf', 'scipy']) +def test_dask_distributed_netcdf_integration_test_not_implemented(loop, engine): + chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6} + + with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename: + with cluster() as (s, [a, b]): + with Client(s['address'], loop=loop) as c: + + original = create_test_data().chunk(chunks) + + with raises_regex(NotImplementedError, 'distributed'): + original.to_netcdf(filename, engine=engine) + + @requires_zarr def test_dask_distributed_zarr_integration_test(loop): chunks = {'dim1': 4, 'dim2': 3, 'dim3': 5} From 3dcfac54632042a5855f2e4ff7d1420cdcc80616 Mon Sep 17 00:00:00 2001 From: stickler-ci Date: Thu, 22 Feb 2018 04:59:13 +0000 Subject: [PATCH 21/27] Fixing style errors. --- xarray/tests/test_distributed.py | 1 + 1 file changed, 1 insertion(+) diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index bc043fc18d2..3b7ac4c464e 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -40,6 +40,7 @@ # Does this belong elsewhere? os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE" + @pytest.mark.xfail(sys.platform == 'win32', reason='https://github.com/pydata/xarray/issues/1738') @pytest.mark.parametrize('engine', ['netcdf4']) From 91f3c6a1bd88873ecbc1b536c13d0d14cfe1a1fa Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Tue, 27 Feb 2018 14:42:00 -0800 Subject: [PATCH 22/27] fix imports after merge --- xarray/backends/api.py | 3 +-- xarray/backends/h5netcdf_.py | 10 +++++----- xarray/backends/netCDF4_.py | 5 +++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/xarray/backends/api.py b/xarray/backends/api.py index 700f8cd9f5d..ed2f81625c8 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -13,8 +13,7 @@ from ..core.pycompat import basestring, path_type from ..core.utils import close_on_error, is_remote_uri from .common import ( - GLOBAL_LOCK, HDF5_LOCK, ArrayWriter, CombinedLock, get_scheduler, - get_scheduler_lock) + HDF5_LOCK, ArrayWriter, CombinedLock, get_scheduler, get_scheduler_lock) DATAARRAY_NAME = '__xarray_dataarray_name__' DATAARRAY_VARIABLE = '__xarray_dataarray_variable__' diff --git a/xarray/backends/h5netcdf_.py b/xarray/backends/h5netcdf_.py index 1410693c084..d14e0ab1a54 100644 --- a/xarray/backends/h5netcdf_.py +++ b/xarray/backends/h5netcdf_.py @@ -8,11 +8,11 @@ from ..core import indexing from ..core.pycompat import OrderedDict, bytes_type, iteritems, unicode_type from ..core.utils import FrozenOrderedDict, close_on_error -from .common import (HDF5_LOCK, DataStorePickleMixin, WritableCFDataStore, - find_root) -from .netCDF4_ import (BaseNetCDF4Array, _encode_nc4_variable, - _extract_nc4_variable_encoding, _get_datatype, - _nc4_group) +from .common import ( + HDF5_LOCK, DataStorePickleMixin, WritableCFDataStore, find_root) +from .netCDF4_ import ( + BaseNetCDF4Array, _encode_nc4_variable, _extract_nc4_variable_encoding, + _get_datatype, _nc4_group) class H5NetCDFArrayWrapper(BaseNetCDF4Array): diff --git a/xarray/backends/netCDF4_.py b/xarray/backends/netCDF4_.py index 2dcd8518024..4f1bd82fc1e 100644 --- a/xarray/backends/netCDF4_.py +++ b/xarray/backends/netCDF4_.py @@ -12,8 +12,9 @@ from ..core import indexing from ..core.pycompat import PY3, OrderedDict, basestring, iteritems, suppress from ..core.utils import FrozenOrderedDict, close_on_error, is_remote_uri -from .common import (HDF5_LOCK, BackendArray, DataStorePickleMixin, - WritableCFDataStore, find_root, robust_getitem) +from .common import ( + HDF5_LOCK, BackendArray, DataStorePickleMixin, WritableCFDataStore, + find_root, robust_getitem) from .netcdf3 import encode_nc3_attr_value, encode_nc3_variable # This lookup table maps from dtype.byteorder to a readable endian From 5cb91bab6103c149108b2abcfff3b91de94f7787 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Tue, 27 Feb 2018 15:06:07 -0800 Subject: [PATCH 23/27] fix more import bugs --- xarray/core/combine.py | 2 +- xarray/tests/test_distributed.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/xarray/core/combine.py b/xarray/core/combine.py index 149009689e9..8c1c58e9a40 100644 --- a/xarray/core/combine.py +++ b/xarray/core/combine.py @@ -8,8 +8,8 @@ from .alignment import align from .merge import merge from .pycompat import OrderedDict, basestring, iteritems -from .variable import IndexVariable, Variable, as_variable from .variable import concat as concat_vars +from .variable import IndexVariable, Variable, as_variable def concat(objs, dim=None, data_vars='all', coords='different', diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index a107962d66b..33c9e8e615d 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -11,6 +11,7 @@ distributed = pytest.importorskip('distributed') # isort:skip from dask import array +from dask.distributed import Client, Lock from distributed.utils_test import cluster, gen_cluster from distributed.utils_test import loop # flake8: noqa from distributed.client import futures_of @@ -21,7 +22,8 @@ from xarray.backends.common import HDF5_LOCK, CombinedLock from . import ( - assert_allclose, has_h5netcdf, has_netCDF4, has_scipy, requires_zarr) + assert_allclose, has_h5netcdf, has_netCDF4, has_scipy, requires_zarr, + raises_regex) # this is to stop isort throwing errors. May have been easier to just use # `isort:skip` in retrospect From 2b97d4f6695e62b33ac00a9e7bc2b688cb6a5ad8 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Tue, 27 Feb 2018 15:49:39 -0800 Subject: [PATCH 24/27] update docs --- doc/dask.rst | 8 ++++++++ doc/io.rst | 6 +++--- doc/whats-new.rst | 9 +++++++-- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/doc/dask.rst b/doc/dask.rst index 65ebd643e1e..dd1dd1fafca 100644 --- a/doc/dask.rst +++ b/doc/dask.rst @@ -100,6 +100,14 @@ Once you've manipulated a dask array, you can still write a dataset too big to fit into memory back to disk by using :py:meth:`~xarray.Dataset.to_netcdf` in the usual way. +.. note:: + + When using dask's distributed scheduler to write NETCDF4 files, + it may be necessary to set the environment variable `HDF5_USE_FILE_LOCKING=FALSE` + to avoid competing locks within the HDF5 SWMR file locking scheme. Note that + writing netCDF files with dask's distributed scheduler is only supported for + the `netcdf4` backend. + A dataset can also be converted to a dask DataFrame using :py:meth:`~xarray.Dataset.to_dask_dataframe`. .. ipython:: python diff --git a/doc/io.rst b/doc/io.rst index c177496f6f2..c14e1516b38 100644 --- a/doc/io.rst +++ b/doc/io.rst @@ -672,9 +672,9 @@ files into a single Dataset by making use of :py:func:`~xarray.concat`. .. note:: - Version 0.5 includes support for manipulating datasets that - don't fit into memory with dask_. If you have dask installed, you can open - multiple files simultaneously using :py:func:`~xarray.open_mfdataset`:: + Xarray includes support for manipulating datasets that don't fit into memory + with dask_. If you have dask installed, you can open multiple files + simultaneously using :py:func:`~xarray.open_mfdataset`:: xr.open_mfdataset('my/files/*.nc') diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 5d83d5ccbbe..52a78f134c1 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -38,6 +38,13 @@ Documentation Enhancements ~~~~~~~~~~~~ +- Support for writing netCDF files from xarray datastores (scipy and netcdf4 only) + when using the `dask.distributed `_ scheduler + (:issue:`1464`). + By `Joe Hamman `_. + + +- Fixed to_netcdf when using dask distributed Bug fixes ~~~~~~~~~ @@ -176,8 +183,6 @@ Bug fixes - Compatibility fixes to plotting module for Numpy 1.14 and Pandas 0.22 (:issue:`1813`). By `Joe Hamman `_. -- Fixed to_netcdf when using dask distributed (:issue:`1464`). - By `Joe Hamman `_.. - Bug fix in encoding coordinates with ``{'_FillValue': None}`` in netCDF metadata (:issue:`1865`). By `Chris Roth `_. From 2dc514f91400148f3fb073e64e1c63cb2004441d Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Tue, 27 Feb 2018 16:05:18 -0800 Subject: [PATCH 25/27] fix for pynio --- xarray/backends/pynio_.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/xarray/backends/pynio_.py b/xarray/backends/pynio_.py index 30969fcd9a0..8184926c1be 100644 --- a/xarray/backends/pynio_.py +++ b/xarray/backends/pynio_.py @@ -41,14 +41,14 @@ class NioDataStore(AbstractDataStore, DataStorePickleMixin): def __init__(self, filename, mode='r', autoclose=False): import Nio opener = functools.partial(Nio.open_file, filename, mode=mode) - self.ds = opener() - # xarray provides its own support for FillValue, - # so turn off PyNIO's support for the same. - self.ds.set_option('MaskedArrayMode', 'MaskedNever') + self._ds = opener() self._autoclose = autoclose self._isopen = True self._opener = opener self._mode = mode + # xarray provides its own support for FillValue, + # so turn off PyNIO's support for the same. + self.ds.set_option('MaskedArrayMode', 'MaskedNever') def open_store_variable(self, name, var): data = indexing.LazilyIndexedArray(NioArrayWrapper(name, self)) From eff01611feac5fbfe5f5e28e71894b0c5cfc2b19 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Wed, 28 Feb 2018 10:19:15 -0800 Subject: [PATCH 26/27] cleanup locks and use pytest monkeypatch for environment variable --- doc/whats-new.rst | 6 +++--- xarray/backends/api.py | 14 +------------- xarray/backends/common.py | 14 +++++++------- xarray/tests/test_distributed.py | 9 ++++----- 4 files changed, 15 insertions(+), 28 deletions(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 52a78f134c1..bda6ea08f2b 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -38,9 +38,9 @@ Documentation Enhancements ~~~~~~~~~~~~ -- Support for writing netCDF files from xarray datastores (scipy and netcdf4 only) - when using the `dask.distributed `_ scheduler - (:issue:`1464`). +- Support for writing xarray datasets to netCDF files (netcdf4 backend only) + when using the `dask.distributed `_ + scheduler (:issue:`1464`). By `Joe Hamman `_. diff --git a/xarray/backends/api.py b/xarray/backends/api.py index ed2f81625c8..a22356f66b0 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -134,25 +134,13 @@ def _get_lock(engine, scheduler, format, path_or_file): """ Get the lock(s) that apply to a particular scheduler/engine/format""" locks = [] - SchedulerLock = get_scheduler_lock(scheduler) if format in ['NETCDF4', None] and engine in ['h5netcdf', 'netcdf4']: locks.append(HDF5_LOCK) - - try: - # per file lock - # Dask locks take a name argument (e.g. filename) - locks.append(SchedulerLock(path_or_file)) - except TypeError: - # threading/multiprocessing lock - locks.append(SchedulerLock()) + locks.append(get_scheduler_lock(scheduler, path_or_file)) # When we have more than one lock, use the CombinedLock wrapper class lock = CombinedLock(locks) if len(locks) > 1 else locks[0] - # Question: Should we be dropping one of these two locks when they are they - # are basically the same. For instance, when using netcdf4 and dask is not - # installed, locks will be [threading.Lock(), threading.Lock()] - return lock diff --git a/xarray/backends/common.py b/xarray/backends/common.py index 1c8161576ea..df125d576c3 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -59,7 +59,7 @@ def get_scheduler(get=None, collection=None): return None -def get_scheduler_lock(scheduler): +def get_scheduler_lock(scheduler, path_or_file=None): """ Get the appropriate lock for a certain situation based onthe dask scheduler used. @@ -70,14 +70,14 @@ def get_scheduler_lock(scheduler): if scheduler == 'distributed': from dask.distributed import Lock - return Lock + return Lock(path_or_file) elif scheduler == 'multiprocessing': - return multiprocessing.Lock + return multiprocessing.Lock() elif scheduler == 'threaded': from dask.utils import SerializableLock - return SerializableLock + return SerializableLock() else: - return threading.Lock + return threading.Lock() def _encode_variable_name(name): @@ -134,7 +134,7 @@ class CombinedLock(object): """ def __init__(self, locks): - self.locks = locks + self.locks = set(locks) def acquire(self, *args): return all(lock.acquire(*args) for lock in self.locks) @@ -156,7 +156,7 @@ def locked(self): return any(lock.locked for lock in self.locks) def __repr__(self): - return "CombinedLock(%s)" % [repr(lock) for lock in self.locks] + return "CombinedLock(%r)" % list(self.locks) class BackendArray(NdimSizeLenMixin, indexing.ExplicitlyIndexed): diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index 33c9e8e615d..0ac03327494 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -47,16 +47,15 @@ TEST_FORMATS = ['NETCDF3_CLASSIC', 'NETCDF4_CLASSIC', 'NETCDF4'] -# Does this belong elsewhere? -os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE" - - @pytest.mark.xfail(sys.platform == 'win32', reason='https://github.com/pydata/xarray/issues/1738') @pytest.mark.parametrize('engine', ['netcdf4']) @pytest.mark.parametrize('autoclose', [True, False]) @pytest.mark.parametrize('nc_format', TEST_FORMATS) -def test_dask_distributed_netcdf_roundtrip(loop, engine, autoclose, nc_format): +def test_dask_distributed_netcdf_roundtrip(monkeypatch, loop, + engine, autoclose, nc_format): + + monkeypatch.setenv('HDF5_USE_FILE_LOCKING', 'FALSE') chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6} From c8552847608e691cdfea117d65411cba91b8ca1c Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Wed, 7 Mar 2018 16:30:46 -0800 Subject: [PATCH 27/27] fix failing test using combined lock --- xarray/backends/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xarray/backends/common.py b/xarray/backends/common.py index df125d576c3..c46f9d5b552 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -134,7 +134,7 @@ class CombinedLock(object): """ def __init__(self, locks): - self.locks = set(locks) + self.locks = tuple(set(locks)) # remove duplicates def acquire(self, *args): return all(lock.acquire(*args) for lock in self.locks)