9
votes

I am trying to figure out how to use clojure to efficiently apply a simple operation to a large sequence in parallel. I would like to be able to use the parallel solution to take advantage of the multiple cores on my machine to achieve some speedup.

I am attempting to use pmap in combination with partition-all to reduce the overhead of creating a future for every item in the input seq. Unfortunately, partition-all forces the complete evaluation of each partition seq. This causes an OutOfMemoryError on my machine.

(defn sum [vs]
  (reduce + vs))

(def workers
  (+ 2 (.. Runtime getRuntime availableProcessors)))

(let
  [n 80000000
   vs (range n)]

  (time (sum vs))
  (time (sum (pmap sum (partition-all (long (/ n workers)) vs)))))

How can I apply sum to a large input set, and beat the performance of the serial implementation?

Solution

Thanks to @Arthur Ulfeldt for pointing out the reducers library. Here is the solution using reducers. This code shows the expected performance improvement when running on a multi-core machine. (NOTE: I have changed vs to be a function to make the timing be more accurate)

(require '[clojure.core.reducers :as r])

(let
  [n 80000000
   vs #(range n)]

  (time (reduce + (vs)))
  (time (r/fold + (vs)))
1
Are the chunks the wrong way around? (partition-all workers vs) creates (/ n workers) sequences of length workers. Don't you want (partition-all (long (/ n workers)) vs)?A. Webb
@A.Webb, thank you for your correction. I will amend the question. This fix helps make the parallel version a bit faster, but it still cannot beat the serial implementation, and it still runs out of memory on very large inputs.John Atwood

1 Answers

9
votes

When using pmap I have found that fairly large chunks are required to overcome the switching and future overhead try a chunk size of 10,000 for an opperation as fast as +. The potential gains are bounded by the overhead of generating the chunks. This results in an optimal value that balances the available cores and the time required to make the chunks. In this case with + as the workload I was unable to make this faster than the single threaded option.

If you're interested in doing this without pmap and potentially using fork/join check out the new(ish) reducers library

The OOM situation comes from the first test realizing the lazy sequence from (range n) which is then retained so it can be passed to the second sequence.

If I make the + function much slower by defining a slow+ function and use that the diference between single thread, pmap over chunks, and reducers w/ forkJoin become visable:

user> *clojure-version*                                                             
{:major 1, :minor 5, :incremental 0, :qualifier "RC15"}
(require '[clojure.core.reducers :as r]) 

(def workers
  (+ 2 (.. Runtime getRuntime availableProcessors)))

(defn slow+
  ([] 0)
  ([x] x)
  ([x y] (reduce + (range 100000)) (+ x y)))

(defn run-test []
  (let [n 8000]
   (time (reduce slow+ (range n)))
   (time (reduce slow+ (pmap #(reduce slow+ %) (partition-all (* workers 100) (range n)))))
   (time (r/fold slow+ (vec (range n)))))) 

user> (run-test)
"Elapsed time: 28655.951241 msecs" ; one thread
"Elapsed time: 6975.488591 msecs"  ; pmap over chunks
"Elapsed time: 8170.254426 msecs"  ; using reducer