2
votes

My objective is to add columns to an existing DataFrame and populate the columns using transformations from existing columns in the DF.

All of the examples I find use withColumn to add the column and when().otherwise() for the transformations.

I desire to use a defined function(x:String) with match case which allows me to use string functions and apply more complex transformations.

Sample DataFrame

val etldf = Seq(   
            ("Total, 20 to 24 years            "),
            ("Men, 20 to 24 years              "),
            ("Women, 20 to 24 years            ")).toDF("A")

Applying a simple transformation using when().otherwise(). I can nest a bunch of these together but soon it will get messy.

val newcol = when($"A".contains("Men"), "Male").
  otherwise(when($"A".contains("Women"), "Female").
  otherwise("Both"))
val newdf = etldf.withColumn("NewCol", newcol)      
newdf.select("A","NewCol").show(100, false)

The output as follows:

+---------------------------------+------+
|A                                |NewCol|
+---------------------------------+------+
|Total, 20 to 24 years            |Both  |
|Men, 20 to 24 years              |Male  |
|Women, 20 to 24 years            |Female|
+---------------------------------+------+

But lets say I wanted a slightly more complex transformation:

val newcol = when($"A".contains("Total") && $"A".contains("years"), $"A".indexOf("to").toString())

It doesn't like this because indexOf is a String function and not a member of ColumnName.

What I really want to do is define a function that can implement very complex transformations and pass that to withColumn():

 def AtoNewCol( A : String): String = A match {
   case a if a.contains("Men") => "Male"
   case a if a.contains("Women") => "Female"
   case a if a.contains("Total") && a.contains("years") => a.indexOf("to").toString()
   case other => "Both"
 }
 AtoNewCol("Total, 20 to 24 years            ")  

The output results in a value of 10 (the position of "to")

But I am faced the same type mismatch: withColumn() wants a ColumnName object:

scala> val newdf = etldf.withColumn("NewCol", AtoNewCol($"A"))
<console>:33: error: type mismatch;
found   : org.apache.spark.sql.ColumnName
required: String
val newdf = etldf.withColumn("NewCol", AtoNewCol($"A"))
                                                    ^

If I change the signature of AtoNewCol(A: org.apache.spark.sql.ColumnName) I get the same problem in the implementation:

scala>  def AtoNewCol( A : org.apache.spark.sql.ColumnName): String = A 
match {
 |     case a if a.contains("Men") => "Male"
 |     case a if a.contains("Women") => "Female"
 |     case a if a.contains("Total") && a.contains("years") => a.indexOf("to").toString()
 |     case other => "Both"
 |   }
<console>:30: error: type mismatch;
found   : org.apache.spark.sql.Column
required: Boolean
       case a if a.contains("Men") => "Male"
                           ^
.
.
.
etc.  

I am hoping that there is a syntax that allows binding the value of the column to the function.

Or maybe there is a function other than withColum() that enables defining more complex functions for the transformations.

Open to all suggestions.

2
You need a udf function for thatRamesh Maharjan

2 Answers

4
votes

All you need is a udf function

import org.apache.spark.sql.functions._
def AtoNewCol = udf(( A : String) => A match {
  case a if a.contains("Men") => "Male"
  case a if a.contains("Women") => "Female"
  case a if a.contains("Total") && a.contains("years") => a.indexOf("to").toString()
  case other => "Both"
})

etldf.withColumn("NewCol", AtoNewCol($"A")).show(false)

And you should get

+---------------------------------+------+
|A                                |NewCol|
+---------------------------------+------+
|Total, 20 to 24 years            |10    |
|Men, 20 to 24 years              |Male  |
|Women, 20 to 24 years            |Female|
+---------------------------------+------+

udf function works row by row and manipulation on data happens on primitive datatypes and not column-wise as with other inbuilt functions

3
votes

You need to create UDF for that, you can try something following. I'm using your defined function as it is.

def AtoNewCol = udf((A: String) => {
  A match {
    case a if a.contains("Men") => "Male"
    case a if a.contains("Women") => "Female"
    case a if a.contains("Total") && a.contains("years") => a.indexOf("to").toString
    case other => "Both"
  }
})

etldf.withColumn("NewCol", AtoNewCol($"A")).show(false)

//    output
//    +---------------------------------+------+
//    |A                                |NewCol|
//    +---------------------------------+------+
//    |Total, 20 to 24 years            |10    | 
//    |Men, 20 to 24 years              |Male  |
//    |Women, 20 to 24 years            |Female|
//    +---------------------------------+------+