1
votes

I'm working with datashader and dask but I'm having problems when trying to plot with a cluster running. To make it more concrete, I have the following example (embedded in a bokeh plot):

import holoviews as hv
import pandas as pd
import dask.dataframe as dd
import numpy as np
from holoviews.operation.datashader import datashade
import datashader.transfer_functions as tf

#initialize the client/cluster
cluster = LocalCluster(n_workers=4, threads_per_worker=1)
dask_client = Client(cluster)


def datashade_plot():
    hv.extension('bokeh')
    #create some random data (in the actual code this is a parquet file with millions of rows, this is just an example)
    delta = 1/1000
    x = np.arange(0, 1, delta)
    y = np.cumsum(np.sqrt(delta)*np.random.normal(size=len(x)))
    df = pd.DataFrame({'X':x, 'Y':y})

    #create dask dataframe
    points_dd = dd.from_pandas(df, npartitions=3)

    #create  plot
    points = hv.Curve(points_dd)
    return  hd.datashade(points)

dask_client.submit(datashade_plot,).result()

This raises a:

TypeError: can't pickle weakref objects

I have the theory that this happens because you can't distribute the datashade operations in the cluster. Sorry if this is a noob question, I'd be very grateful for any advice you could give me.

1

1 Answers

2
votes

I think you want to go the other way. That is, pass datashader a dask dataframe instead of a pandas dataframe:

>>> from dask import dataframe as dd
>>> import multiprocessing as mp
>>> dask_df = dd.from_pandas(df, npartitions=mp.cpu_count())
>>> dask_df.persist()
...
>>> cvs = datashader.Canvas(...)
>>> agg = cvs.points(dask_df, ...)

XREF: https://datashader.org/user_guide/Performance.html