Skip to content

zarr and xarray chunking compatibility and to_zarr performance #2300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
chrisbarber opened this issue Jul 18, 2018 · 15 comments · Fixed by #5065
Closed

zarr and xarray chunking compatibility and to_zarr performance #2300

chrisbarber opened this issue Jul 18, 2018 · 15 comments · Fixed by #5065
Labels
topic-zarr Related to zarr storage library

Comments

@chrisbarber
Copy link

I have a situation where I build large zarr arrays based on chunks which correspond to how I am reading data off a filesystem, for best I/O performance. Then I set these as variables on an xarray dataset which I want to persist to zarr, but with different chunks more optimal for querying.

One problem I ran into is that manually selecting chunks of a dataset prior to to_zarr results in

"Zarr requires uniform chunk sizes excpet for final chunk."

It's difficult for me to understand exactly how to select chunks manually at the dataset level which would also make this zarr "final chunk" constraint happy. I would have been satisfied however with letting zarr choose chunks for me, but could not find a way to trigger this through the xarray API short of "unchunking" it first, which would lead to loading entire variables into memory. I came up with the following hack to trigger zarr's automatic chunking despite having differently defined chunks on my xarray dataset:

    # monkey patch to get zarr to ignore dask chunks and use its own heuristics
    def copy_func(f):
        g = types.FunctionType(f.__code__, f.__globals__, name=f.__name__,
                               argdefs=f.__defaults__,
                               closure=f.__closure__)
        g = functools.update_wrapper(g, f)
        g.__kwdefaults__ = f.__kwdefaults__
        return g
    orig_determine_zarr_chunks = copy_func(xr.backends.zarr._determine_zarr_chunks)
    xr.backends.zarr._determine_zarr_chunks = lambda enc_chunks, var_chunks, ndim: orig_determine_zarr_chunks(enc_chunks, None, ndim)

The next problem to contend with is that da.store between zarr stores with differing chunks between source and destination is astronomically slow. The first thing to attempt would be to rechunk the dask arrays according to the destination zarr chunks, but xarray's consistent chunks constraint blocks this strategy as far as I can tell. Once again I took the dirty hack approach and inject a rechunking on a per-variable basis during the to_zarr operation, as follows:

    # monkey patch to make dask arrays writable with different chunks than zarr dest
    # could do without this but would have to contend with 'inconsistent chunks' on dataset
    def sync_using_zarr_copy(self, compute=True):
        if self.sources:
            import dask.array as da
            rechunked_sources = [source.rechunk(target.chunks)
                for source, target in zip(self.sources, self.targets)]
            delayed_store = da.store(rechunked_sources, self.targets,
                                     lock=self.lock, compute=compute,
                                     flush=True)
            self.sources = []
            self.targets = []
            return delayed_store
    xr.backends.common.ArrayWriter.sync = sync_using_zarr_copy

I may have missed something in the API that would have made this easier, or another workaround which would be less hacky, but in any case I'm wondering if this scenario could be handled elegantly in xarray.

I'm not sure if there is a plan going forward to make legal xarray chunks 100% compatible with zarr; if so that would go a fair ways in alleviating the first problem. Alternatively, perhaps the xarray API could expose some ability to adjust chunks according to zarr's liking, as well as the option of defaulting entirely to zarr's heuristics for chunking.

As for the performance issue with differing chunks, I'm not sure whether my rechunking patch could be applied without causing side-effects. Or where the right place to solve this would be-- perhaps it could be more naturally addressed within da.store.

@shoyer
Copy link
Member

shoyer commented Jul 19, 2018

I just pushed a new xarray release (0.10.8) earlier today. We had a fix for zarr chunking in there (#2228) -- does that solve your issue?

@chrisbarber
Copy link
Author

chrisbarber commented Jul 20, 2018

Ah, that's great. I do see some improvement. Specifically, I can now set chunks using xarray, and successfully write to zarr, and reopen it. However, when reopening it I do find that the chunks have been inconsistently applied (some fields have the expected chunksize whereas some small fields have the entire variable in one chunk). Furthermore, trying to write a second time with to_zarr leads to:
*** NotImplementedError: Specified zarr chunks (100,) would overlap multiple dask chunks ((100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 4),). This is not implemented in xarray yet. Consider rechunking the data usingchunk()or specifying different chunks in encoding.
Trying to reapply the original chunks with xr.Dataset.chunk succeeds, and ds.chunks no longer reports "inconsistent chunks", but trying to write still produces the same error.

I also tried loading my entire dataset into memory, allowing the initial to_zarr to default to zarr's chunking heuristics. Trying to read and write a second time again results in the same error:
NotImplementedError: Specified zarr chunks (63170,) would overlap multiple dask chunks ((63170, 63170, 63170, 63170, 63170, 63170, 63170, 63169),). This is not implemented in xarray yet. Consider rechunking the data usingchunk()or specifying different chunks in encoding.
I tried this round-tripping experiment with my monkey patches, and it works for a sequence of read/write/read/write... without any intervention in between. This only works for default zarr-chunking, however, since the patch to xr.backends.zarr._determine_zarr_chunks overrides whatever chunks are on the originating dataset.

Curious: Is there any downside in xarray to using datasets with inconsistent chunks? I take it that it is a supported configuration because xarray allows it to happen, but just outputs that error when calling ds.chunks, which is just a sort of convenience method for looking at chunks across a whole dataset which happens to have consistent chunks...?

One other thing to add: it might be nice to have an option to allow zarr auto-chunking even when chunks!={}. I don't know how sensitive zarr performance is to chunksizes, but it'd be nice to have some form of sane auto-chunking available when you don't want to bother with manually choosing.

@shoyer
Copy link
Member

shoyer commented Jul 20, 2018

Curious: Is there any downside in xarray to using datasets with inconsistent chunks?

No, there's no downside here. It's just not possible to define a single dict of chunks in this case.

Can you look into the encoding attributes of any variables you load from disk?

It would also help to come up with a self-contained example that reproduces this using dummy data.

@chrisbarber
Copy link
Author

I took a closer look and noticed my one-dimensional fields of size 505359 were reporting a chunksize or 63170. Turns out that's enough to come up with a minimal repro:

>>> xr.__version__
'0.10.8'
>>> ds=xr.Dataset({'foo': (['bar'], np.zeros((505359,)))})
>>> ds.to_zarr('test.zarr')
<xarray.backends.zarr.ZarrStore object at 0x7fd9680f7fd0>
>>> ds2=xr.open_zarr('test.zarr')
>>> ds2
<xarray.Dataset>
Dimensions:  (bar: 505359)
Dimensions without coordinates: bar
Data variables:
    foo      (bar) float64 dask.array<shape=(505359,), chunksize=(63170,)>
>>> ds2.foo.encoding
{'chunks': (63170,), 'compressor': Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0), 'filters':
 None, '_FillValue': nan, 'dtype': dtype('float64')}
>>> ds2.to_zarr('test2.zarr')

raises

NotImplementedError: Specified zarr chunks (63170,) would overlap multiple dask chunks ((63170, 63170, 63
170, 63170, 63170, 63170, 63170, 63169),). This is not implemented in xarray yet.  Consider rechunking th
e data using `chunk()` or specifying different chunks in encoding.

@tinaok
Copy link

tinaok commented May 17, 2019

Hi, I'm new to xarray & zarr ,
After reading a zarr file, I re-chunk the data using xarray.Dataset.chunk. Then create a newly chunked data stored as zarr file with xarray.Dataset.to_zarr But I get error message:
'NotImplementedError: Specified zarr chunks (200, 100, 1) would overlap multiple dask chunks ((50, 50, 50, 50), (25, 25, 25, 25), (10000,)). This is not implemented in xarray yet. Consider rechunking the data using chunk() or specifying different chunks in encoding.'
My xarray version is12.1, & and my understanding is that according to this post #2300 .it is fixed, thus so it is implemented to 12.1??

Then why do I get 'notimplemented error ?

Do I have to use 'del dsread.data.encoding['chunks']. each time before using 'Dataset.to_zarr' as a workaround? but probably I am missing somthing. I hope someone can point me out...

I made a notebook here for reproducing the pb.
https://github.com/tinaok/Pangeo-for-beginners/blob/master/3-1%20zarr%20and%20re-chunking%20bug%20report.ipynb

thanks for your help, regards Tina

@nbren12
Copy link
Contributor

nbren12 commented Aug 8, 2019

I am getting the same error too.

@rabernat
Copy link
Contributor

Hi all. I am looking into this issue, trying to figure out if it is still a thing. I just tried @chrisbarber's MRE above using xarray v 0.15.

import xarray as xr
ds=xr.Dataset({'foo': (['bar'], np.zeros((505359,)))})
ds.to_zarr('test.zarr', mode='w')
ds2=xr.open_zarr('test.zarr')
ds2.to_zarr('test2.zarr', mode='w')

This now works without error, thanks to #2487.

I can trigger the error in a third step:

ds3 = ds2.chunk({'bar': 10000})
ds3.to_zarr('test3.zarr', mode='w')

raises

NotImplementedError: Specified zarr chunks (63170,) would overlap multiple dask chunks ((10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 5359),). This is not implemented in xarray yet.  Consider rechunking the data using `chunk()` or specifying different chunks in encoding.

The problem is that, even though we rechunked the data, chunks key is still present in encoding.

>>> print(ds3.foo.encoding)
{'chunks': (63170,), 'compressor': Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0), 'filters': None, '_FillValue': nan, 'dtype': dtype('float64')}

This was populated with the variable was read from test.zarr.

As a workaround, you can delete the encoding (either just the chunk attribute or all of it):

ds3.foo.encoding = {}
ds3.to_zarr('test3.zarr', mode='w')

This allows the operation to complete successfully.

For all the users stuck on this problem (e.g. @abarciauskas-bgse):

  • update to the latest version of xarray and then
  • delete the encoding on your variables, or overwrite it with the encoding keyword in to_zarr.

For xarray developers, the question is whether the chunk() method should delete existing chunks attributes from encoding.

@dcherian
Copy link
Contributor

the question is whether the chunk() method should delete existing chunks attributes from encoding.

IMO this is the user-friendly thing to do.

@chrisroat
Copy link
Contributor

If there is a non-dimension coordinate, the error is also tickled.

import xarray as xr
import numpy as np
ds=xr.Dataset({'foo': (['bar'], np.zeros((100,)))})

# Problem also affects non-dimension coords
ds.coords['baz'] = ('bar', ['mah'] * 100)

ds.to_zarr('test.zarr', mode='w')
ds2 = xr.open_zarr('test.zarr')

ds3 = ds2.chunk({'bar': 2})

ds3.foo.encoding = {}
ds3.coords['baz'].encoding = {}  # Need this, too.

ds3.to_zarr('test3.zarr', mode='w')

@LunarLanding
Copy link

I arrived here due to a different use case / problem, which ultimately I solved, but I think there's value in documenting it here.
My use case is the following workflow:
1 . take raw data, build a dataset, append it to a zarr store Z
2 . analyze the data on Z, then maybe goto 1.
Step 2's performance is much better when data on Z is chunked properly along the appending dimension 'frame' (chunks of size 50), however step 1 only adds 1 element along it. I end up with Z having chunks (1,1,1,1,1...) on 'frame'.
On xarray 0.16.0, this seems solvable via the encoding parameter, if we take care to only use it on the store creation.
Before that version, I was using something like the monkey patch posted by @chrisbarber .
Code:

import shutil
import xarray as xr
import numpy as np
import tempfile
zarr_path = tempfile.mkdtemp()

def append_test(ds,chunks):
    shutil.rmtree(zarr_path)
    
    for i in range(21):
        d = ds.isel(frame=slice(i,i+1))
        d = d.chunk(chunks)
        d.to_zarr(zarr_path,consolidated=True,**(dict(mode='a',append_dim='frame') if i>0 else {}))
    dsa = xr.open_zarr(str(zarr_path),consolidated=True)
    print(dsa.chunks,dsa.dims)
    
#sometime before 0.16.0
import contextlib
@contextlib.contextmanager
def change_determine_zarr_chunks(chunks):
    orig_determine_zarr_chunks = xr.backends.zarr._determine_zarr_chunks
    try:
        def new_determine_zarr_chunks( enc_chunks, var_chunks, ndim, name):
            da = ds[name]
            zchunks = tuple(chunks[dim] if (dim in chunks and chunks[dim] is not None) else da.shape[i] for i,dim in enumerate(da.dims))
            return zchunks
        xr.backends.zarr._determine_zarr_chunks = new_determine_zarr_chunks
        yield
    finally:
        xr.backends.zarr._determine_zarr_chunks = orig_determine_zarr_chunks
chunks = {'frame':10,'other':50}
ds = xr.Dataset({'data':xr.DataArray(data=np.random.rand(100,100),dims=('frame','other'))})

append_test(ds,chunks)
with change_determine_zarr_chunks(chunks):
    append_test(ds,chunks)

#with 0.16.0
def append_test_encoding(ds,chunks):
    shutil.rmtree(zarr_path)
    
    encoding = {}
    for k,v in ds.variables.items():
        encoding[k]={'chunks':tuple(chunks[dk] if dk in chunks else v.shape[i] for i,dk in enumerate(v.dims))}
    
    for i in range(21):
        d = ds.isel(frame=slice(i,i+1))
        d = d.chunk(chunks)
        d.to_zarr(zarr_path,consolidated=True,**(dict(mode='a',append_dim='frame') if i>0 else dict(encoding = encoding)))
    dsa = xr.open_zarr(str(zarr_path),consolidated=True)
    print(dsa.chunks,dsa.dims)

append_test_encoding(ds,chunks)
Frozen(SortedKeysDict({'frame': (1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 'other': (50, 50)})) Frozen(SortedKeysDict({'frame': 21, 'other': 100}))
Frozen(SortedKeysDict({'frame': (10, 10, 1), 'other': (50, 50)})) Frozen(SortedKeysDict({'frame': 21, 'other': 100}))
Frozen(SortedKeysDict({'frame': (10, 10, 1), 'other': (50, 50)})) Frozen(SortedKeysDict({'frame': 21, 'other': 100}))

@jbusecke
Copy link
Contributor

jbusecke commented Mar 3, 2021

the question is whether the chunk() method should delete existing chunks attributes from encoding.

IMO this is the user-friendly thing to do.

Just ran into this issue myself and just wanted to add a +1 to stripping the encoding when .chunk() is used.

@rabernat
Copy link
Contributor

rabernat commented Mar 3, 2021

I think we are all in agreement. Just waiting for someone to make a PR. It's probably just a few lines of code changes.

@dcherian
Copy link
Contributor

dcherian commented Mar 3, 2021

alternatively to_zarr could ignore encoding["chunks"] when the data is already chunked?

@rabernat
Copy link
Contributor

rabernat commented Mar 3, 2021

alternatively to_zarr could ignore encoding["chunks"] when the data is already chunked?

I would not favor that. A user may choose to define their desired zarr chunks by putting this information in encoding. In this case, it's good to raise the error. (This is the case I had in mind when I wrote this code.)

The problem here is that encoding is often being carried over from the original dataset and persisted across operations that change chunk size.

@rabernat
Copy link
Contributor

In #5056, I have implemented the solution of deleting chunks from encoding when chunk() is called on a variable. A review of that PR would be welcome.

@dcherian dcherian added the topic-zarr Related to zarr storage library label Apr 24, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
topic-zarr Related to zarr storage library
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants