1
votes

I am loading a Dataframe from an external source with the following schema:

 |-- A: string (nullable = true)
 |-- B: timestamp (nullable = true)
 |-- C: long (nullable = true)
 |-- METADATA: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- M_1: integer (nullable = true)
 |    |    |-- M_2: string (nullable = true)
 |    |    |-- M_3: string (nullable = true)
 |    |    |-- M_4: string (nullable = true)
 |    |    |-- M_5: double (nullable = true)
 |    |    |-- M_6: string (nullable = true)
 |    |    |-- M_7: double (nullable = true)
 |    |    |-- M_8: boolean (nullable = true)
 |    |    |-- M_9: boolean (nullable = true)
 |-- E: string (nullable = true)

Now, I need to add new column, METADATA_PARSED, with column type Array and the following case class:

case class META_DATA_COL(M_1: String, M_2: String, M_3, M_10:String)

My approach here, based on examples is to create a UDF and pass in the METADATA column. But since it is of a complex type I am having a lot of trouble parsing it.

On top of that in the UDF, for the "new" variable M_10, I need to do some string manipulation on the method as well. So I need to access each of the elements in the metadata column.

What would be the best way to approach this issue? I attempted to convert the source dataframe (+METADATA) to a case class; but that did not work as it was translated back to spark WrappedArray types upon entering the UDF.

1
Hi there, can you provide an example of the desired output?abiratsis

1 Answers

0
votes

you can Use something like this.

import org.apache.spark.sql.functions._

val tempdf  = df.select(
  explode( col("METADATA")).as("flat") 
)

val processedDf = tempdf.select( col("flat.M_1"),col("flat.M_2"),col("flat.M_3"))

now write a udf


def processudf = udf((col1:Int,col2:String,col3:String) => /* do the processing*/)

this should help, i can provide some more help if you can provide more details on the processing.