0
votes

I have 2 data frames

val df1 = Seq(("1","2","3"),("4","5","6")).toDF("A","B","C")
df1.show
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  1|  2|  3|
|  1|  2|  3|
+---+---+---+

and

val df2 = Seq(("11","22","33"),("44","55","66")).toDF("D","E","F")
df2.show
+---+---+---+
|  D|  E|  F|
+---+---+---+
| 11| 22| 33|
| 44| 55| 66|
+---+---+---+

I need to combine the ones above to get

val df3 = Seq(("1","2","3","","",""),("4","5","6","","",""),("","","","11","22","33"),("","","","44","55","66"))
.toDF("A","B","C","D","E","F")
df3.show
+---+---+---+---+---+---+
|  A|  B|  C|  D|  E|  F|
+---+---+---+---+---+---+
|  1|  2|  3|   |   |   |
|  4|  5|  6|   |   |   |
|   |   |   | 11| 22| 33|
|   |   |   | 44| 55| 66|
+---+---+---+---+---+---+

Right now I'm creating the missing columns for all dataframes manually to get to a common structure and am then using a union. This code is specific to the dataframes and is not scalable

Looking for a solution that will work with x dataframes with y columns each

2

2 Answers

1
votes

You can manually create missing columns in the two data frames and then union them:

import org.apache.spark.sql.DataFrame

val allCols = df1.columns.toSet.union(df2.columns.toSet).toArray

val createMissingCols = (df: DataFrame, allCols: Array[String]) => allCols.foldLeft(df)(
  (_df, _col) => if (_df.columns.contains(_col)) _df else _df.withColumn(_col, lit(""))
).select(allCols.head, allCols.tail: _*)  
// select is needed to make sure the two data frames have the same order of columns

createMissingCols(df1, allCols).union(createMissingCols(df2, allCols)).show
+---+---+---+---+---+---+
|  E|  F|  A|  B|  C|  D|
+---+---+---+---+---+---+
|   |   |  1|  2|  3|   |
|   |   |  4|  5|  6|   |
| 22| 33|   |   |   | 11|
| 55| 66|   |   |   | 44|
+---+---+---+---+---+---+
1
votes

A much simpler way of doing this is creating a full outer join and setting the join expression/condition to false:

val df1 = Seq(("1","2","3"),("4","5","6")).toDF("A","B","C")
val df2 = Seq(("11","22","33"),("44","55","66")).toDF("D","E","F")

val joined = df1.join(df2, lit(false), "full")

joined.show() 

+----+----+----+----+----+----+
|   A|   B|   C|   D|   E|   F|
+----+----+----+----+----+----+
|   1|   2|   3|null|null|null|
|   4|   5|   6|null|null|null|
|null|null|null|  11|  22|  33|
|null|null|null|  44|  55|  66|
+----+----+----+----+----+----+

if you then want to actually set the null values to empty string you can just add:

val withEmptyString = joined.na.fill("")

withEmptyString.show() 

+---+---+---+---+---+---+
|  A|  B|  C|  D|  E|  F|
+---+---+---+---+---+---+
|  1|  2|  3|   |   |   |
|  4|  5|  6|   |   |   |
|   |   |   | 11| 22| 33|
|   |   |   | 44| 55| 66|
+---+---+---+---+---+---+

so in summary df1.join(df2, lit(false), "full").na.fill("") should do the trick.