0
votes

I have a Spark dataframe with string data that I want to map to numeric data, as follows (simplistic version):

+--------------------+-------+----------+-------------------------+
|     participantUUID|001_Age|002_Gender|003_Where did you grow up|
+--------------------+-------+----------+-------------------------+
|010A0550-4324-490...|     23|    Female|                In a town|
|031C5411-FE42-429...|     56|      Male|                In a town|
|038688FF-B5DA-484...|     32|    Female|                In a town|
|05F8E1AF-AFDD-441...|     54|    Female|          Multiple places|
|068B213C-3303-41E...|     23|    Female|                In a town|
|11A9A444-3E93-468...|     39|    Female|                In a town|

There are many columns so rather than applying the mapping column by column, I want to apply the mapping column-by-column across the whole dataframe.

The mapping from string to numeric varies depending on the column. For example, for one column the strings "poor", "fair", "good", "very good" would attract a score 1,2,3,4; for another column the scores might be 4,3,2,1. So, I thought of developing a udf which takes the column header and the string value as parameters and then apply a Foldleft function based on the dataframe columns, as follows:

val calculateScore = udf((columnName: String, answerText: String) => (columnName, answerText) match {

      case ("002_Gender", "Female") => 0
      case ("002_Gender", "Male") => 1
      case ("002_Gender", "Other") => 2

      case ("003_Where did you grow up", "In a village") => 0 
      case ("003_Where did you grow up", "In a town") => 1
      case ("003_Where did you grow up", "Multiple places") => 2
      case _ => -1
    })

val columnNames = Seq("001_Age", "002_Gender", "003_Where did you grow up")

val newDF: DataFrame = columnNames.foldLeft(baseDF)(
      (baseDF, c) =>
        baseDF.withColumn(c.concat("_numeric"), calculateScore(baseDF(c), baseDF(c)))
    )

However this is not returning the right result - all the results are showing as -1, which means the udf is not matching correctly:

+--------------------+----------------+----------+------------------+-------------------------+---------------------------------+
|     participantUUID|assessmentNumber|002_Gender|002_Gender_numeric|003_Where did you grow up|003_Where did you grow up_numeric|
+--------------------+----------------+----------+------------------+-------------------------+---------------------------------+
|010A0550-4324-490...|               0|    Female|                -1|                In a town|                               -1|
|031C5411-FE42-429...|               0|      Male|                -1|                In a town|                               -1|
|038688FF-B5DA-484...|               0|    Female|                -1|                In a town|                               -1|
|05F8E1AF-AFDD-441...|               0|    Female|                -1|          Multiple places|                               -1|
|068B213C-3303-41E...|               0|    Female|                -1|                In a town|                               -1|

I think it's due to the syntax of the calculateScore udf statement, which should pick up the string column name and the answer text and return an int, evaluated row-by-row within the column. In other words, the foldLeft statement is of the format:

val newDF: DataFrame = columnNames.foldLeft[DataFrame](baseDF)(
      (acc, c) =>
        acc.withColumn(c, col(c))
    )

so the calculateScore(baseDF(c), baseDF(c)) should return an object of type Column - but clearly something is going wrong.

Any ideas will be really appreciated, thanks!

NB. I have already reviewed: Apply UDF to multiple columns in Spark Dataframe but I didn't like the idea of using a var DF as this seems to me to violate the principles of immutable programming in Scala!

2

2 Answers

0
votes

You are passing the exact same parameters to your UDF so column values is getting passed as both the parameters and it is matching with the default case _

You need to pass lit(c) as the first parameter.

df.show
+----------+-------------------------+
|002_Gender|003_Where did you grow up|
+----------+-------------------------+
|    Female|                In a town|
|      Male|          Multiple places|
+----------+-------------------------+

columnNames.foldLeft(df)( (df,c) => df.withColumn(c.concat("_numeric") , calculateScore(lit(c) , df(c) ) ) ).show(false)

+----------+-------------------------+------------------+---------------------------------+
|002_Gender|003_Where did you grow up|002_Gender_numeric|003_Where did you grow up_numeric|
+----------+-------------------------+------------------+---------------------------------+
|Female    |In a town                |0                 |1                                |
|Male      |Multiple places          |1                 |2                                |
+----------+-------------------------+------------------+---------------------------------+
0
votes
var baseDF=Seq(("Female","In a town"),("Male","Multiple places")).toDF("002_Gender","003_Where did you grow up")
 baseDF.show
+----------+-------------------------+
|002_Gender|003_Where did you grow up|
+----------+-------------------------+
|    Female|                In a town|
|      Male|          Multiple places|
+----------+-------------------------+

def calculateScore(columnName: String) = udf((answerText: String) => (columnName, answerText) match {

  case ("002_Gender", "Female") => 0
  case ("002_Gender", "Male") => 1
  case ("002_Gender", "Other") => 2

  case ("003_Where did you grow up", "In a village") => 0 
  case ("003_Where did you grow up", "In a town") => 1
  case ("003_Where did you grow up", "Multiple places") => 2
  case _ => -1
})

val columnNames = Seq("002_Gender", "003_Where did you grow up")

val newDF = columnNames.foldLeft(baseDF)(
    (baseDF, c) =>
      baseDF.withColumn(c.concat("_numeric"), calculateScore(c)(baseDF(c)))
   )
 newDF.show