0
votes

I'm using spark-sql-2.4.1v with java8. I have a scenario where I need to add a when condition on column iff that column exist in the respective dataframe . How can it be done?

Ex :

val df = ...// may contain columns either abc, x or y or both....depend on some business logic.

val result_df =  df
                  .withColumn("new_column", when(col("abc") === "a" , concat(col("x"),lit("_"),col("y"))))

// here the problem is some times df may not contain/fetch "x" column then it should give "y" value in the result_df. But in the above statement throwing error as "x" column is not present in df at that point.

So how to check if a column (i.e. "x") presents the use in the concat() else go with remaining columns( i.e. "y")

Here vice versa also possible i.e. only col(x) presents in the df but not column("y"). In some cases both columns x, y available in the df then it is working fine.

Question. how to add the condition in when-clause as and when the column presents in the dataframe. ?

one correction in question. If some colums is not there I should not go into that "withColumn" condition.

Ex :

If column x presents :

val result_df =  df
                  .withColumn("new_x", when(col("abc") === "a" , concat(col("x"))))

If column x presents :

val result_df =  df
                  .withColumn("new_y", when(col("abc") === "a" , concat(col("y"))))

If both column x and y presents :

val result_df =  df
               .withColumn("new_x", when(col("abc") === "a" , concat(col("x"))))
                .withColumn("new_y", when(col("abc") === "a" , concat(col("y"))))
                  .withColumn("new_x_y", when(col("abc") === "a" , concat(col("x"),lit("_"),col("y"))))
2

2 Answers

1
votes

You need to do this with your languages native flow control, e.g. in python/PySpark with if, else statements.

The reason being that Spark df functions work on columns, thus you cannot apply a .when() condition checking col names, it only looks at values within the cols and applies the logic/condition row-wise.

E.g. for F.when(col(x) == col(y)), spark will translate it to Java where it'll apply that logic rowise in the two cols.

This also makes sense if you think that Spark dfs are made up of row objects, so what it does is send the condition to the drive to apply this condition to every object (row), which looks like this [Row(x=2), Row(y=5)].

def check_columns(df, col_x, col_y, concat_name):
    '''
    df: spark dataframe
    col_x & col_y: the two cols to concat if both present
    concat_name: name for new concated col
    -----------------------------------------------------
    returns: df with new concated col if oth x & y cols present
    otherwise if returns df with x or y col if only on present 
    '''
    cols = list(col_x) + list(col_y)
    if all(item in df.columns for item in cols) 
    df = df.withColumn(concat_name, concat(col(col_x),lit("_"),col(col_y)))
    return df

Only need to apply action if both x & y present as if only one is it'll return the df with the existing x or y col anyways.

I would apply something like the above, save as function for re-usability.

What you could do with .when() is only concat values where a condition is met row-wise, this will give you a col with values concated where condition is met.

df.when('concat_col', F.when( F.col('age') < F.lit('18'), 
                      concat(F.col('name'), F.lit('_underAge'))
                      .otherwise(F.col('name'),F.lit('_notUnderAge')))

Hope this helps!

1
votes

You could achive that by creating a list of columns dynamically using the columns property and a simple if Scala/Java statement. The list should include or not the targetColumn depending while the columns was found or not in the dataframe schema (scala code):

import org.apache.spark.sql.functions.{col, concat_ws}

// the column we should check for. Change this accordingly to your requirements i.e "y"
val targetColumn = "x"
var concatItems = Seq(col("y"))

// add targetColumn if found in df.columns
if (df.columns.contains(targetColumn))
  concatItems = concatItems :+ col(targetColumn)

df.withColumn("new_column", when(col("abc") === "a", concat_ws("_", concatItems:_*)))

Note that instead of contact we use concat_ws since it will check automatically while contactItems contains one or more items and apply _ seperator respectively.

Update:

Here is the new updated code using a select statement:

var selectExpr = null

if(df.columns.contains("x") && df.columns.contains("y"))
   selectExpr = Seq(
         when(col("abc") === "a", col("x")).as("new_x"), 
         when(col("abc") === "a", col("y")).as("new_y"),
         when(col("abc") === "a", concat_ws("_", col("x"), col("y"))).as("new_x_y")
   )
else if(df.columns.contains("y"))
   selectExpr =  when(col("abc") === "a", col("y")).as("new_y")
else
   selectExpr =  when(col("abc") === "a", col("x")).as("new_x")

df.select(selectExpr:_*)  

Note that we don't need to use withColumn, select is exactly what you need for your case.