2
votes

I have two Dataframes df1 (Employee table) & df2 (Department table) with following schema :

df1.columns
// Arrays(id,name,dept_id)

and

df2.columns
// Array(id,name)

After i join these two tables on df1.dept_id and df2.id :

val joinedData = df1.join(df2,df1("dept_id")===df2("id"))
joinedData.columns
// Array(id,name,dept_id,id,name)

While saving it in file,

joined.write.csv("<path>")

it gives error :

 org.apache.spark.sql.AnalysisException: Duplicate column(s) : "name", "id" found, cannot save to file.;

I read about using Sequence of Strings to avoid column duplication but that is for columns on which join is to be performed. I need a similar functionality for non-joined columns.

Is there a direct way to embed table name with repeated column so that it can be saved ?

I came up with a solution of matching columns of both dfs and renaming duplicate columns to append table-name to column-name. But is there a direct way ?

Note : This will be a generic code with only column details on which join is performed. Rest columns will be known at runtime only. So we can't rename columns by hard-coding it.

4
@Vijay Please check all the answers.Alper t. Turker
checked. still no usable answer.Vijay

4 Answers

6
votes

I would just keep all columns by making sure they have different names, e.g. by prepending an identifier to the column names:

val df1Cols = df1.columns
val df2Cols = df2.columns

// prefixes to column names
val df1pf = df1.select(df1Cols.map(n => col(n).as("df1_"+n)):_*)
val df2pf = df2.select(df2Cols.map(n => col(n).as("df2_"+n)):_*)

df1pf.join(df2pf,
    $"df1_dept_id"===$"df2_id",
 )
1
votes

After further research and getting views of other developers, it's sure that there is no direct way. One way is to change all column's name as specified by @Raphael. But i solved my problem by changing only duplicate columns :

val commonCols = df1.columns.intersect(df2.columns)
val newDf2 = changeColumnsName(df2,commonCols,"df1")

where changeColumnsName definition is :

@tailrec
def changeColumnsName(dataFrame: DataFrame, columns: Array[String], tableName: String): DataFrame = {
if (columns.size == 0)
  dataFrame
else
  changeColumnsName(dataFrame.withColumnRenamed(columns.head, tableName + "_" + columns.head), columns.tail, tableName)

}

Now, performing join :

val joinedData = df1.join(newDf2,df1("dept_id")===newDf2("df2_id"))
joinedData.columns
// Array(id,name,dept_id,df2_id,df2_name)
1
votes

You could try using alias for dataframe,

import spark.implicits._

df1.as("df1")
  .join(df2.alias("df2"),df1("dept_id") === df2("id"))
  .select($"df1.*",$"df2.*").show()
0
votes
val llist = Seq(("bob", "2015-01-13", 4), ("alice", "2015-04-23",10)) 
val left = llist.toDF("name","date","duration")
val right = Seq(("alice", 100),("bob", 23)).toDF("name","upload")

val df = left.join(right, left.col("name") === right.col("name")) 

display(df)


head(drop(join(left, right, left$name == right$name), left$name))

https://docs.databricks.com/spark/latest/faq/join-two-dataframes-duplicated-column.html