0
votes

I need to flatten a dataframe in order to join this with another dataframe in Spark (Scala).

Basically my 2 dataframes have got the following schemas:

DF1

root
|-- field1: string (nullable = true)
|-- field2: long (nullable = true)
|-- field3: long (nullable = true)
|-- field4: long (nullable = true)
|-- field5: integer (nullable = true)
|-- field6: timestamp (nullable = true)
|-- field7: long (nullable = true)
|-- field8: long (nullable = true)
|-- field9: long (nullable = true)
|-- field10: integer (nullable = true)

DF2

root
|-- field1: long (nullable = true)
|-- field2: long (nullable = true)
|-- field3: string (nullable = true)
|-- field4: integer (nullable = true)
|-- field5: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- field6: long (nullable = true)
|    |    |-- field7: integer (nullable = true)
|    |    |-- field8: array (nullable = true)
|    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |-- field9: string (nullable = true)
|    |    |    |    |-- field10: integer (nullable = true)
|-- field11: timestamp (nullable = true)

I honestly have no clue how I can flatten DF2. Finally I need to join the 2 dataframes on DF.field4 = DF2.field9

I'm using 2.1.0

My first thought was to use explode but that is already deprecated in Spark 2.1.0 Does anyone has a clue for me?

1
I think with Datasets you could use flatMap?LiMuBei
I guess you have to explode your arrays first, then joinRaphael Roth
THe explode functionality is deprecated in Spark 2.1.0Oliviervs
Does DF2 have case class or can you share the creation of `df2'?mrsrinivas
My mistake the explode functionality is still available in Spark 2.1.0 under functions.explode in the org.apache.spark.sql packageOliviervs

1 Answers

1
votes

My mistake the explode functionality is still available in Spark 2.1.0 under functions.explode in the org.apache.spark.sql package

Thanks

You can find the code below:

val DF2Exploded1 = DF2.select(DF2("*"), functions.explode(DF2("field5"))
                      .alias("field5_exploded"))

val DF2Exploded2 = DF2Exploded1.select(DF2Exploded1("*"), functions.explode(DF2Exploded1("field5_exploded.field8"))
                               .alias("field8_exploded"))