2
votes

I am running a set of pandas-like transformations to a dask dataframe, using the "distributed" set-up, running on my own machine - so using 8 workers corresponding to my computer's 8 cores.

I have the default set up of a distributed client:

from dask.distributed import Client
c = Client()

The process runs successfully with a small amount of data (1000 records), but when I scale it up only slightly to 7500 records, I get the following warnings:

tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: tcp://127.0.0.1:58103, threads: 1>>
Traceback (most recent call last):
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/psutil/_psosx.py", line 348, in catch_zombie
yield
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/psutil/_psosx.py", line 387, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
ProcessLookupError: [Errno 3] No such process

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/tornado/ioloop.py", line 1026, in _run
return self.callback()
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/distributed/nanny.py", line 245, in memory_monitor
memory = psutil.Process(self.process.pid).memory_info().rss
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/psutil/_common.py", line 337, in wrapper
return fun(self)
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/psutil/__init__.py", line 1049, in memory_info
return self._proc.memory_info()
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/psutil/_psosx.py", line 330, in wrapper
return fun(self, *args, **kwargs)
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/psutil/_psosx.py", line 456, in memory_info
rawtuple = self._get_pidtaskinfo()
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/psutil/_common.py", line 337, in wrapper
return fun(self)
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/psutil/_psosx.py", line 387, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/contextlib.py", line 99, in __exit__
self.gen.throw(type, value, traceback)
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/psutil/_psosx.py", line 361, in catch_zombie
raise AccessDenied(proc.pid, proc._name)
psutil._exceptions.AccessDenied: psutil.AccessDenied (pid=17998)

Which repeats itself multiple times, as dask attempts to start the computation block again. After it's failed the amount of times specified in the config file, there is finally a KilledWorker error e.g. the below. I've tried this with different lengths of data and the KilledWorker is sometimes on a melt task, sometimes on an apply task.


KilledWorker                              Traceback (most recent call last)
<ipython-input-28-7ba288919b51> in <module>()
      1 #Optional checkpoint to view output
      2 with ProgressBar():
----> 3     output = aggdf.compute()
      4 output.head()

~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    133         dask.base.compute
    134         """
--> 135         (result,) = compute(self, traverse=False, **kwargs)
    136         return result
    137 

~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    331     postcomputes = [a.__dask_postcompute__() if is_dask_collection(a)
    332                     else (None, a) for a in args]
--> 333     results = get(dsk, keys, **kwargs)
    334     results_iter = iter(results)
    335     return tuple(a if f is None else f(next(results_iter), *a)

~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, **kwargs)
   1997                 secede()
   1998             try:
-> 1999                 results = self.gather(packed, asynchronous=asynchronous)
   2000             finally:
   2001                 for f in futures.values():

~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1435             return self.sync(self._gather, futures, errors=errors,
   1436                              direct=direct, local_worker=local_worker,
-> 1437                              asynchronous=asynchronous)
   1438 
   1439     @gen.coroutine

~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    590             return future
    591         else:
--> 592             return sync(self.loop, func, *args, **kwargs)
    593 
    594     def __repr__(self):

~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    252             e.wait(1000000)
    253     if error[0]:
--> 254         six.reraise(*error[0])
    255     else:
    256         return result[0]

~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
    --> 693             raise value
        694         finally:
    695             value = None

~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/distributed/utils.py in f()
    236             yield gen.moment
    237             thread_state.asynchronous = True
--> 238             result[0] = yield make_coro()
    239         except Exception as exc:
    240             logger.exception(exc)

~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1053 
   1054                     try:
-> 1055                         value = future.result()
   1056                     except Exception:
   1057                         self.had_exception = True

~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/tornado/concurrent.py in result(self, timeout)
    236         if self._exc_info is not None:
    237             try:
--> 238                 raise_exc_info(self._exc_info)
    239             finally:
    240                 self = None


~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/tornado/util.py in raise_exc_info(exc_info)

~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1061                     if exc_info is not None:
   1062                         try:
-> 1063                             yielded = self.gen.throw(*exc_info)
   1064                         finally:
   1065                             # Break up a reference to itself

~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1313                             six.reraise(type(exception),
   1314                                         exception,
-> 1315                                         traceback)
   1316                     if errors == 'skip':
   1317                         bad_keys.add(key)

~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

KilledWorker: ("('melt-b85b6b6b1aee5b5aaa8d24db1de65b8a', 0)", 'tcp://127.0.0.1:58108')

I'm not very familiar with the distributed or tornado packages, or the underlying architecture of which processes are being created and killed - is anyone able to point me in the right direction to debug/resolve this?

In the meantime I am switching to the default dask dataframe behaviour of multithreaded computation, which works successfully with a large amount of data.

1

1 Answers

3
votes

It looks like your workers are dying for some reason. Unfortunately it's not clear from the workers what the cause is. You might consider setting up the cluster manually to have clearer access to the worker logs

$ dask-scheduler  # run this in one terminal

$ dask-worker tcp://localhost:8786  # run this in another
worker logs will appear here