Description
Describe the issue:
I'm trying to analyze certain arrays stored in Zarr with Xarray using Dask Dataframes but, I have been getting the following error every time I try to put it in memory ValueError("cannot include dtype 'M' in a buffer"), based on the whole traceback of the error it looks like is something related to the shuffle algorithm used internally by Dask. Sorry if the example is not very small, but the error can or may not appear depending on the data size and the chunks.
Minimal Complete Verifiable Example:
import pandas as pd
import xarray as xr
import numpy as np
import dask.array as da
from dask.distributed import Client
client = Client()
PATH = "YOUR_PATH"
a = pd.date_range("2005-01-01", "2005-01-10").to_numpy(dtype="datetime64[ns]")
b = list(range(50000))
data = da.ones(shape=(len(a), len(b)), chunks=(-1, 6000))
arr = xr.DataArray(data, dims=["a", "b"], coords={"a": a, "b": b}).to_dataset(name="data")
arr.to_zarr(PATH, mode="w")
df = xr.open_zarr(PATH)["data"].to_dask_dataframe()
df.max().compute()
Anything else we need to know?:
Error:
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
File ~\.conda\envs\tensordb\Lib\site-packages\distributed\shuffle\_shuffle.py:105, in shuffle_barrier()
104 try:
--> 105 return get_worker_plugin().barrier(id, run_ids)
106 except Reschedule as e:
File ~\.conda\envs\tensordb\Lib\site-packages\distributed\shuffle\_worker_plugin.py:401, in barrier()
400 def barrier(self, shuffle_id: ShuffleId, run_ids: Sequence[int]) -> int:
--> 401 result = sync(self.worker.loop, self._barrier, shuffle_id, run_ids)
402 return result
File ~\.conda\envs\tensordb\Lib\site-packages\distributed\utils.py:440, in sync()
439 if error is not None:
--> 440 raise error
441 else:
File ~\.conda\envs\tensordb\Lib\site-packages\distributed\utils.py:414, in f()
413 future = asyncio.ensure_future(awaitable)
--> 414 result = yield future
415 except Exception as exception:
File ~\.conda\envs\tensordb\Lib\site-packages\tornado\gen.py:767, in run()
766 try:
--> 767 value = future.result()
768 except Exception as e:
769 # Save the exception for later. It's important that
770 # gen.throw() not be called inside this try/except block
771 # because that makes sys.exc_info behave unexpectedly.
File ~\.conda\envs\tensordb\Lib\site-packages\distributed\shuffle\_worker_plugin.py:368, in _barrier()
366 # Tell all peers that we've reached the barrier
367 # Note that this will call `shuffle_inputs_done` on our own worker as well
--> 368 return await shuffle_run.barrier(run_ids)
File ~\.conda\envs\tensordb\Lib\site-packages\distributed\shuffle\_core.py:200, in barrier()
198 # TODO: Consider broadcast pinging once when the shuffle starts to warm
199 # up the comm pool on scheduler side
--> 200 await self.scheduler.shuffle_barrier(
201 id=self.id, run_id=self.run_id, consistent=consistent
202 )
203 return self.run_id
File ~\.conda\envs\tensordb\Lib\site-packages\distributed\core.py:1256, in send_recv_from_rpc()
1255 try:
-> 1256 return await send_recv(comm=comm, op=key, **kwargs)
1257 finally:
File ~\.conda\envs\tensordb\Lib\site-packages\distributed\core.py:1040, in send_recv()
1039 assert exc
-> 1040 raise exc.with_traceback(tb)
1041 else:
File ~\.conda\envs\tensordb\Lib\site-packages\distributed\core.py:831, in _handle_comm()
830 if inspect.iscoroutine(result):
--> 831 result = await result
832 elif inspect.isawaitable(result):
File ~\.conda\envs\tensordb\Lib\site-packages\distributed\shuffle\_scheduler_plugin.py:99, in barrier()
98 msg = {"op": "shuffle_inputs_done", "shuffle_id": id, "run_id": run_id}
---> 99 await self.scheduler.broadcast(
100 msg=msg,
101 workers=list(shuffle.participating_workers),
102 )
File ~\.conda\envs\tensordb\Lib\site-packages\distributed\scheduler.py:6667, in broadcast()
6662 raise ValueError(
6663 "on_error must be 'raise', 'return', 'return_pickle', "
6664 f"or 'ignore'; got {on_error!r}"
6665 )
-> 6667 results = await All([send_message(address) for address in addresses])
6668 return {k: v for k, v in zip(workers, results) if v is not ERROR}
File ~\.conda\envs\tensordb\Lib\site-packages\distributed\utils.py:257, in All()
256 try:
--> 257 result = await tasks.next()
258 except Exception:
File ~\.conda\envs\tensordb\Lib\site-packages\distributed\scheduler.py:6645, in send_message()
6644 try:
-> 6645 resp = await send_recv(
6646 comm, close=True, serializers=serializers, **msg
6647 )
6648 finally:
File ~\.conda\envs\tensordb\Lib\site-packages\distributed\core.py:1042, in send_recv()
1041 else:
-> 1042 raise Exception(response["exception_text"])
1043 return response
Exception: ValueError("cannot include dtype 'M' in a buffer")
The above exception was the direct cause of the following exception:
RuntimeError Traceback (most recent call last)
Cell In[1], line 17
15 arr.to_zarr(PATH, mode="w")
16 df = xr.open_zarr(PATH)["data"].to_dask_dataframe()
---> 17 df.max().compute()
File ~\.conda\envs\tensordb\Lib\site-packages\dask_expr\_collection.py:477, in FrameBase.compute(self, fuse, **kwargs)
475 out = out.repartition(npartitions=1)
476 out = out.optimize(fuse=fuse)
--> 477 return DaskMethodsMixin.compute(out, **kwargs)
File ~\.conda\envs\tensordb\Lib\site-packages\dask\base.py:372, in DaskMethodsMixin.compute(self, **kwargs)
348 def compute(self, **kwargs):
349 """Compute this dask collection
350
351 This turns a lazy Dask collection into its in-memory equivalent.
(...)
370 dask.compute
371 """
--> 372 (result,) = compute(self, traverse=False, **kwargs)
373 return result
File ~\.conda\envs\tensordb\Lib\site-packages\dask\base.py:660, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
657 postcomputes.append(x.__dask_postcompute__())
659 with shorten_traceback():
--> 660 results = schedule(dsk, keys, **kwargs)
662 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~\.conda\envs\tensordb\Lib\site-packages\distributed\shuffle\_shuffle.py:111, in shuffle_barrier()
109 raise
110 except Exception as e:
--> 111 raise RuntimeError(f"shuffle_barrier failed during shuffle {id}") from e
RuntimeError: shuffle_barrier failed during shuffle efac2437aea7c93cf10fff437617a503
Environment:
- Dask version: 2024.09.0
- Python version: 3.11
- Operating System: Windows 11
- Install method (conda, pip, source): pip
Additional Environment information from Xarray:
INSTALLED VERSIONS
commit: None
python: 3.11.6 | packaged by conda-forge | (main, Oct 3 2023, 10:29:11) [MSC v.1935 64 bit (AMD64)]
python-bits: 64
OS: Windows
OS-release: 10
machine: AMD64
processor: Intel64 Family 6 Model 165 Stepping 2, GenuineIntel
byteorder: little
LC_ALL: None
LANG: None
LOCALE: ('Spanish_Venezuela', '1252')
libhdf5: None
libnetcdf: None
xarray: 2024.9.0
pandas: 2.2.2
numpy: 1.26.4
scipy: 1.11.4
netCDF4: None
pydap: None
h5netcdf: None
h5py: None
zarr: 2.18.2
cftime: None
nc_time_axis: None
iris: None
bottleneck: 1.3.7
dask: 2024.9.0
distributed: 2024.9.0
matplotlib: None
cartopy: None
seaborn: None
numbagg: 0.8.1
fsspec: 2024.6.1
cupy: None
pint: None
sparse: None
flox: 0.9.11
numpy_groupies: 0.10.2
setuptools: 68.2.2
pip: 23.3.1
conda: None
pytest: 7.4.3
mypy: None
IPython: 8.26.0
sphinx: None