I want to use multiprocessing
on a large dataset to find the distance between two gps points. I constructed a test set, but I have been unable to get multiprocessing
to work on this set.
import pandas as pd
from geopy.distance import vincenty
from itertools import combinations
import multiprocessing as mp
df = pd.DataFrame({'ser_no': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
'co_nm': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
'lat': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'lon': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})
def calc_dist(x):
return pd.DataFrame(
[ [grp,
df.loc[c[0]].ser_no,
df.loc[c[1]].ser_no,
vincenty(df.loc[c[0], x],
df.loc[c[1], x])
]
for grp,lst in df.groupby('co_nm').groups.items()
for c in combinations(lst, 2)
],
columns=['co_nm','machineA','machineB','distance'])
if __name__ == '__main__':
pool = mp.Pool(processes = (mp.cpu_count() - 1))
pool.map(calc_dist, ['lat','lon'])
pool.close()
pool.join()
I am using Python 2.7.11 and Ipython 4.1.2 with Anaconda 2.5.0 64-bit on Windows7 Professional when this error occurs.
runfile('C:/.../Desktop/multiprocessing test.py', wdir='C:/.../Desktop') Traceback (most recent call last):
File "", line 1, in runfile('C:/.../Desktop/multiprocessing test.py', wdir='C:/.../Desktop')
File "C:...\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py", line 699, in runfile execfile(filename, namespace)
File "C:...\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py", line 74, in execfile exec(compile(scripttext, filename, 'exec'), glob, loc)
File "C:/..../multiprocessing test.py", line 33, in pool.map(calc_dist, ['lat','lon'])
File "C:...\AppData\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py", line 251, in map return self.map_async(func, iterable, chunksize).get()
File "C:...\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py", line 567, in get raise self._value
TypeError: Failed to create Point instance from 1.
def get(self, timeout=None):
self.wait(timeout)
if not self._ready:
raise TimeoutError
if self._success:
return self._value
else:
raise self._value