I have the following project tree
.
└── src
└── dask_test
├── helpers
│ ├── commandline.py
│ ├── data
│ │ ├── dataset0.json
│ │ ├── dataset1000.json
│ │ ├── dataset300.json
│ │ ├── dataset5000.json
│ │ ├── dataset500.json
│ │ ├── events_to_be_used_final_without_google.nl.json
│ │ ├── http-malware_modified.log
│ │ └── public_suffix_list.json
│ ├── datetime.py
│ ├── datetime.pyc
│ ├── __init__.py
│ ├── __init__.pyc
│ ├── math.py
│ ├── math.pyc
│ ├── pipeline.py
│ ├── queues.py
│ ├── search.py
│ ├── services.py
│ ├── sklearn.py
│ ├── splunk_format.py
│ ├── splunk.py
│ └── sqlalchemy.py
├── __init__.py
├── __init__.pyc
├── main.py
└── riskanalysis
├── iaccess
│ ├── __init__.py
│ └── metrics
│ ├── base.py
│ ├── __init__.py
│ └── profile
│ └── __init__.py
├── __init__.py
└── metrics
└── __init__.py
In the beginning of my main.py I import an object from `dask_test.helpers.datetime' like this
from dask_test.helpers.datetime import Timewindow
to use down in my main. In my main file, I have defined some function and apply them on a dask Dataframe like this
dataframe = transformation1(dataframe)
dataframe = transformation2(dataframe)
dataframe = transformation3(dataframe)
dataframe = transformation4(dataframe)
Transformation function take the dask.dataframe and by using apply they add a new column to it like so:
def transformation1(dataframe):
dataframe['new_column'] = dataframe.apply(make_sequence)
return dataframe
Trying to compute using dask distributed and LocalCluster works correctly:
from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=4, threads_per_worker=1, processes=True)
client = Client(cluster)
client.persist(dataframe)
But when opening dask-scheduler and dask-workers I get the following message
return pickle.loads(x) ImportError: No module named dask_test.helpers.datetime
- Fist question does not LocalCluster use pickling?
- All modules need to contain pickleable objects in order to work with dask distributed correctly?
EDIT:
Importing datetime module and cloudpickle it seems that datetime is pickable
from dask_test.helpers import datetime
import cloudpickle
cloudpickle.dumps(datetime) # this works
datetime_module = cloudpickle.loads(cloudpickle.dumps(datetime)) # this also works
EDIT: After some more investigating I saw this in the log files:
distributed.protocol.pickle - INFO - Failed to deserialize �cpandas.core.frame
DataFrame
In my main file, I first create pandas Dataframe and then use from_pandas method to turn it into a dask DataFrame.
EDIT 3: I found what the issue is but I cannot understand why. In my datetime module I have defined an object called TimeWindow to deal with periods of time. My datajson file has a field with timestamps in a form of
timestamp_since-timestamp_until
I apply a function on the dataframe to change add a column that transforms the above to a time window object like this:
def convert_to_time_window(item):
since = my_datetime.utcfromtimestamp_tzaware(float(item.split('-')[0]))
until = my_datetime.utcfromtimestamp_tzaware(float(item.split('-')[1]))
return my_datetime.TimeWindow(tm_since=since, tm_until=until)
and on the Dataframe (this is a pandas dataframe. I do this before creating a dask dataframe)
dataframe['tw'] = dataframe['time_bucket'].apply(convert_to_time_window)
Without it workers work fine. But TimeWindow object and instances are clould pickleable