I have a Dask-MPI cluster with 4 workers, a 3D grid dataset loaded into a Dask array, and chunked into 4 blocks. My application requires that I run exactly one task per worker, and preferably with one block per task. The trouble I'm having is getting the the blocks distributed across the cluster in a reliable, reproducible way. Specifically, if I run array.map_blocks(foo), foo runs on the same worker for every block.
Client.rebalance() seems like it should do what I want, but it still leaves all or most blocks on the same worker. As a test, I attempted rechunking the data into 128 blocks and running again, which causes 7 or 8 of the blocks to move to a different dataset. This hints that Dask is using a heuristic to decide when to move blocks automatically, but doesn't give me a way to force an even block distribution.
Here's a simple test script I've been trying (connecting to a cluster with 4 workers/ranks).
#connect to the Dask scheduler
from dask.distributed import Client, Sub, Pub, fire_and_forget
client = Client(scheduler_file='../scheduler.json', set_as_default=True)
#load data into a numpy array
import numpy as np
npvol = np.array(np.fromfile('/home/nleaf/data/RegGrid/Vorts_t50_128x128x128_f32.raw', dtype=np.float32))
npvol = npvol.reshape([128,128,128])
#convert numpy array to a dask array
import dask.array as da
ar = da.from_array(npvol).rechunk([npvol.shape[0], npvol.shape[1], npvol.shape[2]/N])
def test(ar):
from mpi4py import MPI
rank = MPI.COMM_WORLD.Get_rank()
return np.array([rank], ndmin=3, dtype=np.int)
client.rebalance()
print(client.persist(ar.map_blocks(test, chunks=(1,1,1))).compute())
Over a few dozen test runs, this code returned a block on rank 3 once, and has had all blocks on rank 0 otherwise.