1
votes

I wrote below function

object AgeClassification {
  def AgeCategory(age:Int) : String = {
    if(age<=30)
      return "Young" 
    else if(age>=65)
      return "Older" 
    else
      return "Mid-age"
  }
}

and I am trying to pass dataframe column as parameter

val df_new = df
  .withColumn("Age_Category", AgeClassification.AgeCategory(df("age")))

but getting the error

:33: error: type mismatch;
found : org.apache.spark.sql.Column
required: Int
val df_new = df.withColumn("Age_Category",AgeClassification.AgeCategory(df("age")))

How to pass column as parameter?

val df_new = df
  .withColumn("Age_Category",AgeClassification.AgeCategory(df.age.cast(IntegerType)))   

:33: error: value age is not a member of org.apache.spark.sql.DataFrame
val df_new = df.withColumn("Age_Category",AgeClassification.AgeCategory(df.age.cast(IntegerType)))

val df_new = df
   .withColumn("Age_Category", AgeClassification.AgeCategory(df("age").cast(Int)))

:33: error: overloaded method value cast with alternatives:
(to: String)org.apache.spark.sql.Column
(to: org.apache.spark.sql.types.DataType)org.apache.spark.sql.Column
cannot be applied to (Int.type)
val df_new = df.withColumn("Age_Category",AgeClassification.AgeCategory(df("age").cast(Int)))

2

2 Answers

3
votes

You can not use scala functions directly when manipulating a dataframe with the SparkSQL API. You can only use "column" functions, defined in the Column class, or in the functions class. They basically tranform columns into columns. The actual computations are handled within Spark.

To illustrate this, you can try this in the REPL:

scala> df("COL1").cast("int")
res6: org.apache.spark.sql.Column = CAST(COL1 AS INT)

The type is Column, not int and this is why scala refuses to apply your function (that epects an integer) on such an object.

To use a custom function, you need to wrap it in a UDF like this:

val ageUDF = udf((age : Int) => AgeClassification.AgeCategory(age))
// or shorter
val ageUDF = udf(AgeClassification.AgeCategory _)

// The you may use it this way:
df.withColumn("classif", ageUDF(df("age")))

Note also that df.age works in pyspark but it is not valid ni scala. For a short way to access columns by name, you can import spark.implicits._ and write $"age" or even shorter 'age.

-1
votes
import org.apache.spark.sql.Column

def AgeCategory(age:Column) : String