I'm relatively new to Python, and am struggling to use the multiprocessing module to perform some CPU-intensive data conversion. I have a large chunk of data (~400,000 observations of ~300 variables) in csv format, which I want to convert to a Tableau Data Extract using their Python API. Writing a script to do the conversion is straightforward, but it takes about 15 minutes to complete as only 1 CPU is doing the work (takes only about 90 seconds with Tableau Desktop). I need to utilize all of my 8 cores to make this conversion go more quickly.
My initial idea was to split the data up into 8 chunks, have 8 workers each produce lists of Tableau Rows with the multiprocessing module, then combine the rows into a single tde Table. However, since the Tableau row objects/classes are defined in a separate module(the tableau API), I get pickling and pointer errors. The API is complex and draws from a number of other modules itself, so my attempts to reconstruct the necessary definitions in the main global space have all failed.
I've tried to use Dill and PiCloud, but both attempts still result in pickling or pointer errors. Does anyone know of an effective way to serialize and/or multiprocess a computation in Python which relies on methods/objects defined in an external package(without having to dig into the package to try to recreate the wheel in your program)?
Below is the working program I want to multiprocess(I drew heavily from Brian Bickell's work published here http://www.interworks.com/blogs/bbickell/2012/12/06/introducing-python-tableau-data-extract-api-csv-extract-example):
from sys import argv
import os, csv, datetime, time
import dataextract as tde
csv.field_size_limit(10000000)
## Functions
# This function makes adding the columns to each row in the extract a bit easier.
def add_tde_col(colnum, row, val, t):
# Date format used below
dateformat = '%Y-%mm-%dd %H:%M:%S.%f'
if t == tdeTypes['INTEGER']:
try:
convert = int(val)
row.setInteger(colnum, convert)
except ValueError:
#if we bomb the cast then we just add a null
row.setNull(colnum)
elif t == tdeTypes['DOUBLE']:
try:
convert = float(val)
row.setDouble(colnum, convert)
except ValueError:
row.setNull(colnum)
elif t == tdeTypes['BOOLEAN']:
try:
convert = int(val)
if convert > -1 and convert <= 1:
row.setBoolean(colnum, convert)
else:
row.setNull(colnum)
except ValueError:
row.setNull(colnum)
elif t == tdeTypes['DATETIME']:
try:
d = datetime.datetime.strptime(val, dateformat)
row.setDate(colnum, d.year, d.month, d.day, d.hour, d.minute, d.second, d.microsecond)
except ValueError:
row.setNull(colnum)
elif t == tdeTypes['CHAR_STRING']:
row.setCharString(colnum, val)
elif t == tdeTypes['UNICODE_STRING']:
row.setString(colnum, val)
else:
print 'Error'
row.setNull(colnum)
# define csv input
inputFile = 'test1.csv'
## Parameters
tdeFileName = 'tdetest1.tde'
startTime = time.clock()
# Handy dictionary of Tableau data types
tdeTypes = {'INTEGER': 7, 'DOUBLE': 10, 'BOOLEAN': 11, 'DATE': 12, 'DATETIME': 13, 'DURATION': 14,
'CHAR_STRING': 15, 'UNICODE_STRING': 16}
## Define CSV Schema in dict, (truncated here for brevity)
csvSchema = []
csvSchema.append({'fAsOfDate': tdeTypes['DATETIME']})
csvSchema.append({'AsOfDate_Max': tdeTypes['DATETIME']})
csvSchema.append({'LoanID': tdeTypes['INTEGER']})
csvSchema.append({'lenderdatabaseid': tdeTypes['INTEGER']})
csvSchema.append({'loanrecordid': tdeTypes['INTEGER']})
csvSchema.append({'random_num': tdeTypes['INTEGER']})
# Try to create extract, delete if found.
try:
tdeFile = tde.Extract(tdeFileName)
except:
os.system('del '+tdeFileName)
os.system('del DataExtract.log')
tdeFile = tde.Extract(tdeFileName)
# Open CSV
csvFile = open(inputFile, "rU")
reader = csv.reader(csvFile, delimiter = '^')
print 'Reading records from %s' % (inputFile)
# Create TDE table definition
tdeTableDef = tde.TableDefinition()
print 'Defined table schema:'
# Build TDE Table Def from csv schema dict
for index, item in enumerate(csvSchema):
for k, v in item.items():
print 'Column %i: %s <%s>' % (index, k, tdeTypes.keys() [tdeTypes.values().index(v)])
tdeTableDef.addColumn(k, v)
# Add table to extract
tdeTable = tdeFile.addTable("Extract",tdeTableDef)
print 'Writing records to %s' % (tdeFileName)
# iterate through rows and columns of csv -> add to tde
rownum = 0
for row in reader:
if rownum == 0:
header = row
else:
colnum = 0
tdeRow = tde.Row(tdeTableDef)
for col in row:
if colnum+1 > len(csvSchema):
break
add_tde_col(colnum, tdeRow, row[colnum], csvSchema[colnum].values()[0])
colnum += 1
tdeTable.insert(tdeRow)
tdeRow.close()
rownum += 1
print '%i rows added in total in %f seconds' % (rownum-1, time.clock()-startTime)
tdeFile.close()
csvFile.close()
dill
/PiCloud
and python'smultiprocessing
module? If so,multiprocessing
relies oncPickle
, and thus either of the two serializers you mentioned won't be used. If you usepathos.multiprocessing
which usesdill
directly, you may have better luck. – Mike McKernsdill
)? That might help someone find a path forward for you. – Mike McKernsctypes.pointers
. Groan.dill
can't do those yet. Only some of thectypes
types work as of yet, and not the pointer types. – Mike McKerns