2
votes

I used spark sql functions arrays_zip combined with flatten to transform data from array of struct of inner array of the same length into array of struct. printSchema shows exactly I want. However, df output lost original column names and replace them with generic column name "0", "1", "2" etc. no matter in Parquet or Avro format. I like to output original column names.

Not to reveal the business of my company. The followings are similar but much simplified examples.

scala> c2.printSchema
root
 |-- cal: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- month: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- num: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
scala> c2.show(false)
+----------------------------------------------+
|cal                                           |
+----------------------------------------------+
|[[[Jan, Feb, Mar], [1, 2, 3]], [[April], [4]]]|
+----------------------------------------------+

I like to transform to

scala> newC2.show(false)
+------------------------------------------+
|cal                                       |
+------------------------------------------+
|[[Jan, 1], [Feb, 2], [Mar, 3], [April, 4]]|
+------------------------------------------+
with
scala> newC2.printSchema
root
 |-- cal: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- month: string (nullable = true)
 |    |    |-- num: long (nullable = true)

I know arrays_zip only work well on the top-level arrays. Therefore, I flatten them to top level. The followings codes work in this example

val newC2 = c2.withColumn("month", flatten(col("cal.month"))).withColumn("num", flatten(col("cal.num"))).withColumn("cal", arrays_zip(col("month"), col("num"))).drop("month", "num")

It generates exactly data and schema I want. However, it outputs all columns generically using "0", "1", "2" etc.

newC2.write.option("header", false).parquet("c2_parquet")

I tried another example that has original data of month array and num array at the top level. I can arrays_zip without flatten and get the same schema and data shown. However, it output original field name correctly in this case.

I tried add alias to flatten data. That does not work. I even tried manipulate columns like (assume field store the result of arrays_zip is 'zipped'

val columns: Array[Column] = inner.fields.map(_.name).map{x => col("zipped").getField(x).alias(x)}
    val newB3 = newB2.withColumn("b", array(struct(columns:_*))).drop("zipped")

It ends up generate original schema ('month", array of string and "num", array of long).

To duplicate the problem, you can use the json input

"cal":[{"month":["Jan","Feb","Mar"],"num":[1,2,3]},{"month":["April"],"num":[4]}]}

the following json is for top-level arrays_zip

{"month":["Jan","Feb","Mar"],"num":[1,2,3]}

How Spark internally decide what field names to use? How can I get it to work? Please advise.

1
Got the same problem. Looks like a bug to me, it sometimes uses columns names, depends on their struct. how did you solved it?trompa

1 Answers

0
votes

Since Spark 2.4, the schema transformation can be achieved using Higher Order Functions. In Scala the query can look like this:

import org.apache.spark.sql.functions.{expr, flatten}

val result = df
.withColumn("cal", flatten(expr("TRANSFORM(cal, x -> zip_with(x.month, x.num, (month, num) -> (month,num)))")))

After applying on your sample data i get this schema:

result.printSchema()
root
 |-- cal: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- month: string (nullable = true)
 |    |    |-- num: long (nullable = true)