5
votes

What is the correct way, in Clojure, to do parallel processing when each job of the processing can occur in utter isolation and may generate a list of additional jobs that need to be evaluated?

My actual problem is a nutritional calculation problem, but I will put this in the form of Chess which shares the same problem space traits as my calculation.

Assume, for instance, that I am trying to find all of the moves to Checkmate in a game of Chess. When searching through the board states, I would start out with 20 possible states, each representing a different possible opening move. Each of those will need to be evaluated, accepted or rejected, and then for each accepted move, a new list of jobs would be created representing all of the possible next moves. The jobs would look like this:

initial: '([] proposed-move)
accepted: '([move] proposed-response)
          '([move move] proposed-response)

The number of states to evaluates grows as a result of each computation, and each state can be evaluated in complete isolation from all of the others.

A solution I am playing with goes as such:

; a list of all final solutions, each of which is a sequence of moves
(def solutions (agent []))
; a list of all jobs pending evaluation
(def jobs (agent []))

Given these definitions, I would have a java thread pool, and each thread would request a job from the jobs agent (and wait for that request to be fulfilled). It would then run the calculation, generate a list of solutions and possible solutions. Finally, it would send the solutions to the solutions agent, and the possible solutions to the jobs agent.

Is using a combination of agents and threads the most idiomatic way to go in this case? Can I even get data out of the job queue in the way I am proposing?

Or should my jobs be a java.util.concurrent.LinkedBlockingQueue, as described in Producer consumer with qualifications?

3
Or, maybe a bunch of futures instead of the thread pool?Savanni D'Gerinel
this looks like a very stateful implementation. I'd suggest doing it in a more functional way and rely on the core functions to get around the concurrency issuePaul Lam
This is a graph searching question, not a clojure question; you're glossing over the most important part. Once you've decided on the search algorithm, heuristics, etc., then come back and ask how it might be implemented in clojure.Alex Taggart

3 Answers

3
votes

You can do this with the following approach:

  • Repeated applications of pmap (which provides parallel processing of all elements in collection)
  • The function used in pmap returns a list of elements. Could be zero, one or multiple elements, which will then be processed in the next iteration
  • The results get recombined with concat
  • You repeat the processing of the list for as many times as you like, perhaps storing the result in an atom.

Example code could be something like the following

(def jobs (atom '(1 10 100)))

(defn process-element [value]
  (if (< (rand) 0.8)
    [(inc value)]
    []))

(defn do-processing []
  (swap! jobs 
         (fn [job-list] (apply concat (pmap process-element job-list)))))

(while (seq @jobs)
  (prn @jobs)
  (do-processing))

Whick could produce output like:

(1 10 100)
(2 11 101)
(3 12 102)
(4 13 103)
(5 14 104)
(6 15 105)
(7 106)
(107)
(108)
(109)
nil

Note that you need to be a bit careful to make sure your algorithm terminates! In the example this is guaranteed by the elements dying off over time, but if your seach space is growing then you will probably want to apply a time limit instead of just using a (while ... ) loop.

2
votes

Your approach with agents and threads seems quite close to (what I see as) idiomatic clojure.

the only thing I would change to make it more "clojure like" would be to use pmap to iterate over queue that is stored in an agent. using pmap instead of your own thread pool will save you the effort of managing the thread pool because pmap already uses clojure's thread pool which is initialized properly for the current number of processors. it also helps you take advantage of sequence chunking (which perhaps could help).

0
votes

You could also use channels. Maybe something like this:

(def jobs (chan))
(def solutions (chan))
(def accepted-solutions (atom (vector)))

(go (loop [job (<! jobs)]
      (when job
        (go (doseq [solution (process-job-into-solutions job)]
              (>! solutions)))
        (recur (<! jobs)))))

(go (loop [solution (<! solutions)]
      (when (acceptable? solution)
        (swap! accepted-solutions conj solution)
        (doseq [new-job (generate-new-jobs solution)]
          (>! jobs))
        (recur (<! solutions)))))

(>!! jobs initial-job)