0
votes

I have a Dask-MPI cluster with 4 workers, a 3D grid dataset loaded into a Dask array, and chunked into 4 blocks. My application requires that I run exactly one task per worker, and preferably with one block per task. The trouble I'm having is getting the the blocks distributed across the cluster in a reliable, reproducible way. Specifically, if I run array.map_blocks(foo), foo runs on the same worker for every block.

Client.rebalance() seems like it should do what I want, but it still leaves all or most blocks on the same worker. As a test, I attempted rechunking the data into 128 blocks and running again, which causes 7 or 8 of the blocks to move to a different dataset. This hints that Dask is using a heuristic to decide when to move blocks automatically, but doesn't give me a way to force an even block distribution.

Here's a simple test script I've been trying (connecting to a cluster with 4 workers/ranks).

#connect to the Dask scheduler
from dask.distributed import Client, Sub, Pub, fire_and_forget
client = Client(scheduler_file='../scheduler.json', set_as_default=True)


#load data into a numpy array
import numpy as np
npvol = np.array(np.fromfile('/home/nleaf/data/RegGrid/Vorts_t50_128x128x128_f32.raw', dtype=np.float32))
npvol = npvol.reshape([128,128,128])

#convert numpy array to a dask array
import dask.array as da
ar = da.from_array(npvol).rechunk([npvol.shape[0], npvol.shape[1], npvol.shape[2]/N])


def test(ar):
    from mpi4py import MPI
    rank = MPI.COMM_WORLD.Get_rank()
    return np.array([rank], ndmin=3, dtype=np.int)

client.rebalance()
print(client.persist(ar.map_blocks(test, chunks=(1,1,1))).compute())

Over a few dozen test runs, this code returned a block on rank 3 once, and has had all blocks on rank 0 otherwise.

1

1 Answers

0
votes

Since your total dataset is not that big, the initial call to from_array is creating one chunk only, so it goes to one worker (you could have specified otherwise with chunks=). The following rechunk prefers not to move data, if possible.

Assuming your file can be reached by each of the workers, you would be better off loading the chunks in workers in the first place.

You would need functions like

def get_chunk(fn, offset, count, shape, dtype):
    with open(fn, 'rb') as f:
        f.seek(offset)
        return np.fromfile(f, dtype=dtype, count=count).reshape(shape)

and passing different offsets for each chunk.

parts = [da.from_delayed(dask.delayed(get_chunk)(fn, offset, count, shape, dtype), shape, dtype) for offset in [...]]
arr = da.concat(parts)

This is very similar to what is done automatically by the npy source in Intake, code: https://github.com/intake/intake/blob/master/intake/source/npy.py#L11