diff --git a/cubed_xarray/__init__.py b/cubed_xarray/__init__.py index 01918bd..58b51f5 100644 --- a/cubed_xarray/__init__.py +++ b/cubed_xarray/__init__.py @@ -1,6 +1,5 @@ from importlib.metadata import version - try: __version__ = version("cubed-xarray") except Exception: diff --git a/cubed_xarray/cubedmanager.py b/cubed_xarray/cubedmanager.py index 099ce89..52ca284 100644 --- a/cubed_xarray/cubedmanager.py +++ b/cubed_xarray/cubedmanager.py @@ -1,18 +1,15 @@ from __future__ import annotations from collections.abc import Sequence -from typing import TYPE_CHECKING, Any, Callable, Union +from typing import TYPE_CHECKING, Any, Callable, Iterable, Union import numpy as np - from tlz import partition - from xarray.namedarray.parallelcompat import ChunkManagerEntrypoint - if TYPE_CHECKING: - from xarray.core.types import T_Chunks, T_NormalizedChunks from cubed import Array as CubedArray + from xarray.core.types import T_Chunks, T_NormalizedChunks class CubedManager(ChunkManagerEntrypoint["CubedArray"]): @@ -204,6 +201,27 @@ def store( """Used when writing to any backend.""" from cubed.core.ops import store + compute = kwargs.pop("compute", True) + if not compute: + raise NotImplementedError("Delayed compute is not supported.") + + lock = kwargs.pop("lock", None) + if lock: + raise NotImplementedError("Locking is not supported.") + + regions = kwargs.pop("regions", None) + if regions: + # regions is either a tuple of slices or a collection of tuples of slices + if isinstance(regions, tuple): + regions = [regions] + for t in regions: + if not all(r == slice(None) for r in t): + raise NotImplementedError( + "Only whole slices are supported for regions." + ) + + kwargs.pop("flush", None) # not used + return store( sources, targets, diff --git a/cubed_xarray/tests/test_wrapping.py b/cubed_xarray/tests/test_wrapping.py index d1b2a0d..3124e18 100644 --- a/cubed_xarray/tests/test_wrapping.py +++ b/cubed_xarray/tests/test_wrapping.py @@ -1,22 +1,63 @@ +import sys + +import cubed +import pytest import xarray as xr +from cubed.runtime.create import create_executor from xarray.namedarray.parallelcompat import list_chunkmanagers -import cubed +from xarray.tests import assert_allclose, create_test_data from cubed_xarray.cubedmanager import CubedManager +EXECUTORS = [create_executor("single-threaded")] + +if sys.version_info >= (3, 11): + EXECUTORS.append(create_executor("processes")) + + +@pytest.fixture( + scope="module", + params=EXECUTORS, + ids=[executor.name for executor in EXECUTORS], +) +def executor(request): + return request.param + class TestDiscoverCubedManager: def test_list_cubedmanager(self): chunkmanagers = list_chunkmanagers() - assert 'cubed' in chunkmanagers - assert isinstance(chunkmanagers['cubed'], CubedManager) + assert "cubed" in chunkmanagers + assert isinstance(chunkmanagers["cubed"], CubedManager) def test_chunk(self): - da = xr.DataArray([1, 2], dims='x') - chunked = da.chunk(x=1, chunked_array_type='cubed') + da = xr.DataArray([1, 2], dims="x") + chunked = da.chunk(x=1, chunked_array_type="cubed") assert isinstance(chunked.data, cubed.Array) - assert chunked.chunksizes == {'x': (1, 1)} + assert chunked.chunksizes == {"x": (1, 1)} # TODO test cubed is default when dask not installed # TODO test dask is default over cubed when both installed + + +def test_to_zarr(tmpdir, executor): + spec = cubed.Spec(allowed_mem="200MB", executor=executor) + + original = create_test_data().chunk( + chunked_array_type="cubed", from_array_kwargs={"spec": spec} + ) + + filename = tmpdir / "out.zarr" + original.to_zarr(filename) + + with xr.open_dataset( + filename, + chunks="auto", + engine="zarr", + chunked_array_type="cubed", + from_array_kwargs={"spec": spec}, + ) as restored: + assert isinstance(restored.var1.data, cubed.Array) + computed = restored.compute() + assert_allclose(original, computed)