5
votes

I have UDF that processes JSON and returns dynamic data results per row. In my case I need this to validate data and return validated data.

The schema is flexible for each row. This means I cannot create case class for every case (some of my data can be nested).

I've tried to return tuple from my UDF function, but I had no luck in this either (because I needed to convert from list to tuple), and I didn't find an elegant solution for that.

The data types that I'm returning are String, Integer, Double, DateTime, in different order.

I've tried to use map on DataFrame, but having issues with my schema.

import spark.implicits._

def processData(row_type: String) = {
  /*
  completely random output here. Tuple/List/Array of 
  elements with a type Integer, String, Double, DateType.
  */

  // pseudo-code starts here

  if row_type == A
     (1, "second", 3)
  else
     (1, "second", 3, 4)
}

val processDataUDF = udf((row_type: String) => processData(row_type))

val df = Seq((0, 1), (1, 2)).toDF("a", "b")
val df2 = df.select(processDataUDF($"a"))
df2.show(5)
df2.printSchema()

Results

+------------+
|      UDF(a)|
+------------+
|[1,second,3]|
|[1,second,3]|
+------------+

How how should I approach this problem? We have different processing results per row_type. All the row_type's are set dynamically. I can great Schema for each row_type, but I cannot make same UDF return results with different schemas.

Is using map is the only approach here ?

1

1 Answers

6
votes

Spark Dataset is a columnar data structure and there is really no place for a flexible schema here. Schema has to be homogeneous (all rows have to have the same general structure) and known upfront (if you use UDF it has to return well defined SQL type).

You can achieve some flexibility by:

  • Defining schema which represents a superset of all possible fields and mark individual columns as nullable. This is possible only if there are no type conflicts (if Row contains field foo it is always represented using the same SQL type).
  • Using collection types (MapType, ArrayType) to represent fields with variable size. All values and / or keys have to be of the same type.
  • Reshaping raw data to the point where it is actually representable with fixed schema. Spark includes, as its dependency, json4s, which provides a set of tools for merging, diffing and querying JSON data. It can be used to apply relatively complex transformations if needed.

If this is not practical I would recommend keeping JSON field "as is" and parsing it only on-demand to extract specific values. You can use get_json_object and explicit type casting. This allows for testing different scenarios:

coalesce(Seq("$.bar", "$.foo.bar", "$.foobar.foo.bar")
  .map(get_json_object($"json_col", _)): _*).cast(DoubleType)

without assuming a single document structure.

You can get a bit more flexibility with binary Encoders (Encoders.kryo, Encoders.java) or RDD API, which can be used to store union types (or even Any), but if you really expect completely random output, it suggests some serious design or data modeling problem. Even if you can store parsed data it will be really hard to work with it.