1
votes

I am trying to query the spark sql dataframe with complex type, where the function should itself be able to create and expression to generate the column dataframe for nested complex datatypes. say

case class SubRecord(x: Int)
case class ArrayElement(foo: String, bar: Int, vals: Array[Double])
case class Record(
  an_array: Array[Int], a_map: Map[String, String], 
  a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])

val df = sc.parallelize(Seq(
  Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1),
         Array(
           ArrayElement("foo", 1, Array(1.0, 2.0)),
           ArrayElement("bar", 2, Array(3.0, 4.0)))),
  Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2),
         Array(ArrayElement("foz", 3, Array(5.0, 6.0)), 
               ArrayElement("baz", 4, Array(7.0, 8.0))))
)).toDF

referred from Querying Spark SQL DataFrame with complex types

for extracting the map type query could be

df.select($"a_map.foo").show

now if I have

case class Record(
  an_array: Array[Int], a_map_new: Map[String, Array[ArrayElement]], 
  a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])

instead of Map[String,String] , how to create a udf that takes the name or index in case of array and generates the result for that nested element in complex datatype. say suppose now i want to query on the vals[0] contained in a_map_new.

1
@user8371915 edits madeVishal
Thanks. Do you want a generic solution? You cannot return Row from udf, so you'd need mapping to external object. Also, should it work for a complete map, or a single key?Alper t. Turker
@user8371915 I am looking for generic solution, say if the final value is struct , ill provide only the name of say column in struct and all the other complexity before that should be created from the schema info of the dataframe.Vishal
I don't think it is possible, and for sure not with udf. Personally I'd choose strongly typed dataset for op like this one.Alper t. Turker
can you clarify vals[0] contained in a_map_new with appropriate example?Ramesh Maharjan

1 Answers

1
votes

In this case, where you have well defined record types, I'd recommend using strongly typed Dataset:

val result = df.as[Record].map(_.a_map_new.mapValues(_.headOption))

result.printSchema
// root
//  |-- value: map (nullable = true)
//  |    |-- key: string
//  |    |-- value: struct (valueContainsNull = true)
//  |    |    |-- foo: string (nullable = true)
//  |    |    |-- bar: integer (nullable = false)
//  |    |    |-- vals: array (nullable = true)
//  |    |    |    |-- element: double (containsNull = false)

With udf the obstacle is it's asymmetric nature:

  • It gets internal type Row.
  • Should return external type.
  • Has to have statically defined return type.

Any generic solution, which would return struct, would have know how to map to external types. I guess you could design something like this (pseudocode)

def f(mapper: Row => T) = udf((map: Map[U, Row]) => map.mapValues(mapper(_)))