Is there any way i can convert a Spark Dataframe to a Dataset[Map[String,Any]]
so that i can do a map side job operation on the row once it is converted to Map. The schema of the file is mostly fluid so it is not really possible to create a case class at compile time to use product encoder like dataframe.as[MyClass]
.
Complexity here is that, data can be nested and it can have Maps and Lists within.
Example Data represented in Json :
{
"field1": "Sally",
"field2": "Green",
"field3": 27,
"subObject": {
"subField": "Value"
},
"fieldArray": ["A","B","C"],
"accounting": [
{
"firstName": "John",
"lastName": "Doe",
"nestedSubField": {
"x": "y"
},
"age": [11,2,33]
},
{
"firstName": "Mary",
"lastName": "Smith",
"age": [11,2,33]
}
],
"sales": [
{
"firstName": "Sally",
"lastName": "Green",
"age": 27
},
{
"firstName": "Jim",
"lastName": "Galley",
"age": 41
}
]
}
When this data is loaded to dataframe, we get the following schema for the dataframe.
Dataframe Schema
root
|-- accounting: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- age: array (nullable = true)
| | | |-- element: long (containsNull = true)
| | |-- firstName: string (nullable = true)
| | |-- lastName: string (nullable = true)
| | |-- nestedSubField: struct (nullable = true)
| | | |-- x: string (nullable = true)
|-- field1: string (nullable = true)
|-- field2: string (nullable = true)
|-- field3: long (nullable = true)
|-- fieldArray: array (nullable = true)
| |-- element: string (containsNull = true)
|-- sales: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- age: long (nullable = true)
| | |-- firstName: string (nullable = true)
| | |-- lastName: string (nullable = true)
|-- subObject: struct (nullable = true)
| |-- subField: string (nullable = true)
Is there any way we can convert this dataframe to Map[String,Any], which will look something as follows. Formatted it a little bit.
Map(
accounting -> List(
Map(
firstName -> John,
lastName -> Doe,
nestedSubField -> Map(x -> y),
age -> List(11, 2, 33)
),
Map(
firstName -> Mary,
lastName -> Smith,
age -> List(11, 2, 33)
)
),
fieldArray -> List(A, B, C),
subObject -> Map(subField -> Value),
field1 -> Sally,
field3 -> 27,
sales -> List(
Map(
firstName -> Sally,
lastName -> Green,
age -> 27
),
Map(
firstName -> Jim,
lastName -> Galley,
age -> 41)
),
field2 -> Green
)
Currently i am achieving this as follows. JsonUtil is a wrapper on top of Jackson API
val dataframeAsJsonDataset:Dataset[String] = dataframe.toJSON
val result:Dataset[Map[String,Any]] = dataframeAsJsonDataset.map(each=>JsonUtils.fromJson(each,classOf[Map[String,Any]]))
The above approach is really bad and performs really bad. Any suggestion on this would be really helpful.