2
votes

I am new to Spark and reasonably new to Clojure (although I really like what Clojure can do so far). I am currently trying to parse JSON in Clojure using sparkling, and I am having trouble with the basics of transforming data and getting it back in a form I can understand and debug. I use dummy data in the example below, but my actual data is over 400GB.

As an example, I first tried splitting each line of my JSON input (each line is a full record) by commas so that I would have a list of keys and values (for eventual conversion to keyword and value maps). In Scala (for which it is easier to find Spark examples) with dummy data, this works fine:

val data = sc.parallelize(Array ("a:1,b:2","a:3,b:4"))
val keyVals = data.map(line => line.split(","))
keyVals.collect()

This returns Array[Array[String]] = Array(Array(a:1, b:2), Array(a:3, b:4)), which is at least a reasonable starting point for key-value mapping.

However, when I run the following in clojure with sparkling:

(def jsony-strings (spark/parallelize sc ["a:1,b:2","a:3,b:4"]))
(def jsony-map (->> jsony-strings
     (spark/map (fn [l] (string/split l #",")))
               ))

(spark/collect jsony-map)

I get the usual concurrency spaghetti from the JVM, the crux of which seems to be:

2018-08-24 18:49:55,796  WARN serialization.Utils:55 - Error deserializing object (clazz: gdelt.core$fn__7437, namespace: gdelt.core)

java.lang.ClassNotFoundException: gdelt.core$fn__7437 

Which is an error I seem to get pretty much anything I try to do something more complex than counts.

Can someone please point me in the right direction?


I guess I should note that my Big Problem is processing lots and lots of lines of JSON in a bigger-than-memory (400G) dataset. I will be using the JSON keys to filter, sort, calculate, etc., and the Spark pipelines looked good for both rapid parallel processing and convenient for these functions. But I am certainly open to considering other alternatives for processing this dataset.

1
Note that I wouldn't mind using clojure.data.json eventually to do the parsing, but I thought it would be best to take this one step at a time. - Matt
I suspect that you have stale class files. Maybe a lein clean and restart helps. - Svante
Thanks Svante. I did so and it didn't change the outcome. - Matt

1 Answers

2
votes

You should use Cheshire for this:

;; parse some json
(parse-string "{\"foo\":\"bar\"}")
;; => {"foo" "bar"}

;; parse some json and get keywords back
(parse-string "{\"foo\":\"bar\"}" true)
;; => {:foo "bar"}

I like to use a shortcut for the 2nd case, since I always want to convert string keys into clojure keywords:

(is= {:a 1 :b 2} (json->edn "{\"a\":1, \"b\":2}"))

It is just a simple wrapper with (I think) an easier-to-remember name:

(defn json->edn [arg] 
  "Shortcut to cheshire.core/parse-string"
  (cc/parse-string arg true))             ; true => keywordize-keys

(defn edn->json [arg] 
  "Shortcut to cheshire.core/generate-string"
  (cc/generate-string arg))

Update: Note that Cheshire can work with lazy streams:

;; parse a stream lazily (keywords option also supported)
(parsed-seq (clojure.java.io/reader "/tmp/foo"))