1
votes

My project involves computing in parallel a map using Julia's Distributed's pmap function.

Mapping a given element could take a few seconds, or it could take essentially forever. I want a timeout or time limit for an individual map task/computation to complete.

If a map task finishes in time, great, return the result of the computation. If the task doesn't complete by the time limit, stop computation when the time limit has been reached, and return some value or message indicating a timeout occurred.

A minimal example follows. First are imported modules, and then worker processes are launched:

num_procs = 1
using Distributed
if num_procs > 1
    # The main process (no calling addprocs) can be used for `pmap`:
    addprocs(num_procs-1)
end

Next, the mapping task is defined for all the worker processes. The mapping task should timeout after 1 second:

@everywhere import Random
@everywhere begin
    """
    Compute stuff for `wait_time` seconds, and return `wait_time`.
    If `timeout` seconds elapses, stop computation and return something else.
    """
    function waitForTimeUnlessTimeout(wait_time, timeout=1)

        # < Insert some sort of timeout code? >

        # This block of code simulates a long computation.
        # (pretend the computation time is unknown)
        x = 0
        while time()-t0 < wait_time
            x += Random.rand() - 0.5
        end

        # computation completed before time limit. Return wait_time.
        round(wait_time, digits=2)
    end
end

The function that executes the parallel map (pmap) is defined on the main process. Each map task randomly takes up to 2 seconds to complete, but should time out after 1 second.

function myParallelMapping(num_tasks = 20, max_runtime=2)    
    # random task runtimes between 0 and max_runtime
    runtimes = Random.rand(num_tasks) * max_runtime

    # return the parallel computation of the mapping tasks
    pmap((runtime)->waitForTimeUnlessTimeout(runtime), runtimes)
end

print(myParallelMapping())

How should this time-limited parallel map be implemented?

2

2 Answers

1
votes

You could put something like this inside your pmap body

pmap(runtimes) do runtime
  t0 = time()
  task = @async waitForTimeUnlessTimeout(runtime)
  while !istaskdone(task) && time()-t0 < time_limit
      sleep(1)
  end
  istaskdone(task) && (return fetch(task))
  error("time over")
end

Also note that (runtime)->waitForTimeUnlessTimeout(runtime) is the same as just waitForTimeUnlessTimeout .

0
votes

Following @Fredrik Bagge's very helpful answer, here is the full working example implementation with some extra explanation.

num_procs = 8
using Distributed
if num_procs > 1
    addprocs(num_procs-1)
end

@everywhere import Random
@everywhere begin
    function waitForTime(wait_time)
         # This code block simulates a long computation.
         # Pretend the computation time is unknown.
        t0 = time()
        x = 0
        while time()-t0 < wait_time
            x += Random.rand() - 0.5
            yield() # CRITICAL to release computation to check if task is done.
            # If you comment out #yield(), you will see timeout doesn't work!
        end

        return round(wait_time, digits=2)
    end
end

function myParallelMapping(num_tasks = 16, max_runtime=2, time_limit=1)
    # random task runtimes between 0 and max_runtime
    runtimes = Random.rand(num_tasks) * max_runtime

    # parallel compute the mapping tasks. See "do block" in 
    # the Julia documentation, it's just syntactic sugar.
    return pmap(runtimes) do runtime
                  t0 = time()
                  task = @async waitForTime(runtime)
                  while !istaskdone(task) && time()-t0 < time_limit
                      # releases computation to waitForTime
                      sleep(0.1)
                      # nothing past here will run until waitForTime calls yield()
                      # *and* 0.1 seconds have passed.
                  end
                  # equal to if istaskdone(task); return fetch(task); end
                  istaskdone(task) && (return fetch(task))
                  return "TimeOut"
                  # `return error("TimeOut")` halts pmap unless pmap is
                  #  given an error handler argument. See pmap documentation.
              end
end

The output is

julia> print(myParallelMapping())

       Any["TimeOut", "TimeOut", 0.33, 0.35, 0.56, 0.41, 0.08, 0.14, 0.72, 
           "TimeOut", "TimeOut", "TimeOut", 0.52, "TimeOut", 0.33, "TimeOut"]

Note that there are two tasks per process in this example. The original task (the "time checker") is checking every 0.1 seconds if the other task has completed computation. The other task (created with @async) is computing something, periodically calling yield() to release control to the time checker; if it doesn't call yield(), time checking cannot occur.