2
votes

I have a dataframe, which stores the scores and labels for various binary classification class problem that I have. For example:

| problem | score | label |
|:--------|:------|-------|
| a       | 0.8   | true  |  
| a       | 0.7   | true  |  
| a       | 0.2   | false |  
| b       | 0.9   | false |  
| b       | 0.3   | true  |  
| b       | 0.1   | false |  
| ...     | ...   | ...   |

Now my goal is to get binary evaluation metrics (take AreaUnderROC for example, see https://spark.apache.org/docs/2.2.0/mllib-evaluation-metrics.html#binary-classification) for each problem, with end result being something like:

| problem | areaUnderROC |
| a       | 0.83         |
| b       | 0.68         |
| ...     | ...          |

I thought about doing something like:

df.groupBy("problem").agg(getMetrics)

but then I am not sure how to write getMetrics in terms of Aggregators (see https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html). Any suggestions?

2

2 Answers

0
votes

There's a module built just for binary metrics - see it in the python docs

This code should work,

from pyspark.mllib.evaluation import BinaryClassificationMetrics

score_and_labels_a = df.filter("problem = 'a'").select("score", "label")
metrics_a = BinaryClassificationMetrics(score_and_labels)
print(metrics_a.areaUnderROC)
print(metrics_a.areaUnderPR)

score_and_labels_b = df.filter("problem = 'b'").select("score", "label")
metrics_b = BinaryClassificationMetrics(score_and_labels)
print(metrics_b.areaUnderROC)
print(metrics_b.areaUnderPR)

... and so on for the other problems

This seems to me to be the easiest way :)

0
votes

Spark has very useful classes to get metrics from binary or multiclass classification. But they are available for the RDD based api version. So, doing a little bit of code and playing around with dataframes and rdd it can be possible. A ful example could be like the following:

object TestMetrics {

  def main(args: Array[String]) : Unit = {

    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)

    implicit val spark: SparkSession =
      SparkSession
        .builder()
        .appName("Example")
        .master("local[1]")
        .getOrCreate()

    import spark.implicits._

    val sc = spark.sparkContext

    // Test data with your schema

    val someData = Seq(
      Row("a",0.8, true),
      Row("a",0.7, true),
      Row("a",0.2, true),
      Row("b",0.9, true),
      Row("b",0.3, true),
      Row("b",0.1, true)
    )

    // Set your threshold to get a positive or negative
    val threshold : Double = 0.5

    import org.apache.spark.sql.functions._

    // First udf to convert probability in positives or negatives
    def _thresholdUdf(threshold: Double) : Double => Double = prob => if(prob > threshold) 1.0 else 0.0

    // Cast boolean to double
    val thresholdUdf = udf { _thresholdUdf(threshold)}
    val castToDouUdf = udf { (label: Boolean) => if(label) 1.0 else 0.0 }

    // Schema to build the dataframe
    val schema = List(StructField("problem", StringType), StructField("score", DoubleType), StructField("label", BooleanType))

    val df = spark.createDataFrame(spark.sparkContext.parallelize(someData), StructType(schema))

    // Apply first trans to get the double representation of all fields
    val df0 = df.withColumn("binarypredict", thresholdUdf('score)).withColumn("labelDouble", castToDouUdf('label))

    // First loop to get the 'problems list'. Maybe it would be possible to do all in one cycle
    val pbl = df0.select("problem").distinct().as[String].collect()

    // Get the RDD from dataframe and build the Array[(string, BinaryClassificationMetrics)]
    val dfList = pbl.map(a => (a, new BinaryClassificationMetrics(df0.select("problem", "binarypredict", "labelDouble").as[(String, Double, Double)]
                 .filter(el => el._1 == a).map{ case (_, predict, label) => (predict, label)}.rdd)))

    // And the metrics for each 'problem' are available
    val results = dfList.toMap.mapValues(metrics =>
      Seq(metrics.areaUnderROC(),
          metrics.areaUnderROC()))

    val moreMetrics = dfList.toMap.map((metrics) => (metrics._1, metrics._2.scoreAndLabels))

    // Get Metrics by key, in your case the 'problem'
    results.foreach(element => println(element))

    moreMetrics.foreach(element => element._2.foreach { pr => println(s"${element._1} ${pr}") })
    // Score and labels
  }
}