In dask distributed, tasks are distributed over a cluster nodes via scheduler. I'm looking to introduce a per node dependency on task submitted to a node. Briefly, a compute operation that I'm seeking to perform needs to:
- Preload data onto a GPU on each node.
- Perform GPU compute on each node with other data in chunked dask arrays.
I also want to enqueue (1) and (2) multiple times multiple times on differing datasets.
I've tried to set this up as a minimal example:
from __future__ import print_function
import dask.array as da
from dask.base import tokenize
from distributed import (Client, LocalCluster,
get_worker, as_completed)
import numpy as np
cluster = LocalCluster(n_workers=0)
cluster.start_worker(name="Alice")
cluster.start_worker(name="Bob")
cluster.start_worker(name="Eve")
with cluster, Client(cluster.scheduler_address) as client:
workers = client.scheduler_info()['workers'].values()
workers = [v['name'] for v in workers]
print("Workers {}".format(workers))
def init_worker():
get_worker()._stuff = 0
return "OK"
# Call init_worker on each worker. Need pure to
# ensure this happens multiple times
init_futures = [client.submit(init_worker, pure=False,
workers=[w])
for w in workers]
assert all(f.result() == "OK" for f in as_completed(init_futures))
A = da.arange(0, 20, chunks=(5,), dtype=np.float64)
def inc_worker(A):
w = get_worker()
w._stuff += 1
print("{}[{}]".format(w.name, w._stuff))
return A + w._stuff
def increment(A):
""" Call inc_worker """
from dask.base import tokenize
name = 'increment-' + tokenize(A)
dsk = { (name, i): (inc_worker, (A.name, i))
for n, i in A.dask.keys() }
dsk.update(A.dask)
return da.Array(dsk, name, A.chunks, A.dtype)
print(increment(A).compute())
print(increment(A).compute())
I want to find some way to make the increment-* tasks submitted to each worker dependent on the init-worker-* tasks submitted to each worker.
Put another way, I want avoid waiting for the init_futures to complete in the client. The problem this introduces is that, while we known which init-worker-* tasks are associated with each worker, there is no obvious way to know beforehand the worker association of the increment-* tasks.
One Possible Approach:
- For each
inc_workercall, spawn alocal_client()that submits a task with theinit-worker-*inget_worker().dataas a dependency. I don't like this because the overhead seems quite large.
Any suggestions on how to do this?
EDIT 1: Actually this works without waiting for the init_futures to complete, presumably because they're submitted to the worker schedulers before any of the increment-* tasks are submitted to the worker. It still feels like I'm making an assumption that may not always be True though...
EDIT 2: Mentioned that the 2 steps should be run multiple times on different datasets.