Skip to content

Diagnosing 'killed workers' error #1466

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jbusecke opened this issue Oct 11, 2017 · 2 comments
Closed

Diagnosing 'killed workers' error #1466

jbusecke opened this issue Oct 11, 2017 · 2 comments

Comments

@jbusecke
Copy link

I am testing a dask.distributed setup on the analysis cluster (with workers on batch submitted jobs and the scheduler on an interactive login node) at GFDL using xarray and jupyter notebooks to analyze high-resolution climate model output.

Right now I am not satisfied with the performance and I am getting a bunch of 'killed worker' issues that pop up randomly.

I am not really able to interpret the error messages and so I am not sure why that happens and wanted to ask if somebody could give me some advice on how to proceed diagnose the problem.

Particularly I am not even able to load a file using xr.open_dataset and then save it back to disk, without the whole thing crashing. Ill give an example here:

I am loading the files like this:

import xarray as xr
import os

ds_hr = xr.open_dataset(filename, chunks={'time':1,'st_ocean':1},
                          decode_times=False,engine='netcdf4')

ds_hr.to_netcdf(os.path.join(odir,'test_single_year_full.nc'))

Below is the error output from the notebook when invoking .to_netcdf(

distributed.utils - ERROR - ("('isnull-where-store-where-c5ef3de6809a91487fa4247c600ce51b', 22, 0)", 'tcp://140.208.147.88:42755')
Traceback (most recent call last):
  File "/home/Julius.Busecke/code/miniconda/envs/standard/lib/python3.6/site-packages/distributed/utils.py", line 229, in f
    result[0] = yield make_coro()
  File "/home/Julius.Busecke/code/miniconda/envs/standard/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/home/Julius.Busecke/code/miniconda/envs/standard/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/home/Julius.Busecke/code/miniconda/envs/standard/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/Julius.Busecke/code/miniconda/envs/standard/lib/python3.6/site-packages/distributed/client.py", line 1254, in _gather
    traceback)
  File "/home/Julius.Busecke/code/miniconda/envs/standard/lib/python3.6/site-packages/six.py", line 693, in reraise
    raise value
distributed.scheduler.KilledWorker: ("('isnull-where-store-where-c5ef3de6809a91487fa4247c600ce51b', 22, 0)", 'tcp://140.208.147.88:42755')
---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
<ipython-input-7-d88c359d650b> in <module>()
----> 1 ds_hr.to_netcdf(os.path.join(odir,'test_single_year_full.nc'))

~/code/miniconda/envs/standard/lib/python3.6/site-packages/xarray/core/dataset.py in to_netcdf(self, path, mode, format, group, engine, encoding, unlimited_dims)
    975         return to_netcdf(self, path, mode, format=format, group=group,
    976                          engine=engine, encoding=encoding,
--> 977                          unlimited_dims=unlimited_dims)
    978 
    979     def __unicode__(self):

~/code/miniconda/envs/standard/lib/python3.6/site-packages/xarray/backends/api.py in to_netcdf(dataset, path_or_file, mode, format, group, engine, writer, encoding, unlimited_dims)
    571     try:
    572         dataset.dump_to_store(store, sync=sync, encoding=encoding,
--> 573                               unlimited_dims=unlimited_dims)
    574         if path_or_file is None:
    575             return target.getvalue()

~/code/miniconda/envs/standard/lib/python3.6/site-packages/xarray/core/dataset.py in dump_to_store(self, store, encoder, sync, encoding, unlimited_dims)
    916                     unlimited_dims=unlimited_dims)
    917         if sync:
--> 918             store.sync()
    919 
    920     def to_netcdf(self, path=None, mode='w', format=None, group=None,

~/code/miniconda/envs/standard/lib/python3.6/site-packages/xarray/backends/netCDF4_.py in sync(self)
    334     def sync(self):
    335         with self.ensure_open(autoclose=True):
--> 336             super(NetCDF4DataStore, self).sync()
    337             self.ds.sync()
    338 

~/code/miniconda/envs/standard/lib/python3.6/site-packages/xarray/backends/common.py in sync(self)
    200 
    201     def sync(self):
--> 202         self.writer.sync()
    203 
    204     def store_dataset(self, dataset):

~/code/miniconda/envs/standard/lib/python3.6/site-packages/xarray/backends/common.py in sync(self)
    177             import dask
    178             if LooseVersion(dask.__version__) > LooseVersion('0.8.1'):
--> 179                 da.store(self.sources, self.targets, lock=GLOBAL_LOCK)
    180             else:
    181                 da.store(self.sources, self.targets)

~/code/miniconda/envs/standard/lib/python3.6/site-packages/dask/array/core.py in store(sources, targets, lock, regions, compute, **kwargs)
    898     dsk = sharedict.merge((name, updates), *[src.dask for src in sources])
    899     if compute:
--> 900         Array._get(dsk, keys, **kwargs)
    901     else:
    902         from ..delayed import Delayed

~/code/miniconda/envs/standard/lib/python3.6/site-packages/dask/base.py in _get(cls, dsk, keys, get, **kwargs)
    104         get = get or _globals['get'] or cls._default_get
    105         dsk2 = optimization_function(cls)(ensure_dict(dsk), keys, **kwargs)
--> 106         return get(dsk2, keys, **kwargs)
    107 
    108     @classmethod

~/code/miniconda/envs/standard/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, **kwargs)
   1932         if sync:
   1933             try:
-> 1934                 results = self.gather(packed, asynchronous=asynchronous)
   1935             finally:
   1936                 for f in futures.values():

~/code/miniconda/envs/standard/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1374             return self.sync(self._gather, futures, errors=errors,
   1375                              direct=direct, local_worker=local_worker,
-> 1376                              asynchronous=asynchronous)
   1377 
   1378     @gen.coroutine

~/code/miniconda/envs/standard/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    546             return future
    547         else:
--> 548             return sync(self.loop, func, *args, **kwargs)
    549 
    550     def __str__(self):

~/code/miniconda/envs/standard/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    239         e.wait(1000000)
    240     if error[0]:
--> 241         six.reraise(*error[0])
    242     else:
    243         return result[0]

~/code/miniconda/envs/standard/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~/code/miniconda/envs/standard/lib/python3.6/site-packages/distributed/utils.py in f()
    227             yield gen.moment
    228             thread_state.asynchronous = True
--> 229             result[0] = yield make_coro()
    230         except Exception as exc:
    231             logger.exception(exc)

~/code/miniconda/envs/standard/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1053 
   1054                     try:
-> 1055                         value = future.result()
   1056                     except Exception:
   1057                         self.had_exception = True

~/code/miniconda/envs/standard/lib/python3.6/site-packages/tornado/concurrent.py in result(self, timeout)
    236         if self._exc_info is not None:
    237             try:
--> 238                 raise_exc_info(self._exc_info)
    239             finally:
    240                 self = None

~/code/miniconda/envs/standard/lib/python3.6/site-packages/tornado/util.py in raise_exc_info(exc_info)

~/code/miniconda/envs/standard/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1061                     if exc_info is not None:
   1062                         try:
-> 1063                             yielded = self.gen.throw(*exc_info)
   1064                         finally:
   1065                             # Break up a reference to itself

~/code/miniconda/envs/standard/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1252                             six.reraise(type(exception),
   1253                                         exception,
-> 1254                                         traceback)
   1255                     if errors == 'skip':
   1256                         bad_keys.add(key)

~/code/miniconda/envs/standard/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

KilledWorker: ("('isnull-where-store-where-c5ef3de6809a91487fa4247c600ce51b', 22, 0)", 'tcp://140.208.147.88:42755')
@mrocklin
Copy link
Member

https://stackoverflow.com/questions/46691675/what-do-killedworker-exceptions-mean-in-dask

@jbusecke
Copy link
Author

I think my problem was related to an xarray issue(pydata/xarray#1464), that was causing the processes to fail.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants