1
votes

I'm working on an HPC system (NCAR's Cheyenne) and want to do a few things to a large NetCDF data set (the NCAR Large Ensemble). I'm having issues with memory errors despite reading through the Pangeo and dask/dask-distributed documentations.

The steps I want to take are:

  1. Load a large (global, ~1800 years of daily data), multi-file data set using xarray.open_mfdataset() into dask
  2. Take a 30-day rolling sum across the full time dimension at all grid points, ideally using dask-distributed since it's on the order of TB of data
  3. Calculate return intervals on these 30-day sums (i.e., running numpy.nanpercentile() or a dask DataFrame.quantile() function with a specified percentile value)
  4. Save the result as a 2D (lat x lon) NetCDF file

It's important for me to be able to load the full time series for any grid point at once, because the rolling sum and return interval calculations depend on the full time period.

Here's the snippet of code I've been trying to do this with (up to step 2 above). I'm running it within a jupyter notebook, and I've specified $TMPDIR as my personal scratch directory (on this system, /glade/scratch/$USER/temp/):

import xarray
import dask_jobqueue
import dask.distributed
import glob

## specify location of precipitation files, use glob to get list of them
PRECT_root = '/glade/collections/cdg/data/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/daily/PRECT/'
filename_list = sorted(glob.glob(PRECT_root + 'b.e11.B1850C5CN.f09_g16.005.cam.h1.PRECT.*.nc'))

## import multi-file data set
mfds = xarray.open_mfdataset(filename_list, chunks={'lat':12, 'lon':12})
mfds_data = mfds['PRECT']

## start cluster object
cluster = dask_jobqueue.PBSCluster(cores=36, memory='100 GB', project='UCLA0022', queue='regular', walltime='06:00:00', local_directory='/glade/scratch/$USER/temp/')

## create 30 workers (I've tried up to 70)
cluster.start_workers(30)

## attach client
client = dask.distributed.Client(cluster)

## create a rolling 30-day sum
mfds_data_30day_rolling_object = mfds_data.rolling(time=30, center=True)
mfds_data_30day_rolling_sum = client.persist(mfds_data_30day_rolling_object.sum())

Even if I load a single file (not the full data set), the final line of code above looks to complete via the dask dashboard, but eventually the job gets killed by the system. The HPC admins have emailed multiple times saying I'm using too much memory on the login nodes, yet I've carefully specified my $TMPDIR and local_directory, and I'm at a loss of how to fix this.

A couple questions I have for this are:

  1. Is there a better approach for these calculations than the steps I'm taking? (It seems like building a lazy rolling window object, then calling sum/persist, would be a good way to go. I've also tried using scatter() and futures, but the memory issues persist.)
  2. When I look at the disk usage of my scratch directory (du -hs /glade/scratch/$USER/temp), it doesn't seem like the workers are taking up any memory during the sum() and persist(). In this case, where exactly is the memory being used? I have a feeling if I can learn this, I can resolve my issue.
    • More info: client.scheduler_info() confirms that my scratch directory has been appropriately set, and the worker-**** files are created in it, but the folder size remains fixed at 32 KB. Perhaps my thinking is off about what setting the local_directory does in the PBSCluster() function?
1
Can you add a repr of the mfds_data? You should not need to call client.persist. Have you been able to watch the distributed dashboard? Are you able to tell if workers are particularly memory stressed at some point? - jhamman

1 Answers

0
votes

There are a few questions here. I'll try to point you along some good paths for future reading:

  • Your nodes likely don't have local storage. Your tmp directory is either RAM, or a network file system. Writing to either will cause problems. Instead. You want Dask workers to pause execution or kill themselves if they run out of memory. This will make your IT folks much happier. More information on this is here: http://docs.dask.org/en/latest/setup/hpc.html#no-local-storage

  • To figure out what is taking up memory you can look at the dashboard. This is evident in the progressbars (the solid rather than transparent color) and also in the info pages for the individual workers. There is a youtube video on this page. http://docs.dask.org/en/latest/diagnostics-distributed.html

  • You might also want to look at the number of total bytes of the mfds_data_30day_rolling_object.sum() object to make sure that it's reasonable to persist in memory. Xarray docs can probably help here.