1
votes

When using futures on dask.distributed, is there a way to differentiate between pending futures that are currently being evaluated, and those that are still in the queue?

The reason is that I am submitting a large number of tasks (~8000) to a smaller set of workers (100), so not all tasks can be processed immediately. The tasks involve calling a third-party executable (via subprocess.check_output) which in some rare cases goes into an infinite loop.

Therefore, I would like to cancel futures to have been running for too long (using an arbitrary timeout). However there doesn't seem to be a way to tell whether a future has been in a pending state for a long time because computations take longer than usual, or simply because it had to wait for a worker to become available.

My setup involves a SGE cluster running a dask-scheduler and dask-worker job/job-array, respectively. I tried setting a timeout directly in the submitted Python function, using @timeout_decorator.timeout(60, use_signals=False) from the timeout_decorator package, but got the following error:

"daemonic processes are not allowed to have children"

Any help would be much appreciated.

2

2 Answers

0
votes

No, you can not determine if a task has started executing or not. Generally we recommend putting this logic in the task itself as you have tried to do with your timeout decorator.

I recommend instead trying the timeout= keyword to subprocess.check_outputs itself. I suspect that this will be simpler and have a higher chance of working smoothly.

0
votes

For users running Python 2, the timeout= keyword is not available in subprocess.check_output.

I was able to get the desired effect by using subprocess.Popen instead, which returns immediately:

import subprocess
import shlex  # useful to split up arguments for subprocess
import time

p = subprocess.Popen(shlex.split('/path/to/binary arg1 arg2'),
                     stderr=subprocess.STDOUT)
for _ in range(60):  # wait for up to 60 seconds
    if p.poll() is not None:
        break  # process completed
    else:
        time.sleep(1.0)  # give it more time
if p.poll() is None:  # time is up, are we done?
    try:
        p.kill()
    except:
        raise
    raise RuntimeError('Binary failed to complete in time.')