
I'm relatively new to Python, and am struggling to use the multiprocessing module to perform some CPU-intensive data conversion. I have a large chunk of data (~400,000 observations of ~300 variables) in csv format, which I want to convert to a Tableau Data Extract using their Python API. Writing a script to do the conversion is straightforward, but it takes about 15 minutes to complete as only 1 CPU is doing the work (takes only about 90 seconds with Tableau Desktop). I need to utilize all of my 8 cores to make this conversion go more quickly.

My initial idea was to split the data up into 8 chunks, have 8 workers each produce lists of Tableau Rows with the multiprocessing module, then combine the rows into a single tde Table. However, since the Tableau row objects/classes are defined in a separate module(the tableau API), I get pickling and pointer errors. The API is complex and draws from a number of other modules itself, so my attempts to reconstruct the necessary definitions in the main global space have all failed.

I've tried to use Dill and PiCloud, but both attempts still result in pickling or pointer errors. Does anyone know of an effective way to serialize and/or multiprocess a computation in Python which relies on methods/objects defined in an external package(without having to dig into the package to try to recreate the wheel in your program)?

Below is the working program I want to multiprocess(I drew heavily from Brian Bickell's work published here http://www.interworks.com/blogs/bbickell/2012/12/06/introducing-python-tableau-data-extract-api-csv-extract-example):

from sys import argv
import os, csv, datetime, time
import dataextract as tde


## Functions

# This function makes adding the columns to each row in the extract a bit easier.
def add_tde_col(colnum, row, val, t):
    # Date format used below
    dateformat = '%Y-%mm-%dd %H:%M:%S.%f'  

    if t == tdeTypes['INTEGER']:
            convert = int(val)
            row.setInteger(colnum, convert)
        except ValueError:
            #if we bomb the cast then we just add a null 

    elif t == tdeTypes['DOUBLE']:
            convert = float(val)
            row.setDouble(colnum, convert)
        except ValueError:                        

    elif t == tdeTypes['BOOLEAN']:
            convert = int(val)
            if convert > -1 and convert <= 1:
                row.setBoolean(colnum, convert)
        except ValueError:

    elif t == tdeTypes['DATETIME']:
            d = datetime.datetime.strptime(val, dateformat)
           row.setDate(colnum, d.year, d.month, d.day, d.hour, d.minute, d.second, d.microsecond)
        except ValueError:

    elif t == tdeTypes['CHAR_STRING']:
        row.setCharString(colnum, val)

    elif t == tdeTypes['UNICODE_STRING']:
        row.setString(colnum, val)

        print 'Error'

# define csv input      
inputFile = 'test1.csv'

## Parameters 
tdeFileName = 'tdetest1.tde'

startTime = time.clock()

# Handy dictionary of Tableau data types
tdeTypes = {'INTEGER': 7, 'DOUBLE': 10, 'BOOLEAN': 11, 'DATE': 12, 'DATETIME': 13, 'DURATION': 14, 
            'CHAR_STRING': 15, 'UNICODE_STRING': 16}

## Define CSV Schema in dict, (truncated here for brevity)
csvSchema = []
csvSchema.append({'fAsOfDate': tdeTypes['DATETIME']})
csvSchema.append({'AsOfDate_Max': tdeTypes['DATETIME']})
csvSchema.append({'LoanID': tdeTypes['INTEGER']})
csvSchema.append({'lenderdatabaseid': tdeTypes['INTEGER']})
csvSchema.append({'loanrecordid': tdeTypes['INTEGER']})
csvSchema.append({'random_num': tdeTypes['INTEGER']})

# Try to create extract, delete if found.
    tdeFile = tde.Extract(tdeFileName)
    os.system('del '+tdeFileName)
    os.system('del DataExtract.log')
    tdeFile = tde.Extract(tdeFileName)

# Open CSV
csvFile = open(inputFile, "rU")
reader = csv.reader(csvFile, delimiter = '^')

print 'Reading records from %s' % (inputFile)

# Create TDE table definition
tdeTableDef = tde.TableDefinition() 

print 'Defined table schema:'

# Build TDE Table Def from csv schema dict
for index, item in enumerate(csvSchema):
    for k, v in item.items():
        print 'Column %i: %s <%s>' % (index, k, tdeTypes.keys() [tdeTypes.values().index(v)])
        tdeTableDef.addColumn(k, v)

# Add table to extract
tdeTable = tdeFile.addTable("Extract",tdeTableDef)

print 'Writing records to %s' % (tdeFileName)

# iterate through rows and columns of csv -> add to tde
rownum = 0
for row in reader:
    if rownum == 0:
        header = row
        colnum = 0
        tdeRow = tde.Row(tdeTableDef)
        for col in row:
            if colnum+1 > len(csvSchema):
            add_tde_col(colnum, tdeRow, row[colnum], csvSchema[colnum].values()[0])
            colnum += 1
    rownum += 1

print '%i rows added in total in %f seconds' % (rownum-1, time.clock()-startTime)


Did you use dill/PiCloud and python's multiprocessing module? If so, multiprocessing relies on cPickle, and thus either of the two serializers you mentioned won't be used. If you use pathos.multiprocessing which uses dill directly, you may have better luck.Mike McKerns
Could you also show the error you are getting (when you use dill)? That might help someone find a path forward for you.Mike McKerns
Thanks for the quick response Mike, and for your awesome work with dill.user3797878
When I attempted to use dill, I cut out the part of the code where the rows are added to the tableau table and instead added each open row object to a list. I then attempted to use dill to preserve the state (thinking I could do this with a number of scripts running concurrently and pick up the state for all of them in another script.) Doing this resulted in the following error: ValueError: ctypes objects containing pointers cannot be pickleduser3797878
Oh… ctypes.pointers. Groan. dill can't do those yet. Only some of the ctypes types work as of yet, and not the pointer types.Mike McKerns

1 Answers


If you are failing to serialize at a ctypes.pointer type with dill and PiCloud, then I think you are stuck. I don't know of a serializer that can handle those types. dill can handle some of the ctypes types, but not the pointer types. I'd suggest adding an issue on github for dill, and maybe magic will happen and you get a new serialized type. With a new type in hand, I'd use pathos.multiprocessing, and it should work. However, until then, you might have to look to a rewrite to get around the serialization issue. For example, if you use from dill.detect import badobjects, baditems, badtypes, errors you might be able to see at what depth you need to do your rewrite. It may be as easy as modifying how something is imported -- however, since there's a ctypes.pointer, I'm skeptical that it'll be easy.