2
votes

I have a simple task which is scheduled by dask-scheduler and is running on a worker node.

My requirement is, I want to have the control to stop the task on demand as and when the user wants..

2

2 Answers

1
votes

You will have to build this into your task, perhaps by explicitly checking a distributed Variable object in a loop.

from dask.distributed import Variable

stop = Variable()
stop.set(False)

def my_task():
    while True:
        if stop.get():
            return

        else:
            # do stuff

future = client.submit(my_task)
# wait
stop.set(True)

You will need something explicit like this. Tasks are normally run in separate threads. As far as I know there is no way to interrupt a thread (though I would be happy to learn otherwise).

0
votes

@MRocklin. thanks for your suggestion.. and here is the machinery that I've built around explicit stopping of the running/live task. Although the below code is not re-factored.. kindly trace the logic behind it.. Thanks - Manoranjan (I will mark your answer was really helpful..) :) keep doing good..

import os
import subprocess
from dask.distributed import Variable, Client 
from multiprocessing import Process, current_process
import time

global stop

def my_task(proc):
    print("my_task..")
    print("child proc::", proc)
    p = None
    childProcessCreated = False
    while True:
      print("stop.get()::", stop.get())
      if stop.get():
        print("event triggered for stopping the live task..")
        p.terminate()
        return 100
      else:
        if childProcessCreated == False:
          print("childProcessCreated::", childProcessCreated)
          p = subprocess.Popen("python sleep.py", shell=False)
          childProcessCreated = True
          print("subprocess p::", p, " type::", type(p))
      time.sleep(1)
    print("returnning with 20")
    return 20

if __name__ == '__main__':

    clienta = Client("192.168.1.2:8786")
    print("global declaration..")
    global stop
    stop = Variable("name-xx", client = clienta)
    stop.set(False)
    future = clienta.submit(my_task, 10)
    print("future::waiting for 4 sec..in client side", future)
    time.sleep(3)
    print("future after sleeping for sec", future)
    #print("result::", future.result())
    stop.set(True)
    print("future after stopping the child process::", future)
    print("child process should be stopped by now..")
    #print("future::", future)
    #print("future result::",future.result())

print("over.!")