The goal is to use dask.delayed
to parallelize some 'embarrassingly parallel' sections of my code. The code involves calling a python function which wraps a c-function using ctypes
. To understand the errors I was getting I wrote a very basic example.
The c-function:
double zippy_sum(double x, double y)
{
return x + y;
}
The python:
from dask.distributed import Client
client = Client(n_workers = 4)
client
import os
import dask
import ctypes
current_dir = os.getcwd() #os.path.abspath(os.path.dirname(__file__))
_mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))
_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
_zippy_sum.restype = ctypes.c_double
def zippy(x, y):
z = _zippy_sum(x, y)
return z
result = dask.delayed(zippy)(1., 2.)
result.compute()
The Traceback:
--------------------------------------------------------------------------- KeyError Traceback (most recent call last) ~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/worker.py in dumps_function(func) 3286 with _cache_lock: -> 3287 result = cache_dumps[func] 3288 except KeyError:
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/utils.py in getitem(self, key) 1517 def getitem(self, key): -> 1518 value = super().getitem(key) 1519 self.data.move_to_end(key)
~/.edm/envs/evaxi3.6/lib/python3.6/collections/init.py in getitem(self, key) 990 return self.class.missing(self, key) --> 991 raise KeyError(key) 992 def setitem(self, key, item): self.data[key] = item
KeyError: function zippy at 0x11ffc50d0
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last) ~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x) 40 if b"main" in result: ---> 41 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL) 42 else:
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol) 1147 cp = CloudPickler(file, protocol=protocol) -> 1148 cp.dump(obj) 1149 return file.getvalue()
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dump(self, obj) 490 try: --> 491 return Pickler.dump(self, obj) 492 except RuntimeError as e:
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in dump(self, obj) 408 self.framer.start_framing() --> 409 self.save(obj) 410 self.write(STOP)
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 475 if f is not None: --> 476 f(self, obj) # Call unbound method with explicit self 477 return
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name) 565 else: --> 566 return self.save_function_tuple(obj) 567
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func) 779 state['kwdefaults'] = func.kwdefaults --> 780 save(state) 781 write(pickle.TUPLE)
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 475 if f is not None: --> 476 f(self, obj) # Call unbound method with explicit self 477 return
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save_dict(self, obj) 820 self.memoize(obj) --> 821 self._batch_setitems(obj.items()) 822
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in _batch_setitems(self, items) 846 save(k) --> 847 save(v) 848 write(SETITEMS)
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 475 if f is not None: --> 476 f(self, obj) # Call unbound method with explicit self 477 return
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save_dict(self, obj) 820 self.memoize(obj) --> 821 self._batch_setitems(obj.items()) 822
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in _batch_setitems(self, items) 851 save(k) --> 852 save(v) 853 write(SETITEM)
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 495 if reduce is not None: --> 496 rv = reduce(self.proto) 497 else:
ValueError: ctypes objects containing pointers cannot be pickled
Unfortunately, I still do not understand the errors! I am just getting started with dask
and only have some basic experience with ctypes
. Does anyone have suggestions for how to tackle this, or even understanding what need to be tackled?
Thanks!