I used spark sql functions arrays_zip combined with flatten to transform data from array of struct of inner array of the same length into array of struct. printSchema shows exactly I want. However, df output lost original column names and replace them with generic column name "0", "1", "2" etc. no matter in Parquet or Avro format. I like to output original column names.
Not to reveal the business of my company. The followings are similar but much simplified examples.
scala> c2.printSchema
root
|-- cal: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- month: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- num: array (nullable = true)
| | | |-- element: long (containsNull = true)
scala> c2.show(false)
+----------------------------------------------+
|cal |
+----------------------------------------------+
|[[[Jan, Feb, Mar], [1, 2, 3]], [[April], [4]]]|
+----------------------------------------------+
I like to transform to
scala> newC2.show(false)
+------------------------------------------+
|cal |
+------------------------------------------+
|[[Jan, 1], [Feb, 2], [Mar, 3], [April, 4]]|
+------------------------------------------+
with
scala> newC2.printSchema
root
|-- cal: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- month: string (nullable = true)
| | |-- num: long (nullable = true)
I know arrays_zip only work well on the top-level arrays. Therefore, I flatten them to top level. The followings codes work in this example
val newC2 = c2.withColumn("month", flatten(col("cal.month"))).withColumn("num", flatten(col("cal.num"))).withColumn("cal", arrays_zip(col("month"), col("num"))).drop("month", "num")
It generates exactly data and schema I want. However, it outputs all columns generically using "0", "1", "2" etc.
newC2.write.option("header", false).parquet("c2_parquet")
I tried another example that has original data of month array and num array at the top level. I can arrays_zip without flatten and get the same schema and data shown. However, it output original field name correctly in this case.
I tried add alias to flatten data. That does not work. I even tried manipulate columns like (assume field store the result of arrays_zip is 'zipped'
val columns: Array[Column] = inner.fields.map(_.name).map{x => col("zipped").getField(x).alias(x)}
val newB3 = newB2.withColumn("b", array(struct(columns:_*))).drop("zipped")
It ends up generate original schema ('month", array of string and "num", array of long).
To duplicate the problem, you can use the json input
"cal":[{"month":["Jan","Feb","Mar"],"num":[1,2,3]},{"month":["April"],"num":[4]}]}
the following json is for top-level arrays_zip
{"month":["Jan","Feb","Mar"],"num":[1,2,3]}
How Spark internally decide what field names to use? How can I get it to work? Please advise.