0
votes

I'm a total newbie in Spark&Scala stuff, it would be great if someone could explain this to me. Let's take following JSON

    {
    "id": 1,
    "persons": [{
        "name": "n1",
        "lastname": "l1",
        "hobbies": [{
            "name": "h1",
            "activity": "a1"
        },
        {
            "name": "h2",
            "activity": "a2"
        }]
    },
    {
        "name": "n2",
        "lastname": "l2",
        "hobbies": [{
            "name": "h3",
            "activity": "a3"
        },
        {
            "name": "h4",
            "activity": "a4"
        }]
    }]
    }

I'm loading this Json to RDD via sc.parralelize(file.json) and to DF via sqlContext.sql.load.json(file.json). So far so good, this gives me RDD and DF (with schema) for mentioned Json, but I want to create annother RDD/DF from existing one that contains all distinct "hobbies" records. How can I achieve sth like that? The only things I get from my operations are multiple WrappedArrays for Hobbies but I cannot go deeper nor assign them to DF/RDD.

Code for SqlContext I have so far

    val jsonData = sqlContext.read.json("path/file.json")
    jsonData.registerTempTable("jsonData") //I receive schema for whole file
    val hobbies = sqlContext.sql("SELECT persons.hobbies FROM jasonData") //subschema for hobbies
    hobbies.show()

That leaves me with

    +--------------------+
    |             hobbies|
    +--------------------+
    |[WrappedArray([a1...|
    +--------------------+

What I expect is more like:

    +--------------------+-----------------+
    |             name   |        activity |
    +--------------------+-----------------|
    |                  h1|              a1 |
    +--------------------+-----------------+
    |                  h2|              a2 |
    +--------------------+-----------------+
    |                  h3|              a3 |
    +--------------------+-----------------+
    |                  h4|              a4 |
    +--------------------+-----------------+
1
Can you please post the code you have written - it makes it easier for us to understand what you have tried and thus where the issues arise.Glennie Helles Sindholt

1 Answers

2
votes

I loaded your example into the dataframe hobbies exactly as you do it and worked with it. You could run something like the following:

val distinctHobbies = hobbies.rdd.flatMap {row => row.getSeq[List[Row]](0).flatten}.map(row => (row.getString(0), row.getString(1))).distinct
val dhDF = distinctHobbies.toDF("activity", "name")

This essentially flattens your hobbies struct, transforms it into a tuple, and runs a distinct on the returned tuples. We then turn it back into a dataframe under the correct column aliases. Because we are doing this through the underlying RDD, there may also be a more efficient way to do it using just the DataFrame API.

Regardless, when I run on your example, I see:

scala> val distinctHobbies = hobbies.rdd.flatMap {row => row.getSeq[List[Row]](0).flatten}.map(row => (row.getString(0), row.getString(1))).distinct
distinctHobbies: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[121] at distinct at <console>:24

scala> val dhDF = distinctHobbies.toDF("activity", "name")
dhDF: org.apache.spark.sql.DataFrame = [activity: string, name: string]

scala> dhDF.show
...
+--------+----+
|activity|name|
+--------+----+
|      a2|  h2|
|      a1|  h1|
|      a3|  h3|
|      a4|  h4|
+--------+----+