1
votes

In the context of the Dask distributed scheduler w/ a LocalCluster: Can somebody help me understand the dynamics of having a large (heap) mapping function?

For example, consider the Dask Data Frame ddf and the map_partitions operation:

def mapper():
  resource=... #load some large resource eg 50MB

  def inner(pdf):
    return pdf.apply(lambda x: ..., axis=1)

  return inner

mapper_fn = mapper() #50MB on heap
ddf.map_partitions(mapper_fn)

What happens here? Dask will serialize mapper_fn and send to all tasks? Say, I have n partitions so, n tasks.

Empirically, I've observed, that if I have 40 tasks and a 50MB mapper, then it takes about 70s taks to start working, the cluster seems to kinda sit there w/ full CPU, but the dashboard shows nothing. What is happening here? What are the consequences of having large (heap) functions in the dish distributed scheduler?

1

1 Answers

2
votes

Dask serializes non-trivial functions with cloudpickle, and includes the serialized version of those functions in every task. This is highly inefficient. We recommend that you not do this, but instead pass data explicitly.

resource = ...

ddf.map_partitions(func, resource=resource)

This will be far more efficient.