Skip to content

from_iris converts dask array into numpy array #2046

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
AlexHilson opened this issue Apr 10, 2018 · 15 comments · Fixed by #2111
Closed

from_iris converts dask array into numpy array #2046

AlexHilson opened this issue Apr 10, 2018 · 15 comments · Fixed by #2111
Labels

Comments

@AlexHilson
Copy link
Contributor

Code Sample, a copy-pastable example if possible

import xarray as xr
import numpy as np
import dask.array as da
import iris

# xarray (dask) -> iris (dask) -> xarray (numpy)
darr = xr.DataArray(
    da.from_array(np.random.rand(4, 5), chunks=4), dims=['x', 'y'],
    coords=dict(x=[10, 20, 30, 40], y=range(5)))

print(type(darr.data)) # <class 'dask.array.core.Array'>

cube = darr.to_iris()
print(type(cube.core_data())) # <class 'dask.array.core.Array'>

darr2 = xr.DataArray.from_iris(cube)
print(type(darr2.data)) # <class 'numpy.ndarray'>

Problem description

After converting an iris Cube into an xarray DataArray the core array object is no longer lazy.

The dask array is not immediately realised - the .from_iris() call completes instantly and plotting slices works as expected. I only noticed the issue when I tried to convert a large dataset back into a Cube, at which point everything ground to a halt.

(The ability to convert between libraries is very cool - thanks for developing this)

Expected Output

Array type to stay the same, data not to be computed if not needed.

Output of xr.show_versions()

INSTALLED VERSIONS ------------------ commit: None python: 3.6.3.final.0 python-bits: 64 OS: Darwin OS-release: 16.7.0 machine: x86_64 processor: i386 byteorder: little LC_ALL: None LANG: en_GB.UTF-8 LOCALE: en_GB.UTF-8

xarray: 0.10.2
pandas: 0.21.0
numpy: 1.13.3
scipy: 1.0.0
netCDF4: 1.3.1
h5netcdf: 0.5.0
h5py: 2.7.1
Nio: None
zarr: None
bottleneck: 1.2.1
cyordereddict: None
dask: 0.17.1
distributed: 1.21.3
matplotlib: 2.1.0
cartopy: 0.15.1
seaborn: 0.8.1
setuptools: 36.5.0.post20170921
pip: 9.0.1
conda: 4.4.6
pytest: None
IPython: 6.2.1
sphinx: None

@shoyer shoyer added the bug label Apr 10, 2018
@shoyer
Copy link
Member

shoyer commented Apr 10, 2018

Thanks for the report and the clear example!

I'm marking this as a bug for now as this is definitely unwanted behavior. I'll attempt to reproduce it later when I have time...

@shoyer
Copy link
Member

shoyer commented Apr 12, 2018

I think this may be fixed by the recently merged #2047. Can you try out the development version of xarray and see if that fixes things?

@shoyer
Copy link
Member

shoyer commented Apr 13, 2018

I can confirm that this seems to be fixed by #2047. When I run without that commit, I see:

<class 'dask.array.core.Array'>
<class 'dask.array.core.Array'>
<class 'numpy.ndarray'>

But now I see:

<class 'dask.array.core.Array'>
<class 'dask.array.core.Array'>
<class 'dask.array.core.Array'>

However, we should add a regression test to make sure this doesn't break again. Any interest in putting that together? The existing Iris/xarray tests can be found here:

def test_to_and_from_iris(self):

@AlexHilson
Copy link
Contributor Author

Thanks, I'll endeavour to add a regression test.

The conversion works fine now with the threaded scheduler or with a local distributed scheduler, but something fails when using a remote distributed scheduler. Computing the data after converting (I think in either direction) causes all of the distributed workers to fall over.

I'd guess that this is an issue with the distributed scheduler (I'm using dask-kubernetes on pangeo), I'll try to dig deeper.

   
import xarray as xr
import numpy as np
import dask.array as da
import dask
from dask.distributed import Client
import iris

def test():
    # xarray (dask) -> iris (dask) -> xarray (numpy)
    darr = xr.DataArray(
        da.from_array(np.random.rand(4, 5), chunks=4), dims=['x', 'y'],
        coords=dict(x=[10, 20, 30, 40], y=range(5)))

    print(type(darr.data)) # <class 'dask.array.core.Array'>
    darr.data.compute() # works
    
    cube = darr.to_iris()
    print(type(cube.core_data())) # <class 'dask.array.core.Array'>
    cube.core_data().compute() # this fails (hangs) with distributed scheduler
     
    darr2 = xr.DataArray.from_iris(cube)
    print(type(darr2.data)) 
    darr2.data.compute() # this fails (hangs) with distributed scheduler (if you comment out the failing line above)


print('threaded')
with dask.set_options(get=dask.threaded.get):
    test()
print('success\n')

print('local distributed')
client = Client(processes=True)
test()
print('success\n')

print('remote distributed')
client = Client(cluster.scheduler_address) # remote cluster (dask-kubernetes)
test() # this will cause all workers to fail and then hang forever
print('success')

@jhamman
Copy link
Member

jhamman commented Apr 17, 2018

@AlexHilson - iris is not part of the standard worker image on pangeo. My first guess is that you either have a version mismatch or a missing library in the workers. If things are working on the threaded scheduler / local cluster, I would think the issue w/ iris and dask has been resolved.

@AlexHilson
Copy link
Contributor Author

Perfect, thanks, missing library 😄 I'm on the Met Office Pangeo so it was xarray missing from the workers.

It's a side effect I wasn't thinking about, for the converted object to still require the original library. In this example the dask array is just wrapping a numpy function but I guess in real life the dask graph would would contain a bunch of library specific functions.

@jhamman
Copy link
Member

jhamman commented Apr 17, 2018

Great, glad that sorted things out. Pester @jacobtomlinson to get xarray in the met office pangeo environment :)

Are we good to close this now then?

@AlexHilson
Copy link
Contributor Author

AlexHilson commented Apr 17, 2018

Re-running my original task (converting a large cube into xarray and back) throws up a related issue. I haven't had a chance to investigate this properly but it looks like the masking operation applied by xarray breaks the ability to selectively compute indexes of the converted cube. If you try and compute a slice the entire array will be realised.

This is different from the original issue, but does have a similar effect of making large objects not realistically convertible. I'm not going to be able to properly investigate this for a while, so if you'd like me to re-raise later as a new issue that's fine with me.

Not sure how to replicate this with dummy data, here are the steps I'm taking.

Given an input cube of shape (31, 56, 12, 3, 600, 800)

Try three ways of passing data through xarray and creating a (600, 800) slice.

slice1 = xr.convert.from_iris(cube).to_iris()[0,0,0,0]
slice2 = xr.convert.from_iris(cube)[0,0,0,0].to_iris()
slice3 = xr.convert.from_iris(cube[0,0,0,0]).to_iris()

Slice 2 + 3 will compute properly, but if you try and compute the core data of slice 1 dask will attempt to realise all the entire original array. So indexing pre calling to_iris() works, leading me to suspect the masking stuff that to_iris() seems to be doing (edit: or perhaps it's an iris problem? but I can't think of anything that would cause a simple index operation to fail like this). As an aside I was a little surprised to pass a regular array in and get a masked array back, but I don't know anything else about this topic so have no objection or opinion on that behaviour.

The task graphs are:

slice 1
image

slice2
image

slice3
image

@AlexHilson
Copy link
Contributor Author

Apologies I haven't had much more of a chance to look at this, but I do have a reproducible example. This is a bit clunky but hopefully it shows that you can no longer partially compute the dask array once to_iris() has been used.

import xarray as xr
import numpy as np
import dask.array as da
import iris
from dask import delayed

@delayed
def flakey_array(i):
    if i == 1:
        raise ValueError('I computed slice {}!'.format(i))
    else:
        return np.random.rand(1)

# array[0] should always work, array[1] should always fail
dask_arrays = da.stack([da.from_delayed(flakey_array(i), dtype=np.float32, shape=[1]) for i in [0, 1]])

darr = xr.DataArray(
    dask_arrays)

cube = iris.cube.Cube(
    dask_arrays)

try:
    _ = darr[0].data.compute()
    print('success (DataArray)')
except ValuError as e:
    print('failure (DataArray):', e)

try:
    _ = cube[0].core_data().compute()
    print('success (Cube)')
except ValueError as e:
    print('failure (Cube):', e)

    
cube_from_darr = darr.to_iris()
try:
    _ = cube_from_darr[0].core_data().compute()
    print('success (DataArray -> Cube)')
except ValueError as e:
    print('failure (DataArray -> Cube):', e)

    
darr_from_cube = xr.DataArray.from_iris(cube)
try:
    _ = darr_from_cube[0].data.compute()
    print('success (Cube -> DataArray)')
except ValueError as e:
    print('failure (Cube -> DataArray):', e)
    
    
darr_from_cube_from_darr = xr.DataArray.from_iris(cube_from_darr)
try:
    _ = darr_from_cube_from_darr[0].data.compute()
    print('success (DataArray -> Cube -> DataArray)')
except ValueError as e:
    print('failure (DataArray -> Cube -> DataArray):', e)

results:

success (DataArray)
success (Cube)
failure (DataArray -> Cube): I computed slice 1!
success (Cube -> DataArray)
failure (DataArray -> Cube -> DataArray): I computed slice 1!
INSTALLED VERSIONS ------------------ commit: None python: 3.6.3.final.0 python-bits: 64 OS: Darwin OS-release: 16.7.0 machine: x86_64 processor: i386 byteorder: little LC_ALL: None LANG: en_GB.UTF-8 LOCALE: en_GB.UTF-8

xarray: 0.10.3+dev12.gb9f40cc
pandas: 0.21.0
numpy: 1.13.3
scipy: 1.0.0
netCDF4: 1.3.1
h5netcdf: 0.5.0
h5py: 2.7.1
Nio: None
zarr: None
bottleneck: 1.2.1
cyordereddict: None
dask: 0.17.1
distributed: 1.21.3
matplotlib: 2.1.0
cartopy: 0.15.1
seaborn: 0.8.1
setuptools: 36.5.0.post20170921
pip: 9.0.1
conda: 4.4.6
pytest: None
IPython: 6.2.1
sphinx: None

@shoyer
Copy link
Member

shoyer commented May 4, 2018

OK, looks like there was a bug in to_iris() also. Thanks for the clear report!

Changing dataarray to dataarray.data on these two lines in to_iris() appears to fix the issue (the example code runs without errors):

xarray/xarray/convert.py

Lines 127 to 129 in b9f40cc

masked_data = dask_ma.masked_invalid(dataarray)
else:
masked_data = np.ma.masked_invalid(dataarray)

@AlexHilson any interest in putting this into a PR? :)

@AlexHilson
Copy link
Contributor Author

Yep I'll PR this along with a test for the earlier issue.

Interesting that the masked behaviour is different, might be indicative of a deeper issue?

# docstrings for both of these methods say they expect an array like.

# works with DataArray
print(da.from_array(darr, chunks=1)[0].compute())

# fails with DataArray
print(da.ma.masked_array(darr, chunks=1)[0].compute())

@AlexHilson
Copy link
Contributor Author

Looks like creating a masked array directly from a DataArray sets the dask chunksize to the maximum possible value:

import dask.array as da
import xarray as xr

dask_arrays = da.random.random(100000, chunks=10)
darr = xr.DataArray(dask_arrays)

# bad (chunksize too large)
print(da.ma.masked_invalid(darr))

# good (original chunksize)
print(da.ma.masked_invalid(darr.data))
dask.array<masked_invalid, shape=(100000,), dtype=float64, chunksize=(100000,)>
dask.array<masked_invalid, shape=(100000,), dtype=float64, chunksize=(10,)>

I can still put in the workaround above but I'm not sure of a clean way to test it. I can assert that original.chunks == new.chunks, but that assumes Iris won't re-chunk the array itself (which is a pretty safe assumption I think, but doesn't seem ideal).

AlexHilson added a commit to AlexHilson/xarray that referenced this issue May 9, 2018
@shoyer
Copy link
Member

shoyer commented May 9, 2018

da.ma.masked_invalid(darr) results in nested dask arrays, with the outer dask array holding an xarray.DataArray holding a dask array.

One way to check this would be to verify that original dask array's tasks ends up directly in the graph of the final dask array, e.g., assert set(darr.data.dask) <= set(darr.to_iris().core_data().dask).

@AlexHilson
Copy link
Contributor Author

I see - so if I understand correctly we end up in dask.array.core.asanyarray, which leads to return from_array(a, chunks=a.shape, getitem=getter_inline, asarray=False) (where a is our DataArray) and then the DataArray acts as a bottleneck in the task graph.

My instinct is to pull out this block from to_iris into a new to_masked function (or a private function?) that will explicitly maintain chunksize - feels like that function could be tested more cleanly than one that passes data through Iris. Or if you'd prefer the task graph test approach I can add that instead.

    # Create the right type of masked array (should be easier after #1769)
    if isinstance(dataarray.data, dask_array_type):
        from dask.array import ma as dask_ma
        # masked_invalid can be applied directly to the dataarray, but
        # results in a bad chunksize. So instead we use the internal
        # dask array (#2046)
        masked_data = dask_ma.masked_invalid(dataarray.data)
    else:
        masked_data = np.ma.masked_invalid(dataarray)

@shoyer
Copy link
Member

shoyer commented May 10, 2018

@AlexHilson Sure, it would make sure to put a masked_invalid function in https://github.com/pydata/xarray/blob/master/xarray/core/duck_array_ops.py

AlexHilson added a commit to AlexHilson/xarray that referenced this issue May 10, 2018
shoyer pushed a commit that referenced this issue May 14, 2018
* TST: assert lazy array maintained by to_iris (#2046)

* Add masked_invalid array op, resolves to_iris rechunking issue (#2046)

* Fix dask_module in duck_array_ops.masked_invalid

* Really fix it

* Resolving to_iris dask array issues
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants