I am quite new to DASK, to this might be really obvious.... I am trying to run a distributed dask setup with 1 node for the scheduler and enough worker nodes to fit the data in memory -- in this particular case I am using 15 workers. I am able to bring up the cluster just fine and I can also load some data and analyze it.
I have copied the data to the worker nodes, but don't have the data available on my client machine, hence I am delaying the loading of the data like so:
import dask
import dask.dataframe as dd
from dask import delayed
def load_data(path):
return dd.read_csv(path)
I can then do some simple analysis just fine:
taxi_df_2016 = delayed(load_data)('/tmp/2016/*.csv').compute()
taxi_df_2016['fare_amount'].mean().compute()
... will return me a value
But when I want to persist the files in memory, the scheduler just dies printing Killed on the console....
taxi_df_2016_pers = taxi_df_2016.compute().persist()
will show me this on the console of the scheduler node:
distributed.scheduler - INFO - Register tcp://10.0.0.17:40385
distributed.scheduler - INFO - Register tcp://10.0.0.6:42847
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.17:40385
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.6:42847
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.9:44627
distributed.scheduler - INFO - Register tcp://10.0.0.7:44419
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.9:44627
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.7:44419
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.16:41907
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.16:41907
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.18:41879
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.18:41879
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.13:32993
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.13:32993
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.8:33265
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.8:33265
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.14:33851
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.14:33851
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.10:44653
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.10:44653
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.19:40201
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.19:40201
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.5:42207
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.5:42207
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.15:36087
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.15:36087
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.12:32827
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.12:32827
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.11:35405
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.11:35405
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Clear task state
Killed
When I look at the dashboard, I see that all 173 partitions are loaded successfully, and the memory gets used, but sometime after that, the scheduler dies.
Any thoughts on how I can debug this?

compute()too many.taxi_df_2016_pers = taxi_df_2016.persist()works just fine. Wondering why the extracompute()hard crashes the scheduler and how I could get more debugging/logging information in situations like these. - Daniel Schneider