Skip to content

Underlying support code of Dataframe is not thread safe #2440

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jaguarviajero opened this issue Dec 6, 2012 · 12 comments
Closed

Underlying support code of Dataframe is not thread safe #2440

jaguarviajero opened this issue Dec 6, 2012 · 12 comments
Labels
Bug Multithreading Parallelism in pandas

Comments

@jaguarviajero
Copy link

Hello,

I've run into a nasty bug as I try to work with dataframes and threads.

The problem is that one thread modifies a local dataframe by removing a column but in doing so somehow corrupts the dataframe of the other thread.

My input data is a dataframe of multiple data types (float and integer in the example). This dataframe is grouped according to column AA. Each group is placed into a synchronised Queue (from the standard library). Two threads will consume the items from the queue and place the results in a standard python List (whose append method is atomic). The thread internally creates an instance of a FrameProcessor and then enters an infinite job processing loop until the input queue is empty. The FrameProcessor is a callable object. It takes a dataframe as input and processes it in two levels. To process the first level it selects the rows of the input dataframe that match a condition and passes the resulting dataframe to a method that undertakes the second level processing. While processing the second level, the input dataframe is modified by removing a column and creating a new dataframe from the results of some calculations.

The code breaks in two places:

measurements = df[ df['BB'] == tag ] #from method _processFirstLevel
cc = df.pop('CC') #from method _processSecondLevel

and for different reasons (last line of trace shown):

    File "C:\Python26\lib\site-packages\pandas\core\internals.py", line 570, in _verify_integrity
        assert(len(self.items) == tot_items)
    AssertionError
    File "C:\Python26\lib\site-packages\pandas\core\internals.py", line 26, in __init__
        assert(len(items) == len(values))
    AssertionError
    File "C:\Python26\lib\site-packages\pandas\core\index.py", line 315, in __getitem__
        return arr_idx[key]
    IndexError: index out of bounds
    File "C:\Python26\lib\site-packages\numpy\lib\function_base.py", line 3336, in delete
        "invalid entry")
    ValueError: invalid entry

As much I can tell is that it seems that although two different objects (one for each thread) are working on two different dataframes (from the groups) they corrupt each other's local variables by removing the column.

Maybe the problem is because of my implementation (the way I set up the threads, jobs, etc.). However, I suspect the problem lies withing the deep support layers of the DataFrame class (such as BlockManager, etc.).

I am using

  • Python 2.6.6
  • Pandas 0.9.0
  • NumPy 1.6.1
  • Windows 7 Professional 64-bit

Why is data corruption ocurring in threads working with no shared resources?

Below is the code that you can use to reproduce the bug. Bear in mind that because of the threads, the bug may not happen on the first run, or you wont get the same effect every time.

import threading
import numpy as np
import pandas as pd
import Queue

class FrameProcessor(object):

    def __call__(self, *args, **kwargs):
        chunk = args[0]
        result = self._processFirstLevel(chunk)
        return result

    def _processFirstLevel(self, df):
        second_level_tags = list(df['BB'].unique())
        results = []
        while len(second_level_tags) > 0:
            tag = second_level_tags.pop()
            measurements = df[ df['BB'] == tag ]
            result = self._processSecondLevel(measurements)
            result['BB'] = tag
            results.append(result)

        result_Frame = pd.concat(results, axis=0)

        return result_Frame

    def _processSecondLevel(self, df):
        cc = df.pop('CC')
        result_row = {
            'cc_avg': cc.mean()
            ,'measurements_in_avg':len(df)}

        result = pd.DataFrame([result_row])

        return result

def test_():
    num_of_groups = 10
    group_size = 30
    sub_group_size = 5
    rows = group_size*num_of_groups
    a = []
    [a.extend(np.repeat(x,group_size)) for x in range(num_of_groups)]
    a = np.array(a)
    b = np.array(np.tile(range(sub_group_size),rows/sub_group_size))
    c = np.random.randn(rows)
    p = np.random.random_integers(0,10,rows)
    q = np.random.randn(rows)
    x = np.vstack((a,b,c,p,q))

    dates = np.asarray(pd.date_range('1/1/2000', periods=rows))
    df = pd.DataFrame(x.T, index=dates, columns=['AA', 'BB', 'CC','P','Q'])

    results = []
    inbox = Queue.Queue()

    group_by_columns = ['AA']
    groups = df.groupby(group_by_columns)
    for name, group in groups:
        inbox.put(groups.get_group(name))


    def workerShell():
        processor = FrameProcessor()
        while True:
            try:
                job = inbox.get(False)
                result = processor(job)
                results.append(result)
                inbox.task_done()
                print '{0} job done. Jobs left: {1}'.format(id(processor),inbox.qsize())
            except Queue.Empty:
                break

    thread1 = threading.Thread(target=workerShell)
    thread2 = threading.Thread(target=workerShell)
    thread1.start()
    thread2.start()
    inbox.join()
    thread1.join()
    thread2.join()

    df = pd.concat(results, axis=0)

    return df

if __name__ == '__main__':

    print 'pandas',pd.__version__
    print 'numpy',np.__version__
    for i in range(5):
        print '--------------test:',i
        test_()
@ghost
Copy link

ghost commented Dec 6, 2012

I can reproduce this on git master

Exception in thread Thread-10:
Traceback (most recent call last):
    block = make_block(values.T, columns, columns)
  File "/home/user1/src/pandas/pandas/core/internals.py", line 464, in make_block
    return klass(values, items, ref_items, ndim=values.ndim)
  File "/home/user1/src/pandas/pandas/core/internals.py", line 31, in __init__
    raise AssertionError('Wrong number of items passed')
AssertionError: Wrong number of items passed

I hope you're aware of the limitations of python with regards to thread parallelism
for cpu-intensive code (GIL and so on). Process-based parallelism is often better
in such cases, using the multiprocessing module or something like celery.
Here's a version using multiprocessing, ran 2500 iterations of test_ without an
exception on my machine with 2 workers, and another 1500 with 4 workers.
Ofcourse it may still be wrong.

Admittadly, this sidesteps the multithreading issues.

Edit: pandas git master (0.10.0dev), py2.7.3 on linux.

import numpy as np
import pandas as pd
import Queue # just for .Empty
from multiprocessing import Process, JoinableQueue, Manager

class FrameProcessor(object):

    def __call__(self, *args, **kwargs):
        chunk = args[0]
        result = self._processFirstLevel(chunk)
        return result

    def _processFirstLevel(self, df):
        second_level_tags = list(df['BB'].unique())
        results = []
        while len(second_level_tags) > 0:
            tag = second_level_tags.pop()
            measurements = df[ df['BB'] == tag ]
            result = self._processSecondLevel(measurements)
            result['BB'] = tag
            results.append(result)

        result_Frame = pd.concat(results, axis=0)

        return result_Frame

    def _processSecondLevel(self, df):
        cc = df.pop('CC')
        result_row = {
            'cc_avg': cc.mean()
            ,'measurements_in_avg':len(df)}

        result = pd.DataFrame([result_row])

        return result

def test_():
    manager = Manager()
    num_of_groups = 10
    group_size = 30
    sub_group_size = 5
    rows = group_size*num_of_groups
    a = []
    [a.extend(np.repeat(x,group_size)) for x in range(num_of_groups)]
    a = np.array(a)
    b = np.array(np.tile(range(sub_group_size),rows/sub_group_size))
    c = np.random.randn(rows)
    p = np.random.random_integers(0,10,rows)
    q = np.random.randn(rows)
    x = np.vstack((a,b,c,p,q))

    dates = np.asarray(pd.date_range('1/1/2000', periods=rows))
    df = pd.DataFrame(x.T, index=dates, columns=['AA', 'BB', 'CC','P','Q'])

    results =manager.list()
    inbox = JoinableQueue()

    group_by_columns = ['AA']
    groups = df.groupby(group_by_columns)
    for name, group in groups:
        inbox.put(groups.get_group(name))

    def workerShell():
        processor = FrameProcessor()
        while True:
            try:
                job = inbox.get(False)
                result = processor(job)
                results.append(result)
                inbox.task_done()
                print '{0} job done. Jobs left: {1}'.format(id(processor),inbox.qsize())
            except Queue.Empty:
                break
            except Exception as e:
                print(e)

    thread1 = Process(target=workerShell)
    thread2 = Process(target=workerShell)
    thread1.start()
    thread2.start()
    inbox.join()
    thread1.join()
    thread2.join()

    df = pd.concat(results, axis=0)

    return df

if __name__ == '__main__':

    for i in range(10000):
        print '--------------test:',i
        test_()

@changhiskhan
Copy link
Contributor

What needs to be done here? Is this a release blocker?

@ghost
Copy link

ghost commented Dec 10, 2012

Is pandas meant to be thread-safe? if yes - bug, if no - probably needs to be better documented.

@changhiskhan
Copy link
Contributor

We should make pandas objects thread-safe, but for 0.10, we should probably just add a short blurb in the docs and revisit after the release. What do you think?

@wesm
Copy link
Member

wesm commented Dec 10, 2012

I will take a look to see what's wrong but it's not a release blocker

@ghost
Copy link

ghost commented Dec 11, 2012

yep.

@wesm
Copy link
Member

wesm commented Dec 16, 2012

I'm not able to reproduce the error on master, 0.9.1, or 0.9.0. Have tried running the script many times. Any suggestions?

@wesm
Copy link
Member

wesm commented Dec 16, 2012

Oops. I just had to do it a LOT of times

@wesm
Copy link
Member

wesm commented Dec 16, 2012

This is incredibly strange. If you put a pdb.set_trace() in the place where the main exception occurs, you will see it should never have happened in first place. The problem appears to be related (as best as I can tell) to the sharing of index objects between DataFrames:

> /home/wesm/code/pandas/pandas/core/internals.py(35)__init__()
-> raise AssertionError('Wrong number of items passed (%d vs %d)'
(Pdb) l
 30             nitems = len(items)
 31             nvalues = len(values)
 32             if nitems != nvalues:
 33                 import pdb
 34                 pdb.set_trace()
 35  ->             raise AssertionError('Wrong number of items passed (%d vs %d)'
 36                                      % (len(items), len(values)))
 37     
 38             self._ref_locs = None
 39             self.values = values
 40             self.ndim = ndim
(Pdb) !nitems
2
(Pdb) !nvalues
5
(Pdb) len(items)
5
(Pdb) 

This is super weird. Looks like a NumPy bug (the np.delete causing some "bad state") to me

@wesm
Copy link
Member

wesm commented Dec 16, 2012

Punting the problem to post 0.10. I really don't have time to chase this one down

@jaguarviajero
Copy link
Author

Thank you all for looking into this

@jreback
Copy link
Contributor

jreback commented Apr 23, 2018

dupe / covered in #2728

@jreback jreback closed this as completed Apr 23, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Bug Multithreading Parallelism in pandas
Projects
None yet
Development

No branches or pull requests

4 participants