0
votes

I have launched many simulations with Dask Distributed:

from time import sleep
from distributed import Client, as_completed

def simulation(x):
    """ Proxy function for simulation """
    sleep(60 * 60 * 24)  # wait one day
    return hash(x)

def save(result):
    with open("result", "w") as f:
        print(result, file=f)

if __name__ == "__main__":
    client = Client("localhost:8786")
    futures = client.map(simulation, range(1000))

    for future in as_completed(future):
        result = future.result()
        save(result)  

However, this code has a bug: open("result", "w") should be open(str(result), "w"). I'd like to correct that mistake, the re-process of the clients futures.

However, I do not know of a way to do that without stopping the Python process with a keyboard interrupt than re-submitting the jobs to the Dask cluster. I don't want to do that because these simulations have taken a couple days.

I want to access all the futures the client has and save all the existing results. How do I make that happen?

Possibly relevant questions

1

1 Answers

0
votes

client.has_what is the method you're looking for:

from distributed import Client, Future

if __name__ == "__main__":
    client = Client("localhost:8786")
    futures = [Future(key) for keys in client.has_what().values() for key in keys]

    for future in as_completed(futures):
        ...