2
votes

Based on the streamz documentation, one could leverage a dask distributed cluster in the following way:

from distributed import Client
client = Client('tcp://localhost:8786')  # Connect to scheduler that has distributed workers

from streamz import Stream
source = Stream()
(source.scatter()       # scatter local elements to cluster, creating a DaskStream
       .map(increment)  # map a function remotely
       .buffer(5)       # allow five futures to stay on the cluster at any time
       .gather()        # bring results back to local process
       .sink(write))    # call write locally

for x in range(10):
    source.emit(x)

Conceptually, it isn't clear why we don't have to pass the dask distributed client in as a parameter to instantiate Stream(). More specifically, how does Stream() know what scheduler to attach to?

What would you do if you had two schedulers that have workers on unrelated nodes like:

from distributed import Client
client_1 = Client('tcp://1.2.3.4:8786')
client_2 = Client('tcp://10.20.30.40:8786')

How does one create two streams for client_1 and client_2, respectively?

1

1 Answers

2
votes

The basic rule in Dask is, if there is a distributed client defined, use it for any Dask computations. If there is more than one distributed client, use the most recently created on that is still alive.

Streamz does not explicitly let you choose which client to use when you .scatter(), it uses dask.distributed.default_client() to pick one. You may wish to raise an issue with them to allow a client= keyword. The workflow doesn't even fit a context-based approach. For now, if you wanted to have simultaneous multiple streamz working with data on different Dask clusters, you would probably have to manipulate the state of dask.distributed.client._global_clients.