1
votes

I'm new to Spark and Scala. I have a Mongo collection with documents like this:

{
    "_id": "doc_1",
    "posts": {
        "a": { "total": 1 },
        "b": { "total": 2 }
    }
}

I'm loading this into a Spark RDD like this

val rc = ReadConfig(Map("collection" -> "my_collection"), Some(ReadConfig(sparkSession)))
val rdd = MongoSpark.load(sparkContext, rc)

I would like to use flatMap (or another suitable function) to flatten out the posts subdocuments into a new RDD like this:

|--------|---------|-------|
| doc_id | post_id | total |
|--------|---------|-------|
| doc_1  | a       | 1     |
| doc_1  | b       | 2     |
| doc_2  | ...     | ...   |
|--------|---------|-------|

(I'm using an RDD rather than a DataFrame because the documents are large and this appears to use less memory).

The signature of flatMap is flatMap[U](f: (T) => TraversableOnce[U])(implicit arg0: ClassTag[U]): RDD[U]. Each object in the RDD is an org.bson.Document from the Mongo connector, so I want to write something like:

val newRdd = rdd.flatMap( { x: org.bson.Document => { x.posts }})

But this gives:

value posts is not a member of org.bson.Document

I've done a lot of Googling. Seems like this should be simple but I can't figure it out. Can you point me in the right direction?

1
did you try explode function ? - koiralo

1 Answers

3
votes

It's not JavaScript :) You must use only fields that are in the class; JavaScript notation is not allowed.

As I can see, Document has the function get, which you can use:

case class Post (///...
val newRdd = rdd.flatMap( { x: org.bson.Document => { x.get("posts", Post)// do something }})

Where instead of // do something you should post your transformation.