For my Mandelbrot explorer project, I need to run several expensive jobs, ideally in parallel. I decided to try chunking the jobs, and running each chunk in its own thread
, and end ended up with something like
(defn point-calculator [chunk-size points]
(let [out-chan (chan (count points))
chunked (partition chunk-size points)]
(doseq [chunk chunked]
(thread
(let [processed-chunk (expensive-calculation chunk)]
(>!! out-chan processed-chunk))))
out-chan))
Where points
is a list of [real, imaginary] coordinates to be tested, and expensive-calculation
is a function that takes the chunk, and tests each point in the chunk. Each chunk can take a long time to finish (potentially a minute or more depending on the chunk size and the number of jobs).
On my consumer end, I'm using
(loop []
(when-let [proc-chunk (<!! result-chan)]
; Do stuff with chunk
(recur)))
To consume each processed chunk. Right now, this blocks when the last chunk is consumed since the channel is still open.
I need a way of closing the channel when the jobs are done. This is proving difficult because of asynchronicity of the producer loop. I can't simply put a close!
after the doseq
since the loop doesn't block, and I can't just close when the last-indexed job is done, since the order is indeterminate.
The best idea I could come up with was maintaining a (atom #{})
of jobs, and disj
each job as it finishes. Then I could either check for the set size in the loop, and close!
when it's 0, or attach a watch to the atom and check there.
This seems very hackish though. Is there a more idiomatic way of dealing with this? Does this scenario suggest I'm using async
incorrectly?