Skip to content

"can't pickle thread.lock objects" when working with published dataframe #1556

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
konrad-roze opened this issue Nov 14, 2017 · 11 comments
Open

Comments

@konrad-roze
Copy link

We're using dask distributed scheduler with multiprocessing workers on an EC2 cluster.
dask 0.15.4 and distributed 1.19.3

I'm trying to publish named dataset (dataframe) and then retrieve and continue working on it. Basically:

frame = df.read_csv(url, ...)
client.publish_dataset(ds_name=frame)
ds = client.get_dataset(ds_name)
client.compute(ds)

This results in 'TypeError: can't pickle thread.lock objects' error.

I suppose this might be related to:
#780
dask/dask#1683
#539

I don't know how to work around this issue because read_csv() doesn't seem to accept lock argument.

full traceback:
traceback.txt

@TomAugspurger
Copy link
Member

TomAugspurger commented Nov 14, 2017

I'm not able to reproduce with this simple example:

In [1]: import dask.dataframe as dd

In [2]: from distributed import Client

In [4]: with open("file.csv", "w") as f: f.write("A,B\n1,2\n3,4\n5,6")

In [5]: client = Client()
In [6]: df = dd.read_csv("s3://dask-data/airline-data/1987.csv", storage_options={'anon': True})
In [7]: client.publish_dataset(ds_name=df)

In [9]: ds = client.get_dataset('ds_name')

In [10]: ds.compute()
Out[10]:
   A  B
0  1  2
1  3  4
2  5  6

Could you try adapting that example until you reproduce the failure?

  • Perhaps is the local cluster created by default doesn't reproduce it? Try using your scheduler-address in `Client
  • Perhaps it's data specific? Try swapping out file.csv for your real file.

@konrad-roze
Copy link
Author

You can reproduce it on a local cluster, but you need to load CSV data from S3.
So the minimal difference in your case would be
df = dd.read_csv(_some_s3_url_)

@TomAugspurger
Copy link
Member

Thanks, that's valuable information. I've updated the example.

@JR-Mayday
Copy link

JR-Mayday commented Feb 14, 2018

Same issue here. Any update?

@mrocklin
Copy link
Member

It would be useful to see a full traceback from a minimal example

@JR-Mayday
Copy link

A simple example to read from a s3 file and persist on cluster. We use server-side encryption for the s3 bucket.

    client = Client(scheduler_file=get_dask_scheduler_file())

    boto_session = session.Session()
    boto_session = configure_session(boto_session, credential)
    df = dd.read_csv('s3://data.csv', storage_options={'botocore_session': boto_session})

    df = client.persist(df)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/distributed/client.py", line 2168, in persist
    loose_restrictions, resources=resources)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/distributed/client.py", line 1906, in _graph_to_futures
    'tasks': valmap(dumps_task, dsk3),
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/toolz-0.8.2-py2.7.egg/toolz/dicttoolz.py", line 84, in valmap
    rv.update(zip(iterkeys(d), map(func, itervalues(d))))
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/distributed/worker.py", line 731, in dumps_task
    'args': pickle.dumps(task[1:])}
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/distributed/protocol/pickle.py", line 51, in dumps
    return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 829, in dumps
    cp.dump(obj)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 233, in dump
    return Pickler.dump(self, obj)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 568, in save_tuple
    save(element)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 727, in save_reduce
    save(state)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 564, in save_instancemethod
    obj=obj)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 709, in save_reduce
    save(args)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 554, in save_tuple
    save(element)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 727, in save_reduce
    save(state)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 692, in _batch_setitems
    save(v)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 727, in save_reduce
    save(state)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 727, in save_reduce
    save(state)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 564, in save_instancemethod
    obj=obj)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 709, in save_reduce
    save(args)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 554, in save_tuple
    save(element)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 360, in save_function
    self.save_function_tuple(obj)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 436, in save_function_tuple
    save(f_globals)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 727, in save_reduce
    save(state)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 727, in save_reduce
    save(state)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 606, in save_list
    self._batch_appends(iter(obj))
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 642, in _batch_appends
    save(tmp[0])
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 727, in save_reduce
    save(state)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 727, in save_reduce
    save(state)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects

@mrocklin
Copy link
Member

My first guess is that the boto_session object is finding its way into the task but that that object isn't serializable.

@JR-Mayday
Copy link

I can try other ways to pass the credentials. But this is the recommended way inside our company. Do you think dask SerializableLock can help? I see you have solved similar issues for read_hdf().

@mrocklin
Copy link
Member

My guess is that the boto library has a lock in it somewhere that we're not going to be able to touch. You could ask them upstream, but they'll probably say "why would you want to pass around session objects? This may be unsafe"

Alternatively, what I often see in production is that some other mechanism is used to manage security so that when workers go to grab the default credentials they already have them automatically. For example maybe environment variables or .boto files are pre-populated. Moving around credentials within a computational framework (like dask, hadoop, spark, ...) is sometimes considered unsafe.

@JR-Mayday
Copy link

Thanks for the suggestions! Currently we are still in the early stage for proof of concept. Will think about other ways for this credential thing.

@finity84
Copy link

I seem to encounter the same problem using a dataset published using read_parquet() on s3 files, configuring credentials through env variables or key/secret param in storage_options. It works before publishing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants