0
votes

I'm new to Spark Scala. I have implemented an solution for Dataset validation for multiple columns using UDF rather than going through individual columns in for loop. But i dint know how this is working faster and i have to explain it was the better solution.

The columns for data validation will be received at run time, so we cannot hard-coded the column names in code. And also the comments column needs to be updated with the column name when column value got failed in validation.

Old Code,

def doValidate(data: Dataset[Row], columnArray: Array[String], validValueArrays: Array[String]): Dataset[Row] = {
var ValidDF: Dataset[Row] = data
var i:Int = 0
for (s <- columnArray) {
        var list = validValueArrays(i).split(",")
    ValidDF = ValidDF.withColumn("comments",when(ValidDF.col(s).isin(list: _*),concat(lit(col("comments")),lit(" Error: Invalid Records in: ") ,lit(s))).otherwise(col("comments")))
    i = i + 1  
}  

return ValidDF;

}

New Code,

def validateColumnValues(data: Dataset[Row], columnArray: Array[String], validValueArrays: Array[String]): Dataset[Row] = {
 var ValidDF: Dataset[Row] = data
 var checkValues = udf((row: Row, comment: String) => {
  var newComment = comment
  for (s: Int  <- 0 to row.length-1) {
    var value = row.get(s)
    var list = validValueArrays(s).split(",")

     if(!list.contains(value))
      {

       newComment = newComment + " Error:Invalid Records in: " + columnArray(s) +";"
      }
    }
     newComment
  });
ValidDF = ValidDF.withColumn("comments",checkValues(struct(columnArray.head, columnArray.tail: _*),col("comments")))

return ValidDF;
} 

columnArray --> Will have list of columns

validValueArrays --> Will have Valid Values Corresponding to column array position. The multiple valid values will be , separated.

I want to know which one better or any other better approach to do it. When i tested new code looks better. And also what is the difference between this two logic's as i read UDF is a black-box for Spark. And in this case the UDF will affect performance in any case?

1

1 Answers

0
votes

I need to correct some closed bracket before running it. One '}' to be removed when you return the validDF. I still get a runtime analysis error.

It is better to avoid UDF as a UDF implies deserialization to process the data in classic Scala and then reserialize it. However, if your requirement cannot be archived using in build SQL function, then you have to go for UDF but you must make sure you review the SparkUI for performance and plan of execution.