1
votes

situation

I have got a large NetCDF file with some Earth System model data. I access the file and the data within via xarray and want to process one variable via dask distributed on several workers (some calculation). The xarray variable is chunked in a way that the processing can be performed on each chunk individually without information from other chunks. Number of chunks equals number of workers. After defining my calculation, I call compute(). The full chunked variable seems to be send to each worker (see example below). Instead, I would have expected that each worker gets only only chunk a performs his calculation on it.

I am missing something important but I don't know what.

minimal example

You find the file and a jupyter notebook in a dedicated GitHub repository: https://github.com/neumannd/issue_usage_dask

Two workers are started and each worker gets 350 MB of RAM. The size of my xarray dataset is 395.5 MB and it is split into two chunks of the size of 197.75 MB. It is split in a way that the operation (mean(dim='time')) can be performed individually. However, the workers crash because more than 95% of their memory is filled (332.5 MB) by data. That means that they do not only receive one chunk but the whole variable.

In this minimal example, it is not issue (I could increate the memory of the workers). But, if the dataset is 8 GB in size and each of four workers has to have 8 GB memory or if I work one n > 10 workers with a xarray dataset of x > 10 GB size, I occupy n * x > 100 GB of memory, which get quite bad for increasing n and x.

# load libraries
import xarray as xr
import dask
from dask.distributed import Client

# set path of data
data_file = "/home/k204221/large_data/more_netcdf/CMAQ_t_zoo_59_1.nc"

# initialize workers
client = Client(n_workers=2, threads_per_worker=1,
                memory_limit='350MB', dashboard_address=':8787')
client

enter image description here

# open data file
ds = xr.open_dataset(data_file, chunks = {"xt_ocean": 112, "yt_ocean": 242})
t_zoo = ds.t_zoo
t_zoo.data

enter image description here

# process data
zoo_mean_delay = t_zoo.mean(dim = 'time')
zoo_mean_delay
## output
# <xarray.DataArray 't_zoo' (st_ocean: 152, yt_ocean: 242, xt_ocean: 224)>
# dask.array<mean_agg-aggregate, shape=(152, 242, 224), dtype=float32, chunksize=(152, 242, 112)>
# Coordinates:
#   * xt_ocean  (xt_ocean) float64 8.283 8.383 8.483 8.583 ... 30.38 30.48 30.58
#   * yt_ocean  (yt_ocean) float64 53.86 53.91 53.96 54.01 ... 65.81 65.86 65.91
#   * st_ocean  (st_ocean) float64 0.25 0.7508 1.257 1.771 ... 263.0 265.0 267.0

# I tried to explicitely list the workers in the call of compute
#  hoping to let it work this way
zoo_mean_comp = zoo_mean_delay.compute(workers = ['localhost:39661', 'localhost:34049'])
## output
# distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
# distributed.nanny - WARNING - Restarting worker
# distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
# distributed.nanny - WARNING - Restarting worker
# distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
# distributed.nanny - WARNING - Restarting worker
# distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
# distributed.nanny - WARNING - Restarting worker
# 
# KeyboardInterrupt
# 
# distributed.nanny - WARNING - Restarting worker

notes

  • I tried client.scatter(t_zoo) but it did not help.
  • Calling client.compute(zoo_mean_delay) instead of zoo_mean_delay.compute() did not help.
1
After looking at the workers' work load and the task graph in the scheduler it seems to me that one worker is collecting the whole data at certain steps of the calculation. Moreover (after reading the dask documentation on chunking), the sizes of the chunks seem to be inappropriate. - daniel.heydebreck

1 Answers

0
votes

Two workers are started and each worker gets 350 MB of RAM

However, the workers crash because more than 95% of their memory is filled (332.5 MB) by data

This is not enough RAM to effectively run the Numpy/Pandas/Xarray stack. Just running Python and importing these libraries takes up considerable memory. I wouldn't try running anything in under 2GB of RAM.