I'm working on an HPC system (NCAR's Cheyenne) and want to do a few things to a large NetCDF data set (the NCAR Large Ensemble). I'm having issues with memory errors despite reading through the Pangeo and dask/dask-distributed documentations.
The steps I want to take are:
- Load a large (global, ~1800 years of daily data), multi-file data set using
xarray.open_mfdataset()into dask - Take a 30-day rolling sum across the full time dimension at all grid points, ideally using dask-distributed since it's on the order of TB of data
- Calculate return intervals on these 30-day sums (i.e., running
numpy.nanpercentile()or a daskDataFrame.quantile()function with a specified percentile value) - Save the result as a 2D (lat x lon) NetCDF file
It's important for me to be able to load the full time series for any grid point at once, because the rolling sum and return interval calculations depend on the full time period.
Here's the snippet of code I've been trying to do this with (up to step 2 above). I'm running it within a jupyter notebook, and I've specified $TMPDIR as my personal scratch directory (on this system, /glade/scratch/$USER/temp/):
import xarray
import dask_jobqueue
import dask.distributed
import glob
## specify location of precipitation files, use glob to get list of them
PRECT_root = '/glade/collections/cdg/data/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/daily/PRECT/'
filename_list = sorted(glob.glob(PRECT_root + 'b.e11.B1850C5CN.f09_g16.005.cam.h1.PRECT.*.nc'))
## import multi-file data set
mfds = xarray.open_mfdataset(filename_list, chunks={'lat':12, 'lon':12})
mfds_data = mfds['PRECT']
## start cluster object
cluster = dask_jobqueue.PBSCluster(cores=36, memory='100 GB', project='UCLA0022', queue='regular', walltime='06:00:00', local_directory='/glade/scratch/$USER/temp/')
## create 30 workers (I've tried up to 70)
cluster.start_workers(30)
## attach client
client = dask.distributed.Client(cluster)
## create a rolling 30-day sum
mfds_data_30day_rolling_object = mfds_data.rolling(time=30, center=True)
mfds_data_30day_rolling_sum = client.persist(mfds_data_30day_rolling_object.sum())
Even if I load a single file (not the full data set), the final line of code above looks to complete via the dask dashboard, but eventually the job gets killed by the system. The HPC admins have emailed multiple times saying I'm using too much memory on the login nodes, yet I've carefully specified my $TMPDIR and local_directory, and I'm at a loss of how to fix this.
A couple questions I have for this are:
- Is there a better approach for these calculations than the steps I'm taking? (It seems like building a lazy rolling window object, then calling sum/persist, would be a good way to go. I've also tried using
scatter()and futures, but the memory issues persist.) - When I look at the disk usage of my scratch directory (
du -hs /glade/scratch/$USER/temp), it doesn't seem like the workers are taking up any memory during thesum()andpersist(). In this case, where exactly is the memory being used? I have a feeling if I can learn this, I can resolve my issue.- More info:
client.scheduler_info()confirms that my scratch directory has been appropriately set, and the worker-**** files are created in it, but the folder size remains fixed at 32 KB. Perhaps my thinking is off about what setting thelocal_directorydoes in thePBSCluster()function?
- More info:
reprof themfds_data? You should not need to callclient.persist. Have you been able to watch the distributed dashboard? Are you able to tell if workers are particularly memory stressed at some point? - jhamman