3
votes

How can I transform an array of structs to again a struct using spark higher order functions?

The dataset:

case class Foo(thing1:String, thing2:String, thing3:String)
case class Baz(foo:Foo, other:String)
case class Bar(id:Int, bazes:Seq[Baz])
import spark.implicits._
val df = Seq(Bar(1, Seq(Baz(Foo("first", "second", "third"), "other"), Baz(Foo("1", "2", "3"), "else")))).toDF
df.printSchema
df.show(false)

I want to concatenate all thing1, thign2, thing3 but keep the other property for each bar.

A simple:

scala> df.withColumn("cleaned", expr("transform(bazes, x -> x)")).printSchema
root
 |-- id: integer (nullable = false)
 |-- bazes: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- foo: struct (nullable = true)
 |    |    |    |-- thing1: string (nullable = true)
 |    |    |    |-- thing2: string (nullable = true)
 |    |    |    |-- thing3: string (nullable = true)
 |    |    |-- other: string (nullable = true)
 |-- cleaned: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- foo: struct (nullable = true)
 |    |    |    |-- thing1: string (nullable = true)
 |    |    |    |-- thing2: string (nullable = true)
 |    |    |    |-- thing3: string (nullable = true)
 |    |    |-- other: string (nullable = true)

Will only copy the thing over.

The desired concatentate operation:

 df.withColumn("cleaned", expr("transform(bazes, x -> concat(x.foo.thing1, '::', x.foo.thing2, '::', x.foo.thing3))")).printSchema

will, unfortunately, remove all the values form the other column:

 +---+----------------------------------------------------+-------------------------------+
|id |bazes                                               |cleaned                        |
+---+----------------------------------------------------+-------------------------------+
|1  |[[[first, second, third], other], [[1, 2, 3], else]]|[first::second::third, 1::2::3]|
+---+----------------------------------------------------+-------------------------------+

How can these be retained? Trying to keep the tuples:

df.withColumn("cleaned", expr("transform(bazes, x -> (concat(x.foo.thing1, '::', x.foo.thing2, '::', x.foo.thing3), x.other))")).printSchema

fails with:

.AnalysisException: cannot resolve 'named_struct('col1', concat(namedlambdavariable().`foo`.`thing1`, '::', namedlambdavariable().`foo`.`thing2`, '::', namedlambdavariable().`foo`.`thing3`), NamePlaceholder(), namedlambdavariable().`other`)' due to data type mismatch: Only foldable string expressions are allowed to appear at odd position, got: NamePlaceholder; line 1 pos 22;

edit

The desired output:

  • a new column with contents:

    [[first::second::third, other], [1::2::3,else]

which retain the column other

1
can you please add expected output.it is not directly accessibly bcoz foo and other are sharing same hierarchy. so please add your desired output. so I could able to add the answer for your question. - Mahesh Gupta
is it clearer now? - Georg Heiler
yes I have added answer please check and accept - Mahesh Gupta
ou concat other, but I would like to keep it as a separate column inside the outputted struct. - Georg Heiler
Ok got your point - Mahesh Gupta

1 Answers

6
votes

In this way, you can achieve your desired output. you cannot directly access other value bcoz foo and other are sharing the same hierarchy. so you need to access other separately.

scala>  df.withColumn("cleaned", expr("transform(bazes, x -> struct(concat(x.foo.thing1, '::', x.foo.thing2, '::', x.foo.thing3),cast(x.other as string)))")).show(false)
+---+----------------------------------------------------+------------------------------------------------+
|id |bazes                                               |cleaned                                         |
+---+----------------------------------------------------+------------------------------------------------+

printSchema

scala>  df.withColumn("cleaned", expr("transform(bazes, x -> struct(concat(x.foo.thing1, '::', x.foo.thing2, '::', x.foo.thing3),cast(x.other as string)))")).printSchema
root
 |-- id: integer (nullable = false)
 |-- bazes: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- foo: struct (nullable = true)
 |    |    |    |-- thing1: string (nullable = true)
 |    |    |    |-- thing2: string (nullable = true)
 |    |    |    |-- thing3: string (nullable = true)
 |    |    |-- other: string (nullable = true)
 |-- cleaned: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- col1: string (nullable = true)
 |    |    |-- col2: string (nullable = true)

let me know if you have further any question related to the same.