6
votes

Using dask distributed i try to submit a function that is located in another file named worker.py. In workers i've the following error :

No module named 'worker'

However I'm unable to figure out what i'm doing wrong here ...

Here is a sample of my code:

import worker

def run(self):
    dask_queue = queue.Queue()
    remote_queue = self.executor.scatter(dask_queue)
    map_queue = self.executor.map(worker.run, remote_queue)
    result = self.executor.gather(map_queue)

    # Load data into the queue
    for option in self.input.get_next_option():
        remote_queue.put([self.server, self.arg, option])

Here is the complete traceback obtained on the worker side:

distributed.core - INFO - Failed to deserialize b'\x80\x04\x95\x19\x00\x00\x00\x00\x00\x00\x00\x8c\x06worker\x94\x8c\nrun\x94\x93\x94.' Traceback (most recent call last): File "/usr/local/lib/python3.5/dist-packages/distributed/core.py", line 74, in loads return pickle.loads(x) ImportError: No module named 'worker' distributed.worker - WARNING - Could not deserialize task Traceback (most recent call last): File "/usr/local/lib/python3.5/dist-packages/distributed/worker.py", line 496, in compute_one task) File "/usr/local/lib/python3.5/dist-packages/distributed/worker.py", line 284, in deserialize function = loads(function) File "/usr/local/lib/python3.5/dist-packages/distributed/core.py", line 74, in loads return pickle.loads(x) ImportError: No module named 'worker'

3

3 Answers

3
votes

Edit: see MRocklin comment for a cleaner solution

Actually if the code to execute in dask worker is in a external module it must be known from the dask worker path (It's not serialized from the client to the worker).

Changing my PYTHONPATH to ensure that the worker knows that module fixed the issue. A similar issue was posted in dask issues:

https://github.com/dask/distributed/issues/344

1
votes

Similar issue is faced by me. Functions from a python module were used while creating dask graph. However, worker process can not find the python module.

Following errors were presented in worker console. Here, tasks.py contained the worker function used in dask graph.

[ worker 10.0.2.4 ] : ModuleNotFoundError: No module named 'tasks'
[ worker 10.0.2.4 ] : distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x95\x14\x00\x00\x00\x00\x00\x00\x00\x8c\x05tasks\x94\x8c\x06ogs_mk\x94\x93\x94.'

When Client.upload_file was used (shown below) to send local python modules to workers, the issue got resolved.

client.upload_file('tasks.py')     ## Send local package to workers
results = client.get(dsk, 'root')  ## get the results
-2
votes

[LINUX ONLY]

If you are using python 3.XX then you may want to intstal pip version 3 and then use the command below

sudo pip3 install dask

This well install all dependencies along with dask