0
votes

I am quite new to DASK, to this might be really obvious.... I am trying to run a distributed dask setup with 1 node for the scheduler and enough worker nodes to fit the data in memory -- in this particular case I am using 15 workers. I am able to bring up the cluster just fine and I can also load some data and analyze it.

I have copied the data to the worker nodes, but don't have the data available on my client machine, hence I am delaying the loading of the data like so:

import dask
import dask.dataframe as dd
from dask import delayed

def load_data(path):
    return dd.read_csv(path)

I can then do some simple analysis just fine:

taxi_df_2016 = delayed(load_data)('/tmp/2016/*.csv').compute()
taxi_df_2016['fare_amount'].mean().compute()

... will return me a value

But when I want to persist the files in memory, the scheduler just dies printing Killed on the console....

taxi_df_2016_pers = taxi_df_2016.compute().persist()

will show me this on the console of the scheduler node:

distributed.scheduler - INFO - Register tcp://10.0.0.17:40385
distributed.scheduler - INFO - Register tcp://10.0.0.6:42847
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.17:40385
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.6:42847
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.9:44627
distributed.scheduler - INFO - Register tcp://10.0.0.7:44419
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.9:44627
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.7:44419
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.16:41907
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.16:41907
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.18:41879
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.18:41879
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.13:32993
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.13:32993
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.8:33265
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.8:33265
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.14:33851
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.14:33851
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.10:44653
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.10:44653
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.19:40201
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.19:40201
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.5:42207
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.5:42207
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.15:36087
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.15:36087
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.12:32827
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.12:32827
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.0.11:35405
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.0.11:35405
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Clear task state
Killed

When I look at the dashboard, I see that all 173 partitions are loaded successfully, and the memory gets used, but sometime after that, the scheduler dies.

enter image description here

Any thoughts on how I can debug this?

1
I guess I figured out what I did wrong: I had one compute() too many. taxi_df_2016_pers = taxi_df_2016.persist() works just fine. Wondering why the extra compute() hard crashes the scheduler and how I could get more debugging/logging information in situations like these. - Daniel Schneider

1 Answers

0
votes