1
votes

For my Mandelbrot explorer project, I need to run several expensive jobs, ideally in parallel. I decided to try chunking the jobs, and running each chunk in its own thread, and end ended up with something like

(defn point-calculator [chunk-size points]
  (let [out-chan (chan (count points))
        chunked (partition chunk-size points)]

    (doseq [chunk chunked]
      (thread
        (let [processed-chunk (expensive-calculation chunk)]
          (>!! out-chan processed-chunk))))

    out-chan))

Where points is a list of [real, imaginary] coordinates to be tested, and expensive-calculation is a function that takes the chunk, and tests each point in the chunk. Each chunk can take a long time to finish (potentially a minute or more depending on the chunk size and the number of jobs).

On my consumer end, I'm using

(loop []
  (when-let [proc-chunk (<!! result-chan)]
   ; Do stuff with chunk
   (recur)))

To consume each processed chunk. Right now, this blocks when the last chunk is consumed since the channel is still open.

I need a way of closing the channel when the jobs are done. This is proving difficult because of asynchronicity of the producer loop. I can't simply put a close! after the doseq since the loop doesn't block, and I can't just close when the last-indexed job is done, since the order is indeterminate.

The best idea I could come up with was maintaining a (atom #{}) of jobs, and disj each job as it finishes. Then I could either check for the set size in the loop, and close! when it's 0, or attach a watch to the atom and check there.

This seems very hackish though. Is there a more idiomatic way of dealing with this? Does this scenario suggest I'm using async incorrectly?

2

2 Answers

1
votes

i would take a look at the take function from core-async. That is what it's documentation says:

"Returns a channel that will return, at most, n items from ch. After n items have been returned, or ch has been closed, the return channel will close. "

so it leads you to a simple fix: instead of returning out-chan you can just wrap it into take:

(clojure.core.async/take (count chunked) out-chan)

that should work. Also i would recommend you to rewrite your example from blocking put/get to parking (<!, >!) and thread to go / go-loop which is more idiomatic usage for core async.

1
votes

You may want to use async/pipeline(-blocking) to control parallelisms. And use aysnc/onto-chan to close the input channel automatically after all the chunks are copied.

E.g. below example shows a 16x improvement on elapsed time when parallelisms is set to 16.

(defn expensive-calculation [pts]
  (Thread/sleep 100)
  (reduce + pts))

(time
 (let [points     (take 10000 (repeatedly #(rand 100)))
       chunk-size 500
       inp-chan   (chan)
       out-chan   (chan)]
   (go-loop [] (when-let [res (<! out-chan)]
                 ;; do stuff with chunk
                 (recur)))
   (pipeline-blocking 16 out-chan (map expensive-calculation) inp-chan)
   (<!! (onto-chan inp-chan (partition-all chunk-size points)))))