0
votes

I am using spark-sql-2.4.1v with java8. I have a scenario where I need to perform certain operation if columns presents in the given dataframe column list

I have Sample data frame as below, the columns of dataframe would differ based on external query executed on the database table.

val data = List(
  ("20", "score", "school", "2018-03-31", 14 , 12 , 20),
  ("21", "score", "school", "2018-03-31", 13 , 13 , 21),
  ("22", "rate", "school", "2018-03-31", 11 , 14, 22),
  ("21", "rate", "school", "2018-03-31", 13 , 12, 23)
 )

val df = data.toDF("id", "code", "entity", "date", "column1", "column2" ,"column3"..."columnN")

as show above dataframe "data" columns are not fixed and would vary and would have "column1", "column2" ,"column3"..."columnN" ...

So depend on the column availability i need to perform some operations for the same i am trying to use "when-clause" , when a column present then i have to perform certain operation on the specified column else move on to the next operation..

I am trying below two ways using "when-cluase"

First-way :

 Dataset<Row> resultDs =  df.withColumn("column1_avg", 
                     when( df.schema().fieldNames().contains(col("column1")) , avg(col("column1"))))
                     )
 

Second-way :

  Dataset<Row> resultDs =  df.withColumn("column2_sum", 
                     when( df.columns().contains(col("column2")) , sum(col("column1"))))
                     )

Error:

Cannot invoke contains(Column) on the array type String[]

so how to handle this scenario using java8 code ?

1
pls show expected output - thebluephantom
@thebluephantom expected output is dynamic depend on the column ... i.e. if "column1" exists then I will do avg on column1 , if column2 exists i will do sum on column etc.... tricky part here is if columns does not have "column1" operation should not fail hence i need to check if the column exists or not - BdEngineer
The functions sum and avg are aggregate functions. They would normally return a single row. So it's imperative that you edit your question, and show examples of what you expect as the output. You can do two examples - one when column1 exists and one when it doesn't. Show both input ds and output ds. - RealSkeptic

1 Answers

1
votes

You can create a column having all the column names. then you can check if the column is present or not and process if it is available-

 df.withColumn("columns_available", array(df.columns.map(lit): _*))
      .withColumn("column1_org",
      when( array_contains(col("columns_available"),"column1") , col("column1")))
      .withColumn("x",
        when( array_contains(col("columns_available"),"column4") , col("column1")))
      .withColumn("column2_new",
        when( array_contains(col("columns_available"),"column2") , sqrt("column2")))
      .show(false)