Spark has very useful classes to get metrics from binary or multiclass classification. But they are available for the RDD based api version. So, doing a little bit of code and playing around with dataframes and rdd it can be possible. A ful example could be like the following:
object TestMetrics {
def main(args: Array[String]) : Unit = {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
implicit val spark: SparkSession =
SparkSession
.builder()
.appName("Example")
.master("local[1]")
.getOrCreate()
import spark.implicits._
val sc = spark.sparkContext
// Test data with your schema
val someData = Seq(
Row("a",0.8, true),
Row("a",0.7, true),
Row("a",0.2, true),
Row("b",0.9, true),
Row("b",0.3, true),
Row("b",0.1, true)
)
// Set your threshold to get a positive or negative
val threshold : Double = 0.5
import org.apache.spark.sql.functions._
// First udf to convert probability in positives or negatives
def _thresholdUdf(threshold: Double) : Double => Double = prob => if(prob > threshold) 1.0 else 0.0
// Cast boolean to double
val thresholdUdf = udf { _thresholdUdf(threshold)}
val castToDouUdf = udf { (label: Boolean) => if(label) 1.0 else 0.0 }
// Schema to build the dataframe
val schema = List(StructField("problem", StringType), StructField("score", DoubleType), StructField("label", BooleanType))
val df = spark.createDataFrame(spark.sparkContext.parallelize(someData), StructType(schema))
// Apply first trans to get the double representation of all fields
val df0 = df.withColumn("binarypredict", thresholdUdf('score)).withColumn("labelDouble", castToDouUdf('label))
// First loop to get the 'problems list'. Maybe it would be possible to do all in one cycle
val pbl = df0.select("problem").distinct().as[String].collect()
// Get the RDD from dataframe and build the Array[(string, BinaryClassificationMetrics)]
val dfList = pbl.map(a => (a, new BinaryClassificationMetrics(df0.select("problem", "binarypredict", "labelDouble").as[(String, Double, Double)]
.filter(el => el._1 == a).map{ case (_, predict, label) => (predict, label)}.rdd)))
// And the metrics for each 'problem' are available
val results = dfList.toMap.mapValues(metrics =>
Seq(metrics.areaUnderROC(),
metrics.areaUnderROC()))
val moreMetrics = dfList.toMap.map((metrics) => (metrics._1, metrics._2.scoreAndLabels))
// Get Metrics by key, in your case the 'problem'
results.foreach(element => println(element))
moreMetrics.foreach(element => element._2.foreach { pr => println(s"${element._1} ${pr}") })
// Score and labels
}
}