3
votes

I'm using a disk-distributed long running task along the lines of this example http://matthewrocklin.com/blog/work/2017/02/11/dask-tensorflow where a long running worker task gets its inputs from a queue as in the tensorflow example and delivers it's results to an output queue. ( I don't see the Channels used in the example in the most recent release of dask ).

I can see how to scatter a list and apply a map to generate a list of futures that push the input data into the workers input queue.

def transfer_dask_to_worker(batch):
    worker = get_worker()
    worker.tensorflow_queue.put(batch)

data = [1,2,3,4] 

future_data = e.scatter(data)

tasks = e.map(transfer_dask_to_worker, future_data ,
     workers=dask_spec['worker'], pure=False)

Now if we wait for the worker to consume the tasks all the results will be in an output queue on the worker. And we can pull it all back using

def transfer_worker_to_dask(arg):
    worker = get_worker()
    return worker.output_queue.get()

results = e.map(transfer_worker_to_dask,range(len(tasks)))

This works fine as long as we manually handle the sequencing by waiting for all the worker tasks to complete before calling them back.

How do we link the output futures to be downstream from the inputs? Is there a way for the long running tasks to create futures on the workers that can be gathered back to the scheduler task?

I tried letting transfer_dask_to_worker(batch) also query the output queue and return the result:

def transfer_dask_to_worker_and_return(batch):
    worker = get_worker()
    worker.tensorflow_queue.put(batch)
    return worker.output_queue.get()

This works for short list but starts to fail with canceled futures around 1000 items.

Thanks in advance.

1

1 Answers

2
votes

Note: that blogpost was experimental. There are several approaches here, I wouldn't restrict myself to that pattern

Lets start with this concrete question:

How do we link the output futures to be downstream from the inputs? Is there a way for the long running tasks to create futures on the workers that can be gathered back to the scheduler task?

The simplest solution here is probably to scatter local data and then put it into a Dask distributed Queue. So if you have a function that your TensorFlow code calls when it produces some result then that function might scatter the local data into a future (this doesn't actually move data, it just makes the Dask worker start tracking it) and then puts that future into a distributed Queue. Placing the future in the Queue allows other clients and workers in Dask to know of the data's existence, and pull it down if necessary

from dask.distributed import Queue
results_q = Queue()

def tf_result_ready(result):
    future = get_worker().scatter(result)
    results_q.put(future)

You can then sit in your client code and pull results from this queue as they become available:

for _ in range(n_blocks):
    future = results_q.get()
    # do stuff with future like submit or gather