10
votes
(require '[clojure.core.reducers :as r])

(def data (into [] (take 10000000 (repeatedly #(rand-int 1000)))))

(defn frequencies [coll]
  (reduce (fn [counts x]
    (merge-with + counts {x 1}))
    {} coll))

(defn pfrequencies [coll]
  (r/reduce (fn [counts x]
    (merge-with + counts {x 1}))
    {} coll))


user=> (time (do (frequencies data) nil))
"Elapsed time: 29697.183 msecs"

user=> (time (do (pfrequencies data) nil))
"Elapsed time: 25273.794 msecs"

user=> (time (do (frequencies data) nil))
"Elapsed time: 25384.086 msecs"

user=> (time (do (pfrequencies data) nil))
"Elapsed time: 25778.502 msecs"

And who can show me an example with significant speedup?

I'm running on Mac OSX 10.7.5 with Java 1.7 on an Intel Core i7 (2 cores, http://ark.intel.com/products/54617).

3
You should use fold not reduce as it is almost same as core reduceAnkur
And even a fold version on 2 cores will probably be still be much slower than the clojure.core/frequencies version which uses transients.A. Webb
@ankur When I try r/fold (and omitting the {} seed argument), I get this error: ArityException Wrong number of args (0) passed to: user$pfrequencies$fn clojure.lang.AFn.throwArity (AFn.java:437)Michiel Borkent
@A.Webb Yeah, but that's not the point. I was just trying to see if I could get some difference using reducers.Michiel Borkent
@MichielBorkent Post your fold version.A. Webb

3 Answers

19
votes

You called it pfrequencies, which, along with your parallel-processing tag on the question, suggests you think that something is using multiple threads here. That is not the case, and neither is it the "main" goal of the reducers library.

The main thing reducers buy you is that you don't need to allocate many intermediate cons cells for your lazy sequences. Before reducers were introduced, frequencies would allocate 10000000 cons cells to create a sequential view of the vector for reduce to use. Now that reducers exist, vectors know how to reduce themselves without creating such temporary objects. But that feature has been backported into clojure.core/reduce, which behaves exactly like r/reduce (ignoring some minor features that are irrelevant here). So you are just benchmarking your function against an identical clone of itself.

The reducers library also includes the notion of a fold, which can do some work in parallel, and then later merge together the intermediate results. To use this, you need to provide more information than reduce needs: you must define how to start a "chunk" from nothing; your function must be associative; and you must specify how to combine chunks. A. Webb's answer demonstrates how to use fold correctly, to get work done on multiple threads.

However, you're unlikely to get any benefit from folding: in addition to the reason he notes (you give up transients, compared to clojure.core/frequencies), building a map is not easily parallelizable. If the bulk of the work in frequencies were addition (as it would be in something like (frequencies (repeat 1e6 1))), then fold would help; but most of the work is in managing the keys in the hashmap, which really has to be single-threaded eventually. You can build maps in parallel, but then you have to merge them together; since that combination step takes time proportional to the size of the chunk, rather than constant time, you gain little by doing the chunks on a separate thread anyway.

5
votes

A fold version of your frequencies function would look something like

(defn pfrequencies [coll] 
  (r/fold 
    (fn combinef
      ([] {})
      ([x y] (merge-with + x y)))
    (fn reducef
      ([counts x] (merge-with + counts {x 1})))
    coll))

On 2 cores, it will likely be much slower than clojure.core/frequencies which uses transients. At least on 4 cores, it is faster (2x) than the first implementation, but still slower than clojure.core/frequencies.

You might also experiment with

(defn p2frequencies [coll]
  (apply merge-with + (pmap clojure.core/frequencies (partition-all 512 coll))))
4
votes

Some serious food for thought in the answers here. In this specific case maps should not be needed, since the result domain can be easily predicted and put in a vector where the index can be used. So, a naive implementation of a naive problem would be something like:

(defn freqs
  [coll]
  (reduce (fn [counts x] (assoc counts x (inc (get counts x))))
          (vec (int-array 1000 0))
          coll))

(defn rfreqs
     [coll]
     (r/fold
       (fn combinef
         ([] (vec (int-array 1000 0)))
         ([& cols] (apply mapv + cols)))
       (fn reducef
         [counts x] (assoc counts x (inc (get counts x))))
       coll))

Here the combinef would be a simple map addition over the 1000 columns of the resulting collections, which should be negligible.

This gives the reducer version a speedup of about 2-3x over the normal version, especially on bigger (10x-100x) datasets. Some twiddling with the partition size of r/fold (optional 'n' parameter) can be done as finetuning. Seemed optimal to use (* 16 1024) with a data size of 1E8 (need 6GB JVM at least).

You could even use transients in both versions, but I didn't notice much improvements.

I know this version isn't appropriate for generic usage, but it might show the speed improvement without the hash management overhead.