1
votes

Is there any way i can convert a Spark Dataframe to a Dataset[Map[String,Any]] so that i can do a map side job operation on the row once it is converted to Map. The schema of the file is mostly fluid so it is not really possible to create a case class at compile time to use product encoder like dataframe.as[MyClass].

Complexity here is that, data can be nested and it can have Maps and Lists within.

Example Data represented in Json :

{
    "field1": "Sally",
    "field2": "Green",
    "field3": 27,
    "subObject": {
        "subField": "Value"
    },
    "fieldArray": ["A","B","C"],
    "accounting": [
        {
            "firstName": "John",
            "lastName": "Doe",
            "nestedSubField": {
                "x": "y"
            },
            "age": [11,2,33]
        },
        {
            "firstName": "Mary",
            "lastName": "Smith",
            "age": [11,2,33]
        }
    ],
    "sales": [
        {
            "firstName": "Sally",
            "lastName": "Green",
            "age": 27
        },
        {
            "firstName": "Jim",
            "lastName": "Galley",
            "age": 41
        }
    ]
}

When this data is loaded to dataframe, we get the following schema for the dataframe.

Dataframe Schema

  root
     |-- accounting: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- age: array (nullable = true)
     |    |    |    |-- element: long (containsNull = true)
     |    |    |-- firstName: string (nullable = true)
     |    |    |-- lastName: string (nullable = true)
     |    |    |-- nestedSubField: struct (nullable = true)
     |    |    |    |-- x: string (nullable = true)
     |-- field1: string (nullable = true)
     |-- field2: string (nullable = true)
     |-- field3: long (nullable = true)
     |-- fieldArray: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- sales: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- age: long (nullable = true)
     |    |    |-- firstName: string (nullable = true)
     |    |    |-- lastName: string (nullable = true)
     |-- subObject: struct (nullable = true)
     |    |-- subField: string (nullable = true)

Is there any way we can convert this dataframe to Map[String,Any], which will look something as follows. Formatted it a little bit.

Map(
    accounting -> List(
            Map(
                firstName -> John, 
                lastName -> Doe, 
                nestedSubField -> Map(x -> y), 
                age -> List(11, 2, 33)
            ),
            Map(
                firstName -> Mary, 
                lastName -> Smith, 
                age -> List(11, 2, 33)
            )
        ),
    fieldArray -> List(A, B, C),
    subObject -> Map(subField -> Value), 
    field1 -> Sally, 
    field3 -> 27, 
    sales -> List(
            Map(
                firstName -> Sally, 
                lastName -> Green, 
                age -> 27
                ), 
            Map(
                firstName -> Jim, 
                lastName -> Galley, 
                age -> 41)
                ), 
    field2 -> Green
)

Currently i am achieving this as follows. JsonUtil is a wrapper on top of Jackson API

val dataframeAsJsonDataset:Dataset[String] = dataframe.toJSON
val result:Dataset[Map[String,Any]] = dataframeAsJsonDataset.map(each=>JsonUtils.fromJson(each,classOf[Map[String,Any]]))

The above approach is really bad and performs really bad. Any suggestion on this would be really helpful.

1
RDD is very badRemis Haroon - رامز
We do not need to get it out as RDD. Getting it out as Dataset should be fine ie. Dataset[Map[String,Any]]. Updating question to avoid any confusion.tpuli

1 Answers

0
votes

If your schema will be evolving, data quality may be an issue. This calls for using something like spark-records for custom parsing the data with appropriate error checks.

Alternatively, you will want to do this with minimal code (and safety), read the data as text and then parse using, say, json4s, e.g.,

sc.textFile(pathToJson)
  .map { json =>
    import org.json4s._
    org.json4s.jackson.JsonMethods.parse(json) match {
      case JObject(obj) =>
        // build your Map
    }
  }
  .toDF("my_map")