Based on the streamz documentation, one could leverage a dask distributed cluster in the following way:
from distributed import Client
client = Client('tcp://localhost:8786') # Connect to scheduler that has distributed workers
from streamz import Stream
source = Stream()
(source.scatter() # scatter local elements to cluster, creating a DaskStream
.map(increment) # map a function remotely
.buffer(5) # allow five futures to stay on the cluster at any time
.gather() # bring results back to local process
.sink(write)) # call write locally
for x in range(10):
source.emit(x)
Conceptually, it isn't clear why we don't have to pass the dask distributed client in as a parameter to instantiate Stream(). More specifically, how does Stream() know what scheduler to attach to?
What would you do if you had two schedulers that have workers on unrelated nodes like:
from distributed import Client
client_1 = Client('tcp://1.2.3.4:8786')
client_2 = Client('tcp://10.20.30.40:8786')
How does one create two streams for client_1 and client_2, respectively?