0
votes

I have a question pertaining to the scheduling/execution order of tasks in dask.distributed for the case of strong data reduction of a large raw dataset.

We are using dask.distributed for a code which extracts information from movie frames. Its specific application is crystallography, but quite generally the steps are:

  1. Read frames of a movie stored as a 3D array in a HDF5 file (or a few thereof which are concatenated) into a dask array. This is obviously quite I/O-heavy
  2. Group these frames into consecutive sub-stacks of typically 10 move stills, the frames of which are aggregated (summed or averaged), resulting in a single 2D image.
  3. Run several, computationally heavy, analysis functions on the 2D image (such as positions of certain features), returning a dictionary of results, which is negligibly small compared to the movie itself.

We implement this by using the dask.array API for steps 1 and 2 (the latter using map_blocks with a block/chunk size of one or a few of the aggregation sub-stacks), then converting the array chunks to dask.delayed objects (using to_delayed) which are passed to a function doing the actual data reduction function (step 3). We take care to properly align the chunks of the HDF5 arrays, the dask computation and the aggregation ranges in step 2 such that the task graph of each final delayed object (elements of tasks) is very clean. Here's the example code:

def sum_sub_stacks(mov):
    # aggregation function
    sub_stk = []
    for k in range(mov.shape[0]//10):
        sub_stk.append(mov[k*10:k*10+10,...].sum(axis=0, keepdims=True))
    return np.concatenate(sub_stk)

def get_info(mov):
    # reduction function
    results = []
    for frame in mov:
        results.append({
            'sum': frame.sum(),
            'variance': frame.var()
            # ...actually much more complex/expensive stuff
        })
    return results

# connect to dask.distributed scheduler
client = Client(address='127.0.0.1:8786')

# 1: get the movie
fh = h5py.File('movie_stack.h5')
movie = da.from_array(fh['/entry/data/raw_counts'], chunks=(100,-1,-1))

# 2: sum sub-stacks within movie
movie_aggregated = movie.map_blocks(sum_sub_stacks, 
                                    chunks=(10,) + movie.chunks[1:],
                                    dtype=movie.dtype)

# 3: create and run reduction tasks
tasks = [delayed(get_info)(chk) 
         for chk in movie_aggregated.to_delayed().ravel()]

info = client.compute(tasks, sync=True)

The ideal scheduling of operations would clearly be for each worker to perform the 1-2-3 sequence on a single chunk and then move on to the next, which would keep I/O load constant, CPUs maxed out and memory low.

What happens instead is that first all workers are trying to read as many chunks as possible from the files (step 1) which creates an I/O bottleneck and quickly exhausts the worker memory causing thrashing to the local drives. Often, at some point workers eventually move to steps 2/3 which quickly frees up memory and properly uses all CPUs, but in other cases workers get killed in an uncoordinated way or the entire computation is stalling. Also intermediate cases happen where surviving workers behave reasonably for a while only.

Is there any way to give hints to the scheduler to process the tasks in the preferred order as described above or are there other means to improve the scheduling behavior? Or is there something inherently stupid about this code/way of doing things?

1

1 Answers

0
votes

First, there is nothing inherently stupid about what you are doing at all!

In general, Dask tries to reduce the number of temporaries it is holding onto and it also balances this with parallelizability (width of the graph and the number of workers). Scheduling is complex and there is yet another optimization Dask uses which fuses tasks together to make them more optimal. With lots of little chunks you may run into issues: https://docs.dask.org/en/latest/array-best-practices.html?highlight=chunk%20size#select-a-good-chunk-size

Dask does have a number of optimization configurations which I would recommend playing with after considering other chunksizes. I would also encourage you to read through the following issue as there is a healthy discussion around scheduling configurations.

Lastly, you might consider additional memory configuration of your workers as you may want to more tightly control how much memory each worker should use