
I've the sample data as shown below, i would need to convert columns(ABS, ALT) from string to Array[structType] using spark scala code. Any help would be much appreciated.

With the help of UDF, i was able to convert from string to arrayType, but need some help on converting from string to Array[structType] for these two columns(ABS, ALT).

VIN         TT  MSG_TYPE ABS                           ALT
MSGXXXXXXXX 1   SIGL     [{"E":1569XXXXXXX,"V":0.0}] 

|-- VIN: string (nullable = true)
|-- TT: long (nullable = true)
|-- MSG_TYPE: string (nullable = true)
|-- ABS: string (nullable = true)
|-- ALT: string (nullable = true)


|-- VIN: string (nullable = true)
|-- TT: long (nullable = true)
|-- MSG_TYPE: string (nullable = true)
|-- ABS: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- E: long (nullable = true)
|    |    |-- V: long (nullable = true)
|-- ALT: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- E: long (nullable = true)
|    |    |-- V: double (nullable = true)

2 Answers


It also works if you try as below:

import org.apache.spark.sql.types.{StructField, StructType, ArrayType, StringType}

val schema = ArrayType(StructType(Seq(StructField("E", LongType), StructField("V", DoubleType))))

val final_df = newDF.withColumn("ABS", from_json($"ABS", schema)).withColumn("ALT", from_json($"ALT", schema))


 |-- VIN: string (nullable = true)
 |-- TT: string (nullable = true)
 |-- MSG_TYPE: string (nullable = true)
 |-- ABS: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: double (nullable = false)
 |-- ALT: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: double (nullable = false)

You can use an udf to parse the Json and transform it into arrays of structs.

First, define a function that parses the Json (based on this answer):

case class Data(E:String, V:Double)
class CC[T] extends Serializable { def unapply(a: Any): Option[T] = Some(a.asInstanceOf[T]) }
object M extends CC[Map[String, Any]]
object L extends CC[List[Any]]
object S extends CC[String]
object D extends CC[Double]

def toStruct(in: String): Array[Data] = {
  if( in == null || in.isEmpty) return new Array[Data](0)
  val result = for {
    Some(L(map)) <- List(JSON.parseFull(in))
    M(data) <- map
    S(e) = data("E")
    D(v) = data("V")
  } yield {
    Data(e, v)

This function returns an array of Data objects, that have already the correct structure. Now we use this function to define an udf

val ts: String => Array[Data] = toStruct(_)
import org.apache.spark.sql.functions.udf
val toStructUdf = udf(ts)

Finally we call the udf (for example in a select statement):

val df = ...
val newdf = df.select('VIN, 'TT, 'MSG_TYPE, toStructUdf('ABS).as("ABS"), toStructUdf('ALT).as("ALT"))


 |-- VIN: string (nullable = true)
 |-- TT: string (nullable = true)
 |-- MSG_TYPE: string (nullable = true)
 |-- ABS: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: string (nullable = true)
 |    |    |-- V: double (nullable = false)
 |-- ALT: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: string (nullable = true)
 |    |    |-- V: double (nullable = false)