3
votes

I have some troubles with exchange of the object (dataframe) between 2 processes through the Queue.

First process get the data from a queue, second put data into a queue. The put-process is faster, so the get-process should clear the queue with reading all object.

I've got strange behaviour, because my code works perfectly and as expected but only for 100 rows in dataframe, for 1000row the get-process takes always only 1 object.

import multiprocessing, time, sys
import pandas as pd

NR_ROWS = 1000
i = 0
def getDf():
    global i, NR_ROWS
    myheader = ["name", "test2", "test3"]                
    myrow1 =   [ i,  i+400, i+250]
    df = pd.DataFrame([myrow1]*NR_ROWS, columns = myheader)
    i = i+1
    return df 


def f_put(q):
    print "f_put start"        

    while(1): 
        data = getDf()                
        q.put(data)
        print "P:", data["name"].iloc[0]         
        sys.stdout.flush()                    
        time.sleep(1.55)


def f_get(q):
    print "f_get start"    

    while(1):     
        data = pd.DataFrame()

        while not q.empty():
            data = q.get()
            print "get"

        if not data.empty:
            print "G:", data["name"].iloc[0] 
        else:
            print "nothing new"                       
        time.sleep(5.9)


if __name__ == "__main__":

    q = multiprocessing.Queue()

    p = multiprocessing.Process(target=f_put, args=(q,))            
    p.start()
    while(1):
        f_get(q)

    p.join()

Output for 100rows dataframe, get-process takes all objects

f_get start
nothing new
f_put start
P: 0        # put 1.object into the queue
P: 1        # put 2.object into the queue
P: 2        # put 3.object into the queue
P: 3        # put 4.object into the queue
get         # get-process takes all 4 objects from the queue
get
get
get
G: 3
P: 4
P: 5
P: 6
get
get
get
G: 6
P: 7
P: 8

Output for 1000rows dataframe, get-process takes only one object.

f_get start
nothing new
f_put start
P: 0        # put 1.object into the queue
P: 1        # put 2.object into the queue
P: 2        # put 3.object into the queue
P: 3        # put 4.object into the queue
get     <-- #!!! get-process takes ONLY 1 object from the queue!!!
G: 1
P: 4
P: 5
P: 6
get
G: 2
P: 7
P: 8
P: 9
P: 10
get
G: 3
P: 11

Any idea what I am doing wrong and how to pass also the bigger dataframe through?

1
I quickly tested your code, and it works as you describe it even for N > 1000. Is it possible that you use some old version of pandas and/or multiprocessing that gives this behaviour? (__version__: pandas 0.16.2, multiprocessing 0.70a1, python 2.7.10) - chris-sc
I've all packages up to date now and still I didn't get expected results. Try this pastebin.com/bihSv93F First attempt is done manually and it works, last item is readed G: 2. Than I try the same with multiprocessing and it doesn't work. - Meloun
pandas: 0.16.2, multiprocessing: 0.70a1, python 2.7.10 - Meloun
When I use big dictionary instead of dataframe I get the same behaviour. - Meloun
Yes. Have a look at my answer below. The problem is not limited to a DataFrame per se, but to all Python objects above some size threshold that will be system dependent. - chris-sc

1 Answers

5
votes

At the risk of not being completely able to provide a fully functional example, here is what goes wrong.

First of all, its a timing issue.

I tried your code again with larger DataFrames (10000 or even 100000) and I start to see the same things as you do. This means you see this behaviour as soon as the size of the arrays crosses a certain threshold that will be system(CPU?) dependent.

I modified your code a bit to make it easier to see what happens. First, 5 DataFrames are put into the queue without any custom time.sleep. In the f_get function I added a counter (and a time.sleep(0), see below) to the loop (while not q.empty()).

The new code:

import multiprocessing, time, sys                                                 
import pandas as pd                                                              

NR_ROWS = 10000                                                                  
i = 0                                                                            
def getDf():                                                                     
    global i, NR_ROWS                                                            
    myheader = ["name", "test2", "test3"]                                        
    myrow1 =   [ i,  i+400, i+250]                                               
    df = pd.DataFrame([myrow1]*NR_ROWS, columns = myheader)                      
    i = i+1                                                                      
    return df                                                                    

def f_put(q):                                                                    
    print "f_put start"                                                          
    j = 0                                                                        
    while(j < 5):                                                                
        data = getDf()                                                           
        q.put(data)                                                              
        print "P:", data["name"].iloc[0]                                         
        sys.stdout.flush()                                                       
        j += 1                                                                   

def f_get(q):                                                                    
    print "f_get start"                                                          
    while(1):
        data = pd.DataFrame()                                                    
        loop = 0                                                                 
        while not q.empty():                                                     
            data = q.get()                                                  
            print "get (loop: %s)" %loop
            time.sleep(0)                                         
            loop += 1                                                            
        time.sleep(1.)                                                           

if __name__ == "__main__":                                                       

    q = multiprocessing.Queue()                                                  
    p = multiprocessing.Process(target=f_put, args=(q,))                         
    p.start()                                                                    
    while(1):                                                                    
        f_get(q)                                                                 
    p.join()

Now, if you run this for different number of rows, you will see something like this:

N=100:

f_get start
f_put start
P: 0
P: 1
P: 2
P: 3
P: 4
get (loop: 0)
get (loop: 1)
get (loop: 2)
get (loop: 3)
get (loop: 4)

N=10000:

f_get start
f_put start
P: 0
P: 1
P: 2
P: 3
P: 4
get (loop: 0)
get (loop: 1)
get (loop: 0)
get (loop: 0)
get (loop: 0)

What does this tell us? As long as the DataFrame is small, your assumption that the put process is faster than the get seems true, we can fetch all 5 items within one loop of while not q.empty().

But, as the number of rows increases, something changes. The while-condition q.empty() evaluates to True (the queue is empty) and the outer while(1) cycles.

This could mean that put is now slower than get and we have to wait. But if we set the sleep time for the whole f_get to something like 15, we still get the same behaviour.

On the other hand, if we change the time.sleep(0) in the inner q.get() loop to 1,

while not q.empty():                                                  
    data = q.get()                                                    
    time.sleep(1)                                                     
    print "get (loop: %s)" %loop                                      
    loop += 1

we get this:

f_get start
f_put start
P: 0
P: 1
P: 2
P: 3
P: 4
get (loop: 0)
get (loop: 1)
get (loop: 2)
get (loop: 3)
get (loop: 4)

This looks right! And it means that actually get does something strange. It seems that while it is still processing a get, the queue state is empty, and after the get is done the next item is available.

I'm sure there is a reason for that, but I'm not familiar enough with multiprocessing to see that.

Depending on your application, you could just add the appropriate time.sleep to your inner loop and see if thats enough.

Or, if you want to solve it (instead of using a workaround as the time.sleep method), you could look into multiprocessing and look for information on blocking, non-blocking or asynchronous communication - I think the solution will be found there.