4
votes

I need to append multiple columns to the existing spark dataframe where column names are given in List assuming values for new columns are constant, for example given input columns and dataframe are

val columnsNames=List("col1","col2")
val data = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4))

and after appending both columns, assuming constant values are "val1" for col1 and "val2" for col2,output data frame should be

+-----+---+-------+------+
|   _1| _2|col1   |col2|
+-----+---+-------+------+
|  one|  1|val1   |val2|
|  two|  2|val1   |val2|
|three|  3|val1   |val2|
| four|  4|val1   |val2|
+-----+---+-------+------+

i have written a function to append columns

def appendColumns (cols: List[String], ds: DataFrame): DataFrame = {

            cols match {

                case Nil => ds
                case h :: Nil => appendColumns(Nil, ds.withColumn(h, lit(h)))
                case h :: tail => appendColumns(tail, ds.withColumn(h, lit(h)))

            }
        }

Is there any better way and more functional way to do it.

thanks

2
Just to clarify, in appendColumns the column name is the same as the column value, while in the expected output dataframe the value for e.g. col1 is val1, can it be the same (column name and value) or do you want them to be separate? - Shaido
column name and column value can be same. - nat
Odd close reason. - thebluephantom
Hi, Have you found the answer to your question? Or is something still unclear? - Oli
Thanks Oli, yes approach suggested was very good. - nat

2 Answers

6
votes

Yes, there is a better and simpler way. Basically, you make as many calls to withColumn as you have columns. With lots of columns, catalyst, the engine that optimizes spark queries may feel a bit overwhelmed (I've had the experience in the past with a similar use case). I've even seen it cause an OOM on the driver when experimenting with thousands of columns. To avoid stressing catalyst (and write less code ;-) ), you can simply use select like this below to get this done in one spark command:

val data = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF
// let's assume that we have a map that associates column names to their values
val columnMap = Map("col1" -> "val1", "col2" -> "val2")
// Let's create the new columns from the map
val newCols = columnMap.keys.map(k => lit(columnMap(k)) as k)
// selecting the old columns + the new ones
data.select(data.columns.map(col) ++ newCols : _*).show
+-----+---+----+----+
|   _1| _2|col1|col2|
+-----+---+----+----+
|  one|  1|val1|val2|
|  two|  2|val1|val2|
|three|  3|val1|val2|
| four|  4|val1|val2|
+-----+---+----+----+
4
votes

As opposed to recursion the more general approach using a foldLeft would I think be more general, for a limited number of columns. Using Databricks Notebook:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

import spark.implicits._

val columnNames = Seq("c3","c4")
val df = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("c1", "c2")

def addCols(df: DataFrame, columns: Seq[String]): DataFrame = {
    columns.foldLeft(df)((acc, col) => {
      acc.withColumn(col, lit(col)) })
}

val df2 = addCols(df, columnNames)
df2.show(false)

returns:

+-----+---+---+---+
|c1   |c2 |c3 |c4 |
+-----+---+---+---+
|one  |1  |c3 |c4 |
|two  |2  |c3 |c4 |
|three|3  |c3 |c4 |
|four |4  |c3 |c4 |
+-----+---+---+---+

Please beware of the following: https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015 albeit in a slightly different context and the other answer alludes to this via the select approach.