3
votes

I want to merge several DataFrames having few different columns. Suppose ,

  • DataFrame A has 3 columns: Column_1, Column_2, Column 3

  • DataFrame B has 3 columns: Column_1, Columns_2, Column_4

  • DataFrame C has 3 Columns: Column_1, Column_2, Column_5

I want to merge these DataFrames such that I get a DataFrame like :

Column_1, Column_2, Column_3, Column_4 Column_5

number of DataFrames may increase. Is there any way to get this merge ? such that for a particular Column_1 Column_2 combination i get the values for other three columns in same row, and if for a particular combination of Column_1 Column_2 there is no data in some Columns then it should show null there.

DataFrame A:

Column_1 Column_2 Column_3
   1        x        abc
   2        y        def

DataFrame B:

Column_1 Column_2 Column_4
   1        x        xyz
   2        y        www
   3        z        sdf

The merge of A and B :

Column_1 Column_2 Column_3 Column_4
   1        x        abc     xyz
   2        y        def     www
   3        z        null    sdf
2

2 Answers

6
votes

If I understand your question correctly, you'll be needing to perform an outer join using a sequence of columns as keys.

I have used the data provided in your question to illustrate how it is done with an example :

scala> val df1 = Seq((1,"x","abc"),(2,"y","def")).toDF("Column_1","Column_2","Column_3")
// df1: org.apache.spark.sql.DataFrame = [Column_1: int, Column_2: string, Column_3: string]

scala> val df2 = Seq((1,"x","xyz"),(2,"y","www"),(3,"z","sdf")).toDF("Column_1","Column_2","Column_4")
// df2: org.apache.spark.sql.DataFrame = [Column_1: int, Column_2: string, Column_4: string]

scala> val df3 = df1.join(df2, Seq("Column_1","Column_2"), "outer")
// df3: org.apache.spark.sql.DataFrame = [Column_1: int, Column_2: string, Column_3: string, Column_4: string]

scala> df3.show
// +--------+--------+--------+--------+                                           
// |Column_1|Column_2|Column_3|Column_4|
// +--------+--------+--------+--------+
// |       1|       x|     abc|     xyz|
// |       2|       y|     def|     www|
// |       3|       z|    null|     sdf|
// +--------+--------+--------+--------+

This is called an equi-join with another DataFrame using the given columns.

It is different from other join functions, the join columns will only appear once in the output, i.e. similar to SQL's JOIN USING syntax.

Note

Outer equi-joins are available since Spark 1.6.

0
votes

First use following codes for all three data frames, so that SQL queries can be implemented on dataframes

DF1.createOrReplaceTempView("df1view")
DF2.createOrReplaceTempView("df2view")
DF3.createOrReplaceTempView("df3view")

then use this join command to merge

val intermediateDF = spark.sql("SELECT a.column1, a.column2, a.column3, b.column4 FROM df1view a leftjoin df2view b on a.column1 = b.column1 and a.column2 = b.column2")`

intermediateDF.createOrReplaceTempView("imDFview")

val resultDF = spark.sql("SELECT a.column1, a.column2, a.column3, a.column4, b.column5 FROM imDFview a leftjoin df3view b on a.column1 = b.column1 and a.column2 = b.column2")

these join can also be done together in one join, also since you want all values of column1 and column2,you can use full outer join instead of left join